feat: harden live real-cut orchestration
This commit is contained in:
@@ -770,10 +770,49 @@ def job_context_payload(row: dict[str, Any]) -> dict[str, Any]:
|
||||
return payload
|
||||
|
||||
|
||||
async def run_local_orchestrated_job(job_id: str, workflow_key: str) -> None:
|
||||
try:
|
||||
if workflow_key == "analysis_pipeline":
|
||||
await internal_run_analysis(None, job_id, True)
|
||||
return
|
||||
if workflow_key == "content_source_sync_pipeline":
|
||||
await internal_content_source_sync(None, job_id, True)
|
||||
return
|
||||
if workflow_key == "real_cut_pipeline":
|
||||
await internal_real_cut_run(None, job_id, True)
|
||||
return
|
||||
if workflow_key == "ai_video_pipeline":
|
||||
await internal_ai_video_render(None, job_id, True)
|
||||
return
|
||||
raise HTTPException(status_code=400, detail=f"Unsupported local workflow fallback: {workflow_key}")
|
||||
except HTTPException as exc:
|
||||
row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))
|
||||
if row:
|
||||
update_job_state(
|
||||
job_id,
|
||||
status="failed",
|
||||
provider_name="collector-local",
|
||||
provider_task_id="",
|
||||
error=str(exc.detail),
|
||||
result=merge_json_field(row.get("result_json") or "{}", {"local_orchestrator": {"error": str(exc.detail)}}),
|
||||
)
|
||||
append_job_event(job_id, "workflow.local.failed", {"workflow_key": workflow_key, "error": str(exc.detail)})
|
||||
except Exception as exc:
|
||||
row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))
|
||||
if row:
|
||||
update_job_state(
|
||||
job_id,
|
||||
status="failed",
|
||||
provider_name="collector-local",
|
||||
provider_task_id="",
|
||||
error=f"Local orchestrator failed: {exc}",
|
||||
result=merge_json_field(row.get("result_json") or "{}", {"local_orchestrator": {"error": str(exc)}}),
|
||||
)
|
||||
append_job_event(job_id, "workflow.local.failed", {"workflow_key": workflow_key, "error": str(exc)})
|
||||
|
||||
|
||||
async def trigger_orchestrated_job(job_row: dict[str, Any]) -> dict[str, Any]:
|
||||
workflow_key = job_row.get("workflow_key") or "analysis_pipeline"
|
||||
if not n8n_client.enabled:
|
||||
raise HTTPException(status_code=503, detail="n8n is not configured")
|
||||
append_job_event(job_row["id"], "workflow.trigger.requested", {"workflow_key": workflow_key})
|
||||
update_job_state(
|
||||
job_row["id"],
|
||||
@@ -782,17 +821,59 @@ async def trigger_orchestrated_job(job_row: dict[str, Any]) -> dict[str, Any]:
|
||||
provider_task_id="",
|
||||
result={"n8n_trigger": {"requested": True}},
|
||||
)
|
||||
trigger_result = await n8n_client.trigger(
|
||||
workflow_key,
|
||||
{
|
||||
"jobId": job_row["id"],
|
||||
"job_id": job_row["id"],
|
||||
"workflowKey": workflow_key,
|
||||
"workflow_key": workflow_key,
|
||||
"lineType": job_row.get("line_type", "analysis"),
|
||||
"line_type": job_row.get("line_type", "analysis"),
|
||||
},
|
||||
)
|
||||
payload = {
|
||||
"jobId": job_row["id"],
|
||||
"job_id": job_row["id"],
|
||||
"workflowKey": workflow_key,
|
||||
"workflow_key": workflow_key,
|
||||
"lineType": job_row.get("line_type", "analysis"),
|
||||
"line_type": job_row.get("line_type", "analysis"),
|
||||
}
|
||||
if not n8n_client.enabled:
|
||||
append_job_event(job_row["id"], "workflow.trigger.fallback", {"workflow_key": workflow_key, "reason": "n8n is not configured"})
|
||||
asyncio.create_task(run_local_orchestrated_job(job_row["id"], workflow_key))
|
||||
db.execute(
|
||||
"""
|
||||
UPDATE jobs
|
||||
SET provider_name = ?, provider_task_id = ?, result_json = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(
|
||||
"collector-local",
|
||||
"",
|
||||
merge_json_field(
|
||||
db.fetch_one("SELECT result_json FROM jobs WHERE id = ?", (job_row["id"],)).get("result_json") or "{}",
|
||||
{"n8n_trigger": {"requested": True, "fallback": "local", "reason": "n8n is not configured"}},
|
||||
),
|
||||
utc_now(),
|
||||
job_row["id"],
|
||||
),
|
||||
)
|
||||
return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],))
|
||||
|
||||
try:
|
||||
trigger_result = await n8n_client.trigger(workflow_key, payload)
|
||||
except Exception as exc:
|
||||
append_job_event(job_row["id"], "workflow.trigger.fallback", {"workflow_key": workflow_key, "reason": str(exc)})
|
||||
asyncio.create_task(run_local_orchestrated_job(job_row["id"], workflow_key))
|
||||
db.execute(
|
||||
"""
|
||||
UPDATE jobs
|
||||
SET provider_name = ?, provider_task_id = ?, result_json = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(
|
||||
"collector-local",
|
||||
"",
|
||||
merge_json_field(
|
||||
db.fetch_one("SELECT result_json FROM jobs WHERE id = ?", (job_row["id"],)).get("result_json") or "{}",
|
||||
{"n8n_trigger": {"requested": True, "fallback": "local", "reason": str(exc)}},
|
||||
),
|
||||
utc_now(),
|
||||
job_row["id"],
|
||||
),
|
||||
)
|
||||
return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],))
|
||||
provider_task_id = str(trigger_result.get("executionId") or "")
|
||||
db.execute(
|
||||
"""
|
||||
@@ -1050,6 +1131,68 @@ async def stage_real_cut_source_to_cutvideo(source_job: dict[str, Any]) -> dict[
|
||||
}
|
||||
|
||||
|
||||
def cutvideo_run_has_materialized_outputs(run_payload: dict[str, Any]) -> bool:
|
||||
for key in (
|
||||
"manifest",
|
||||
"assets",
|
||||
"segments",
|
||||
"top_segments",
|
||||
"tool_report",
|
||||
"llm_review_summary",
|
||||
"exports",
|
||||
"timeline",
|
||||
"summary_markdown",
|
||||
"clips",
|
||||
"downloads",
|
||||
"transcripts",
|
||||
"files",
|
||||
):
|
||||
value = run_payload.get(key)
|
||||
if value not in (None, "", [], {}, 0):
|
||||
return True
|
||||
return bool(str(run_payload.get("generated_at") or "").strip())
|
||||
|
||||
|
||||
async def find_cutvideo_run_for_job(row: dict[str, Any]) -> dict[str, Any] | None:
|
||||
result_payload = parse_job_result(row)
|
||||
submit_payload = result_payload.get("cutvideo_submit") or {}
|
||||
if not isinstance(submit_payload, dict):
|
||||
submit_payload = {}
|
||||
request_payload = submit_payload.get("request") or {}
|
||||
if not isinstance(request_payload, dict):
|
||||
request_payload = {}
|
||||
expected_name = str(request_payload.get("name") or row.get("title") or "").strip()
|
||||
if not expected_name:
|
||||
return None
|
||||
|
||||
runs_payload = await cutvideo_client.list_runs()
|
||||
items = runs_payload.get("items")
|
||||
if not isinstance(items, list):
|
||||
return None
|
||||
|
||||
normalized_expected = expected_name.casefold()
|
||||
for item in items:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
run_id = str(item.get("run_id") or item.get("id") or "").strip()
|
||||
job_name = str(item.get("job_name") or item.get("name") or "").strip()
|
||||
normalized_job_name = job_name.casefold()
|
||||
normalized_run_id = run_id.casefold()
|
||||
if (
|
||||
normalized_job_name == normalized_expected
|
||||
or normalized_run_id == normalized_expected
|
||||
or normalized_job_name.endswith(normalized_expected)
|
||||
or normalized_run_id.endswith(normalized_expected)
|
||||
):
|
||||
detail = await cutvideo_client.get_run(run_id or job_name)
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"summary": item,
|
||||
"detail": detail,
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
def create_job_record(
|
||||
*,
|
||||
account_id: str,
|
||||
@@ -1102,7 +1245,7 @@ def create_job_record(
|
||||
|
||||
|
||||
async def wait_for_huobao_image(image_id: str | int) -> dict[str, Any]:
|
||||
deadline = now_ts() + HUOBAO_MAX_WAIT_SEC
|
||||
deadline = now_ts() + CUTVIDEO_MAX_WAIT_SEC
|
||||
last_payload: dict[str, Any] = {}
|
||||
while True:
|
||||
last_payload = await huobao_client.get_image(str(image_id))
|
||||
@@ -2635,7 +2778,47 @@ async def internal_real_cut_run(
|
||||
|
||||
deadline = now_ts() + HUOBAO_MAX_WAIT_SEC
|
||||
while True:
|
||||
task_payload = await cutvideo_client.get_task(row["provider_task_id"])
|
||||
run_fallback: dict[str, Any] | None = None
|
||||
try:
|
||||
task_payload = await cutvideo_client.get_task(row["provider_task_id"])
|
||||
except httpx.HTTPStatusError as exc:
|
||||
if exc.response is None or exc.response.status_code != 404:
|
||||
raise
|
||||
run_fallback = await find_cutvideo_run_for_job(row)
|
||||
if run_fallback and cutvideo_run_has_materialized_outputs(run_fallback["detail"]):
|
||||
updated = update_job_state(
|
||||
row["id"],
|
||||
status="completed",
|
||||
provider_name="cutvideo",
|
||||
provider_task_id=row["provider_task_id"],
|
||||
artifacts={
|
||||
"cutvideo_task": {
|
||||
"task_id": row["provider_task_id"],
|
||||
"status": "missing",
|
||||
"compatibility_mode": "run-fallback",
|
||||
"error": "Task not found after submit; using run fallback",
|
||||
},
|
||||
"cutvideo_run_lookup": run_fallback["summary"],
|
||||
"cutvideo_run": run_fallback["detail"],
|
||||
},
|
||||
result={
|
||||
**parse_job_result(row),
|
||||
"cutvideo_task": {
|
||||
"task_id": row["provider_task_id"],
|
||||
"status": "missing",
|
||||
"compatibility_mode": "run-fallback",
|
||||
"error": "Task not found after submit; using run fallback",
|
||||
},
|
||||
"cutvideo_run_lookup": run_fallback["summary"],
|
||||
"cutvideo_run": run_fallback["detail"],
|
||||
},
|
||||
)
|
||||
return job_context_payload(updated)
|
||||
task_payload = {
|
||||
"task_id": row["provider_task_id"],
|
||||
"status": "missing",
|
||||
"error": "Task not found",
|
||||
}
|
||||
status = str(task_payload.get("status") or "").lower()
|
||||
if status == "completed":
|
||||
run_payload: dict[str, Any] = {}
|
||||
|
||||
@@ -133,6 +133,15 @@ class CutVideoClient:
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def list_runs(self) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
_join_url(self.base_url, "/api/runs"),
|
||||
headers=self._headers(),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
|
||||
class AsrHttpClient:
|
||||
def __init__(
|
||||
|
||||
Reference in New Issue
Block a user