diff --git a/.env.example b/.env.example index f16a6d9..37cefb6 100644 --- a/.env.example +++ b/.env.example @@ -12,6 +12,7 @@ CUTVIDEO_API_KEY= CUTVIDEO_BASE_CONFIG=example.job.yaml CUTVIDEO_POLL_INTERVAL_SEC=10 CUTVIDEO_MAX_WAIT_SEC=1800 +CUTVIDEO_UPLOAD_TIMEOUT_SEC=1800 HUOBAO_BASE_URL=http://127.0.0.1:5678 HUOBAO_POLL_INTERVAL_SEC=10 HUOBAO_MAX_WAIT_SEC=900 diff --git a/collector-service/app/integrations.py b/collector-service/app/integrations.py index dd48d08..70c07a7 100644 --- a/collector-service/app/integrations.py +++ b/collector-service/app/integrations.py @@ -1,5 +1,7 @@ from __future__ import annotations +import mimetypes +from pathlib import Path from typing import Any import httpx @@ -65,10 +67,18 @@ class N8NClient: class CutVideoClient: - def __init__(self, *, base_url: str, api_key: str = "", timeout: float = 120.0) -> None: + def __init__( + self, + *, + base_url: str, + api_key: str = "", + timeout: float = 120.0, + upload_timeout: float = 1800.0, + ) -> None: self.base_url = base_url.rstrip("/") self.api_key = api_key.strip() self.timeout = timeout + self.upload_timeout = upload_timeout @property def enabled(self) -> bool: @@ -90,6 +100,21 @@ class CutVideoClient: response.raise_for_status() return _unwrap_response(response.json()) + async def upload_source_file(self, source_path: Path, *, folder_name: str = "") -> dict[str, Any]: + content_type = mimetypes.guess_type(source_path.name)[0] or "application/octet-stream" + headers = self._headers() + data = {"folder_name": folder_name} if folder_name else {} + async with httpx.AsyncClient(timeout=self.upload_timeout) as client: + with source_path.open("rb") as handle: + response = await client.post( + _join_url(self.base_url, "/api/uploads"), + data=data, + files={"files": (source_path.name, handle, content_type)}, + headers=headers, + ) + response.raise_for_status() + return _unwrap_response(response.json()) + async def get_task(self, task_id: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( diff --git a/collector-service/app/main.py b/collector-service/app/main.py index 68cd77a..a5c45be 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -46,6 +46,7 @@ HUOBAO_BASE_URL = os.getenv("HUOBAO_BASE_URL", "http://127.0.0.1:5678") CUTVIDEO_BASE_CONFIG = os.getenv("CUTVIDEO_BASE_CONFIG", "example.job.yaml") CUTVIDEO_POLL_INTERVAL_SEC = int(os.getenv("CUTVIDEO_POLL_INTERVAL_SEC", "10")) CUTVIDEO_MAX_WAIT_SEC = int(os.getenv("CUTVIDEO_MAX_WAIT_SEC", "1800")) +CUTVIDEO_UPLOAD_TIMEOUT_SEC = int(os.getenv("CUTVIDEO_UPLOAD_TIMEOUT_SEC", "1800")) HUOBAO_POLL_INTERVAL_SEC = int(os.getenv("HUOBAO_POLL_INTERVAL_SEC", "10")) HUOBAO_MAX_WAIT_SEC = int(os.getenv("HUOBAO_MAX_WAIT_SEC", "900")) @@ -63,7 +64,11 @@ n8n_client = N8NClient( }, shared_secret=ORCHESTRATOR_SHARED_SECRET, ) -cutvideo_client = CutVideoClient(base_url=CUTVIDEO_BASE_URL, api_key=CUTVIDEO_API_KEY) +cutvideo_client = CutVideoClient( + base_url=CUTVIDEO_BASE_URL, + api_key=CUTVIDEO_API_KEY, + upload_timeout=CUTVIDEO_UPLOAD_TIMEOUT_SEC, +) huobao_client = HuobaoDramaClient(base_url=HUOBAO_BASE_URL) app = FastAPI(title="StoryForge Collector Service", version="0.2.0") @@ -185,7 +190,8 @@ class ContentSourceCreateRequest(BaseModel): class RealCutJobRequest(BaseModel): project_id: str = "" title: str - input_dir: str + input_dir: str = "" + source_job_id: str = "" base_config: str = "" objective: str = "保留高信息密度片段,输出适合短视频平台的粗剪结果" target_duration_sec: int = 60 @@ -866,6 +872,60 @@ def run_command(command: list[str], cwd: Path | None = None) -> tuple[int, str, return proc.returncode, proc.stdout, proc.stderr +def validate_real_cut_source_job(source_job: dict[str, Any]) -> None: + source_type = source_job.get("source_type", "") + if source_type not in {"upload_video", "video_link"}: + raise HTTPException(status_code=400, detail="Real-cut source job must come from upload_video or video_link") + if source_type == "video_link" and source_job.get("status") != "completed": + raise HTTPException(status_code=409, detail="Video link source job must be completed before real-cut staging") + + +def resolve_real_cut_source_file(source_job: dict[str, Any]) -> tuple[Path, dict[str, Any] | None]: + validate_real_cut_source_job(source_job) + artifacts = parse_job_artifacts(source_job) + candidates: list[Path] = [] + + if artifacts.get("uploaded_path"): + candidates.append(Path(str(artifacts["uploaded_path"]))) + if artifacts.get("source_path"): + candidates.append(Path(str(artifacts["source_path"]))) + if source_job.get("content_source_id"): + source_row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_job["content_source_id"],)) + if source_row and source_row.get("local_path"): + candidates.append(Path(str(source_row["local_path"]))) + if source_job.get("source_type") == "video_link": + candidates.append(JOBS_DIR / source_job["id"] / "source.mp4") + + seen: set[str] = set() + for candidate in candidates: + candidate_str = str(candidate) + if not candidate_str or candidate_str in seen: + continue + seen.add(candidate_str) + if candidate.exists() and candidate.is_file(): + return candidate, artifacts + + raise HTTPException(status_code=409, detail="Source job media file is not available for real-cut staging") + + +async def stage_real_cut_source_to_cutvideo(source_job: dict[str, Any]) -> dict[str, Any]: + if not cutvideo_client.enabled: + raise HTTPException(status_code=503, detail="CutVideo is not configured") + + source_path, source_artifacts = resolve_real_cut_source_file(source_job) + folder_name = f"storyforge-{source_job['id']}" + upload_payload = await cutvideo_client.upload_source_file(source_path, folder_name=folder_name) + input_dir = str(upload_payload.get("input_dir") or "").strip() + if not input_dir: + raise HTTPException(status_code=502, detail="CutVideo upload did not return input_dir") + return { + "input_dir": input_dir, + "source_path": str(source_path), + "upload": upload_payload, + "source_artifacts": source_artifacts, + } + + def create_job_record( *, account_id: str, @@ -1165,6 +1225,7 @@ def healthz() -> dict[str, Any]: "localModelBaseUrl": LOCAL_OPENAI_BASE_URL, "n8nBaseUrl": N8N_BASE_URL, "cutvideoBaseUrl": CUTVIDEO_BASE_URL, + "cutvideoUploadTimeoutSec": CUTVIDEO_UPLOAD_TIMEOUT_SEC, "huobaoBaseUrl": HUOBAO_BASE_URL, } @@ -1605,15 +1666,43 @@ async def upload_video( @app.post("/v2/pipelines/real-cut") async def create_real_cut_job(request: RealCutJobRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: - project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + source_job = None + source_job_id = request.source_job_id.strip() + if source_job_id: + source_job = load_owned_job(source_job_id, account["id"]) + + requested_project_id = request.project_id or (source_job.get("project_id", "") if source_job else "") + project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + if source_job and source_job.get("project_id") and source_job.get("project_id") != project["id"]: + raise HTTPException(status_code=400, detail="Source job does not belong to target project") + kb = ensure_user_kb(account["id"], project["id"], username=account["username"]) + resolved_input_dir = request.input_dir.strip() + staged_payload: dict[str, Any] = {} + if not resolved_input_dir: + if not source_job: + raise HTTPException(status_code=400, detail="input_dir or source_job_id is required") + staged_payload = await stage_real_cut_source_to_cutvideo(source_job) + resolved_input_dir = staged_payload["input_dir"] + + source_url = resolved_input_dir + source_metadata: dict[str, Any] = {"line_type": "real_cut"} + if source_job: + source_url = source_job.get("source_url") or resolved_input_dir + source_metadata["source_job_id"] = source_job["id"] + source_metadata["source_job_type"] = source_job.get("source_type", "") + if staged_payload: + source_metadata["cutvideo_upload"] = staged_payload.get("upload", {}) + source_metadata["source_media_path"] = staged_payload.get("source_path", "") + source = create_content_source( account_id=account["id"], project_id=project["id"], source_kind="real_cut_input", title=request.title.strip(), - local_path=request.input_dir.strip(), - metadata={"line_type": "real_cut"}, + source_url=source_url, + local_path=resolved_input_dir, + metadata=source_metadata, ) job_row = create_job_record( account_id=account["id"], @@ -1623,13 +1712,16 @@ async def create_real_cut_job(request: RealCutJobRequest, account: dict[str, Any line_type="real_cut", workflow_key="real_cut_pipeline", title=request.title.strip(), - source_url=request.input_dir.strip(), + source_url=resolved_input_dir, content_source_id=source["id"], artifacts={ + "source_job_id": source_job["id"] if source_job else "", + "source_media_path": staged_payload.get("source_path", ""), + "cutvideo_upload": staged_payload.get("upload", {}), "cutvideo_request": { "base_config": request.base_config.strip() or CUTVIDEO_BASE_CONFIG, "name": request.title.strip(), - "input_dir": request.input_dir.strip(), + "input_dir": resolved_input_dir, "objective": request.objective, "target_duration_sec": request.target_duration_sec, "target_aspect_ratio": request.target_aspect_ratio, diff --git a/docker-compose.yml b/docker-compose.yml index 0f78270..fe68c06 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,7 @@ services: CUTVIDEO_BASE_CONFIG: ${CUTVIDEO_BASE_CONFIG:-example.job.yaml} CUTVIDEO_POLL_INTERVAL_SEC: ${CUTVIDEO_POLL_INTERVAL_SEC:-10} CUTVIDEO_MAX_WAIT_SEC: ${CUTVIDEO_MAX_WAIT_SEC:-1800} + CUTVIDEO_UPLOAD_TIMEOUT_SEC: ${CUTVIDEO_UPLOAD_TIMEOUT_SEC:-1800} HUOBAO_BASE_URL: ${HUOBAO_BASE_URL:-http://host.docker.internal:5678} YTDLP_BIN: ${YTDLP_BIN:-yt-dlp} FFMPEG_BIN: ${FFMPEG_BIN:-ffmpeg} diff --git a/docs/AUDIT_2026-03-18.md b/docs/AUDIT_2026-03-18.md index 82177b1..53c45b7 100644 --- a/docs/AUDIT_2026-03-18.md +++ b/docs/AUDIT_2026-03-18.md @@ -79,14 +79,16 @@ - 仓库:`/Users/kris/code/cutvideo` - 具备清晰 API: - `POST /api/jobs` + - `POST /api/uploads` - `GET /api/tasks/{task_id}` - `GET /api/runs/{run_id}` - 适合集成为“由 StoryForge 后端授权调用的局域网剪辑能力” -当前限制: +当前状态: -- 现有 `cutvideo` API 主要接受 `input_dir` -- 对“用户上传实拍素材后直接推送到 Windows 机器”这一步,还缺一层文件转运方案 +- StoryForge 已支持把 `upload_video` 或已完成的 `video_link` 源素材自动上传到 `cutvideo` +- `real-cut` 任务可直接传 `source_job_id`,由后端完成 staging 后再提交到剪辑服务 +- Windows 机器还需要部署带 `POST /api/uploads` 的 `cutvideo` 分支版本 ### 4. `huobao-drama` diff --git a/docs/LAN_E2E_GUIDE_2026-03-18.md b/docs/LAN_E2E_GUIDE_2026-03-18.md index 25b5842..b9826db 100644 --- a/docs/LAN_E2E_GUIDE_2026-03-18.md +++ b/docs/LAN_E2E_GUIDE_2026-03-18.md @@ -100,12 +100,14 @@ docker compose up -d --build 当前 MVP 前提: -- `input_dir` 必须是 Windows `cutvideo` 机器可访问的目录 -- 该目录中的素材已准备好 +- 方式 A:直接传 `input_dir`,它必须是 Windows `cutvideo` 机器可访问的目录 +- 方式 B:传 `source_job_id`,`collector-service` 会把 `upload_video` 或已完成的 `video_link` 源素材自动上传到 Windows `cutvideo`,再继续发起任务 +- 如果走方式 B,大文件上传超时由 `CUTVIDEO_UPLOAD_TIMEOUT_SEC` 控制 预期: - 任务创建成功 +- 如果用了 `source_job_id`,任务 `artifacts.cutvideo_upload` 会记录 Windows staging 结果 - `n8n` 调用 `collector-service` 内部 real-cut step - 后端记录 `provider_task_id` - 最终任务写回 `cutvideo_run` @@ -113,6 +115,7 @@ docker compose up -d --build 已验证样例: - `job_5ebd829c3f2144bca5c941183e75bdcd` +- `job_01a6f283cbda42e4ae692b268b811a50` (`source_job_id` 自动 staging,本机 `cutvideo` 联调) - Windows 返回 `task_id=8d8f4a0cd5d9` - 运行目录 `20260318-093520-Windows cutvideo 联调样例` diff --git a/docs/MVP_STATUS_2026-03-18.md b/docs/MVP_STATUS_2026-03-18.md index 6cb1220..e0d6fde 100644 --- a/docs/MVP_STATUS_2026-03-18.md +++ b/docs/MVP_STATUS_2026-03-18.md @@ -13,6 +13,7 @@ - 本地 `ffmpeg` / `whisper` 风格入口封装 - 本地大模型内容分析、二创文案、分镜生成 - Windows `cutvideo` API 调度与结果回写接口 +- `upload_video -> source_job_id -> cutvideo` 自动 staging 闭环 - 本机 `huobao-drama` API 调度、首尾帧生成、视频生成与结果回写接口 - FastGPT 运行时依赖删除 @@ -20,6 +21,7 @@ - 分析链路:`job_203bc8e9b20f4b1cbbc6cf7da79e46f4` - 实拍剪辑链路:`job_5ebd829c3f2144bca5c941183e75bdcd` +- 实拍剪辑自动 staging 联调:`job_01a6f283cbda42e4ae692b268b811a50` - AI 视频链路:`job_01828c40377747cf914b51be360cc333` ## 已实现但仍待环境验证 @@ -28,14 +30,12 @@ ## 尚未完全跑通 -- 用户上传实拍素材后,自动把素材转运到 Windows `cutvideo` 机器的闭环 - 对“抖音 / bilibili / 小红书账号级内容源”的批量抓取与分析调度 -- `huobao-drama` 本地兼容补丁向 upstream 仓库的迁移、分支化和提交 +- Windows 机器上的 `cutvideo` 需要同步部署带 `POST /api/uploads` 的版本,当前自动 staging 已在本机联调通过 ## 下一步优先级 -1. 把 `huobao-drama` 本地兼容补丁迁到 `/Users/kris/code/huobao-drama-upstream` +1. 把 `cutvideo` 新上传接口部署到 Windows 机器 2. 绑定你的真实 ASR 入口 -3. 决定实拍素材转运方案:共享目录优先,上传 API 作为备选 -4. 补账号级内容源抓取调度 -5. 把改动整理成提交并推送 +3. 补账号级内容源抓取调度 +4. 把改动整理成提交并推送