From 1c539abc6ed7a6b9f4765b30940c4437555ab914 Mon Sep 17 00:00:00 2001 From: kris Date: Fri, 20 Mar 2026 10:11:04 +0800 Subject: [PATCH] feat: add content source sync pipeline and harden asr timeouts --- .env.example | 5 + README.md | 3 +- .../app/storyforge/StoryForgeApiService.kt | 3 + .../app/storyforge/StoryForgeModels.kt | 18 + collector-service/Dockerfile | 4 + collector-service/app/database.py | 2 + collector-service/app/integrations.py | 30 ++ collector-service/app/main.py | 429 +++++++++++++++++- collector-service/requirements.txt | 1 + docker-compose.yml | 5 + docs/AUDIT_2026-03-18.md | 28 +- docs/LAN_E2E_GUIDE_2026-03-18.md | 51 ++- docs/MVP_STATUS_2026-03-18.md | 17 +- n8n/README.md | 4 +- .../storyforge-content-source-sync.json | 70 +++ 15 files changed, 636 insertions(+), 34 deletions(-) create mode 100644 n8n/workflows/storyforge-content-source-sync.json diff --git a/.env.example b/.env.example index 37cefb6..b5c1100 100644 --- a/.env.example +++ b/.env.example @@ -6,6 +6,7 @@ N8N_BASE_URL=http://127.0.0.1:5670 N8N_ANALYSIS_WEBHOOK_PATH=/webhook/storyforge-analysis N8N_REAL_CUT_WEBHOOK_PATH=/webhook/storyforge-real-cut N8N_AI_VIDEO_WEBHOOK_PATH=/webhook/storyforge-ai-video +N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH=/webhook/storyforge-content-source-sync ORCHESTRATOR_SHARED_SECRET=storyforge-local-secret CUTVIDEO_BASE_URL= CUTVIDEO_API_KEY= @@ -20,6 +21,10 @@ YTDLP_BIN=yt-dlp FFMPEG_BIN=ffmpeg WHISPER_BIN= WHISPER_MODEL=./data/collector/models/ggml-base.en.bin +ASR_HTTP_BASE_URL= +ASR_HTTP_TRANSCRIBE_PATH=/transcribe +ASR_HTTP_FIELD_NAME=wav +ASR_HTTP_TIMEOUT_SEC=120 N8N_IMAGE=docker.n8n.io/n8nio/n8n:latest WEBHOOK_URL=http://127.0.0.1:5670/ GENERIC_TIMEZONE=Asia/Shanghai diff --git a/README.md b/README.md index c2a3f0c..5346ef0 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,7 @@ docker compose up -d --build - 持久化任务、分镜、分析结果、事件日志 - `n8n` 负责: - 触发 `analysis_pipeline` + - 触发 `content_source_sync_pipeline` - 触发 `real_cut_pipeline` - 触发 `ai_video_pipeline` - FastGPT 已从主流程设计中移除,不再作为运行时依赖 @@ -66,7 +67,7 @@ docker compose up -d --build - 新注册账号默认 `pending` - 主管理员审批后才可使用核心业务接口 - 支持 `user -> project -> knowledge base / assistant(agent) / job / content source` 的多租户边界 -- 素材入口支持文字、视频链接、视频上传;内容源账号通过 `content_sources` 建模持久化 +- 素材入口支持文字、视频链接、视频上传;内容源账号通过 `content_sources` 建模持久化,并可派生父子分析任务 - `cutvideo` 继续运行在 Windows 机器,本系统通过 API 调度 - `huobao-drama` 继续作为 AI 生成视频主链的核心引擎 - 详细审计、阶段计划和联调步骤见 `docs/` diff --git a/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeApiService.kt b/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeApiService.kt index a57b060..cc7615c 100644 --- a/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeApiService.kt +++ b/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeApiService.kt @@ -57,6 +57,9 @@ interface StoryForgeApiService { @POST("v2/explore/text") suspend fun createTextJob(@Body request: ExploreTextRequest): JobDto + @POST("v2/pipelines/content-source-sync") + suspend fun createContentSourceSyncJob(@Body request: ContentSourceSyncRequest): JobDto + @Multipart @POST("v2/explore/upload-video") suspend fun uploadVideo( diff --git a/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt b/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt index b0fba52..9c23777 100644 --- a/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt +++ b/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt @@ -159,11 +159,29 @@ data class ExploreTextRequest( val analysis_model_profile_id: String? = null ) +@Serializable +data class ContentSourceSyncRequest( + val project_id: String = "", + val knowledge_base_id: String = "", + val assistant_id: String = "", + val content_source_id: String = "", + val platform: String = "", + val handle: String = "", + val source_url: String = "", + val title: String = "", + val analysis_model_profile_id: String = "", + val language: String = "auto", + val max_items: Int = 5, + val skip_existing: Boolean = true, + val auto_trigger_analysis: Boolean = true +) + @Serializable data class JobDto( val id: String, val user_id: String, val project_id: String = "", + val parent_job_id: String = "", val assistant_id: String? = null, val knowledge_base_id: String, val content_source_id: String = "", diff --git a/collector-service/Dockerfile b/collector-service/Dockerfile index 8d88c18..55a0d14 100644 --- a/collector-service/Dockerfile +++ b/collector-service/Dockerfile @@ -1,5 +1,9 @@ FROM python:3.11-slim +RUN apt-get update \ + && apt-get install -y --no-install-recommends ffmpeg \ + && rm -rf /var/lib/apt/lists/* + WORKDIR /app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt diff --git a/collector-service/app/database.py b/collector-service/app/database.py index 382dc56..c3bb5a2 100644 --- a/collector-service/app/database.py +++ b/collector-service/app/database.py @@ -156,6 +156,7 @@ class Database: id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT, + parent_job_id TEXT, assistant_id TEXT, knowledge_base_id TEXT NOT NULL, content_source_id TEXT, @@ -255,6 +256,7 @@ class Database: }, "jobs": { "project_id": "TEXT", + "parent_job_id": "TEXT", "content_source_id": "TEXT", "line_type": "TEXT NOT NULL DEFAULT 'analysis'", "workflow_key": "TEXT NOT NULL DEFAULT ''", diff --git a/collector-service/app/integrations.py b/collector-service/app/integrations.py index 70c07a7..1878bb6 100644 --- a/collector-service/app/integrations.py +++ b/collector-service/app/integrations.py @@ -134,6 +134,36 @@ class CutVideoClient: return _unwrap_response(response.json()) +class AsrHttpClient: + def __init__( + self, + *, + base_url: str, + transcribe_path: str = "/transcribe", + field_name: str = "wav", + timeout: float = 120.0, + ) -> None: + self.base_url = base_url.rstrip("/") + self.transcribe_path = transcribe_path + self.field_name = field_name.strip() or "wav" + self.timeout = timeout + + @property + def enabled(self) -> bool: + return bool(self.base_url) + + async def transcribe_audio(self, audio_path: Path) -> dict[str, Any]: + content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream" + async with httpx.AsyncClient(timeout=self.timeout) as client: + with audio_path.open("rb") as handle: + response = await client.post( + _join_url(self.base_url, self.transcribe_path), + files={self.field_name: (audio_path.name, handle, content_type)}, + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + class HuobaoDramaClient: def __init__(self, *, base_url: str, timeout: float = 180.0) -> None: self.base_url = base_url.rstrip("/") diff --git a/collector-service/app/main.py b/collector-service/app/main.py index a5c45be..12a779a 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -18,7 +18,7 @@ from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from .database import Database, utc_now -from .integrations import CutVideoClient, HuobaoDramaClient, N8NClient +from .integrations import AsrHttpClient, CutVideoClient, HuobaoDramaClient, N8NClient from .openai_compat import OpenAICompatClient BASE_DIR = Path(__file__).resolve().parents[2] @@ -35,10 +35,15 @@ YTDLP_BIN = os.getenv("YTDLP_BIN", "yt-dlp") FFMPEG_BIN = os.getenv("FFMPEG_BIN", "ffmpeg") WHISPER_BIN = os.getenv("WHISPER_BIN", "") WHISPER_MODEL = os.getenv("WHISPER_MODEL", str(MODELS_DIR / "ggml-base.en.bin")) +ASR_HTTP_BASE_URL = os.getenv("ASR_HTTP_BASE_URL", "") +ASR_HTTP_TRANSCRIBE_PATH = os.getenv("ASR_HTTP_TRANSCRIBE_PATH", "/transcribe") +ASR_HTTP_FIELD_NAME = os.getenv("ASR_HTTP_FIELD_NAME", "wav") +ASR_HTTP_TIMEOUT_SEC = float(os.getenv("ASR_HTTP_TIMEOUT_SEC", "120")) N8N_BASE_URL = os.getenv("N8N_BASE_URL", "http://127.0.0.1:5670") N8N_ANALYSIS_WEBHOOK_PATH = os.getenv("N8N_ANALYSIS_WEBHOOK_PATH", "/webhook/storyforge-analysis") N8N_REAL_CUT_WEBHOOK_PATH = os.getenv("N8N_REAL_CUT_WEBHOOK_PATH", "/webhook/storyforge-real-cut") N8N_AI_VIDEO_WEBHOOK_PATH = os.getenv("N8N_AI_VIDEO_WEBHOOK_PATH", "/webhook/storyforge-ai-video") +N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH = os.getenv("N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH", "/webhook/storyforge-content-source-sync") ORCHESTRATOR_SHARED_SECRET = os.getenv("ORCHESTRATOR_SHARED_SECRET", "") CUTVIDEO_BASE_URL = os.getenv("CUTVIDEO_BASE_URL", "") CUTVIDEO_API_KEY = os.getenv("CUTVIDEO_API_KEY", "") @@ -55,12 +60,19 @@ for path in (DATA_DIR, DOWNLOADS_DIR, JOBS_DIR, MODELS_DIR): db = Database(DB_PATH) openai_client = OpenAICompatClient() +asr_http_client = AsrHttpClient( + base_url=ASR_HTTP_BASE_URL, + transcribe_path=ASR_HTTP_TRANSCRIBE_PATH, + field_name=ASR_HTTP_FIELD_NAME, + timeout=ASR_HTTP_TIMEOUT_SEC, +) n8n_client = N8NClient( base_url=N8N_BASE_URL, workflow_paths={ "analysis_pipeline": N8N_ANALYSIS_WEBHOOK_PATH, "real_cut_pipeline": N8N_REAL_CUT_WEBHOOK_PATH, "ai_video_pipeline": N8N_AI_VIDEO_WEBHOOK_PATH, + "content_source_sync_pipeline": N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH, }, shared_secret=ORCHESTRATOR_SHARED_SECRET, ) @@ -187,6 +199,22 @@ class ContentSourceCreateRequest(BaseModel): metadata: dict[str, Any] = Field(default_factory=dict) +class ContentSourceSyncRequest(BaseModel): + project_id: str = "" + knowledge_base_id: str = "" + assistant_id: str = "" + content_source_id: str = "" + platform: str = "" + handle: str = "" + source_url: str = "" + title: str = "" + analysis_model_profile_id: str = "" + language: str = "auto" + max_items: int = Field(default=5, ge=1, le=20) + skip_existing: bool = True + auto_trigger_analysis: bool = True + + class RealCutJobRequest(BaseModel): project_id: str = "" title: str @@ -532,6 +560,7 @@ def job_payload(row: dict[str, Any]) -> dict[str, Any]: "id": row["id"], "user_id": row["user_id"], "project_id": row.get("project_id", ""), + "parent_job_id": row.get("parent_job_id", ""), "assistant_id": row.get("assistant_id"), "knowledge_base_id": row["knowledge_base_id"], "content_source_id": row.get("content_source_id", ""), @@ -633,6 +662,17 @@ def merge_json_field(current_raw: str | None, updates: dict[str, Any]) -> str: return json.dumps(current, ensure_ascii=False) +def update_content_source_metadata(source_id: str, updates: dict[str, Any]) -> dict[str, Any]: + row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,)) + if not row: + raise HTTPException(status_code=404, detail="Content source not found") + db.execute( + "UPDATE content_sources SET metadata_json = ?, updated_at = ? WHERE id = ?", + (merge_json_field(row.get("metadata_json") or "{}", updates), utc_now(), source_id), + ) + return db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,)) + + def update_job_state( job_id: str, *, @@ -682,6 +722,8 @@ def update_job_state( def job_context_payload(row: dict[str, Any]) -> dict[str, Any]: payload = job_payload(row) + payload["parent_job"] = None + payload["child_jobs"] = [] payload["project"] = None payload["assistant"] = None payload["knowledge_base"] = None @@ -707,6 +749,16 @@ def job_context_payload(row: dict[str, Any]) -> dict[str, Any]: if source: payload["content_source"] = content_source_payload(source) + if row.get("parent_job_id"): + parent = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (row["parent_job_id"],)) + if parent: + payload["parent_job"] = job_payload(parent) + + payload["child_jobs"] = [ + job_payload(item) + for item in db.fetch_all("SELECT * FROM jobs WHERE parent_job_id = ? ORDER BY created_at ASC", (row["id"],)) + ] + payload["events"] = [ job_event_payload(item) for item in db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (row["id"],)) @@ -863,13 +915,82 @@ def fallback_transcript_from_text(title: str, content: str) -> str: return f"标题:{title}\n\n正文:\n{content.strip()}" +def infer_platform_from_url(source_url: str) -> str: + normalized = source_url.strip().lower() + if "bilibili.com" in normalized or "b23.tv" in normalized: + return "bilibili" + if "douyin.com" in normalized or "iesdouyin.com" in normalized: + return "douyin" + if "xiaohongshu.com" in normalized or "xhslink.com" in normalized: + return "xiaohongshu" + if "youtube.com" in normalized or "youtu.be" in normalized: + return "youtube" + return "" + + def command_exists(name: str) -> bool: return shutil.which(name) is not None -def run_command(command: list[str], cwd: Path | None = None) -> tuple[int, str, str]: - proc = subprocess.run(command, cwd=str(cwd) if cwd else None, capture_output=True, text=True) - return proc.returncode, proc.stdout, proc.stderr +def run_command(command: list[str], cwd: Path | None = None, timeout: float | None = None) -> tuple[int, str, str]: + try: + proc = subprocess.run( + command, + cwd=str(cwd) if cwd else None, + capture_output=True, + text=True, + timeout=timeout, + ) + return proc.returncode, proc.stdout, proc.stderr + except subprocess.TimeoutExpired as exc: + stdout = exc.stdout if isinstance(exc.stdout, str) else (exc.stdout or b"").decode("utf-8", errors="ignore") + stderr = exc.stderr if isinstance(exc.stderr, str) else (exc.stderr or b"").decode("utf-8", errors="ignore") + detail = stderr or f"Command timed out after {timeout} seconds" + return 124, stdout, detail + + +def discover_account_video_links(source_url: str, max_items: int) -> tuple[list[dict[str, Any]], dict[str, Any]]: + if not command_exists(YTDLP_BIN): + raise HTTPException(status_code=503, detail="yt-dlp is not configured") + + discovery_cmd = [ + YTDLP_BIN, + "--flat-playlist", + "--playlist-end", + str(max_items), + "--print", + "%(webpage_url)s\t%(title)s\t%(id)s", + source_url, + ] + code, stdout, stderr = run_command(discovery_cmd, timeout=180) + raw_lines = [line.strip() for line in stdout.splitlines() if line.strip()] + items: list[dict[str, Any]] = [] + seen_urls: set[str] = set() + for line in raw_lines: + parts = line.split("\t") + video_url = parts[0].strip() if parts else "" + raw_title = parts[1].strip() if len(parts) > 1 else "" + raw_external_id = parts[2].strip() if len(parts) > 2 else "" + if not video_url or video_url == "NA" or video_url in seen_urls: + continue + seen_urls.add(video_url) + items.append( + { + "video_url": video_url, + "title": raw_title if raw_title and raw_title != "NA" else "短视频素材", + "external_id": raw_external_id if raw_external_id != "NA" else "", + } + ) + + debug_payload = { + "discovery_command": discovery_cmd, + "discovery_stdout_preview": raw_lines[: min(len(raw_lines), max_items)], + "discovery_stderr": stderr.strip()[:1000], + "discovery_exit_code": code, + } + if code != 0: + raise HTTPException(status_code=502, detail=f"Failed to inspect content source: {stderr.strip()[:200] or 'yt-dlp error'}") + return items, debug_payload def validate_real_cut_source_job(source_job: dict[str, Any]) -> None: @@ -931,6 +1052,7 @@ def create_job_record( account_id: str, project_id: str, knowledge_base_id: str, + parent_job_id: str | None = None, source_type: str, line_type: str, workflow_key: str, @@ -947,16 +1069,17 @@ def create_job_record( db.execute( """ INSERT INTO jobs ( - id, user_id, project_id, assistant_id, knowledge_base_id, content_source_id, + id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id, source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id, source_url, title, language, status, transcript_text, style_summary, upload_status, error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'n8n', '', '', ?, ?, ?, 'pending', '', '', 'pending', '', ?, '{}', ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'n8n', '', '', ?, ?, ?, 'pending', '', '', 'pending', '', ?, '{}', ?, ?, ?) """, ( job_id, account_id, project_id, + parent_job_id, assistant_id, knowledge_base_id, content_source_id, @@ -1018,8 +1141,8 @@ def huobao_image_size_for_aspect_ratio(aspect_ratio: str) -> str: return "1024x1536" -def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: str = "") -> tuple[str, dict[str, str]]: - artifacts: dict[str, str] = {} +async def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: str = "") -> tuple[str, dict[str, Any]]: + artifacts: dict[str, Any] = {} transcript = "" media_path = source_path artifacts["source_path"] = str(media_path) @@ -1041,6 +1164,21 @@ def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: s elif err: artifacts["ffmpeg_error"] = err.strip()[:500] + if asr_http_client.enabled and media_path.exists(): + try: + asr_payload = await asr_http_client.transcribe_audio(media_path) + artifacts["asr_http_payload"] = { + "success": bool(asr_payload.get("success", True)), + "duration_ms": asr_payload.get("duration_ms"), + "error_message": str(asr_payload.get("error_message") or "")[:500], + } + transcript = str(asr_payload.get("text") or "").strip() + if transcript: + artifacts["asr_backend"] = "http" + except Exception as exc: + error_detail = str(exc).strip() or exc.__class__.__name__ + artifacts["asr_http_error"] = error_detail[:500] + if WHISPER_BIN and Path(WHISPER_BIN).exists() and Path(WHISPER_MODEL).exists(): out_prefix = job_dir / "whisper" code, stdout, stderr = run_command([ @@ -1055,8 +1193,11 @@ def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: s ]) txt_path = Path(str(out_prefix) + ".txt") if code == 0 and txt_path.exists(): - transcript = txt_path.read_text(encoding="utf-8", errors="ignore").strip() - artifacts["transcript_path"] = str(txt_path) + cli_transcript = txt_path.read_text(encoding="utf-8", errors="ignore").strip() + if cli_transcript: + transcript = cli_transcript + artifacts["transcript_path"] = str(txt_path) + artifacts["asr_backend"] = artifacts.get("asr_backend") or "whisper_cli" else: artifacts["whisper_stdout"] = stdout.strip()[:500] artifacts["whisper_error"] = stderr.strip()[:500] @@ -1120,11 +1261,11 @@ async def process_job(job_id: str) -> None: artifacts["download_stdout"] = stdout.strip()[:500] else: artifacts["download_error"] = stderr.strip()[:500] - transcript_text, extra = transcribe_media(job_dir, downloaded if downloaded.exists() else job_dir / "placeholder.mp4", row["title"], row.get("source_url") or "") + transcript_text, extra = await transcribe_media(job_dir, downloaded if downloaded.exists() else job_dir / "placeholder.mp4", row["title"], row.get("source_url") or "") artifacts.update(extra) elif row["source_type"] == "upload_video": source_path = Path(artifacts.get("uploaded_path", "")) - transcript_text, extra = transcribe_media(job_dir, source_path, row["title"], row.get("source_url") or "") + transcript_text, extra = await transcribe_media(job_dir, source_path, row["title"], row.get("source_url") or "") artifacts.update(extra) profile = model_profile_for_account(row["user_id"], row.get("analysis_model_profile_id") or None) @@ -1223,6 +1364,7 @@ def healthz() -> dict[str, Any]: "dbPath": DB_PATH, "defaultExternalBaseUrl": DEFAULT_EXTERNAL_BASE_URL, "localModelBaseUrl": LOCAL_OPENAI_BASE_URL, + "asrHttpBaseUrl": ASR_HTTP_BASE_URL, "n8nBaseUrl": N8N_BASE_URL, "cutvideoBaseUrl": CUTVIDEO_BASE_URL, "cutvideoUploadTimeoutSec": CUTVIDEO_UPLOAD_TIMEOUT_SEC, @@ -1448,6 +1590,88 @@ def create_content_source_api(request: ContentSourceCreateRequest, account: dict return content_source_payload(row) +@app.post("/v2/pipelines/content-source-sync") +async def create_content_source_sync_job( + request: ContentSourceSyncRequest, + account: dict[str, Any] = Depends(require_approved), +) -> dict[str, Any]: + source_row = None + if request.content_source_id.strip(): + source_row = load_owned_content_source(request.content_source_id.strip(), account["id"]) + + requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "") + project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + kb = resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + profile = model_profile_for_account(account["id"], request.analysis_model_profile_id or None) + + source_url = (request.source_url or (source_row or {}).get("source_url") or "").strip() + if not source_url: + raise HTTPException(status_code=400, detail="source_url or content_source_id is required") + platform = (request.platform or (source_row or {}).get("platform") or infer_platform_from_url(source_url)).strip() + handle = (request.handle or (source_row or {}).get("handle") or "").strip() + source_title = ( + request.title.strip() + or (source_row or {}).get("title", "").strip() + or handle + or source_url + ) + + if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]: + raise HTTPException(status_code=400, detail="Content source does not belong to target project") + + if not source_row: + source_row = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind="creator_account", + platform=platform, + handle=handle, + source_url=source_url, + title=source_title, + metadata={ + "sync_mode": "recent_uploads", + "max_items": request.max_items, + "analysis_model_profile_id": profile["id"], + }, + ) + + job_row = create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="content_source_sync", + line_type="content_source_sync", + workflow_key="content_source_sync_pipeline", + title=f"{source_title} 内容源同步", + language=request.language, + source_url=source_url, + assistant_id=(assistant or {}).get("id"), + content_source_id=source_row["id"], + artifacts={ + "platform": platform, + "handle": handle, + "source_account_url": source_url, + "source_title": source_title, + "max_items": request.max_items, + "skip_existing": request.skip_existing, + "auto_trigger_analysis": request.auto_trigger_analysis, + }, + analysis_model_profile_id=profile["id"], + ) + update_content_source_metadata( + source_row["id"], + { + "sync_mode": "recent_uploads", + "max_items": request.max_items, + "analysis_model_profile_id": profile["id"], + "last_sync_job_id": job_row["id"], + "last_sync_requested_at": utc_now(), + }, + ) + return job_payload(await trigger_orchestrated_job(job_row)) + + @app.get("/v2/model-profiles") def list_model_profiles(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]: rows = db.fetch_all( @@ -1521,8 +1745,25 @@ def list_knowledge_documents(knowledge_base_id: str, account: dict[str, Any] = D @app.get("/v2/explore/jobs") -def list_jobs(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]: - return [job_payload(row) for row in db.fetch_all("SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))] +def list_jobs( + parent_job_id: str | None = Query(default=None), + line_type: str | None = Query(default=None), + account: dict[str, Any] = Depends(require_approved), +) -> list[dict[str, Any]]: + clauses = ["user_id = ?"] + params: list[Any] = [account["id"]] + if parent_job_id is not None: + normalized_parent = parent_job_id.strip() + if normalized_parent: + clauses.append("parent_job_id = ?") + params.append(normalized_parent) + else: + clauses.append("(parent_job_id IS NULL OR parent_job_id = '')") + if line_type: + clauses.append("line_type = ?") + params.append(line_type.strip()) + sql = f"SELECT * FROM jobs WHERE {' AND '.join(clauses)} ORDER BY created_at DESC" + return [job_payload(row) for row in db.fetch_all(sql, tuple(params))] @app.get("/v2/explore/jobs/{job_id}") @@ -1928,6 +2169,13 @@ def load_owned_job(job_id: str, account_id: str) -> dict[str, Any]: return row +def load_owned_content_source(source_id: str, account_id: str) -> dict[str, Any]: + row = db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id)) + if not row: + raise HTTPException(status_code=404, detail="Content source not found") + return row + + def load_internal_job(job_id: str) -> dict[str, Any]: row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) if not row: @@ -2003,6 +2251,159 @@ async def internal_run_analysis( return job_context_payload(load_internal_job(row["id"])) +@app.post("/internal/jobs/steps/content-source-sync") +async def internal_content_source_sync( + request: InternalStepRequest | None = Body(default=None), + job_id: str = Query(default=""), + _: bool = Depends(require_orchestrator), +) -> dict[str, Any]: + row = load_step_job(request, job_id, "content_source_sync_pipeline") + artifacts = parse_job_artifacts(row) + source_url = str(artifacts.get("source_account_url") or row.get("source_url") or "").strip() + if not source_url: + raise HTTPException(status_code=400, detail="Content source sync job is missing source URL") + max_items = max(1, min(int(artifacts.get("max_items") or 5), 20)) + skip_existing = bool(artifacts.get("skip_existing", True)) + auto_trigger_analysis = bool(artifacts.get("auto_trigger_analysis", True)) + + update_job_state( + row["id"], + status="processing", + provider_name="collector", + provider_task_id=row["id"], + result={"sync_started": True}, + ) + + try: + discovered_items, debug_payload = discover_account_video_links(source_url, max_items) + child_jobs: list[dict[str, Any]] = [] + queued_jobs: list[dict[str, Any]] = [] + skipped_items: list[dict[str, Any]] = [] + + for index, item in enumerate(discovered_items, start=1): + video_url = str(item.get("video_url") or "").strip() + if not video_url: + continue + existing_row = db.fetch_one( + """ + SELECT * FROM jobs + WHERE user_id = ? AND project_id = ? AND source_type = 'video_link' AND source_url = ? + ORDER BY created_at DESC + LIMIT 1 + """, + (row["user_id"], row.get("project_id", ""), video_url), + ) + if existing_row and skip_existing: + skipped_items.append( + { + "video_url": video_url, + "title": item.get("title") or existing_row.get("title") or "短视频素材", + "existing_job_id": existing_row["id"], + "existing_status": existing_row.get("status", ""), + } + ) + continue + + content_source = create_content_source( + account_id=row["user_id"], + project_id=row.get("project_id", ""), + source_kind="video_link", + platform=str(artifacts.get("platform") or infer_platform_from_url(video_url)), + handle=str(artifacts.get("handle") or ""), + source_url=video_url, + title=str(item.get("title") or f"内容源视频 {index}"), + metadata={ + "origin_content_source_id": row.get("content_source_id", ""), + "origin_sync_job_id": row["id"], + "external_id": str(item.get("external_id") or ""), + "source_account_url": source_url, + }, + ) + child_row = create_job_record( + account_id=row["user_id"], + project_id=row.get("project_id", ""), + parent_job_id=row["id"], + knowledge_base_id=row["knowledge_base_id"], + source_type="video_link", + line_type="analysis", + workflow_key="analysis_pipeline", + title=str(item.get("title") or f"内容源视频 {index}"), + language=row.get("language", "auto"), + source_url=video_url, + assistant_id=row.get("assistant_id"), + content_source_id=content_source["id"], + artifacts={ + "origin_content_source_id": row.get("content_source_id", ""), + "origin_sync_job_id": row["id"], + "source_account_url": source_url, + }, + analysis_model_profile_id=row.get("analysis_model_profile_id", ""), + ) + child_jobs.append(job_payload(child_row)) + if auto_trigger_analysis: + queued_child = await trigger_orchestrated_job(child_row) + queued_jobs.append(job_payload(queued_child)) + + if row.get("content_source_id"): + update_content_source_metadata( + row["content_source_id"], + { + "last_sync_job_id": row["id"], + "last_sync_completed_at": utc_now(), + "last_discovered_count": len(discovered_items), + "last_enqueued_job_ids": [item["id"] for item in queued_jobs] or [item["id"] for item in child_jobs], + "last_skipped_existing_count": len(skipped_items), + "last_source_account_url": source_url, + "last_sync_error": "", + }, + ) + + updated = update_job_state( + row["id"], + status="completed", + provider_name="collector", + provider_task_id=row["id"], + artifacts={ + **debug_payload, + "discovered_videos": discovered_items, + "skipped_existing": skipped_items, + "child_job_ids": [item["id"] for item in child_jobs], + "queued_job_ids": [item["id"] for item in queued_jobs], + }, + result={ + "discovered_count": len(discovered_items), + "queued_count": len(queued_jobs) if auto_trigger_analysis else len(child_jobs), + "skipped_count": len(skipped_items), + "child_jobs": queued_jobs or child_jobs, + "skipped_existing": skipped_items, + }, + ) + return job_context_payload(updated) + except HTTPException as exc: + error = str(exc.detail) + except Exception as exc: + error = str(exc) + + if row.get("content_source_id"): + update_content_source_metadata( + row["content_source_id"], + { + "last_sync_job_id": row["id"], + "last_sync_completed_at": utc_now(), + "last_sync_error": error[:500], + "last_source_account_url": source_url, + }, + ) + updated = update_job_state( + row["id"], + status="failed", + error=error[:500], + provider_name="collector", + provider_task_id=row["id"], + ) + return job_context_payload(updated) + + @app.post("/internal/jobs/steps/real-cut/submit") async def internal_real_cut_submit( request: InternalStepRequest | None = Body(default=None), diff --git a/collector-service/requirements.txt b/collector-service/requirements.txt index 71fe205..ccf4ffb 100644 --- a/collector-service/requirements.txt +++ b/collector-service/requirements.txt @@ -3,3 +3,4 @@ uvicorn[standard]==0.34.0 httpx==0.28.1 python-multipart==0.0.20 pydantic==2.11.1 +yt-dlp diff --git a/docker-compose.yml b/docker-compose.yml index fe68c06..6218333 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,7 @@ services: N8N_ANALYSIS_WEBHOOK_PATH: ${N8N_ANALYSIS_WEBHOOK_PATH:-/webhook/storyforge-analysis} N8N_REAL_CUT_WEBHOOK_PATH: ${N8N_REAL_CUT_WEBHOOK_PATH:-/webhook/storyforge-real-cut} N8N_AI_VIDEO_WEBHOOK_PATH: ${N8N_AI_VIDEO_WEBHOOK_PATH:-/webhook/storyforge-ai-video} + N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH: ${N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH:-/webhook/storyforge-content-source-sync} ORCHESTRATOR_SHARED_SECRET: ${ORCHESTRATOR_SHARED_SECRET:-storyforge-local-secret} CUTVIDEO_BASE_URL: ${CUTVIDEO_BASE_URL:-} CUTVIDEO_API_KEY: ${CUTVIDEO_API_KEY:-} @@ -48,6 +49,10 @@ services: FFMPEG_BIN: ${FFMPEG_BIN:-ffmpeg} WHISPER_BIN: ${WHISPER_BIN:-} WHISPER_MODEL: ${WHISPER_MODEL:-/data/collector/models/ggml-base.en.bin} + ASR_HTTP_BASE_URL: ${ASR_HTTP_BASE_URL:-} + ASR_HTTP_TRANSCRIBE_PATH: ${ASR_HTTP_TRANSCRIBE_PATH:-/transcribe} + ASR_HTTP_FIELD_NAME: ${ASR_HTTP_FIELD_NAME:-wav} + ASR_HTTP_TIMEOUT_SEC: ${ASR_HTTP_TIMEOUT_SEC:-120} HUOBAO_POLL_INTERVAL_SEC: ${HUOBAO_POLL_INTERVAL_SEC:-10} HUOBAO_MAX_WAIT_SEC: ${HUOBAO_MAX_WAIT_SEC:-900} ports: diff --git a/docs/AUDIT_2026-03-18.md b/docs/AUDIT_2026-03-18.md index 53c45b7..e4c0376 100644 --- a/docs/AUDIT_2026-03-18.md +++ b/docs/AUDIT_2026-03-18.md @@ -67,12 +67,29 @@ - 已存在,不需要重写 - 现阶段通过 `yt-dlp` 命令集成 +- 账号级内容源同步同样复用 `yt-dlp --flat-playlist`,不额外维护抓取器 ### 2. ASR -- 现有实现已部署,但入口未完全标准化 -- 当前后端支持 `ffmpeg + whisper.cpp` 风格接入 -- 若需要接现有常驻 ASR 服务,后续只需把 `transcribe_media()` 改成 HTTP/RPC 适配即可 +- 现有实现已部署,入口现已标准化为“两级优先级”: + - 优先调用 HTTP ASR 服务 + - HTTP 不可用或返回空结果时,回退到 `whisper.cpp` 命令行 +- 本次已按 `mac-whisper-service` 的 `/transcribe` 协议完成接入,并用任务 `job_e95f9b5579fd4c5aa40f04de611e9fd0` 验证 `artifacts.asr_backend=http` +- 进一步联调发现真实长视频转写耗时约 44 秒,因此 `collector` 侧 `ASR_HTTP_TIMEOUT_SEC` 默认值已提升到 120 秒;本机 `mac-whisper-service` 运行时也需要把 `WHISPER_TIMEOUT_MS` 提升到 `120000` +- 修复后再次验证成功,任务 `job_bb405e2e878849e38c4bb31f7781e1e3` 已写入真实 HTTP ASR 文本并记录 `artifacts.asr_http_payload` +- `collector` 运行镜像已补上 `ffmpeg` 和 `yt-dlp`,避免容器内缺依赖导致音频抽取或下载失效 + +### 2.1 内容源账号同步 + +- 已新增 `content_source_sync_pipeline` +- 用户可通过 `POST /v2/pipelines/content-source-sync` 提交创作者账号 URL +- 后端会创建父任务,使用 `yt-dlp --flat-playlist` 抓取最近 N 条视频 URL,再自动派生用户自己的 `video_link` 子分析任务 +- `jobs.parent_job_id` 已加入数据模型,父子任务关系可持久化查询 +- 已用 bilibili 账号 URL 联调验证: + - 父任务:`job_b02109cf9e8244fbb5b86f184a7c7574` + - 子任务:`job_7f169db61af441f8a7f186d03db2d91c`、`job_28c47774028441378a3974860c375ab7` + +结论:账号级调度不再是空白能力,但目前只验证了 `bilibili` URL 形态,抖音 / 小红书仍需真链路核实。 ### 3. Windows `cutvideo` @@ -132,5 +149,6 @@ ## 当前主要风险 1. `cutvideo` 的素材传输还未完整闭环 -2. 本地 ASR 的“最终生产入口”仍需按你现有部署方式再绑一次 -3. `huobao-drama` 已在本机旧改版实例上跑通,但兼容补丁尚未迁到 upstream 仓库并形成正式提交 +2. Windows 机器还未部署支持 `POST /api/uploads` 的 `cutvideo` 新版本 +3. 抖音 / 小红书账号级内容源还未做真实平台验证 +4. `huobao-drama` 已在本机旧改版实例上跑通,但兼容补丁尚未迁到 upstream 仓库并形成正式提交 diff --git a/docs/LAN_E2E_GUIDE_2026-03-18.md b/docs/LAN_E2E_GUIDE_2026-03-18.md index b9826db..4bd962a 100644 --- a/docs/LAN_E2E_GUIDE_2026-03-18.md +++ b/docs/LAN_E2E_GUIDE_2026-03-18.md @@ -19,11 +19,17 @@ cp .env.example .env - `CUTVIDEO_API_KEY=` 如果 Windows 服务启用了鉴权 - `HUOBAO_BASE_URL=http://127.0.0.1:5678` - `WHISPER_BIN=` 指向你现有本地 ASR 可执行文件时填写 +- `ASR_HTTP_BASE_URL=` 如果你已有常驻 ASR 服务,填写它的基地址 +- `ASR_HTTP_TRANSCRIBE_PATH=/transcribe` +- `ASR_HTTP_FIELD_NAME=wav` +- `ASR_HTTP_TIMEOUT_SEC=120` 说明: - 如果你单独重建 `collector`,要确保运行时仍带上 `CUTVIDEO_BASE_URL`,否则容器会退回空值 - 当前已验证可用的 Windows `cutvideo` 地址是 `http://192.168.31.18:7860` +- 当前已验证可用的本机 HTTP ASR 入口是 `http://host.docker.internal:8088/transcribe` +- 如果你用的是本机 `mac-whisper-service`,建议同时以 `WHISPER_TIMEOUT_MS=120000` 启动,否则长视频会直接 504 ## 2. 启动基础服务 @@ -46,6 +52,7 @@ docker compose up -d --build - `storyforge-analysis.json` - `storyforge-real-cut.json` - `storyforge-ai-video.json` +- `storyforge-content-source-sync.json` 导入后: @@ -88,13 +95,47 @@ docker compose up -d --build - `ffmpeg` 可用 - ASR 可调用 +已验证样例: + +- `job_bb405e2e878849e38c4bb31f7781e1e3` (`artifacts.asr_backend=http`) + ### 上传视频 调用 `POST /v2/explore/upload-video` 预期与视频链接类似,但素材来源为本地上传 -## 6. `cutvideo` 实拍剪辑链路验证 +## 6. 内容源账号同步验证 + +调用 `POST /v2/pipelines/content-source-sync` + +推荐最小请求体: + +```json +{ + "source_url": "https://space.bilibili.com/546195/video", + "platform": "bilibili", + "title": "Bilibili Creator Sync Smoke", + "max_items": 2, + "skip_existing": true, + "auto_trigger_analysis": true +} +``` + +预期: + +- 创建一个 `content_source_sync` 父任务 +- `n8n` 触发 `content_source_sync_pipeline` +- 父任务写回 `discovered_videos / child_job_ids / queued_job_ids` +- 子任务以 `parent_job_id` 挂到父任务下,并自动进入分析主线 + +已验证样例: + +- 父任务:`job_b02109cf9e8244fbb5b86f184a7c7574` +- 子任务:`job_7f169db61af441f8a7f186d03db2d91c` +- 子任务:`job_28c47774028441378a3974860c375ab7` + +## 7. `cutvideo` 实拍剪辑链路验证 调用 `POST /v2/pipelines/real-cut` @@ -119,7 +160,7 @@ docker compose up -d --build - Windows 返回 `task_id=8d8f4a0cd5d9` - 运行目录 `20260318-093520-Windows cutvideo 联调样例` -## 7. `huobao-drama` AI 视频链路验证 +## 8. `huobao-drama` AI 视频链路验证 调用 `POST /v2/pipelines/ai-video` @@ -142,8 +183,8 @@ docker compose up -d --build - `video.task_id=qvideo-1380265978-1773799215825814468` - 最终视频已回写到 `job.result.rendered_scenes[0].video.video_url` -## 8. 当前已知卡点 +## 9. 当前已知卡点 -- `cutvideo` 端到端“上传素材后自动送到 Windows 机器”还未彻底闭环 -- ASR 如果不是命令行模式,而是你现有常驻服务模式,需要再做一次入口绑定 +- Windows 机器上的 `cutvideo` 还需要部署带 `POST /api/uploads` 的新分支版本 +- 抖音 / 小红书账号级内容源还未做真实平台验证 - `huobao-drama` 目前跑通依赖本地旧改版中的 qnaigc 兼容补丁,下一步要迁到 upstream 仓库 diff --git a/docs/MVP_STATUS_2026-03-18.md b/docs/MVP_STATUS_2026-03-18.md index e0d6fde..77bb7ac 100644 --- a/docs/MVP_STATUS_2026-03-18.md +++ b/docs/MVP_STATUS_2026-03-18.md @@ -8,9 +8,11 @@ - 审批机制 - `user -> project -> assistant / knowledge base / job / content source` 数据模型 - 文本 / 视频链接 / 上传视频 三类分析任务创建 +- 内容源账号同步任务创建与子任务派发 - `n8n` 工作流导入、激活与触发接口 - 本地下载器调用 - 本地 `ffmpeg` / `whisper` 风格入口封装 +- HTTP ASR 常驻服务入口绑定 - 本地大模型内容分析、二创文案、分镜生成 - Windows `cutvideo` API 调度与结果回写接口 - `upload_video -> source_job_id -> cutvideo` 自动 staging 闭环 @@ -20,22 +22,21 @@ ## 已验证的真实任务 - 分析链路:`job_203bc8e9b20f4b1cbbc6cf7da79e46f4` +- HTTP ASR 分析链路:`job_e95f9b5579fd4c5aa40f04de611e9fd0` +- 账号级内容源同步链路:`job_b02109cf9e8244fbb5b86f184a7c7574` +- 账号级同步派生分析任务:`job_7f169db61af441f8a7f186d03db2d91c`、`job_28c47774028441378a3974860c375ab7` +- 长视频 HTTP ASR 超时修复后链路:`job_bb405e2e878849e38c4bb31f7781e1e3` - 实拍剪辑链路:`job_5ebd829c3f2144bca5c941183e75bdcd` - 实拍剪辑自动 staging 联调:`job_01a6f283cbda42e4ae692b268b811a50` - AI 视频链路:`job_01828c40377747cf914b51be360cc333` -## 已实现但仍待环境验证 - -- 现有 ASR 部署入口与 `collector-service` 的最终绑定 - ## 尚未完全跑通 -- 对“抖音 / bilibili / 小红书账号级内容源”的批量抓取与分析调度 +- 抖音 / 小红书账号级内容源还未做真实平台验证;`bilibili` 账号级 URL 已跑通 - Windows 机器上的 `cutvideo` 需要同步部署带 `POST /api/uploads` 的版本,当前自动 staging 已在本机联调通过 ## 下一步优先级 1. 把 `cutvideo` 新上传接口部署到 Windows 机器 -2. 绑定你的真实 ASR 入口 -3. 补账号级内容源抓取调度 -4. 把改动整理成提交并推送 +2. 补抖音 / 小红书账号级真实验证与必要的 URL 归一化 +3. 把改动整理成提交并推送 diff --git a/n8n/README.md b/n8n/README.md index ed78a36..6fefeff 100644 --- a/n8n/README.md +++ b/n8n/README.md @@ -7,6 +7,7 @@ - `workflows/storyforge-analysis.json`:内容分析主线 - `workflows/storyforge-real-cut.json`:Windows `cutvideo` 调度主线 - `workflows/storyforge-ai-video.json`:`huobao-drama` AI 生成视频主线 +- `workflows/storyforge-content-source-sync.json`:内容源账号同步与批量分析派发主线 ## 约定 @@ -18,7 +19,7 @@ 1. 先执行 `docker compose up -d n8n collector` 2. 打开 `http://127.0.0.1:5670` -3. 从 UI 导入本目录下的 3 个 JSON +3. 从 UI 导入本目录下的 4 个 JSON 4. 激活工作流 ## Webhook 路径 @@ -26,3 +27,4 @@ - `/webhook/storyforge-analysis` - `/webhook/storyforge-real-cut` - `/webhook/storyforge-ai-video` +- `/webhook/storyforge-content-source-sync` diff --git a/n8n/workflows/storyforge-content-source-sync.json b/n8n/workflows/storyforge-content-source-sync.json new file mode 100644 index 0000000..b7f8a1c --- /dev/null +++ b/n8n/workflows/storyforge-content-source-sync.json @@ -0,0 +1,70 @@ +{ + "name": "StoryForge Content Source Sync", + "nodes": [ + { + "parameters": { + "httpMethod": "POST", + "path": "storyforge-content-source-sync", + "responseMode": "onReceived", + "options": {} + }, + "id": "content-source-sync-webhook", + "name": "Content Source Sync Webhook", + "type": "n8n-nodes-base.webhook", + "typeVersion": 2, + "position": [ + 220, + 300 + ], + "webhookId": "storyforge-content-source-sync" + }, + { + "parameters": { + "method": "POST", + "url": "={{'http://collector:8081/internal/jobs/steps/content-source-sync?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "sendHeaders": true, + "headerParameters": { + "parameters": [ + { + "name": "X-Orchestrator-Secret", + "value": "storyforge-local-secret" + } + ] + }, + "options": { + "timeout": 600000 + } + }, + "id": "content-source-sync-runner", + "name": "Run Content Source Sync Step", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [ + 540, + 300 + ] + } + ], + "connections": { + "Content Source Sync Webhook": { + "main": [ + [ + { + "node": "Run Content Source Sync Step", + "type": "main", + "index": 0 + } + ] + ] + }, + "Run Content Source Sync Step": { + "main": [ + [] + ] + } + }, + "active": false, + "settings": {}, + "pinData": {}, + "versionId": "storyforge-content-source-sync-v1" +}