feat: add content source sync pipeline and harden asr timeouts

This commit is contained in:
kris
2026-03-20 10:11:04 +08:00
parent 63af810236
commit 1c539abc6e
15 changed files with 636 additions and 34 deletions

View File

@@ -6,6 +6,7 @@ N8N_BASE_URL=http://127.0.0.1:5670
N8N_ANALYSIS_WEBHOOK_PATH=/webhook/storyforge-analysis
N8N_REAL_CUT_WEBHOOK_PATH=/webhook/storyforge-real-cut
N8N_AI_VIDEO_WEBHOOK_PATH=/webhook/storyforge-ai-video
N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH=/webhook/storyforge-content-source-sync
ORCHESTRATOR_SHARED_SECRET=storyforge-local-secret
CUTVIDEO_BASE_URL=
CUTVIDEO_API_KEY=
@@ -20,6 +21,10 @@ YTDLP_BIN=yt-dlp
FFMPEG_BIN=ffmpeg
WHISPER_BIN=
WHISPER_MODEL=./data/collector/models/ggml-base.en.bin
ASR_HTTP_BASE_URL=
ASR_HTTP_TRANSCRIBE_PATH=/transcribe
ASR_HTTP_FIELD_NAME=wav
ASR_HTTP_TIMEOUT_SEC=120
N8N_IMAGE=docker.n8n.io/n8nio/n8n:latest
WEBHOOK_URL=http://127.0.0.1:5670/
GENERIC_TIMEZONE=Asia/Shanghai

View File

@@ -57,6 +57,7 @@ docker compose up -d --build
- 持久化任务、分镜、分析结果、事件日志
- `n8n` 负责:
- 触发 `analysis_pipeline`
- 触发 `content_source_sync_pipeline`
- 触发 `real_cut_pipeline`
- 触发 `ai_video_pipeline`
- FastGPT 已从主流程设计中移除,不再作为运行时依赖
@@ -66,7 +67,7 @@ docker compose up -d --build
- 新注册账号默认 `pending`
- 主管理员审批后才可使用核心业务接口
- 支持 `user -> project -> knowledge base / assistant(agent) / job / content source` 的多租户边界
- 素材入口支持文字、视频链接、视频上传;内容源账号通过 `content_sources` 建模持久化
- 素材入口支持文字、视频链接、视频上传;内容源账号通过 `content_sources` 建模持久化,并可派生父子分析任务
- `cutvideo` 继续运行在 Windows 机器,本系统通过 API 调度
- `huobao-drama` 继续作为 AI 生成视频主链的核心引擎
- 详细审计、阶段计划和联调步骤见 `docs/`

View File

