feat: auto stage real-cut inputs to cutvideo

This commit is contained in:
kris
2026-03-20 06:57:53 +08:00
parent b145363111
commit 63af810236
7 changed files with 143 additions and 19 deletions

View File

@@ -12,6 +12,7 @@ CUTVIDEO_API_KEY=
CUTVIDEO_BASE_CONFIG=example.job.yaml
CUTVIDEO_POLL_INTERVAL_SEC=10
CUTVIDEO_MAX_WAIT_SEC=1800
CUTVIDEO_UPLOAD_TIMEOUT_SEC=1800
HUOBAO_BASE_URL=http://127.0.0.1:5678
HUOBAO_POLL_INTERVAL_SEC=10
HUOBAO_MAX_WAIT_SEC=900

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import mimetypes
from pathlib import Path
from typing import Any
import httpx
@@ -65,10 +67,18 @@ class N8NClient:
class CutVideoClient:
def __init__(self, *, base_url: str, api_key: str = "", timeout: float = 120.0) -> None:
def __init__(
self,
*,
base_url: str,
api_key: str = "",
timeout: float = 120.0,
upload_timeout: float = 1800.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.api_key = api_key.strip()
self.timeout = timeout
self.upload_timeout = upload_timeout
@property
def enabled(self) -> bool:
@@ -90,6 +100,21 @@ class CutVideoClient:
response.raise_for_status()
return _unwrap_response(response.json())
async def upload_source_file(self, source_path: Path, *, folder_name: str = "") -> dict[str, Any]:
content_type = mimetypes.guess_type(source_path.name)[0] or "application/octet-stream"
headers = self._headers()
data = {"folder_name": folder_name} if folder_name else {}
async with httpx.AsyncClient(timeout=self.upload_timeout) as client:
with source_path.open("rb") as handle:
response = await client.post(
_join_url(self.base_url, "/api/uploads"),
data=data,
files={"files": (source_path.name, handle, content_type)},
headers=headers,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_task(self, task_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(

View File

@@ -46,6 +46,7 @@ HUOBAO_BASE_URL = os.getenv("HUOBAO_BASE_URL", "http://127.0.0.1:5678")
CUTVIDEO_BASE_CONFIG = os.getenv("CUTVIDEO_BASE_CONFIG", "example.job.yaml")
CUTVIDEO_POLL_INTERVAL_SEC = int(os.getenv("CUTVIDEO_POLL_INTERVAL_SEC", "10"))
CUTVIDEO_MAX_WAIT_SEC = int(os.getenv("CUTVIDEO_MAX_WAIT_SEC", "1800"))
CUTVIDEO_UPLOAD_TIMEOUT_SEC = int(os.getenv("CUTVIDEO_UPLOAD_TIMEOUT_SEC", "1800"))
HUOBAO_POLL_INTERVAL_SEC = int(os.getenv("HUOBAO_POLL_INTERVAL_SEC", "10"))
HUOBAO_MAX_WAIT_SEC = int(os.getenv("HUOBAO_MAX_WAIT_SEC", "900"))
@@ -63,7 +64,11 @@ n8n_client = N8NClient(
},
shared_secret=ORCHESTRATOR_SHARED_SECRET,
)
cutvideo_client = CutVideoClient(base_url=CUTVIDEO_BASE_URL, api_key=CUTVIDEO_API_KEY)
cutvideo_client = CutVideoClient(
base_url=CUTVIDEO_BASE_URL,
api_key=CUTVIDEO_API_KEY,
upload_timeout=CUTVIDEO_UPLOAD_TIMEOUT_SEC,
)
huobao_client = HuobaoDramaClient(base_url=HUOBAO_BASE_URL)
app = FastAPI(title="StoryForge Collector Service", version="0.2.0")
@@ -185,7 +190,8 @@ class ContentSourceCreateRequest(BaseModel):
class RealCutJobRequest(BaseModel):
project_id: str = ""
title: str
input_dir: str
input_dir: str = ""
source_job_id: str = ""
base_config: str = ""
objective: str = "保留高信息密度片段,输出适合短视频平台的粗剪结果"
target_duration_sec: int = 60
@@ -866,6 +872,60 @@ def run_command(command: list[str], cwd: Path | None = None) -> tuple[int, str,
return proc.returncode, proc.stdout, proc.stderr
def validate_real_cut_source_job(source_job: dict[str, Any]) -> None:
source_type = source_job.get("source_type", "")
if source_type not in {"upload_video", "video_link"}:
raise HTTPException(status_code=400, detail="Real-cut source job must come from upload_video or video_link")
if source_type == "video_link" and source_job.get("status") != "completed":
raise HTTPException(status_code=409, detail="Video link source job must be completed before real-cut staging")
def resolve_real_cut_source_file(source_job: dict[str, Any]) -> tuple[Path, dict[str, Any] | None]:
validate_real_cut_source_job(source_job)
artifacts = parse_job_artifacts(source_job)
candidates: list[Path] = []
if artifacts.get("uploaded_path"):
candidates.append(Path(str(artifacts["uploaded_path"])))
if artifacts.get("source_path"):
candidates.append(Path(str(artifacts["source_path"])))
if source_job.get("content_source_id"):
source_row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_job["content_source_id"],))
if source_row and source_row.get("local_path"):
candidates.append(Path(str(source_row["local_path"])))
if source_job.get("source_type") == "video_link":
candidates.append(JOBS_DIR / source_job["id"] / "source.mp4")
seen: set[str] = set()
for candidate in candidates:
candidate_str = str(candidate)
if not candidate_str or candidate_str in seen:
continue
seen.add(candidate_str)
if candidate.exists() and candidate.is_file():
return candidate, artifacts
raise HTTPException(status_code=409, detail="Source job media file is not available for real-cut staging")
async def stage_real_cut_source_to_cutvideo(source_job: dict[str, Any]) -> dict[str, Any]:
if not cutvideo_client.enabled:
raise HTTPException(status_code=503, detail="CutVideo is not configured")
source_path, source_artifacts = resolve_real_cut_source_file(source_job)
folder_name = f"storyforge-{source_job['id']}"
upload_payload = await cutvideo_client.upload_source_file(source_path, folder_name=folder_name)
input_dir = str(upload_payload.get("input_dir") or "").strip()
if not input_dir:
raise HTTPException(status_code=502, detail="CutVideo upload did not return input_dir")
return {
"input_dir": input_dir,
"source_path": str(source_path),
"upload": upload_payload,
"source_artifacts": source_artifacts,
}
def create_job_record(
*,
account_id: str,
@@ -1165,6 +1225,7 @@ def healthz() -> dict[str, Any]:
"localModelBaseUrl": LOCAL_OPENAI_BASE_URL,
"n8nBaseUrl": N8N_BASE_URL,
"cutvideoBaseUrl": CUTVIDEO_BASE_URL,
"cutvideoUploadTimeoutSec": CUTVIDEO_UPLOAD_TIMEOUT_SEC,
"huobaoBaseUrl": HUOBAO_BASE_URL,
}
@@ -1605,15 +1666,43 @@ async def upload_video(
@app.post("/v2/pipelines/real-cut")
async def create_real_cut_job(request: RealCutJobRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
project = resolve_target_project(account["id"], request.project_id or None, username=account["username"])
source_job = None
source_job_id = request.source_job_id.strip()
if source_job_id:
source_job = load_owned_job(source_job_id, account["id"])
requested_project_id = request.project_id or (source_job.get("project_id", "") if source_job else "")
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
if source_job and source_job.get("project_id") and source_job.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Source job does not belong to target project")
kb = ensure_user_kb(account["id"], project["id"], username=account["username"])
resolved_input_dir = request.input_dir.strip()
staged_payload: dict[str, Any] = {}
if not resolved_input_dir:
if not source_job:
raise HTTPException(status_code=400, detail="input_dir or source_job_id is required")
staged_payload = await stage_real_cut_source_to_cutvideo(source_job)
resolved_input_dir = staged_payload["input_dir"]
source_url = resolved_input_dir
source_metadata: dict[str, Any] = {"line_type": "real_cut"}
if source_job:
source_url = source_job.get("source_url") or resolved_input_dir
source_metadata["source_job_id"] = source_job["id"]
source_metadata["source_job_type"] = source_job.get("source_type", "")
if staged_payload:
source_metadata["cutvideo_upload"] = staged_payload.get("upload", {})
source_metadata["source_media_path"] = staged_payload.get("source_path", "")
source = create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind="real_cut_input",
title=request.title.strip(),
local_path=request.input_dir.strip(),
metadata={"line_type": "real_cut"},
source_url=source_url,
local_path=resolved_input_dir,
metadata=source_metadata,
)
job_row = create_job_record(
account_id=account["id"],
@@ -1623,13 +1712,16 @@ async def create_real_cut_job(request: RealCutJobRequest, account: dict[str, Any
line_type="real_cut",
workflow_key="real_cut_pipeline",
title=request.title.strip(),
source_url=request.input_dir.strip(),
source_url=resolved_input_dir,
content_source_id=source["id"],
artifacts={
"source_job_id": source_job["id"] if source_job else "",
"source_media_path": staged_payload.get("source_path", ""),
"cutvideo_upload": staged_payload.get("upload", {}),
"cutvideo_request": {
"base_config": request.base_config.strip() or CUTVIDEO_BASE_CONFIG,
"name": request.title.strip(),
"input_dir": request.input_dir.strip(),
"input_dir": resolved_input_dir,
"objective": request.objective,
"target_duration_sec": request.target_duration_sec,
"target_aspect_ratio": request.target_aspect_ratio,

View File

@@ -42,6 +42,7 @@ services:
CUTVIDEO_BASE_CONFIG: ${CUTVIDEO_BASE_CONFIG:-example.job.yaml}
CUTVIDEO_POLL_INTERVAL_SEC: ${CUTVIDEO_POLL_INTERVAL_SEC:-10}
CUTVIDEO_MAX_WAIT_SEC: ${CUTVIDEO_MAX_WAIT_SEC:-1800}
CUTVIDEO_UPLOAD_TIMEOUT_SEC: ${CUTVIDEO_UPLOAD_TIMEOUT_SEC:-1800}
HUOBAO_BASE_URL: ${HUOBAO_BASE_URL:-http://host.docker.internal:5678}
YTDLP_BIN: ${YTDLP_BIN:-yt-dlp}
FFMPEG_BIN: ${FFMPEG_BIN:-ffmpeg}

View File

@@ -79,14 +79,16 @@
- 仓库:`/Users/kris/code/cutvideo`
- 具备清晰 API
- `POST /api/jobs`
- `POST /api/uploads`
- `GET /api/tasks/{task_id}`
- `GET /api/runs/{run_id}`
- 适合集成为“由 StoryForge 后端授权调用的局域网剪辑能力”
当前限制
当前状态
- 现有 `cutvideo` API 主要接受 `input_dir`
- 对“用户上传实拍素材后直接推送到 Windows 机器”这一步,还缺一层文件转运方案
- StoryForge 已支持把 `upload_video` 或已完成的 `video_link` 源素材自动上传到 `cutvideo`
- `real-cut` 任务可直接传 `source_job_id`,由后端完成 staging 后再提交到剪辑服务
- Windows 机器还需要部署带 `POST /api/uploads``cutvideo` 分支版本
### 4. `huobao-drama`

View File

@@ -100,12 +100,14 @@ docker compose up -d --build
当前 MVP 前提:
- `input_dir` 必须是 Windows `cutvideo` 机器可访问的目录
- 该目录中的素材已准备好
- 方式 A直接传 `input_dir`,它必须是 Windows `cutvideo` 机器可访问的目录
- 方式 B`source_job_id``collector-service` 会把 `upload_video` 或已完成的 `video_link` 源素材自动上传到 Windows `cutvideo`,再继续发起任务
- 如果走方式 B大文件上传超时由 `CUTVIDEO_UPLOAD_TIMEOUT_SEC` 控制
预期:
- 任务创建成功
- 如果用了 `source_job_id`,任务 `artifacts.cutvideo_upload` 会记录 Windows staging 结果
- `n8n` 调用 `collector-service` 内部 real-cut step
- 后端记录 `provider_task_id`
- 最终任务写回 `cutvideo_run`
@@ -113,6 +115,7 @@ docker compose up -d --build
已验证样例:
- `job_5ebd829c3f2144bca5c941183e75bdcd`
- `job_01a6f283cbda42e4ae692b268b811a50` (`source_job_id` 自动 staging本机 `cutvideo` 联调)
- Windows 返回 `task_id=8d8f4a0cd5d9`
- 运行目录 `20260318-093520-Windows cutvideo 联调样例`

View File

@@ -13,6 +13,7 @@
- 本地 `ffmpeg` / `whisper` 风格入口封装
- 本地大模型内容分析、二创文案、分镜生成
- Windows `cutvideo` API 调度与结果回写接口
- `upload_video -> source_job_id -> cutvideo` 自动 staging 闭环
- 本机 `huobao-drama` API 调度、首尾帧生成、视频生成与结果回写接口
- FastGPT 运行时依赖删除
@@ -20,6 +21,7 @@
- 分析链路:`job_203bc8e9b20f4b1cbbc6cf7da79e46f4`
- 实拍剪辑链路:`job_5ebd829c3f2144bca5c941183e75bdcd`
- 实拍剪辑自动 staging 联调:`job_01a6f283cbda42e4ae692b268b811a50`
- AI 视频链路:`job_01828c40377747cf914b51be360cc333`
## 已实现但仍待环境验证
@@ -28,14 +30,12 @@
## 尚未完全跑通
- 用户上传实拍素材后,自动把素材转运到 Windows `cutvideo` 机器的闭环
- 对“抖音 / bilibili / 小红书账号级内容源”的批量抓取与分析调度
- `huobao-drama` 本地兼容补丁向 upstream 仓库的迁移、分支化和提交
- Windows 机器上的 `cutvideo` 需要同步部署带 `POST /api/uploads` 的版本,当前自动 staging 已在本机联调通过
## 下一步优先级
1.`huobao-drama` 本地兼容补丁迁到 `/Users/kris/code/huobao-drama-upstream`
1.`cutvideo` 新上传接口部署到 Windows 机器
2. 绑定你的真实 ASR 入口
3. 决定实拍素材转运方案:共享目录优先,上传 API 作为备选
4. 补账号级内容源抓取调度
5. 把改动整理成提交并推送
3. 补账号级内容源抓取调度
4. 把改动整理成提交并推送