From 4ab0b2682101d4271f3350f8f58a21003dbbaf95 Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 06:54:01 +0800 Subject: [PATCH] feat: sync live orchestrator and n8n routing --- collector-service/app/integrations.py | 9 + collector-service/app/main.py | 213 ++++++++++++++++-- n8n/README.md | 3 +- n8n/workflows/storyforge-ai-video.json | 2 +- n8n/workflows/storyforge-analysis.json | 2 +- .../storyforge-content-source-sync.json | 2 +- n8n/workflows/storyforge-real-cut.json | 2 +- 7 files changed, 213 insertions(+), 20 deletions(-) diff --git a/collector-service/app/integrations.py b/collector-service/app/integrations.py index 1878bb6..7cde031 100644 --- a/collector-service/app/integrations.py +++ b/collector-service/app/integrations.py @@ -133,6 +133,15 @@ class CutVideoClient: response.raise_for_status() return _unwrap_response(response.json()) + async def list_runs(self) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + _join_url(self.base_url, "/api/runs"), + headers=self._headers(), + ) + response.raise_for_status() + return _unwrap_response(response.json()) + class AsrHttpClient: def __init__( diff --git a/collector-service/app/main.py b/collector-service/app/main.py index 6664b32..f1272cf 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -837,10 +837,49 @@ def job_context_payload(row: dict[str, Any]) -> dict[str, Any]: return payload +async def run_local_orchestrated_job(job_id: str, workflow_key: str) -> None: + try: + if workflow_key == "analysis_pipeline": + await internal_run_analysis(None, job_id, True) + return + if workflow_key == "content_source_sync_pipeline": + await internal_content_source_sync(None, job_id, True) + return + if workflow_key == "real_cut_pipeline": + await internal_real_cut_run(None, job_id, True) + return + if workflow_key == "ai_video_pipeline": + await internal_ai_video_render(None, job_id, True) + return + raise HTTPException(status_code=400, detail=f"Unsupported local workflow fallback: {workflow_key}") + except HTTPException as exc: + row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) + if row: + update_job_state( + job_id, + status="failed", + provider_name="collector-local", + provider_task_id="", + error=str(exc.detail), + result=merge_json_field(row.get("result_json") or "{}", {"local_orchestrator": {"error": str(exc.detail)}}), + ) + append_job_event(job_id, "workflow.local.failed", {"workflow_key": workflow_key, "error": str(exc.detail)}) + except Exception as exc: + row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) + if row: + update_job_state( + job_id, + status="failed", + provider_name="collector-local", + provider_task_id="", + error=f"Local orchestrator failed: {exc}", + result=merge_json_field(row.get("result_json") or "{}", {"local_orchestrator": {"error": str(exc)}}), + ) + append_job_event(job_id, "workflow.local.failed", {"workflow_key": workflow_key, "error": str(exc)}) + + async def trigger_orchestrated_job(job_row: dict[str, Any]) -> dict[str, Any]: workflow_key = job_row.get("workflow_key") or "analysis_pipeline" - if not n8n_client.enabled: - raise HTTPException(status_code=503, detail="n8n is not configured") append_job_event(job_row["id"], "workflow.trigger.requested", {"workflow_key": workflow_key}) update_job_state( job_row["id"], @@ -849,17 +888,59 @@ async def trigger_orchestrated_job(job_row: dict[str, Any]) -> dict[str, Any]: provider_task_id="", result={"n8n_trigger": {"requested": True}}, ) - trigger_result = await n8n_client.trigger( - workflow_key, - { - "jobId": job_row["id"], - "job_id": job_row["id"], - "workflowKey": workflow_key, - "workflow_key": workflow_key, - "lineType": job_row.get("line_type", "analysis"), - "line_type": job_row.get("line_type", "analysis"), - }, - ) + payload = { + "jobId": job_row["id"], + "job_id": job_row["id"], + "workflowKey": workflow_key, + "workflow_key": workflow_key, + "lineType": job_row.get("line_type", "analysis"), + "line_type": job_row.get("line_type", "analysis"), + } + if not n8n_client.enabled: + append_job_event(job_row["id"], "workflow.trigger.fallback", {"workflow_key": workflow_key, "reason": "n8n is not configured"}) + asyncio.create_task(run_local_orchestrated_job(job_row["id"], workflow_key)) + db.execute( + """ + UPDATE jobs + SET provider_name = ?, provider_task_id = ?, result_json = ?, updated_at = ? + WHERE id = ? + """, + ( + "collector-local", + "", + merge_json_field( + db.fetch_one("SELECT result_json FROM jobs WHERE id = ?", (job_row["id"],)).get("result_json") or "{}", + {"n8n_trigger": {"requested": True, "fallback": "local", "reason": "n8n is not configured"}}, + ), + utc_now(), + job_row["id"], + ), + ) + return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],)) + + try: + trigger_result = await n8n_client.trigger(workflow_key, payload) + except Exception as exc: + append_job_event(job_row["id"], "workflow.trigger.fallback", {"workflow_key": workflow_key, "reason": str(exc)}) + asyncio.create_task(run_local_orchestrated_job(job_row["id"], workflow_key)) + db.execute( + """ + UPDATE jobs + SET provider_name = ?, provider_task_id = ?, result_json = ?, updated_at = ? + WHERE id = ? + """, + ( + "collector-local", + "", + merge_json_field( + db.fetch_one("SELECT result_json FROM jobs WHERE id = ?", (job_row["id"],)).get("result_json") or "{}", + {"n8n_trigger": {"requested": True, "fallback": "local", "reason": str(exc)}}, + ), + utc_now(), + job_row["id"], + ), + ) + return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],)) provider_task_id = str(trigger_result.get("executionId") or "") db.execute( """ @@ -1117,6 +1198,68 @@ async def stage_real_cut_source_to_cutvideo(source_job: dict[str, Any]) -> dict[ } +def cutvideo_run_has_materialized_outputs(run_payload: dict[str, Any]) -> bool: + for key in ( + "manifest", + "assets", + "segments", + "top_segments", + "tool_report", + "llm_review_summary", + "exports", + "timeline", + "summary_markdown", + "clips", + "downloads", + "transcripts", + "files", + ): + value = run_payload.get(key) + if value not in (None, "", [], {}, 0): + return True + return bool(str(run_payload.get("generated_at") or "").strip()) + + +async def find_cutvideo_run_for_job(row: dict[str, Any]) -> dict[str, Any] | None: + result_payload = parse_job_result(row) + submit_payload = result_payload.get("cutvideo_submit") or {} + if not isinstance(submit_payload, dict): + submit_payload = {} + request_payload = submit_payload.get("request") or {} + if not isinstance(request_payload, dict): + request_payload = {} + expected_name = str(request_payload.get("name") or row.get("title") or "").strip() + if not expected_name: + return None + + runs_payload = await cutvideo_client.list_runs() + items = runs_payload.get("items") + if not isinstance(items, list): + return None + + normalized_expected = expected_name.casefold() + for item in items: + if not isinstance(item, dict): + continue + run_id = str(item.get("run_id") or item.get("id") or "").strip() + job_name = str(item.get("job_name") or item.get("name") or "").strip() + normalized_job_name = job_name.casefold() + normalized_run_id = run_id.casefold() + if ( + normalized_job_name == normalized_expected + or normalized_run_id == normalized_expected + or normalized_job_name.endswith(normalized_expected) + or normalized_run_id.endswith(normalized_expected) + ): + detail = await cutvideo_client.get_run(run_id or job_name) + return { + "run_id": run_id, + "summary": item, + "detail": detail, + } + return None + + def create_job_record( *, account_id: str, @@ -2805,9 +2948,49 @@ async def internal_real_cut_run( result={"cutvideo_submit": submit_result}, ) - deadline = now_ts() + HUOBAO_MAX_WAIT_SEC + deadline = now_ts() + CUTVIDEO_MAX_WAIT_SEC while True: - task_payload = await cutvideo_client.get_task(row["provider_task_id"]) + run_fallback: dict[str, Any] | None = None + try: + task_payload = await cutvideo_client.get_task(row["provider_task_id"]) + except httpx.HTTPStatusError as exc: + if exc.response is None or exc.response.status_code != 404: + raise + run_fallback = await find_cutvideo_run_for_job(row) + if run_fallback and cutvideo_run_has_materialized_outputs(run_fallback["detail"]): + updated = update_job_state( + row["id"], + status="completed", + provider_name="cutvideo", + provider_task_id=row["provider_task_id"], + artifacts={ + "cutvideo_task": { + "task_id": row["provider_task_id"], + "status": "missing", + "compatibility_mode": "run-fallback", + "error": "Task not found after submit; using run fallback", + }, + "cutvideo_run_lookup": run_fallback["summary"], + "cutvideo_run": run_fallback["detail"], + }, + result={ + **parse_job_result(row), + "cutvideo_task": { + "task_id": row["provider_task_id"], + "status": "missing", + "compatibility_mode": "run-fallback", + "error": "Task not found after submit; using run fallback", + }, + "cutvideo_run_lookup": run_fallback["summary"], + "cutvideo_run": run_fallback["detail"], + }, + ) + return job_context_payload(updated) + task_payload = { + "task_id": row["provider_task_id"], + "status": "missing", + "error": "Task not found", + } status = str(task_payload.get("status") or "").lower() if status == "completed": run_payload: dict[str, Any] = {} diff --git a/n8n/README.md b/n8n/README.md index 6fefeff..67a72e0 100644 --- a/n8n/README.md +++ b/n8n/README.md @@ -11,7 +11,8 @@ ## 约定 -- 工作流内部默认通过 `http://collector:8081` 调用 `collector-service` +- 当前这套 live 链路默认通过 `http://host.docker.internal:8081` 调用宿主机 collector-service +- 如果整套流程完全运行在 Docker 内部,再把工作流里的内部调用地址切回 `http://collector:8081` - 内部调用头部使用 `X-Orchestrator-Secret: storyforge-local-secret` - 如果你修改了 `.env` 里的 `ORCHESTRATOR_SHARED_SECRET`,导入工作流后需要同步更新对应 HTTP Request 节点 diff --git a/n8n/workflows/storyforge-ai-video.json b/n8n/workflows/storyforge-ai-video.json index 6e35cd3..606b953 100644 --- a/n8n/workflows/storyforge-ai-video.json +++ b/n8n/workflows/storyforge-ai-video.json @@ -21,7 +21,7 @@ { "parameters": { "method": "POST", - "url": "={{'http://collector:8081/internal/jobs/steps/ai-video/render?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "url": "={{'http://host.docker.internal:8081/internal/jobs/steps/ai-video/render?job_id=' + ($json.body.job_id || $json.body.jobId)}}", "sendHeaders": true, "headerParameters": { "parameters": [ diff --git a/n8n/workflows/storyforge-analysis.json b/n8n/workflows/storyforge-analysis.json index 77b910b..75ce3ae 100644 --- a/n8n/workflows/storyforge-analysis.json +++ b/n8n/workflows/storyforge-analysis.json @@ -21,7 +21,7 @@ { "parameters": { "method": "POST", - "url": "={{'http://collector:8081/internal/jobs/steps/analyze?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "url": "={{'http://host.docker.internal:8081/internal/jobs/steps/analyze?job_id=' + ($json.body.job_id || $json.body.jobId)}}", "sendHeaders": true, "headerParameters": { "parameters": [ diff --git a/n8n/workflows/storyforge-content-source-sync.json b/n8n/workflows/storyforge-content-source-sync.json index b7f8a1c..fc14f9d 100644 --- a/n8n/workflows/storyforge-content-source-sync.json +++ b/n8n/workflows/storyforge-content-source-sync.json @@ -21,7 +21,7 @@ { "parameters": { "method": "POST", - "url": "={{'http://collector:8081/internal/jobs/steps/content-source-sync?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "url": "={{'http://host.docker.internal:8081/internal/jobs/steps/content-source-sync?job_id=' + ($json.body.job_id || $json.body.jobId)}}", "sendHeaders": true, "headerParameters": { "parameters": [ diff --git a/n8n/workflows/storyforge-real-cut.json b/n8n/workflows/storyforge-real-cut.json index 5fd2892..3b45dda 100644 --- a/n8n/workflows/storyforge-real-cut.json +++ b/n8n/workflows/storyforge-real-cut.json @@ -21,7 +21,7 @@ { "parameters": { "method": "POST", - "url": "={{'http://collector:8081/internal/jobs/steps/real-cut/run?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "url": "={{'http://host.docker.internal:8081/internal/jobs/steps/real-cut/run?job_id=' + ($json.body.job_id || $json.body.jobId)}}", "sendHeaders": true, "headerParameters": { "parameters": [