@@ -57,6 +57,9 @@ interface StoryForgeApiService {
@POST("v2/explore/text")
suspend fun createTextJob(@Body request: ExploreTextRequest): JobDto
@POST("v2/pipelines/content-source-sync")
suspend fun createContentSourceSyncJob(@Body request: ContentSourceSyncRequest): JobDto
@Multipart
@POST("v2/explore/upload-video")
suspend fun uploadVideo(

View File

@@ -159,11 +159,29 @@ data class ExploreTextRequest(
val analysis_model_profile_id: String? = null
)
@Serializable
data class ContentSourceSyncRequest(
val project_id: String = "",
val knowledge_base_id: String = "",
val assistant_id: String = "",
val content_source_id: String = "",
val platform: String = "",
val handle: String = "",
val source_url: String = "",
val title: String = "",
val analysis_model_profile_id: String = "",
val language: String = "auto",
val max_items: Int = 5,
val skip_existing: Boolean = true,
val auto_trigger_analysis: Boolean = true
)
@Serializable
data class JobDto(
val id: String,
val user_id: String,
val project_id: String = "",
val parent_job_id: String = "",
val assistant_id: String? = null,
val knowledge_base_id: String,
val content_source_id: String = "",

View File

@@ -1,5 +1,9 @@
FROM python:3.11-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends ffmpeg \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

View File

@@ -156,6 +156,7 @@ class Database:
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
parent_job_id TEXT,
assistant_id TEXT,
knowledge_base_id TEXT NOT NULL,
content_source_id TEXT,
@@ -255,6 +256,7 @@ class Database:
},
"jobs": {
"project_id": "TEXT",
"parent_job_id": "TEXT",
"content_source_id": "TEXT",
"line_type": "TEXT NOT NULL DEFAULT 'analysis'",
"workflow_key": "TEXT NOT NULL DEFAULT ''",

View File

@@ -134,6 +134,36 @@ class CutVideoClient:
return _unwrap_response(response.json())
class AsrHttpClient:
def __init__(
self,
*,
base_url: str,
transcribe_path: str = "/transcribe",
field_name: str = "wav",
timeout: float = 120.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.transcribe_path = transcribe_path
self.field_name = field_name.strip() or "wav"
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
async def transcribe_audio(self, audio_path: Path) -> dict[str, Any]:
content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream"
async with httpx.AsyncClient(timeout=self.timeout) as client:
with audio_path.open("rb") as handle:
response = await client.post(
_join_url(self.base_url, self.transcribe_path),
files={self.field_name: (audio_path.name, handle, content_type)},
)
response.raise_for_status()
return _unwrap_response(response.json())
class HuobaoDramaClient:
def __init__(self, *, base_url: str, timeout: float = 180.0) -> None:
self.base_url = base_url.rstrip("/")

View File

@@ -18,7 +18,7 @@ from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from .database import Database, utc_now
from .integrations import CutVideoClient, HuobaoDramaClient, N8NClient
from .integrations import AsrHttpClient, CutVideoClient, HuobaoDramaClient, N8NClient
from .openai_compat import OpenAICompatClient
BASE_DIR = Path(__file__).resolve().parents[2]
@@ -35,10 +35,15 @@ YTDLP_BIN = os.getenv("YTDLP_BIN", "yt-dlp")
FFMPEG_BIN = os.getenv("FFMPEG_BIN", "ffmpeg")
WHISPER_BIN = os.getenv("WHISPER_BIN", "")
WHISPER_MODEL = os.getenv("WHISPER_MODEL", str(MODELS_DIR / "ggml-base.en.bin"))
ASR_HTTP_BASE_URL = os.getenv("ASR_HTTP_BASE_URL", "")
ASR_HTTP_TRANSCRIBE_PATH = os.getenv("ASR_HTTP_TRANSCRIBE_PATH", "/transcribe")
ASR_HTTP_FIELD_NAME = os.getenv("ASR_HTTP_FIELD_NAME", "wav")
ASR_HTTP_TIMEOUT_SEC = float(os.getenv("ASR_HTTP_TIMEOUT_SEC", "120"))
N8N_BASE_URL = os.getenv("N8N_BASE_URL", "http://127.0.0.1:5670")
N8N_ANALYSIS_WEBHOOK_PATH = os.getenv("N8N_ANALYSIS_WEBHOOK_PATH", "/webhook/storyforge-analysis")
N8N_REAL_CUT_WEBHOOK_PATH = os.getenv("N8N_REAL_CUT_WEBHOOK_PATH", "/webhook/storyforge-real-cut")
N8N_AI_VIDEO_WEBHOOK_PATH = os.getenv("N8N_AI_VIDEO_WEBHOOK_PATH", "/webhook/storyforge-ai-video")
N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH = os.getenv("N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH", "/webhook/storyforge-content-source-sync")
ORCHESTRATOR_SHARED_SECRET = os.getenv("ORCHESTRATOR_SHARED_SECRET", "")
CUTVIDEO_BASE_URL = os.getenv("CUTVIDEO_BASE_URL", "")
CUTVIDEO_API_KEY = os.getenv("CUTVIDEO_API_KEY", "")
@@ -55,12 +60,19 @@ for path in (DATA_DIR, DOWNLOADS_DIR, JOBS_DIR, MODELS_DIR):
db = Database(DB_PATH)
openai_client = OpenAICompatClient()
asr_http_client = AsrHttpClient(
base_url=ASR_HTTP_BASE_URL,
transcribe_path=ASR_HTTP_TRANSCRIBE_PATH,
field_name=ASR_HTTP_FIELD_NAME,
timeout=ASR_HTTP_TIMEOUT_SEC,
)
n8n_client = N8NClient(
base_url=N8N_BASE_URL,
workflow_paths={
"analysis_pipeline": N8N_ANALYSIS_WEBHOOK_PATH,
"real_cut_pipeline": N8N_REAL_CUT_WEBHOOK_PATH,
"ai_video_pipeline": N8N_AI_VIDEO_WEBHOOK_PATH,
"content_source_sync_pipeline": N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH,
},
shared_secret=ORCHESTRATOR_SHARED_SECRET,
)
@@ -187,6 +199,22 @@ class ContentSourceCreateRequest(BaseModel):
metadata: dict[str, Any] = Field(default_factory=dict)
class ContentSourceSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
platform: str = ""
handle: str = ""
source_url: str = ""
title: str = ""
analysis_model_profile_id: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
class RealCutJobRequest(BaseModel):
project_id: str = ""
title: str
@@ -532,6 +560,7 @@ def job_payload(row: dict[str, Any]) -> dict[str, Any]:
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"parent_job_id": row.get("parent_job_id", ""),
"assistant_id": row.get("assistant_id"),
"knowledge_base_id": row["knowledge_base_id"],
"content_source_id": row.get("content_source_id", ""),
@@ -633,6 +662,17 @@ def merge_json_field(current_raw: str | None, updates: dict[str, Any]) -> str:
return json.dumps(current, ensure_ascii=False)
def update_content_source_metadata(source_id: str, updates: dict[str, Any]) -> dict[str, Any]:
row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,))
if not row:
raise HTTPException(status_code=404, detail="Content source not found")
db.execute(
"UPDATE content_sources SET metadata_json = ?, updated_at = ? WHERE id = ?",
(merge_json_field(row.get("metadata_json") or "{}", updates), utc_now(), source_id),
)
return db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,))
def update_job_state(
job_id: str,
*,
@@ -682,6 +722,8 @@ def update_job_state(
def job_context_payload(row: dict[str, Any]) -> dict[str, Any]:
payload = job_payload(row)
payload["parent_job"] = None
payload["child_jobs"] = []
payload["project"] = None
payload["assistant"] = None
payload["knowledge_base"] = None
@@ -707,6 +749,16 @@ def job_context_payload(row: dict[str, Any]) -> dict[str, Any]:
if source:
payload["content_source"] = content_source_payload(source)
if row.get("parent_job_id"):
parent = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (row["parent_job_id"],))
if parent:
payload["parent_job"] = job_payload(parent)
payload["child_jobs"] = [
job_payload(item)
for item in db.fetch_all("SELECT * FROM jobs WHERE parent_job_id = ? ORDER BY created_at ASC", (row["id"],))
]
payload["events"] = [
job_event_payload(item)
for item in db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (row["id"],))
@@ -863,13 +915,82 @@ def fallback_transcript_from_text(title: str, content: str) -> str:
return f"标题:{title}\n\n正文:\n{content.strip()}"
def infer_platform_from_url(source_url: str) -> str:
normalized = source_url.strip().lower()
if "bilibili.com" in normalized or "b23.tv" in normalized:
return "bilibili"
if "douyin.com" in normalized or "iesdouyin.com" in normalized:
return "douyin"
if "xiaohongshu.com" in normalized or "xhslink.com" in normalized:
return "xiaohongshu"
if "youtube.com" in normalized or "youtu.be" in normalized:
return "youtube"
return ""
def command_exists(name: str) -> bool:
return shutil.which(name) is not None
def run_command(command: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
proc = subprocess.run(command, cwd=str(cwd) if cwd else None, capture_output=True, text=True)
return proc.returncode, proc.stdout, proc.stderr
def run_command(command: list[str], cwd: Path | None = None, timeout: float | None = None) -> tuple[int, str, str]:
try:
proc = subprocess.run(
command,
cwd=str(cwd) if cwd else None,
capture_output=True,
text=True,
timeout=timeout,
)
return proc.returncode, proc.stdout, proc.stderr
except subprocess.TimeoutExpired as exc:
stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode("utf-8", errors="ignore")
stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode("utf-8", errors="ignore")
detail = stderr or f"Command timed out after {timeout} seconds"
return 124, stdout, detail
def discover_account_video_links(source_url: str, max_items: int) -> tuple[list[dict[str, Any]], dict[str, Any]]:
if not command_exists(YTDLP_BIN):
raise HTTPException(status_code=503, detail="yt-dlp is not configured")
discovery_cmd = [
YTDLP_BIN,
"--flat-playlist",
"--playlist-end",
str(max_items),
"--print",
"%(webpage_url)s\t%(title)s\t%(id)s",
source_url,
]
code, stdout, stderr = run_command(discovery_cmd, timeout=180)
raw_lines = [line.strip() for line in stdout.splitlines() if line.strip()]
items: list[dict[str, Any]] = []
seen_urls: set[str] = set()
for line in raw_lines:
parts = line.split("\t")
video_url = parts[0].strip() if parts else ""
raw_title = parts[1].strip() if len(parts) > 1 else ""
raw_external_id = parts[2].strip() if len(parts) > 2 else ""
if not video_url or video_url == "NA" or video_url in seen_urls:
continue
seen_urls.add(video_url)
items.append(
{
"video_url": video_url,
"title": raw_title if raw_title and raw_title != "NA" else "短视频素材",
"external_id": raw_external_id if raw_external_id != "NA" else "",
}
)
debug_payload = {
"discovery_command": discovery_cmd,
"discovery_stdout_preview": raw_lines[: min(len(raw_lines), max_items)],
"discovery_stderr": stderr.strip()[:1000],
"discovery_exit_code": code,
}
if code != 0:
raise HTTPException(status_code=502, detail=f"Failed to inspect content source: {stderr.strip()[:200] or 'yt-dlp error'}")
return items, debug_payload
def validate_real_cut_source_job(source_job: dict[str, Any]) -> None:
@@ -931,6 +1052,7 @@ def create_job_record(
account_id: str,
project_id: str,
knowledge_base_id: str,
parent_job_id: str | None = None,
source_type: str,
line_type: str,
workflow_key: str,
@@ -947,16 +1069,17 @@ def create_job_record(
db.execute(
"""
INSERT INTO jobs (
id, user_id, project_id, assistant_id, knowledge_base_id, content_source_id,
id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id,
source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id,
source_url, title, language, status, transcript_text, style_summary, upload_status,
error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'n8n', '', '', ?, ?, ?, 'pending', '', '', 'pending', '', ?, '{}', ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'n8n', '', '', ?, ?, ?, 'pending', '', '', 'pending', '', ?, '{}', ?, ?, ?)
""",
(
job_id,
account_id,
project_id,
parent_job_id,
assistant_id,
knowledge_base_id,
content_source_id,
@@ -1018,8 +1141,8 @@ def huobao_image_size_for_aspect_ratio(aspect_ratio: str) -> str:
return "1024x1536"
def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: str = "") -> tuple[str, dict[str, str]]:
artifacts: dict[str, str] = {}
async def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: str = "") -> tuple[str, dict[str, Any]]:
artifacts: dict[str, Any] = {}
transcript = ""
media_path = source_path
artifacts["source_path"] = str(media_path)
@@ -1041,6 +1164,21 @@ def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: s
elif err:
artifacts["ffmpeg_error"] = err.strip()[:500]
if asr_http_client.enabled and media_path.exists():
try:
asr_payload = await asr_http_client.transcribe_audio(media_path)
artifacts["asr_http_payload"] = {
"success": bool(asr_payload.get("success", True)),
"duration_ms": asr_payload.get("duration_ms"),
"error_message": str(asr_payload.get("error_message") or "")[:500],
}
transcript = str(asr_payload.get("text") or "").strip()
if transcript:
artifacts["asr_backend"] = "http"
except Exception as exc:
error_detail = str(exc).strip() or exc.__class__.__name__
artifacts["asr_http_error"] = error_detail[:500]
if WHISPER_BIN and Path(WHISPER_BIN).exists() and Path(WHISPER_MODEL).exists():
out_prefix = job_dir / "whisper"
code, stdout, stderr = run_command([
@@ -1055,8 +1193,11 @@ def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: s
])
txt_path = Path(str(out_prefix) + ".txt")
if code == 0 and txt_path.exists():
transcript = txt_path.read_text(encoding="utf-8", errors="ignore").strip()
artifacts["transcript_path"] = str(txt_path)
cli_transcript = txt_path.read_text(encoding="utf-8", errors="ignore").strip()
if cli_transcript:
transcript = cli_transcript
artifacts["transcript_path"] = str(txt_path)
artifacts["asr_backend"] = artifacts.get("asr_backend") or "whisper_cli"
else:
artifacts["whisper_stdout"] = stdout.strip()[:500]
artifacts["whisper_error"] = stderr.strip()[:500]
@@ -1120,11 +1261,11 @@ async def process_job(job_id: str) -> None:
artifacts["download_stdout"] = stdout.strip()[:500]
else:
artifacts["download_error"] = stderr.strip()[:500]
transcript_text, extra = transcribe_media(job_dir, downloaded if downloaded.exists() else job_dir / "placeholder.mp4", row["title"], row.get("source_url") or "")
transcript_text, extra = await transcribe_media(job_dir, downloaded if downloaded.exists() else job_dir / "placeholder.mp4", row["title"], row.get("source_url") or "")
artifacts.update(extra)
elif row["source_type"] == "upload_video":
source_path = Path(artifacts.get("uploaded_path", ""))
transcript_text, extra = transcribe_media(job_dir, source_path, row["title"], row.get("source_url") or "")
transcript_text, extra = await transcribe_media(job_dir, source_path, row["title"], row.get("source_url") or "")
artifacts.update(extra)
profile = model_profile_for_account(row["user_id"], row.get("analysis_model_profile_id") or None)
@@ -1223,6 +1364,7 @@ def healthz() -> dict[str, Any]:
"dbPath": DB_PATH,
"defaultExternalBaseUrl": DEFAULT_EXTERNAL_BASE_URL,
"localModelBaseUrl": LOCAL_OPENAI_BASE_URL,
"asrHttpBaseUrl": ASR_HTTP_BASE_URL,
"n8nBaseUrl": N8N_BASE_URL,
"cutvideoBaseUrl": CUTVIDEO_BASE_URL,
"cutvideoUploadTimeoutSec": CUTVIDEO_UPLOAD_TIMEOUT_SEC,
@@ -1448,6 +1590,88 @@ def create_content_source_api(request: ContentSourceCreateRequest, account: dict
return content_source_payload(row)
@app.post("/v2/pipelines/content-source-sync")
async def create_content_source_sync_job(
request: ContentSourceSyncRequest,
account: dict[str, Any] = Depends(require_approved),
) -> dict[str, Any]:
source_row = None
if request.content_source_id.strip():
source_row = load_owned_content_source(request.content_source_id.strip(), account["id"])
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
kb = resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
profile = model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
source_url = (request.source_url or (source_row or {}).get("source_url") or "").strip()
if not source_url:
raise HTTPException(status_code=400, detail="source_url or content_source_id is required")
platform = (request.platform or (source_row or {}).get("platform") or infer_platform_from_url(source_url)).strip()
handle = (request.handle or (source_row or {}).get("handle") or "").strip()
source_title = (
request.title.strip()
or (source_row or {}).get("title", "").strip()
or handle
or source_url
)
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
if not source_row:
source_row = create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind="creator_account",
platform=platform,
handle=handle,
source_url=source_url,
title=source_title,
metadata={
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
},
)
job_row = create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{source_title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": platform,
"handle": handle,
"source_account_url": source_url,
"source_title": source_title,
"max_items": request.max_items,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
},
analysis_model_profile_id=profile["id"],
)
update_content_source_metadata(
source_row["id"],
{
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": utc_now(),
},
)
return job_payload(await trigger_orchestrated_job(job_row))
@app.get("/v2/model-profiles")
def list_model_profiles(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]:
rows = db.fetch_all(
@@ -1521,8 +1745,25 @@ def list_knowledge_documents(knowledge_base_id: str, account: dict[str, Any] = D
@app.get("/v2/explore/jobs")
def list_jobs(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]:
return [job_payload(row) for row in db.fetch_all("SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))]
def list_jobs(
parent_job_id: str | None = Query(default=None),
line_type: str | None = Query(default=None),
account: dict[str, Any] = Depends(require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?"]
params: list[Any] = [account["id"]]
if parent_job_id is not None:
normalized_parent = parent_job_id.strip()
if normalized_parent:
clauses.append("parent_job_id = ?")
params.append(normalized_parent)
else:
clauses.append("(parent_job_id IS NULL OR parent_job_id = '')")
if line_type:
clauses.append("line_type = ?")
params.append(line_type.strip())
sql = f"SELECT * FROM jobs WHERE {' AND '.join(clauses)} ORDER BY created_at DESC"
return [job_payload(row) for row in db.fetch_all(sql, tuple(params))]
@app.get("/v2/explore/jobs/{job_id}")
@@ -1928,6 +2169,13 @@ def load_owned_job(job_id: str, account_id: str) -> dict[str, Any]:
return row
def load_owned_content_source(source_id: str, account_id: str) -> dict[str, Any]:
row = db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id))
if not row:
raise HTTPException(status_code=404, detail="Content source not found")
return row
def load_internal_job(job_id: str) -> dict[str, Any]:
row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))
if not row:
@@ -2003,6 +2251,159 @@ async def internal_run_analysis(
return job_context_payload(load_internal_job(row["id"]))
@app.post("/internal/jobs/steps/content-source-sync")
async def internal_content_source_sync(
request: InternalStepRequest | None = Body(default=None),
job_id: str = Query(default=""),
_: bool = Depends(require_orchestrator),
) -> dict[str, Any]:
row = load_step_job(request, job_id, "content_source_sync_pipeline")
artifacts = parse_job_artifacts(row)
source_url = str(artifacts.get("source_account_url") or row.get("source_url") or "").strip()
if not source_url:
raise HTTPException(status_code=400, detail="Content source sync job is missing source URL")
max_items = max(1, min(int(artifacts.get("max_items") or 5), 20))
skip_existing = bool(artifacts.get("skip_existing", True))
auto_trigger_analysis = bool(artifacts.get("auto_trigger_analysis", True))
update_job_state(
row["id"],
status="processing",
provider_name="collector",
provider_task_id=row["id"],
result={"sync_started": True},
)
try:
discovered_items, debug_payload = discover_account_video_links(source_url, max_items)
child_jobs: list[dict[str, Any]] = []
queued_jobs: list[dict[str, Any]] = []
skipped_items: list[dict[str, Any]] = []
for index, item in enumerate(discovered_items, start=1):
video_url = str(item.get("video_url") or "").strip()
if not video_url:
continue
existing_row = db.fetch_one(
"""
SELECT * FROM jobs
WHERE user_id = ? AND project_id = ? AND source_type = 'video_link' AND source_url = ?
ORDER BY created_at DESC
LIMIT 1
""",
(row["user_id"], row.get("project_id", ""), video_url),
)
if existing_row and skip_existing:
skipped_items.append(
{
"video_url": video_url,
"title": item.get("title") or existing_row.get("title") or "短视频素材",
"existing_job_id": existing_row["id"],
"existing_status": existing_row.get("status", ""),
}
)
continue
content_source = create_content_source(
account_id=row["user_id"],
project_id=row.get("project_id", ""),
source_kind="video_link",
platform=str(artifacts.get("platform") or infer_platform_from_url(video_url)),
handle=str(artifacts.get("handle") or ""),
source_url=video_url,
title=str(item.get("title") or f"内容源视频 {index}"),
metadata={
"origin_content_source_id": row.get("content_source_id", ""),
"origin_sync_job_id": row["id"],
"external_id": str(item.get("external_id") or ""),
"source_account_url": source_url,
},
)
child_row = create_job_record(
account_id=row["user_id"],
project_id=row.get("project_id", ""),
parent_job_id=row["id"],
knowledge_base_id=row["knowledge_base_id"],
source_type="video_link",
line_type="analysis",
workflow_key="analysis_pipeline",
title=str(item.get("title") or f"内容源视频 {index}"),
language=row.get("language", "auto"),
source_url=video_url,
assistant_id=row.get("assistant_id"),
content_source_id=content_source["id"],
artifacts={
"origin_content_source_id": row.get("content_source_id", ""),
"origin_sync_job_id": row["id"],
"source_account_url": source_url,
},
analysis_model_profile_id=row.get("analysis_model_profile_id", ""),
)
child_jobs.append(job_payload(child_row))
if auto_trigger_analysis:
queued_child = await trigger_orchestrated_job(child_row)
queued_jobs.append(job_payload(queued_child))
if row.get("content_source_id"):
update_content_source_metadata(
row["content_source_id"],
{
"last_sync_job_id": row["id"],
"last_sync_completed_at": utc_now(),
"last_discovered_count": len(discovered_items),
"last_enqueued_job_ids": [item["id"] for item in queued_jobs] or [item["id"] for item in child_jobs],
"last_skipped_existing_count": len(skipped_items),
"last_source_account_url": source_url,
"last_sync_error": "",
},
)
updated = update_job_state(
row["id"],
status="completed",
provider_name="collector",
provider_task_id=row["id"],
artifacts={
**debug_payload,
"discovered_videos": discovered_items,
"skipped_existing": skipped_items,
"child_job_ids": [item["id"] for item in child_jobs],
"queued_job_ids": [item["id"] for item in queued_jobs],
},
result={
"discovered_count": len(discovered_items),
"queued_count": len(queued_jobs) if auto_trigger_analysis else len(child_jobs),
"skipped_count": len(skipped_items),
"child_jobs": queued_jobs or child_jobs,
"skipped_existing": skipped_items,
},
)
return job_context_payload(updated)
except HTTPException as exc:
error = str(exc.detail)
except Exception as exc:
error = str(exc)
if row.get("content_source_id"):
update_content_source_metadata(
row["content_source_id"],
{
"last_sync_job_id": row["id"],
"last_sync_completed_at": utc_now(),
"last_sync_error": error[:500],
"last_source_account_url": source_url,
},
)
updated = update_job_state(
row["id"],
status="failed",
error=error[:500],
provider_name="collector",
provider_task_id=row["id"],
)
return job_context_payload(updated)
@app.post("/internal/jobs/steps/real-cut/submit")
async def internal_real_cut_submit(
request: InternalStepRequest | None = Body(default=None),

View File

@@ -3,3 +3,4 @@ uvicorn[standard]==0.34.0
httpx==0.28.1
python-multipart==0.0.20
pydantic==2.11.1
yt-dlp

View File

@@ -36,6 +36,7 @@ services:
N8N_ANALYSIS_WEBHOOK_PATH: ${N8N_ANALYSIS_WEBHOOK_PATH:-/webhook/storyforge-analysis}
N8N_REAL_CUT_WEBHOOK_PATH: ${N8N_REAL_CUT_WEBHOOK_PATH:-/webhook/storyforge-real-cut}
N8N_AI_VIDEO_WEBHOOK_PATH: ${N8N_AI_VIDEO_WEBHOOK_PATH:-/webhook/storyforge-ai-video}
N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH: ${N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH:-/webhook/storyforge-content-source-sync}
ORCHESTRATOR_SHARED_SECRET: ${ORCHESTRATOR_SHARED_SECRET:-storyforge-local-secret}
CUTVIDEO_BASE_URL: ${CUTVIDEO_BASE_URL:-}
CUTVIDEO_API_KEY: ${CUTVIDEO_API_KEY:-}
@@ -48,6 +49,10 @@ services:
FFMPEG_BIN: ${FFMPEG_BIN:-ffmpeg}
WHISPER_BIN: ${WHISPER_BIN:-}
WHISPER_MODEL: ${WHISPER_MODEL:-/data/collector/models/ggml-base.en.bin}
ASR_HTTP_BASE_URL: ${ASR_HTTP_BASE_URL:-}
ASR_HTTP_TRANSCRIBE_PATH: ${ASR_HTTP_TRANSCRIBE_PATH:-/transcribe}
ASR_HTTP_FIELD_NAME: ${ASR_HTTP_FIELD_NAME:-wav}
ASR_HTTP_TIMEOUT_SEC: ${ASR_HTTP_TIMEOUT_SEC:-120}
HUOBAO_POLL_INTERVAL_SEC: ${HUOBAO_POLL_INTERVAL_SEC:-10}
HUOBAO_MAX_WAIT_SEC: ${HUOBAO_MAX_WAIT_SEC:-900}
ports:

View File

@@ -67,12 +67,29 @@
- 已存在,不需要重写
- 现阶段通过 `yt-dlp` 命令集成
- 账号级内容源同步同样复用 `yt-dlp --flat-playlist`,不额外维护抓取器
### 2. ASR
- 现有实现已部署,入口未完全标准化
- 当前后端支持 `ffmpeg + whisper.cpp` 风格接入
- 若需要接现有常驻 ASR 服务,后续只需把 `transcribe_media()` 改成 HTTP/RPC 适配即可
- 现有实现已部署,入口现已标准化为“两级优先级”:
- 优先调用 HTTP ASR 服务
- HTTP 不可用或返回空结果时,回退到 `whisper.cpp` 命令行
- 本次已按 `mac-whisper-service``/transcribe` 协议完成接入,并用任务 `job_e95f9b5579fd4c5aa40f04de611e9fd0` 验证 `artifacts.asr_backend=http`
- 进一步联调发现真实长视频转写耗时约 44 秒,因此 `collector``ASR_HTTP_TIMEOUT_SEC` 默认值已提升到 120 秒;本机 `mac-whisper-service` 运行时也需要把 `WHISPER_TIMEOUT_MS` 提升到 `120000`
- 修复后再次验证成功,任务 `job_bb405e2e878849e38c4bb31f7781e1e3` 已写入真实 HTTP ASR 文本并记录 `artifacts.asr_http_payload`
- `collector` 运行镜像已补上 `ffmpeg``yt-dlp`,避免容器内缺依赖导致音频抽取或下载失效
### 2.1 内容源账号同步
- 已新增 `content_source_sync_pipeline`
- 用户可通过 `POST /v2/pipelines/content-source-sync` 提交创作者账号 URL
- 后端会创建父任务,使用 `yt-dlp --flat-playlist` 抓取最近 N 条视频 URL再自动派生用户自己的 `video_link` 子分析任务
- `jobs.parent_job_id` 已加入数据模型,父子任务关系可持久化查询
- 已用 bilibili 账号 URL 联调验证:
- 父任务:`job_b02109cf9e8244fbb5b86f184a7c7574`
- 子任务:`job_7f169db61af441f8a7f186d03db2d91c``job_28c47774028441378a3974860c375ab7`
结论:账号级调度不再是空白能力,但目前只验证了 `bilibili` URL 形态,抖音 / 小红书仍需真链路核实。
### 3. Windows `cutvideo`
@@ -132,5 +149,6 @@
## 当前主要风险
1. `cutvideo` 的素材传输还未完整闭环
2. 本地 ASR 的“最终生产入口”仍需按你现有部署方式再绑一次
3. `huobao-drama` 已在本机旧改版实例上跑通,但兼容补丁尚未迁到 upstream 仓库并形成正式提交
2. Windows 机器还未部署支持 `POST /api/uploads``cutvideo` 新版本
3. 抖音 / 小红书账号级内容源还未做真实平台验证
4. `huobao-drama` 已在本机旧改版实例上跑通,但兼容补丁尚未迁到 upstream 仓库并形成正式提交

View File

@@ -19,11 +19,17 @@ cp .env.example .env
- `CUTVIDEO_API_KEY=` 如果 Windows 服务启用了鉴权
- `HUOBAO_BASE_URL=http://127.0.0.1:5678`
- `WHISPER_BIN=` 指向你现有本地 ASR 可执行文件时填写
- `ASR_HTTP_BASE_URL=` 如果你已有常驻 ASR 服务,填写它的基地址
- `ASR_HTTP_TRANSCRIBE_PATH=/transcribe`
- `ASR_HTTP_FIELD_NAME=wav`
- `ASR_HTTP_TIMEOUT_SEC=120`
说明:
- 如果你单独重建 `collector`,要确保运行时仍带上 `CUTVIDEO_BASE_URL`,否则容器会退回空值
- 当前已验证可用的 Windows `cutvideo` 地址是 `http://192.168.31.18:7860`
- 当前已验证可用的本机 HTTP ASR 入口是 `http://host.docker.internal:8088/transcribe`
- 如果你用的是本机 `mac-whisper-service`,建议同时以 `WHISPER_TIMEOUT_MS=120000` 启动,否则长视频会直接 504
## 2. 启动基础服务
@@ -46,6 +52,7 @@ docker compose up -d --build
- `storyforge-analysis.json`
- `storyforge-real-cut.json`
- `storyforge-ai-video.json`
- `storyforge-content-source-sync.json`
导入后:
@@ -88,13 +95,47 @@ docker compose up -d --build
- `ffmpeg` 可用
- ASR 可调用
已验证样例:
- `job_bb405e2e878849e38c4bb31f7781e1e3` (`artifacts.asr_backend=http`)
### 上传视频
调用 `POST /v2/explore/upload-video`
预期与视频链接类似,但素材来源为本地上传
## 6. `cutvideo` 实拍剪辑链路验证
## 6. 内容源账号同步验证
调用 `POST /v2/pipelines/content-source-sync`
推荐最小请求体:
```json
{
"source_url": "https://space.bilibili.com/546195/video",
"platform": "bilibili",
"title": "Bilibili Creator Sync Smoke",
"max_items": 2,
"skip_existing": true,
"auto_trigger_analysis": true
}
```
预期:
- 创建一个 `content_source_sync` 父任务
- `n8n` 触发 `content_source_sync_pipeline`
- 父任务写回 `discovered_videos / child_job_ids / queued_job_ids`
- 子任务以 `parent_job_id` 挂到父任务下,并自动进入分析主线
已验证样例:
- 父任务:`job_b02109cf9e8244fbb5b86f184a7c7574`
- 子任务:`job_7f169db61af441f8a7f186d03db2d91c`
- 子任务:`job_28c47774028441378a3974860c375ab7`
## 7. `cutvideo` 实拍剪辑链路验证
调用 `POST /v2/pipelines/real-cut`
@@ -119,7 +160,7 @@ docker compose up -d --build
- Windows 返回 `task_id=8d8f4a0cd5d9`
- 运行目录 `20260318-093520-Windows cutvideo 联调样例`
## 7. `huobao-drama` AI 视频链路验证
## 8. `huobao-drama` AI 视频链路验证
调用 `POST /v2/pipelines/ai-video`
@@ -142,8 +183,8 @@ docker compose up -d --build
- `video.task_id=qvideo-1380265978-1773799215825814468`
- 最终视频已回写到 `job.result.rendered_scenes[0].video.video_url`
## 8. 当前已知卡点
## 9. 当前已知卡点
- `cutvideo` 端到端“上传素材后自动送到 Windows 机器”还未彻底闭环
- ASR 如果不是命令行模式,而是你现有常驻服务模式,需要再做一次入口绑定
- Windows 机器上的 `cutvideo` 还需要部署带 `POST /api/uploads` 的新分支版本
- 抖音 / 小红书账号级内容源还未做真实平台验证
- `huobao-drama` 目前跑通依赖本地旧改版中的 qnaigc 兼容补丁,下一步要迁到 upstream 仓库

View File

@@ -8,9 +8,11 @@
- 审批机制
- `user -> project -> assistant / knowledge base / job / content source` 数据模型
- 文本 / 视频链接 / 上传视频 三类分析任务创建
- 内容源账号同步任务创建与子任务派发
- `n8n` 工作流导入、激活与触发接口
- 本地下载器调用
- 本地 `ffmpeg` / `whisper` 风格入口封装
- HTTP ASR 常驻服务入口绑定
- 本地大模型内容分析、二创文案、分镜生成
- Windows `cutvideo` API 调度与结果回写接口
- `upload_video -> source_job_id -> cutvideo` 自动 staging 闭环
@@ -20,22 +22,21 @@
## 已验证的真实任务
- 分析链路:`job_203bc8e9b20f4b1cbbc6cf7da79e46f4`
- HTTP ASR 分析链路:`job_e95f9b5579fd4c5aa40f04de611e9fd0`
- 账号级内容源同步链路:`job_b02109cf9e8244fbb5b86f184a7c7574`
- 账号级同步派生分析任务:`job_7f169db61af441f8a7f186d03db2d91c``job_28c47774028441378a3974860c375ab7`
- 长视频 HTTP ASR 超时修复后链路:`job_bb405e2e878849e38c4bb31f7781e1e3`
- 实拍剪辑链路:`job_5ebd829c3f2144bca5c941183e75bdcd`
- 实拍剪辑自动 staging 联调:`job_01a6f283cbda42e4ae692b268b811a50`
- AI 视频链路:`job_01828c40377747cf914b51be360cc333`
## 已实现但仍待环境验证
- 现有 ASR 部署入口与 `collector-service` 的最终绑定
## 尚未完全跑通
- 对“抖音 / bilibili / 小红书账号级内容源”的批量抓取与分析调度
- 抖音 / 小红书账号级内容源还未做真实平台验证;`bilibili` 账号级 URL 已跑通
- Windows 机器上的 `cutvideo` 需要同步部署带 `POST /api/uploads` 的版本,当前自动 staging 已在本机联调通过
## 下一步优先级
1.`cutvideo` 新上传接口部署到 Windows 机器
2. 绑定你的真实 ASR 入口
3. 补账号级内容源抓取调度
4. 把改动整理成提交并推送
2. 补抖音 / 小红书账号级真实验证与必要的 URL 归一化
3. 把改动整理成提交并推送

View File

@@ -7,6 +7,7 @@
- `workflows/storyforge-analysis.json`:内容分析主线
- `workflows/storyforge-real-cut.json`Windows `cutvideo` 调度主线
- `workflows/storyforge-ai-video.json``huobao-drama` AI 生成视频主线
- `workflows/storyforge-content-source-sync.json`:内容源账号同步与批量分析派发主线
## 约定
@@ -18,7 +19,7 @@
1. 先执行 `docker compose up -d n8n collector`
2. 打开 `http://127.0.0.1:5670`
3. 从 UI 导入本目录下的 3 个 JSON
3. 从 UI 导入本目录下的 4 个 JSON
4. 激活工作流
## Webhook 路径
@@ -26,3 +27,4 @@
- `/webhook/storyforge-analysis`
- `/webhook/storyforge-real-cut`
- `/webhook/storyforge-ai-video`
- `/webhook/storyforge-content-source-sync`

View File

@@ -0,0 +1,70 @@
{
"name": "StoryForge Content Source Sync",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "storyforge-content-source-sync",
"responseMode": "onReceived",
"options": {}
},
"id": "content-source-sync-webhook",
"name": "Content Source Sync Webhook",
"type": "n8n-nodes-base.webhook",
"typeVersion": 2,
"position": [
220,
300
],
"webhookId": "storyforge-content-source-sync"
},
{
"parameters": {
"method": "POST",
"url": "={{'http://collector:8081/internal/jobs/steps/content-source-sync?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"sendHeaders": true,
"headerParameters": {
"parameters": [
{
"name": "X-Orchestrator-Secret",
"value": "storyforge-local-secret"
}
]
},
"options": {
"timeout": 600000
}
},
"id": "content-source-sync-runner",
"name": "Run Content Source Sync Step",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
540,
300
]
}
],
"connections": {
"Content Source Sync Webhook": {
"main": [
[
{
"node": "Run Content Source Sync Step",
"type": "main",
"index": 0
}
]
]
},
"Run Content Source Sync Step": {
"main": [
[]
]
}
},
"active": false,
"settings": {},
"pinData": {},
"versionId": "storyforge-content-source-sync-v1"
}