feat: sync live orchestrator and n8n routing

This commit is contained in:
kris
2026-03-23 06:54:01 +08:00
parent ea6a855890
commit 4ab0b26821
7 changed files with 213 additions and 20 deletions

View File

@@ -133,6 +133,15 @@ class CutVideoClient:
response.raise_for_status()
return _unwrap_response(response.json())
async def list_runs(self) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, "/api/runs"),
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
class AsrHttpClient:
def __init__(

View File

@@ -837,10 +837,49 @@ def job_context_payload(row: dict[str, Any]) -> dict[str, Any]:
return payload
async def run_local_orchestrated_job(job_id: str, workflow_key: str) -> None:
try:
if workflow_key == "analysis_pipeline":
await internal_run_analysis(None, job_id, True)
return
if workflow_key == "content_source_sync_pipeline":
await internal_content_source_sync(None, job_id, True)
return
if workflow_key == "real_cut_pipeline":
await internal_real_cut_run(None, job_id, True)
return
if workflow_key == "ai_video_pipeline":
await internal_ai_video_render(None, job_id, True)
return
raise HTTPException(status_code=400, detail=f"Unsupported local workflow fallback: {workflow_key}")
except HTTPException as exc:
row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))
if row:
update_job_state(
job_id,
status="failed",
provider_name="collector-local",
provider_task_id="",
error=str(exc.detail),
result=merge_json_field(row.get("result_json") or "{}", {"local_orchestrator": {"error": str(exc.detail)}}),
)
append_job_event(job_id, "workflow.local.failed", {"workflow_key": workflow_key, "error": str(exc.detail)})
except Exception as exc:
row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))
if row:
update_job_state(
job_id,
status="failed",
provider_name="collector-local",
provider_task_id="",
error=f"Local orchestrator failed: {exc}",
result=merge_json_field(row.get("result_json") or "{}", {"local_orchestrator": {"error": str(exc)}}),
)
append_job_event(job_id, "workflow.local.failed", {"workflow_key": workflow_key, "error": str(exc)})
async def trigger_orchestrated_job(job_row: dict[str, Any]) -> dict[str, Any]:
workflow_key = job_row.get("workflow_key") or "analysis_pipeline"
if not n8n_client.enabled:
raise HTTPException(status_code=503, detail="n8n is not configured")
append_job_event(job_row["id"], "workflow.trigger.requested", {"workflow_key": workflow_key})
update_job_state(
job_row["id"],
@@ -849,17 +888,59 @@ async def trigger_orchestrated_job(job_row: dict[str, Any]) -> dict[str, Any]:
provider_task_id="",
result={"n8n_trigger": {"requested": True}},
)
trigger_result = await n8n_client.trigger(
workflow_key,
{
"jobId": job_row["id"],
"job_id": job_row["id"],
"workflowKey": workflow_key,
"workflow_key": workflow_key,
"lineType": job_row.get("line_type", "analysis"),
"line_type": job_row.get("line_type", "analysis"),
},
)
payload = {
"jobId": job_row["id"],
"job_id": job_row["id"],
"workflowKey": workflow_key,
"workflow_key": workflow_key,
"lineType": job_row.get("line_type", "analysis"),
"line_type": job_row.get("line_type", "analysis"),
}
if not n8n_client.enabled:
append_job_event(job_row["id"], "workflow.trigger.fallback", {"workflow_key": workflow_key, "reason": "n8n is not configured"})
asyncio.create_task(run_local_orchestrated_job(job_row["id"], workflow_key))
db.execute(
"""
UPDATE jobs
SET provider_name = ?, provider_task_id = ?, result_json = ?, updated_at = ?
WHERE id = ?
""",
(
"collector-local",
"",
merge_json_field(
db.fetch_one("SELECT result_json FROM jobs WHERE id = ?", (job_row["id"],)).get("result_json") or "{}",
{"n8n_trigger": {"requested": True, "fallback": "local", "reason": "n8n is not configured"}},
),
utc_now(),
job_row["id"],
),
)
return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],))
try:
trigger_result = await n8n_client.trigger(workflow_key, payload)
except Exception as exc:
append_job_event(job_row["id"], "workflow.trigger.fallback", {"workflow_key": workflow_key, "reason": str(exc)})
asyncio.create_task(run_local_orchestrated_job(job_row["id"], workflow_key))
db.execute(
"""
UPDATE jobs
SET provider_name = ?, provider_task_id = ?, result_json = ?, updated_at = ?
WHERE id = ?
""",
(
"collector-local",
"",
merge_json_field(
db.fetch_one("SELECT result_json FROM jobs WHERE id = ?", (job_row["id"],)).get("result_json") or "{}",
{"n8n_trigger": {"requested": True, "fallback": "local", "reason": str(exc)}},
),
utc_now(),
job_row["id"],
),
)
return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],))
provider_task_id = str(trigger_result.get("executionId") or "")
db.execute(
"""
@@ -1117,6 +1198,68 @@ async def stage_real_cut_source_to_cutvideo(source_job: dict[str, Any]) -> dict[
}
def cutvideo_run_has_materialized_outputs(run_payload: dict[str, Any]) -> bool:
for key in (
"manifest",
"assets",
"segments",
"top_segments",
"tool_report",
"llm_review_summary",
"exports",
"timeline",
"summary_markdown",
"clips",
"downloads",
"transcripts",
"files",
):
value = run_payload.get(key)
if value not in (None, "", [], {}, 0):
return True
return bool(str(run_payload.get("generated_at") or "").strip())
async def find_cutvideo_run_for_job(row: dict[str, Any]) -> dict[str, Any] | None:
result_payload = parse_job_result(row)
submit_payload = result_payload.get("cutvideo_submit") or {}
if not isinstance(submit_payload, dict):
submit_payload = {}
request_payload = submit_payload.get("request") or {}
if not isinstance(request_payload, dict):
request_payload = {}
expected_name = str(request_payload.get("name") or row.get("title") or "").strip()
if not expected_name:
return None
runs_payload = await cutvideo_client.list_runs()
items = runs_payload.get("items")
if not isinstance(items, list):
return None
normalized_expected = expected_name.casefold()
for item in items:
if not isinstance(item, dict):
continue
run_id = str(item.get("run_id") or item.get("id") or "").strip()
job_name = str(item.get("job_name") or item.get("name") or "").strip()
normalized_job_name = job_name.casefold()
normalized_run_id = run_id.casefold()
if (
normalized_job_name == normalized_expected
or normalized_run_id == normalized_expected
or normalized_job_name.endswith(normalized_expected)
or normalized_run_id.endswith(normalized_expected)
):
detail = await cutvideo_client.get_run(run_id or job_name)
return {
"run_id": run_id,
"summary": item,
"detail": detail,
}
return None
def create_job_record(
*,
account_id: str,
@@ -2805,9 +2948,49 @@ async def internal_real_cut_run(
result={"cutvideo_submit": submit_result},
)
deadline = now_ts() + HUOBAO_MAX_WAIT_SEC
deadline = now_ts() + CUTVIDEO_MAX_WAIT_SEC
while True:
task_payload = await cutvideo_client.get_task(row["provider_task_id"])
run_fallback: dict[str, Any] | None = None
try:
task_payload = await cutvideo_client.get_task(row["provider_task_id"])
except httpx.HTTPStatusError as exc:
if exc.response is None or exc.response.status_code != 404:
raise
run_fallback = await find_cutvideo_run_for_job(row)
if run_fallback and cutvideo_run_has_materialized_outputs(run_fallback["detail"]):
updated = update_job_state(
row["id"],
status="completed",
provider_name="cutvideo",
provider_task_id=row["provider_task_id"],
artifacts={
"cutvideo_task": {
"task_id": row["provider_task_id"],
"status": "missing",
"compatibility_mode": "run-fallback",
"error": "Task not found after submit; using run fallback",
},
"cutvideo_run_lookup": run_fallback["summary"],
"cutvideo_run": run_fallback["detail"],
},
result={
**parse_job_result(row),
"cutvideo_task": {
"task_id": row["provider_task_id"],
"status": "missing",
"compatibility_mode": "run-fallback",
"error": "Task not found after submit; using run fallback",
},
"cutvideo_run_lookup": run_fallback["summary"],
"cutvideo_run": run_fallback["detail"],
},
)
return job_context_payload(updated)
task_payload = {
"task_id": row["provider_task_id"],
"status": "missing",
"error": "Task not found",
}
status = str(task_payload.get("status") or "").lower()
if status == "completed":
run_payload: dict[str, Any] = {}

View File

@@ -11,7 +11,8 @@
## 约定
- 工作流内部默认通过 `http://collector:8081` 调用 `collector-service`
- 当前这套 live 链路默认通过 `http://host.docker.internal:8081` 调用宿主机 collector-service
- 如果整套流程完全运行在 Docker 内部,再把工作流里的内部调用地址切回 `http://collector:8081`
- 内部调用头部使用 `X-Orchestrator-Secret: storyforge-local-secret`
- 如果你修改了 `.env` 里的 `ORCHESTRATOR_SHARED_SECRET`,导入工作流后需要同步更新对应 HTTP Request 节点

View File

@@ -21,7 +21,7 @@
{
"parameters": {
"method": "POST",
"url": "={{'http://collector:8081/internal/jobs/steps/ai-video/render?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"url": "={{'http://host.docker.internal:8081/internal/jobs/steps/ai-video/render?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"sendHeaders": true,
"headerParameters": {
"parameters": [

View File

@@ -21,7 +21,7 @@
{
"parameters": {
"method": "POST",
"url": "={{'http://collector:8081/internal/jobs/steps/analyze?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"url": "={{'http://host.docker.internal:8081/internal/jobs/steps/analyze?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"sendHeaders": true,
"headerParameters": {
"parameters": [

View File

@@ -21,7 +21,7 @@
{
"parameters": {
"method": "POST",
"url": "={{'http://collector:8081/internal/jobs/steps/content-source-sync?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"url": "={{'http://host.docker.internal:8081/internal/jobs/steps/content-source-sync?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"sendHeaders": true,
"headerParameters": {
"parameters": [

View File

@@ -21,7 +21,7 @@
{
"parameters": {
"method": "POST",
"url": "={{'http://collector:8081/internal/jobs/steps/real-cut/run?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"url": "={{'http://host.docker.internal:8081/internal/jobs/steps/real-cut/run?job_id=' + ($json.body.job_id || $json.body.jobId)}}",
"sendHeaders": true,
"headerParameters": {
"parameters": [