diff --git a/.env.example b/.env.example index 193ac1b..f16a6d9 100644 --- a/.env.example +++ b/.env.example @@ -2,15 +2,25 @@ DEFAULT_EXTERNAL_BASE_URL=http://test.hyzq.net:8081 LOCAL_OPENAI_BASE_URL=http://127.0.0.1:8317/v1 LOCAL_OPENAI_MODEL=GLM-5 LOCAL_OPENAI_API_KEY= -FASTGPT_BASE_URL=http://127.0.0.1:3000 -FASTGPT_DATASET_API_KEY= +N8N_BASE_URL=http://127.0.0.1:5670 +N8N_ANALYSIS_WEBHOOK_PATH=/webhook/storyforge-analysis +N8N_REAL_CUT_WEBHOOK_PATH=/webhook/storyforge-real-cut +N8N_AI_VIDEO_WEBHOOK_PATH=/webhook/storyforge-ai-video +ORCHESTRATOR_SHARED_SECRET=storyforge-local-secret +CUTVIDEO_BASE_URL= +CUTVIDEO_API_KEY= +CUTVIDEO_BASE_CONFIG=example.job.yaml +CUTVIDEO_POLL_INTERVAL_SEC=10 +CUTVIDEO_MAX_WAIT_SEC=1800 +HUOBAO_BASE_URL=http://127.0.0.1:5678 +HUOBAO_POLL_INTERVAL_SEC=10 +HUOBAO_MAX_WAIT_SEC=900 YTDLP_BIN=yt-dlp FFMPEG_BIN=ffmpeg WHISPER_BIN= WHISPER_MODEL=./data/collector/models/ggml-base.en.bin -POSTGRES_DB=fastgpt -POSTGRES_USER=postgres -POSTGRES_PASSWORD=postgres -MINIO_ROOT_USER=minioadmin -MINIO_ROOT_PASSWORD=minioadmin +N8N_IMAGE=docker.n8n.io/n8nio/n8n:latest +WEBHOOK_URL=http://127.0.0.1:5670/ +GENERIC_TIMEZONE=Asia/Shanghai +TZ=Asia/Shanghai CLIPROXY_IMAGE=storyforge/cli-proxy-api:patched diff --git a/README.md b/README.md index 3eabcd8..c2a3f0c 100644 --- a/README.md +++ b/README.md @@ -5,36 +5,68 @@ StoryForge 现在拆成独立项目目录,和 `AI-glasses` 分开维护。 ## 目录 - `android-app/`:StoryForge Android 客户端 -- `collector-service/`:FastAPI 后端,提供登录、审批、素材导入、知识库、智能体和 OTA -- `docker-compose.yml`:本地 FastGPT / collector / 基础依赖编排 +- `collector-service/`:FastAPI 后端,负责用户体系、项目、Agent、任务、内容分析和对外能力接入 +- `n8n/`:工作流导出文件,作为流程编排中枢 +- `docker-compose.yml`:本地 `collector + n8n + cli-proxy-api` 编排 - `Common/`:项目约束和架构说明 - `data/collector/`:SQLite、任务文件、下载产物 +- `docs/`:审计、实施计划、联调说明、当前 MVP 状态 ## Android ```bash -cd /Users/kris/code/StoryForge/android-app +cd /Users/kris/code/StoryForge-gitea/android-app ./gradlew assembleDebug ``` ## Collector Service ```bash -cd /Users/kris/code/StoryForge/collector-service +cd /Users/kris/code/StoryForge-gitea/collector-service python3 -m venv .venv source .venv/bin/activate pip install -r requirements.txt uvicorn app.main:app --host 0.0.0.0 --port 8081 --reload ``` +## Docker Compose + +```bash +cd /Users/kris/code/StoryForge-gitea +cp .env.example .env +docker compose up -d --build +``` + +默认会启动: + +- `collector-service`:`http://127.0.0.1:8081` +- `n8n`:`http://127.0.0.1:5670` +- `cli-proxy-api`:`http://127.0.0.1:8317` + 默认会创建最高权限账号: - `kris` - `Asd123456.` +## 当前架构 + +- `collector-service` 负责: + - 用户账号、多项目、多 Agent、多任务、多内容源数据边界 + - 调用下载器、本地 ASR、本机 OpenAI 兼容模型 + - 调用 Windows `cutvideo` 和 `huobao-drama` + - 持久化任务、分镜、分析结果、事件日志 +- `n8n` 负责: + - 触发 `analysis_pipeline` + - 触发 `real_cut_pipeline` + - 触发 `ai_video_pipeline` +- FastGPT 已从主流程设计中移除,不再作为运行时依赖 + ## 说明 - 新注册账号默认 `pending` - 主管理员审批后才可使用核心业务接口 -- 素材入口支持文字、视频链接、视频上传 -- 可选对接本机 OpenAI 兼容模型服务和 FastGPT 数据集 API +- 支持 `user -> project -> knowledge base / assistant(agent) / job / content source` 的多租户边界 +- 素材入口支持文字、视频链接、视频上传;内容源账号通过 `content_sources` 建模持久化 +- `cutvideo` 继续运行在 Windows 机器,本系统通过 API 调度 +- `huobao-drama` 继续作为 AI 生成视频主链的核心引擎 +- 详细审计、阶段计划和联调步骤见 `docs/` diff --git a/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt b/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt index 663c0cf..b0fba52 100644 --- a/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt +++ b/android-app/app/src/main/java/com/aiglasses/app/storyforge/StoryForgeModels.kt @@ -1,6 +1,10 @@ package com.aiglasses.app.storyforge import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.buildJsonArray +import kotlinx.serialization.json.buildJsonObject @Serializable data class RegisterAccountRequest( @@ -66,12 +70,22 @@ data class PreferredModelRequest( ) @Serializable -data class KnowledgeBaseDto( +data class ProjectDto( val id: String, val user_id: String, val name: String, val description: String = "", - val fastgpt_dataset_id: String? = null, + val created_at: String = "", + val updated_at: String = "" +) + +@Serializable +data class KnowledgeBaseDto( + val id: String, + val user_id: String, + val project_id: String = "", + val name: String, + val description: String = "", val sync_status: String = "pending", val document_count: Int = 0, val linked_assistant_count: Int = 0, @@ -82,6 +96,7 @@ data class KnowledgeBaseDto( @Serializable data class KnowledgeBaseCreateRequest( val name: String, + val project_id: String = "", val description: String = "" ) @@ -89,12 +104,13 @@ data class KnowledgeBaseCreateRequest( data class AssistantDto( val id: String, val user_id: String, + val project_id: String = "", val name: String, val description: String = "", val system_prompt: String = "", val generation_goal: String = "", val knowledge_base_ids: List = emptyList(), - val fastgpt_app_key: String = "", + val config: JsonObject = buildJsonObject { }, val model_profile_id: String = "", val created_at: String = "", val updated_at: String = "" @@ -107,7 +123,7 @@ data class AssistantCreateRequest( val system_prompt: String = "", val generation_goal: String = "", val knowledge_base_ids: List = emptyList(), - val fastgpt_app_key: String = "", + val project_id: String = "", val model_profile_id: String = "" ) @@ -118,7 +134,7 @@ data class AssistantUpdateRequest( val system_prompt: String? = null, val generation_goal: String? = null, val knowledge_base_ids: List? = null, - val fastgpt_app_key: String? = null, + val project_id: String? = null, val model_profile_id: String? = null ) @@ -126,6 +142,7 @@ data class AssistantUpdateRequest( data class ExploreVideoLinkRequest( val video_url: String, val title: String? = null, + val project_id: String? = null, val knowledge_base_id: String? = null, val assistant_id: String? = null, val analysis_model_profile_id: String? = null, @@ -136,6 +153,7 @@ data class ExploreVideoLinkRequest( data class ExploreTextRequest( val title: String, val content: String, + val project_id: String? = null, val knowledge_base_id: String? = null, val assistant_id: String? = null, val analysis_model_profile_id: String? = null @@ -145,19 +163,26 @@ data class ExploreTextRequest( data class JobDto( val id: String, val user_id: String, + val project_id: String = "", val assistant_id: String? = null, val knowledge_base_id: String, + val content_source_id: String = "", val source_type: String, + val line_type: String = "analysis", + val workflow_key: String = "", + val orchestrator: String = "n8n", + val provider_name: String = "", + val provider_task_id: String = "", val source_url: String? = null, val title: String, val language: String, val status: String, val transcript_text: String = "", val style_summary: String = "", - val fastgpt_collection_id: String = "", val upload_status: String = "pending", val error: String = "", - val artifacts: Map = emptyMap(), + val artifacts: JsonObject = buildJsonObject { }, + val result: JsonObject = buildJsonObject { }, val analysis_model_profile_id: String = "", val created_at: String = "", val updated_at: String = "" @@ -173,7 +198,9 @@ data class KnowledgeDocumentDto( val transcript_text: String = "", val style_summary: String = "", val combined_text: String = "", - val fastgpt_collection_id: String = "", + val analysis: JsonObject = buildJsonObject { }, + val storyboards: JsonArray = buildJsonArray { }, + val source_artifacts: JsonObject = buildJsonObject { }, val analysis_model_profile_id: String = "", val created_at: String = "", val updated_at: String = "" @@ -200,6 +227,7 @@ data class GenerateCopyResponseDto( @Serializable data class DashboardDto( val account: AccountDto, + val projects: List = emptyList(), val knowledge_bases: List = emptyList(), val assistants: List = emptyList(), val recent_jobs: List = emptyList(), diff --git a/collector-service/app/database.py b/collector-service/app/database.py index 90d6180..382dc56 100644 --- a/collector-service/app/database.py +++ b/collector-service/app/database.py @@ -48,6 +48,18 @@ class Database: with self.session() as conn: conn.execute(sql, params) + def table_exists(self, name: str) -> bool: + row = self.fetch_one( + "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", + (name,), + ) + return bool(row) + + def column_exists(self, table: str, column: str) -> bool: + with self.session() as conn: + rows = conn.execute(f"PRAGMA table_info({table})").fetchall() + return any(row["name"] == column for row in rows) + def init_schema(self) -> None: schema = """ CREATE TABLE IF NOT EXISTS accounts ( @@ -90,10 +102,10 @@ class Database: CREATE TABLE IF NOT EXISTS knowledge_bases ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, + project_id TEXT, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', - fastgpt_dataset_id TEXT, - sync_status TEXT NOT NULL DEFAULT 'pending', + sync_status TEXT NOT NULL DEFAULT 'ready', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE @@ -108,7 +120,9 @@ class Database: transcript_text TEXT NOT NULL DEFAULT '', style_summary TEXT NOT NULL DEFAULT '', combined_text TEXT NOT NULL DEFAULT '', - fastgpt_collection_id TEXT NOT NULL DEFAULT '', + analysis_json TEXT NOT NULL DEFAULT '{}', + storyboard_json TEXT NOT NULL DEFAULT '[]', + source_artifact_json TEXT NOT NULL DEFAULT '{}', analysis_model_profile_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, @@ -118,11 +132,12 @@ class Database: CREATE TABLE IF NOT EXISTS assistants ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, + project_id TEXT, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', system_prompt TEXT NOT NULL DEFAULT '', generation_goal TEXT NOT NULL DEFAULT '', - fastgpt_app_key TEXT NOT NULL DEFAULT '', + config_json TEXT NOT NULL DEFAULT '{}', model_profile_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, @@ -140,19 +155,26 @@ class Database: CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, + project_id TEXT, assistant_id TEXT, knowledge_base_id TEXT NOT NULL, + content_source_id TEXT, source_type TEXT NOT NULL, + line_type TEXT NOT NULL DEFAULT 'analysis', + workflow_key TEXT NOT NULL DEFAULT '', + orchestrator TEXT NOT NULL DEFAULT 'n8n', + provider_name TEXT NOT NULL DEFAULT '', + provider_task_id TEXT NOT NULL DEFAULT '', source_url TEXT, title TEXT NOT NULL, language TEXT NOT NULL DEFAULT 'auto', status TEXT NOT NULL, transcript_text TEXT NOT NULL DEFAULT '', style_summary TEXT NOT NULL DEFAULT '', - fastgpt_collection_id TEXT NOT NULL DEFAULT '', upload_status TEXT NOT NULL DEFAULT 'pending', error TEXT NOT NULL DEFAULT '', artifacts_json TEXT NOT NULL DEFAULT '{}', + result_json TEXT NOT NULL DEFAULT '{}', analysis_model_profile_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, @@ -161,6 +183,42 @@ class Database: FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE ); + CREATE TABLE IF NOT EXISTS projects ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + name TEXT NOT NULL, + description TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS content_sources ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT, + source_kind TEXT NOT NULL, + platform TEXT NOT NULL DEFAULT '', + handle TEXT NOT NULL DEFAULT '', + source_url TEXT NOT NULL DEFAULT '', + title TEXT NOT NULL DEFAULT '', + local_path TEXT NOT NULL DEFAULT '', + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS job_events ( + id TEXT PRIMARY KEY, + job_id TEXT NOT NULL, + event_type TEXT NOT NULL, + payload_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE + ); + CREATE TABLE IF NOT EXISTS app_updates ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, @@ -179,3 +237,102 @@ class Database: """ with self.session() as conn: conn.executescript(schema) + self.migrate_schema() + + def migrate_schema(self) -> None: + table_columns: dict[str, dict[str, str]] = { + "knowledge_bases": { + "project_id": "TEXT", + }, + "knowledge_documents": { + "analysis_json": "TEXT NOT NULL DEFAULT '{}'", + "storyboard_json": "TEXT NOT NULL DEFAULT '[]'", + "source_artifact_json": "TEXT NOT NULL DEFAULT '{}'", + }, + "assistants": { + "project_id": "TEXT", + "config_json": "TEXT NOT NULL DEFAULT '{}'", + }, + "jobs": { + "project_id": "TEXT", + "content_source_id": "TEXT", + "line_type": "TEXT NOT NULL DEFAULT 'analysis'", + "workflow_key": "TEXT NOT NULL DEFAULT ''", + "orchestrator": "TEXT NOT NULL DEFAULT 'n8n'", + "provider_name": "TEXT NOT NULL DEFAULT ''", + "provider_task_id": "TEXT NOT NULL DEFAULT ''", + "result_json": "TEXT NOT NULL DEFAULT '{}'", + }, + } + + for table, columns in table_columns.items(): + if not self.table_exists(table): + continue + for column, definition in columns.items(): + if self.column_exists(table, column): + continue + self.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}") + + self.ensure_default_projects() + + def ensure_default_projects(self) -> None: + if not self.table_exists("projects"): + return + + accounts = self.fetch_all("SELECT id, username FROM accounts ORDER BY created_at ASC") + for account in accounts: + project = self.fetch_one( + "SELECT * FROM projects WHERE user_id = ? ORDER BY created_at ASC LIMIT 1", + (account["id"],), + ) + if not project: + project_id = f"proj_{account['id']}" + now = utc_now() + self.execute( + """ + INSERT INTO projects (id, user_id, name, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + project_id, + account["id"], + f"{account['username']} 默认项目", + "系统自动创建的默认项目", + now, + now, + ), + ) + project = self.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,)) + + if not project: + continue + + if self.column_exists("knowledge_bases", "project_id"): + self.execute( + """ + UPDATE knowledge_bases + SET project_id = ? + WHERE user_id = ? AND (project_id IS NULL OR project_id = '') + """, + (project["id"], account["id"]), + ) + + if self.column_exists("assistants", "project_id"): + self.execute( + """ + UPDATE assistants + SET project_id = ? + WHERE user_id = ? AND (project_id IS NULL OR project_id = '') + """, + (project["id"], account["id"]), + ) + + if self.column_exists("jobs", "project_id"): + self.execute( + """ + UPDATE jobs + SET project_id = ? + WHERE user_id = ? AND (project_id IS NULL OR project_id = '') + """, + (project["id"], account["id"]), + ) diff --git a/collector-service/app/fastgpt.py b/collector-service/app/fastgpt.py deleted file mode 100644 index a03848b..0000000 --- a/collector-service/app/fastgpt.py +++ /dev/null @@ -1,48 +0,0 @@ -from __future__ import annotations - -from typing import Any - -import httpx - - -class FastGPTClient: - def __init__(self, *, base_url: str, dataset_api_key: str, timeout: float = 60.0) -> None: - self.base_url = base_url.rstrip("/") - self.dataset_api_key = dataset_api_key.strip() - self.timeout = timeout - - @property - def enabled(self) -> bool: - return bool(self.base_url and self.dataset_api_key) - - async def ensure_dataset(self, name: str, intro: str = "") -> dict[str, Any] | None: - if not self.enabled: - return None - payload = {"name": name, "intro": intro} - async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.post( - f"{self.base_url}/api/core/dataset/create", - headers={"Authorization": f"Bearer {self.dataset_api_key}"}, - json=payload, - ) - response.raise_for_status() - return response.json().get("data") or response.json() - - async def add_text_document(self, dataset_id: str, name: str, text: str) -> dict[str, Any] | None: - if not self.enabled or not dataset_id.strip(): - return None - payload = { - "datasetId": dataset_id, - "type": "text", - "name": name, - "trainingType": "chunk", - "text": text, - } - async with httpx.AsyncClient(timeout=self.timeout) as client: - response = await client.post( - f"{self.base_url}/api/core/dataset/collection/create/text", - headers={"Authorization": f"Bearer {self.dataset_api_key}"}, - json=payload, - ) - response.raise_for_status() - return response.json().get("data") or response.json() diff --git a/collector-service/app/integrations.py b/collector-service/app/integrations.py new file mode 100644 index 0000000..dd48d08 --- /dev/null +++ b/collector-service/app/integrations.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +from typing import Any + +import httpx + + +def _join_url(base_url: str, path: str) -> str: + base = base_url.rstrip("/") + if path.startswith("http://") or path.startswith("https://"): + return path + return f"{base}/{path.lstrip('/')}" + + +def _unwrap_response(payload: Any) -> dict[str, Any]: + if not isinstance(payload, dict): + return {"value": payload} + if payload.get("success") is True and "data" in payload: + data = payload.get("data") + if isinstance(data, dict): + return data + return {"value": data} + return payload + + +class N8NClient: + def __init__( + self, + *, + base_url: str, + workflow_paths: dict[str, str], + shared_secret: str = "", + timeout: float = 60.0, + ) -> None: + self.base_url = base_url.rstrip("/") + self.workflow_paths = workflow_paths + self.shared_secret = shared_secret.strip() + self.timeout = timeout + + @property + def enabled(self) -> bool: + return bool(self.base_url) + + async def trigger(self, workflow_key: str, payload: dict[str, Any]) -> dict[str, Any]: + workflow_path = self.workflow_paths.get(workflow_key, "").strip() + if not workflow_path: + raise ValueError(f"workflow path not configured for {workflow_key}") + try: + workflow_path = workflow_path.format(**payload) + except KeyError: + pass + headers: dict[str, str] = {} + if self.shared_secret: + headers["X-Orchestrator-Secret"] = self.shared_secret + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + _join_url(self.base_url, workflow_path), + json=payload, + headers=headers, + ) + response.raise_for_status() + if not response.content: + return {"triggered": True} + return _unwrap_response(response.json()) + + +class CutVideoClient: + def __init__(self, *, base_url: str, api_key: str = "", timeout: float = 120.0) -> None: + self.base_url = base_url.rstrip("/") + self.api_key = api_key.strip() + self.timeout = timeout + + @property + def enabled(self) -> bool: + return bool(self.base_url) + + def _headers(self) -> dict[str, str]: + headers: dict[str, str] = {} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + return headers + + async def submit_job(self, payload: dict[str, Any]) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + _join_url(self.base_url, "/api/jobs"), + json=payload, + headers=self._headers(), + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + async def get_task(self, task_id: str) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + _join_url(self.base_url, f"/api/tasks/{task_id}"), + headers=self._headers(), + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + async def get_run(self, run_id: str) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + _join_url(self.base_url, f"/api/runs/{run_id}"), + headers=self._headers(), + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + +class HuobaoDramaClient: + def __init__(self, *, base_url: str, timeout: float = 180.0) -> None: + self.base_url = base_url.rstrip("/") + self.timeout = timeout + + @property + def enabled(self) -> bool: + return bool(self.base_url) + + async def create_drama(self, payload: dict[str, Any]) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + _join_url(self.base_url, "/api/v1/dramas"), + json=payload, + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + async def generate_image(self, payload: dict[str, Any]) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + _join_url(self.base_url, "/api/v1/images"), + json=payload, + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + async def get_image(self, image_id: str) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + _join_url(self.base_url, f"/api/v1/images/{image_id}"), + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + async def generate_video(self, payload: dict[str, Any]) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + _join_url(self.base_url, "/api/v1/videos"), + json=payload, + ) + response.raise_for_status() + return _unwrap_response(response.json()) + + async def get_video(self, video_id: str) -> dict[str, Any]: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + _join_url(self.base_url, f"/api/v1/videos/{video_id}"), + ) + response.raise_for_status() + return _unwrap_response(response.json()) diff --git a/collector-service/app/main.py b/collector-service/app/main.py index 0ee5460..68cd77a 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -1,7 +1,9 @@ from __future__ import annotations +import asyncio import json import os +import re import secrets import shutil import subprocess @@ -10,13 +12,13 @@ from datetime import datetime, timezone from pathlib import Path from typing import Any -from fastapi import BackgroundTasks, Depends, FastAPI, File, Form, Header, HTTPException, Query, UploadFile +from fastapi import Body, Depends, FastAPI, File, Form, Header, HTTPException, Query, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from .database import Database, utc_now -from .fastgpt import FastGPTClient +from .integrations import CutVideoClient, HuobaoDramaClient, N8NClient from .openai_compat import OpenAICompatClient BASE_DIR = Path(__file__).resolve().parents[2] @@ -29,19 +31,40 @@ DEFAULT_EXTERNAL_BASE_URL = os.getenv("DEFAULT_EXTERNAL_BASE_URL", "https://test LOCAL_OPENAI_BASE_URL = os.getenv("LOCAL_OPENAI_BASE_URL", "http://127.0.0.1:8317/v1") LOCAL_OPENAI_MODEL = os.getenv("LOCAL_OPENAI_MODEL", "GLM-5") LOCAL_OPENAI_API_KEY = os.getenv("LOCAL_OPENAI_API_KEY", "") -FASTGPT_BASE_URL = os.getenv("FASTGPT_BASE_URL", "http://127.0.0.1:3000") -FASTGPT_DATASET_API_KEY = os.getenv("FASTGPT_DATASET_API_KEY", "") YTDLP_BIN = os.getenv("YTDLP_BIN", "yt-dlp") FFMPEG_BIN = os.getenv("FFMPEG_BIN", "ffmpeg") WHISPER_BIN = os.getenv("WHISPER_BIN", "") WHISPER_MODEL = os.getenv("WHISPER_MODEL", str(MODELS_DIR / "ggml-base.en.bin")) +N8N_BASE_URL = os.getenv("N8N_BASE_URL", "http://127.0.0.1:5670") +N8N_ANALYSIS_WEBHOOK_PATH = os.getenv("N8N_ANALYSIS_WEBHOOK_PATH", "/webhook/storyforge-analysis") +N8N_REAL_CUT_WEBHOOK_PATH = os.getenv("N8N_REAL_CUT_WEBHOOK_PATH", "/webhook/storyforge-real-cut") +N8N_AI_VIDEO_WEBHOOK_PATH = os.getenv("N8N_AI_VIDEO_WEBHOOK_PATH", "/webhook/storyforge-ai-video") +ORCHESTRATOR_SHARED_SECRET = os.getenv("ORCHESTRATOR_SHARED_SECRET", "") +CUTVIDEO_BASE_URL = os.getenv("CUTVIDEO_BASE_URL", "") +CUTVIDEO_API_KEY = os.getenv("CUTVIDEO_API_KEY", "") +HUOBAO_BASE_URL = os.getenv("HUOBAO_BASE_URL", "http://127.0.0.1:5678") +CUTVIDEO_BASE_CONFIG = os.getenv("CUTVIDEO_BASE_CONFIG", "example.job.yaml") +CUTVIDEO_POLL_INTERVAL_SEC = int(os.getenv("CUTVIDEO_POLL_INTERVAL_SEC", "10")) +CUTVIDEO_MAX_WAIT_SEC = int(os.getenv("CUTVIDEO_MAX_WAIT_SEC", "1800")) +HUOBAO_POLL_INTERVAL_SEC = int(os.getenv("HUOBAO_POLL_INTERVAL_SEC", "10")) +HUOBAO_MAX_WAIT_SEC = int(os.getenv("HUOBAO_MAX_WAIT_SEC", "900")) for path in (DATA_DIR, DOWNLOADS_DIR, JOBS_DIR, MODELS_DIR): path.mkdir(parents=True, exist_ok=True) db = Database(DB_PATH) openai_client = OpenAICompatClient() -fastgpt_client = FastGPTClient(base_url=FASTGPT_BASE_URL, dataset_api_key=FASTGPT_DATASET_API_KEY) +n8n_client = N8NClient( + base_url=N8N_BASE_URL, + workflow_paths={ + "analysis_pipeline": N8N_ANALYSIS_WEBHOOK_PATH, + "real_cut_pipeline": N8N_REAL_CUT_WEBHOOK_PATH, + "ai_video_pipeline": N8N_AI_VIDEO_WEBHOOK_PATH, + }, + shared_secret=ORCHESTRATOR_SHARED_SECRET, +) +cutvideo_client = CutVideoClient(base_url=CUTVIDEO_BASE_URL, api_key=CUTVIDEO_API_KEY) +huobao_client = HuobaoDramaClient(base_url=HUOBAO_BASE_URL) app = FastAPI(title="StoryForge Collector Service", version="0.2.0") app.add_middleware( @@ -80,11 +103,13 @@ class PreferredModelRequest(BaseModel): class KnowledgeBaseCreateRequest(BaseModel): name: str description: str = "" + project_id: str = "" class ExploreVideoLinkRequest(BaseModel): video_url: str title: str | None = None + project_id: str | None = None knowledge_base_id: str | None = None assistant_id: str | None = None analysis_model_profile_id: str | None = None @@ -94,6 +119,7 @@ class ExploreVideoLinkRequest(BaseModel): class ExploreTextRequest(BaseModel): title: str content: str + project_id: str | None = None knowledge_base_id: str | None = None assistant_id: str | None = None analysis_model_profile_id: str | None = None @@ -105,7 +131,7 @@ class AssistantCreateRequest(BaseModel): system_prompt: str = "" generation_goal: str = "" knowledge_base_ids: list[str] = Field(default_factory=list) - fastgpt_app_key: str = "" + project_id: str = "" model_profile_id: str = "" @@ -115,7 +141,7 @@ class AssistantUpdateRequest(BaseModel): system_prompt: str | None = None generation_goal: str | None = None knowledge_base_ids: list[str] | None = None - fastgpt_app_key: str | None = None + project_id: str | None = None model_profile_id: str | None = None @@ -140,6 +166,70 @@ class PublishAppUpdateRequest(BaseModel): isActive: bool = True +class ProjectCreateRequest(BaseModel): + name: str + description: str = "" + + +class ContentSourceCreateRequest(BaseModel): + project_id: str = "" + source_kind: str + platform: str = "" + handle: str = "" + source_url: str = "" + title: str = "" + local_path: str = "" + metadata: dict[str, Any] = Field(default_factory=dict) + + +class RealCutJobRequest(BaseModel): + project_id: str = "" + title: str + input_dir: str + base_config: str = "" + objective: str = "保留高信息密度片段,输出适合短视频平台的粗剪结果" + target_duration_sec: int = 60 + target_aspect_ratio: str = "9:16" + ideal_segment_duration_sec: int = 8 + max_segment_duration_sec: int = 18 + transcript_backend: str = "auto" + transcript_device: str = "cuda" + review_enabled: bool = False + dry_run: bool = False + + +class AiVideoJobRequest(BaseModel): + project_id: str = "" + assistant_id: str = "" + knowledge_base_id: str = "" + source_job_id: str = "" + title: str + brief: str + style: str = "realistic" + shots: int = 4 + image_provider: str = "openai" + image_model: str = "" + video_provider: str = "doubao" + video_model: str = "" + aspect_ratio: str = "9:16" + duration: int = 5 + + +class InternalStepRequest(BaseModel): + job_id: str = "" + jobId: str = "" + payload: dict[str, Any] = Field(default_factory=dict) + + +class JobStatusUpdateRequest(BaseModel): + status: str + error: str = "" + provider_name: str = "" + provider_task_id: str = "" + artifacts: dict[str, Any] = Field(default_factory=dict) + result: dict[str, Any] = Field(default_factory=dict) + + def now_ts() -> int: return int(datetime.now(timezone.utc).timestamp()) @@ -225,6 +315,132 @@ def model_profile_for_account(account_id: str, requested_id: str | None) -> dict return row +def project_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "user_id": row["user_id"], + "name": row["name"], + "description": row.get("description", ""), + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + +def content_source_payload(row: dict[str, Any]) -> dict[str, Any]: + metadata = row.get("metadata_json") or "{}" + try: + metadata_map = json.loads(metadata) + except json.JSONDecodeError: + metadata_map = {} + return { + "id": row["id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", ""), + "source_kind": row["source_kind"], + "platform": row.get("platform", ""), + "handle": row.get("handle", ""), + "source_url": row.get("source_url", ""), + "title": row.get("title", ""), + "local_path": row.get("local_path", ""), + "metadata": metadata_map, + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + +def job_event_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "job_id": row["job_id"], + "event_type": row["event_type"], + "payload": parse_json_object(row.get("payload_json") or "{}"), + "created_at": row["created_at"], + } + + +def ensure_default_project(account_id: str, username: str = "默认用户") -> dict[str, Any]: + project = db.fetch_one( + "SELECT * FROM projects WHERE user_id = ? ORDER BY created_at ASC LIMIT 1", + (account_id,), + ) + if project: + return project + now = utc_now() + project_id = make_id("project") + db.execute( + """ + INSERT INTO projects (id, user_id, name, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + project_id, + account_id, + f"{username} 默认项目", + "系统自动创建", + now, + now, + ), + ) + return db.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,)) + + +def resolve_target_project(account_id: str, requested_project_id: str | None, username: str = "默认用户") -> dict[str, Any]: + if requested_project_id: + project = db.fetch_one( + "SELECT * FROM projects WHERE id = ? AND user_id = ?", + (requested_project_id, account_id), + ) + if project: + return project + raise HTTPException(status_code=404, detail="Project not found") + return ensure_default_project(account_id, username=username) + + +def resolve_target_assistant(account_id: str, requested_assistant_id: str | None, project_id: str = "") -> dict[str, Any] | None: + if not requested_assistant_id: + return None + assistant = db.fetch_one("SELECT * FROM assistants WHERE id = ? AND user_id = ?", (requested_assistant_id, account_id)) + if not assistant: + raise HTTPException(status_code=404, detail="Assistant not found") + if project_id and assistant.get("project_id") and assistant.get("project_id") != project_id: + raise HTTPException(status_code=400, detail="Assistant does not belong to target project") + return assistant + + +def append_job_event(job_id: str, event_type: str, payload: dict[str, Any] | None = None) -> None: + db.execute( + """ + INSERT INTO job_events (id, job_id, event_type, payload_json, created_at) + VALUES (?, ?, ?, ?, ?) + """, + ( + make_id("evt"), + job_id, + event_type, + json.dumps(payload or {}, ensure_ascii=False), + utc_now(), + ), + ) + + +def parse_json_object(raw_text: str) -> dict[str, Any]: + cleaned = raw_text.strip() + if not cleaned: + return {} + try: + data = json.loads(cleaned) + return data if isinstance(data, dict) else {} + except json.JSONDecodeError: + match = re.search(r"\{.*\}", cleaned, re.S) + if not match: + return {} + try: + data = json.loads(match.group(0)) + return data if isinstance(data, dict) else {} + except json.JSONDecodeError: + return {} + + def knowledge_base_payload(row: dict[str, Any]) -> dict[str, Any]: document_count = db.fetch_one( "SELECT COUNT(*) AS count FROM knowledge_documents WHERE knowledge_base_id = ?", @@ -237,10 +453,10 @@ def knowledge_base_payload(row: dict[str, Any]) -> dict[str, Any]: return { "id": row["id"], "user_id": row["user_id"], + "project_id": row.get("project_id", ""), "name": row["name"], "description": row.get("description", ""), - "fastgpt_dataset_id": row.get("fastgpt_dataset_id"), - "sync_status": row.get("sync_status", "pending"), + "sync_status": row.get("sync_status", "ready"), "document_count": document_count, "linked_assistant_count": linked_count, "created_at": row["created_at"], @@ -256,12 +472,13 @@ def assistant_payload(row: dict[str, Any]) -> dict[str, Any]: return { "id": row["id"], "user_id": row["user_id"], + "project_id": row.get("project_id", ""), "name": row["name"], "description": row.get("description", ""), "system_prompt": row.get("system_prompt", ""), "generation_goal": row.get("generation_goal", ""), "knowledge_base_ids": [item["knowledge_base_id"] for item in kb_rows], - "fastgpt_app_key": row.get("fastgpt_app_key", ""), + "config": parse_json_object(row.get("config_json") or "{}"), "model_profile_id": row.get("model_profile_id", ""), "created_at": row["created_at"], "updated_at": row["updated_at"], @@ -269,6 +486,13 @@ def assistant_payload(row: dict[str, Any]) -> dict[str, Any]: def document_payload(row: dict[str, Any]) -> dict[str, Any]: + analysis_map = parse_json_object(row.get("analysis_json") or "{}") + source_artifacts = parse_json_object(row.get("source_artifact_json") or "{}") + storyboard_raw = row.get("storyboard_json") or "[]" + try: + storyboard_items = json.loads(storyboard_raw) + except json.JSONDecodeError: + storyboard_items = [] return { "id": row["id"], "knowledge_base_id": row["knowledge_base_id"], @@ -278,7 +502,9 @@ def document_payload(row: dict[str, Any]) -> dict[str, Any]: "transcript_text": row.get("transcript_text", ""), "style_summary": row.get("style_summary", ""), "combined_text": row.get("combined_text", ""), - "fastgpt_collection_id": row.get("fastgpt_collection_id", ""), + "analysis": analysis_map, + "storyboards": storyboard_items, + "source_artifacts": source_artifacts, "analysis_model_profile_id": row.get("analysis_model_profile_id", ""), "created_at": row["created_at"], "updated_at": row["updated_at"], @@ -287,26 +513,38 @@ def document_payload(row: dict[str, Any]) -> dict[str, Any]: def job_payload(row: dict[str, Any]) -> dict[str, Any]: artifacts = row.get("artifacts_json") or "{}" + result = row.get("result_json") or "{}" try: artifacts_map = json.loads(artifacts) except json.JSONDecodeError: artifacts_map = {} + try: + result_map = json.loads(result) + except json.JSONDecodeError: + result_map = {} return { "id": row["id"], "user_id": row["user_id"], + "project_id": row.get("project_id", ""), "assistant_id": row.get("assistant_id"), "knowledge_base_id": row["knowledge_base_id"], + "content_source_id": row.get("content_source_id", ""), "source_type": row["source_type"], + "line_type": row.get("line_type", "analysis"), + "workflow_key": row.get("workflow_key", ""), + "orchestrator": row.get("orchestrator", "n8n"), + "provider_name": row.get("provider_name", ""), + "provider_task_id": row.get("provider_task_id", ""), "source_url": row.get("source_url"), "title": row["title"], "language": row.get("language", "auto"), "status": row["status"], "transcript_text": row.get("transcript_text", ""), "style_summary": row.get("style_summary", ""), - "fastgpt_collection_id": row.get("fastgpt_collection_id", ""), "upload_status": row.get("upload_status", "pending"), "error": row.get("error", ""), "artifacts": artifacts_map, + "result": result_map, "analysis_model_profile_id": row.get("analysis_model_profile_id", ""), "created_at": row["created_at"], "updated_at": row["updated_at"], @@ -338,6 +576,188 @@ def require_super_admin(account: dict[str, Any] = Depends(require_auth)) -> dict return account +def require_orchestrator(x_orchestrator_secret: str | None = Header(default=None)) -> bool: + if ORCHESTRATOR_SHARED_SECRET and x_orchestrator_secret != ORCHESTRATOR_SHARED_SECRET: + raise HTTPException(status_code=401, detail="Invalid orchestrator secret") + return True + + +def create_content_source( + *, + account_id: str, + project_id: str, + source_kind: str, + platform: str = "", + handle: str = "", + source_url: str = "", + title: str = "", + local_path: str = "", + metadata: dict[str, Any] | None = None, +) -> dict[str, Any]: + source_id = make_id("source") + now = utc_now() + db.execute( + """ + INSERT INTO content_sources ( + id, user_id, project_id, source_kind, platform, handle, + source_url, title, local_path, metadata_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + source_id, + account_id, + project_id, + source_kind, + platform, + handle, + source_url, + title, + local_path, + json.dumps(metadata or {}, ensure_ascii=False), + now, + now, + ), + ) + return db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,)) + + +def merge_json_field(current_raw: str | None, updates: dict[str, Any]) -> str: + current = parse_json_object(current_raw or "{}") + current.update(updates) + return json.dumps(current, ensure_ascii=False) + + +def update_job_state( + job_id: str, + *, + status: str, + error: str = "", + provider_name: str | None = None, + provider_task_id: str | None = None, + artifacts: dict[str, Any] | None = None, + result: dict[str, Any] | None = None, +) -> dict[str, Any]: + row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) + if not row: + raise HTTPException(status_code=404, detail="Job not found") + merged_artifacts = merge_json_field(row.get("artifacts_json") or "{}", artifacts or {}) + merged_result = merge_json_field(row.get("result_json") or "{}", result or {}) + db.execute( + """ + UPDATE jobs + SET status = ?, error = ?, provider_name = ?, provider_task_id = ?, + artifacts_json = ?, result_json = ?, updated_at = ? + WHERE id = ? + """, + ( + status, + error, + provider_name if provider_name is not None else row.get("provider_name", ""), + provider_task_id if provider_task_id is not None else row.get("provider_task_id", ""), + merged_artifacts, + merged_result, + utc_now(), + job_id, + ), + ) + append_job_event( + job_id, + f"job.{status}", + { + "provider_name": provider_name if provider_name is not None else row.get("provider_name", ""), + "provider_task_id": provider_task_id if provider_task_id is not None else row.get("provider_task_id", ""), + "error": error, + "artifacts": artifacts or {}, + "result": result or {}, + }, + ) + return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) + + +def job_context_payload(row: dict[str, Any]) -> dict[str, Any]: + payload = job_payload(row) + payload["project"] = None + payload["assistant"] = None + payload["knowledge_base"] = None + payload["content_source"] = None + payload["events"] = [] + + if row.get("project_id"): + project = db.fetch_one("SELECT * FROM projects WHERE id = ?", (row["project_id"],)) + if project: + payload["project"] = project_payload(project) + + if row.get("assistant_id"): + assistant = db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],)) + if assistant: + payload["assistant"] = assistant_payload(assistant) + + kb = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (row["knowledge_base_id"],)) + if kb: + payload["knowledge_base"] = knowledge_base_payload(kb) + + if row.get("content_source_id"): + source = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],)) + if source: + payload["content_source"] = content_source_payload(source) + + payload["events"] = [ + job_event_payload(item) + for item in db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (row["id"],)) + ] + + return payload + + +async def trigger_orchestrated_job(job_row: dict[str, Any]) -> dict[str, Any]: + workflow_key = job_row.get("workflow_key") or "analysis_pipeline" + if not n8n_client.enabled: + raise HTTPException(status_code=503, detail="n8n is not configured") + append_job_event(job_row["id"], "workflow.trigger.requested", {"workflow_key": workflow_key}) + update_job_state( + job_row["id"], + status="queued", + provider_name="n8n", + provider_task_id="", + result={"n8n_trigger": {"requested": True}}, + ) + trigger_result = await n8n_client.trigger( + workflow_key, + { + "jobId": job_row["id"], + "job_id": job_row["id"], + "workflowKey": workflow_key, + "workflow_key": workflow_key, + "lineType": job_row.get("line_type", "analysis"), + "line_type": job_row.get("line_type", "analysis"), + }, + ) + provider_task_id = str(trigger_result.get("executionId") or "") + db.execute( + """ + UPDATE jobs + SET provider_name = ?, provider_task_id = ?, result_json = ?, updated_at = ? + WHERE id = ? + """, + ( + "n8n", + provider_task_id, + merge_json_field( + db.fetch_one("SELECT result_json FROM jobs WHERE id = ?", (job_row["id"],)).get("result_json") or "{}", + {"n8n_trigger": trigger_result}, + ), + utc_now(), + job_row["id"], + ), + ) + append_job_event( + job_row["id"], + "workflow.trigger.accepted", + {"provider_task_id": provider_task_id, "trigger_result": trigger_result}, + ) + return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],)) + + async def call_model(profile: dict[str, Any], system_prompt: str, user_prompt: str, temperature: float = 0.4) -> str: try: content = await openai_client.chat_completion( @@ -366,6 +786,73 @@ async def summarize_style(profile: dict[str, Any], transcript_text: str, title: return await call_model(profile, system_prompt, prompt, temperature=0.3) +async def generate_content_blueprint( + profile: dict[str, Any], + *, + title: str, + transcript_text: str, + style_summary: str, + agent_prompt: str = "", + generation_goal: str = "", +) -> dict[str, Any]: + system_prompt = ( + "你是短视频内容策略师。" + "必须输出 JSON 对象,不要输出 Markdown,不要输出多余解释。" + ) + user_prompt = ( + f"标题:{title}\n\n" + f"素材转写:\n{transcript_text}\n\n" + f"风格拆解:\n{style_summary}\n\n" + f"智能体补充约束:\n{agent_prompt or '无'}\n\n" + f"生成目标:\n{generation_goal or '围绕原素材做二创短视频'}\n\n" + "请输出如下 JSON 结构:" + "{" + '"analysis":{"hook":"","structure":[],"style_tags":[],"cta":""},' + '"rewrite":{"title":"","script":"","summary":""},' + '"storyboards":[' + '{"shot_index":1,"title":"","narration":"","visual":"","first_frame_prompt":"","last_frame_prompt":"","video_prompt":"","duration_sec":5}' + "]" + "}" + ) + raw = await call_model(profile, system_prompt, user_prompt, temperature=0.5) + parsed = parse_json_object(raw) + if parsed.get("storyboards"): + return parsed + + fallback_storyboards: list[dict[str, Any]] = [] + paragraphs = [part.strip() for part in transcript_text.split("\n") if part.strip()] + seed_segments = paragraphs[:4] or [transcript_text[:1200]] + for idx, segment in enumerate(seed_segments, start=1): + snippet = segment[:180] + fallback_storyboards.append( + { + "shot_index": idx, + "title": f"镜头{idx}", + "narration": snippet, + "visual": f"围绕这段内容构建具象画面:{snippet}", + "first_frame_prompt": f"短视频首帧,突出主题:{snippet}", + "last_frame_prompt": f"短视频尾帧,强化结论和行动指令:{snippet}", + "video_prompt": f"基于首尾帧生成连贯镜头,内容是:{snippet}", + "duration_sec": 5, + } + ) + + return { + "analysis": { + "hook": title, + "structure": ["结论开场", "核心论点", "例证推进", "收尾行动"], + "style_tags": ["短句", "结论先行", "强 CTA"], + "cta": "引导用户采取下一步行动", + }, + "rewrite": { + "title": title, + "script": transcript_text[:3000], + "summary": style_summary[:500], + }, + "storyboards": fallback_storyboards, + } + + def fallback_transcript_from_text(title: str, content: str) -> str: return f"标题:{title}\n\n正文:\n{content.strip()}" @@ -379,6 +866,98 @@ def run_command(command: list[str], cwd: Path | None = None) -> tuple[int, str, return proc.returncode, proc.stdout, proc.stderr +def create_job_record( + *, + account_id: str, + project_id: str, + knowledge_base_id: str, + source_type: str, + line_type: str, + workflow_key: str, + title: str, + language: str = "auto", + source_url: str = "", + assistant_id: str | None = None, + content_source_id: str | None = None, + artifacts: dict[str, Any] | None = None, + analysis_model_profile_id: str = "", +) -> dict[str, Any]: + job_id = make_id("job") + now = utc_now() + db.execute( + """ + INSERT INTO jobs ( + id, user_id, project_id, assistant_id, knowledge_base_id, content_source_id, + source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id, + source_url, title, language, status, transcript_text, style_summary, upload_status, + error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'n8n', '', '', ?, ?, ?, 'pending', '', '', 'pending', '', ?, '{}', ?, ?, ?) + """, + ( + job_id, + account_id, + project_id, + assistant_id, + knowledge_base_id, + content_source_id, + source_type, + line_type, + workflow_key, + source_url or None, + title, + language, + json.dumps(artifacts or {}, ensure_ascii=False), + analysis_model_profile_id, + now, + now, + ), + ) + return db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) + + +async def wait_for_huobao_image(image_id: str | int) -> dict[str, Any]: + deadline = now_ts() + HUOBAO_MAX_WAIT_SEC + last_payload: dict[str, Any] = {} + while True: + last_payload = await huobao_client.get_image(str(image_id)) + status = str(last_payload.get("status") or "").lower() + if status in {"completed", "failed"}: + return last_payload + if now_ts() >= deadline: + raise RuntimeError(f"Huobao image task timed out: {image_id}") + await asyncio.sleep(HUOBAO_POLL_INTERVAL_SEC) + + +async def wait_for_huobao_video(video_id: str | int) -> dict[str, Any]: + deadline = now_ts() + HUOBAO_MAX_WAIT_SEC + last_payload: dict[str, Any] = {} + while True: + last_payload = await huobao_client.get_video(str(video_id)) + status = str(last_payload.get("status") or "").lower() + if status in {"completed", "failed"}: + return last_payload + if now_ts() >= deadline: + raise RuntimeError(f"Huobao video task timed out: {video_id}") + await asyncio.sleep(HUOBAO_POLL_INTERVAL_SEC) + + +def coerce_storyboards(items: Any) -> list[dict[str, Any]]: + if not isinstance(items, list): + return [] + return [item for item in items if isinstance(item, dict)] + + +def huobao_image_size_for_aspect_ratio(aspect_ratio: str) -> str: + normalized = str(aspect_ratio or "").strip() + if normalized == "9:16": + return "1024x1536" + if normalized == "16:9": + return "1536x1024" + if normalized == "1:1": + return "1024x1024" + return "1024x1536" + + def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: str = "") -> tuple[str, dict[str, str]]: artifacts: dict[str, str] = {} transcript = "" @@ -431,49 +1010,33 @@ def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: s return transcript, artifacts -def ensure_user_kb(account_id: str) -> dict[str, Any]: - row = db.fetch_one("SELECT * FROM knowledge_bases WHERE user_id = ? ORDER BY created_at ASC LIMIT 1", (account_id,)) +def ensure_user_kb(account_id: str, project_id: str = "", username: str = "默认用户") -> dict[str, Any]: + project = resolve_target_project(account_id, project_id or None, username=username) + row = db.fetch_one( + "SELECT * FROM knowledge_bases WHERE user_id = ? AND project_id = ? ORDER BY created_at ASC LIMIT 1", + (account_id, project["id"]), + ) if row: return row kb_id = make_id("kb") now = utc_now() db.execute( - "INSERT INTO knowledge_bases (id, user_id, name, description, sync_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)", - (kb_id, account_id, "默认知识库", "系统为新用户自动创建", "pending", now, now), + """ + INSERT INTO knowledge_bases (id, user_id, project_id, name, description, sync_status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + (kb_id, account_id, project["id"], "默认知识库", "系统为新用户自动创建", "ready", now, now), ) return db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (kb_id,)) -async def sync_document_to_fastgpt(kb_row: dict[str, Any], title: str, combined_text: str) -> tuple[str, str]: - dataset_id = kb_row.get("fastgpt_dataset_id") or "" - collection_id = "" - sync_status = kb_row.get("sync_status", "pending") - if not fastgpt_client.enabled: - return collection_id, "local_only" - try: - if not dataset_id: - dataset = await fastgpt_client.ensure_dataset(kb_row["name"], kb_row.get("description", "")) - dataset_id = str((dataset or {}).get("_id") or (dataset or {}).get("id") or "") - if dataset_id: - db.execute( - "UPDATE knowledge_bases SET fastgpt_dataset_id = ?, sync_status = ?, updated_at = ? WHERE id = ?", - (dataset_id, "synced", utc_now(), kb_row["id"]), - ) - if dataset_id: - data = await fastgpt_client.add_text_document(dataset_id, title, combined_text) - collection_id = str((data or {}).get("_id") or (data or {}).get("id") or "") - sync_status = "synced" - except Exception: - sync_status = "pending" - return collection_id, sync_status - - async def process_job(job_id: str) -> None: row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) if not row: return now = utc_now() db.execute("UPDATE jobs SET status = ?, updated_at = ? WHERE id = ?", ("processing", now, job_id)) + append_job_event(job_id, "job.processing", {}) try: artifacts = json.loads(row.get("artifacts_json") or "{}") @@ -506,20 +1069,38 @@ async def process_job(job_id: str) -> None: profile = model_profile_for_account(row["user_id"], row.get("analysis_model_profile_id") or None) style_summary = await summarize_style(profile, transcript_text, row["title"]) - combined_text = f"{transcript_text}\n\n------\n风格学习结论:\n{style_summary}" + assistant = None + if row.get("assistant_id"): + assistant = db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],)) + content_blueprint = await generate_content_blueprint( + profile, + title=row["title"], + transcript_text=transcript_text, + style_summary=style_summary, + agent_prompt=(assistant or {}).get("system_prompt", ""), + generation_goal=(assistant or {}).get("generation_goal", ""), + ) + combined_text = ( + f"{transcript_text}\n\n" + "------\n" + f"风格学习结论:\n{style_summary}\n\n" + "------\n" + f"二创文案:\n{(content_blueprint.get('rewrite') or {}).get('script', '')}\n\n" + "------\n" + f"分镜:\n{json.dumps(content_blueprint.get('storyboards') or [], ensure_ascii=False, indent=2)}" + ) kb_row = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (row["knowledge_base_id"],)) if not kb_row: raise RuntimeError("Knowledge base not found") - collection_id, sync_status = await sync_document_to_fastgpt(kb_row, row["title"], combined_text) document_id = make_id("doc") timestamp = utc_now() db.execute( """ INSERT INTO knowledge_documents ( id, knowledge_base_id, title, source_type, source_url, transcript_text, - style_summary, combined_text, fastgpt_collection_id, analysis_model_profile_id, - created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + style_summary, combined_text, analysis_json, storyboard_json, source_artifact_json, + analysis_model_profile_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( document_id, @@ -530,35 +1111,43 @@ async def process_job(job_id: str) -> None: transcript_text, style_summary, combined_text, - collection_id, + json.dumps(content_blueprint.get("analysis") or {}, ensure_ascii=False), + json.dumps(content_blueprint.get("storyboards") or [], ensure_ascii=False), + json.dumps(artifacts, ensure_ascii=False), profile["id"], timestamp, timestamp, ), ) + update_job_state( + job_id, + status="completed", + artifacts={ + "document_id": document_id, + "project_job_dir": str(job_dir), + **artifacts, + }, + result={ + "analysis": content_blueprint.get("analysis") or {}, + "rewrite": content_blueprint.get("rewrite") or {}, + "storyboards": content_blueprint.get("storyboards") or [], + "document_id": document_id, + }, + ) db.execute( """ UPDATE jobs - SET status = ?, transcript_text = ?, style_summary = ?, fastgpt_collection_id = ?, - upload_status = ?, artifacts_json = ?, updated_at = ? + SET transcript_text = ?, style_summary = ?, upload_status = ?, updated_at = ? WHERE id = ? """, - ( - "completed", - transcript_text, - style_summary, - collection_id, - sync_status, - json.dumps(artifacts, ensure_ascii=False), - timestamp, - job_id, - ), + (transcript_text, style_summary, "ready", timestamp, job_id), + ) + db.execute( + "UPDATE knowledge_bases SET sync_status = ?, updated_at = ? WHERE id = ?", + ("ready", timestamp, kb_row["id"]), ) except Exception as exc: - db.execute( - "UPDATE jobs SET status = ?, error = ?, updated_at = ? WHERE id = ?", - ("failed", str(exc), utc_now(), job_id), - ) + update_job_state(job_id, status="failed", error=str(exc)) @app.on_event("startup") @@ -574,6 +1163,9 @@ def healthz() -> dict[str, Any]: "dbPath": DB_PATH, "defaultExternalBaseUrl": DEFAULT_EXTERNAL_BASE_URL, "localModelBaseUrl": LOCAL_OPENAI_BASE_URL, + "n8nBaseUrl": N8N_BASE_URL, + "cutvideoBaseUrl": CUTVIDEO_BASE_URL, + "huobaoBaseUrl": HUOBAO_BASE_URL, } @@ -625,16 +1217,18 @@ def seed_defaults() -> None: now, ), ) - kb = ensure_user_kb(account_id) + project = ensure_default_project(account_id, username="kris") + kb = ensure_user_kb(account_id, project["id"], username="kris") assistant_id = make_id("assistant") db.execute( """ - INSERT INTO assistants (id, user_id, name, description, system_prompt, generation_goal, fastgpt_app_key, model_profile_id, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, '', ?, ?, ?) + INSERT INTO assistants (id, user_id, project_id, name, description, system_prompt, generation_goal, config_json, model_profile_id, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, '{}', ?, ?, ?) """, ( assistant_id, account_id, + project["id"], "默认文案助手", "系统为超级管理员预置", "你是一个擅长学习短视频文案风格的 AI 助手。", @@ -682,6 +1276,7 @@ def register(request: RegisterAccountRequest) -> dict[str, Any]: now, ), ) + ensure_default_project(account_id, username=username) account = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,)) return normalize_account(account) @@ -717,12 +1312,14 @@ def me(account: dict[str, Any] = Depends(require_auth)) -> dict[str, Any]: @app.get("/v2/me/dashboard") def dashboard(account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + projects = [project_payload(row) for row in db.fetch_all("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at ASC", (account["id"],))] knowledge_bases = [knowledge_base_payload(row) for row in db.fetch_all("SELECT * FROM knowledge_bases WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))] assistants = [assistant_payload(row) for row in db.fetch_all("SELECT * FROM assistants WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))] jobs = [job_payload(row) for row in db.fetch_all("SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT 20", (account["id"],))] model_profiles = [normalize_model_profile(row) for row in db.fetch_all("SELECT * FROM model_profiles WHERE owner_account_id IS NULL OR owner_account_id = ? ORDER BY is_default DESC, created_at ASC", (account["id"],))] return { "account": normalize_account(account), + "projects": projects, "knowledge_bases": knowledge_bases, "assistants": assistants, "recent_jobs": jobs, @@ -730,6 +1327,66 @@ def dashboard(account: dict[str, Any] = Depends(require_approved)) -> dict[str, } +@app.get("/v2/projects") +def list_projects(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]: + return [project_payload(row) for row in db.fetch_all("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at ASC", (account["id"],))] + + +@app.post("/v2/projects") +def create_project(request: ProjectCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + project_id = make_id("project") + now = utc_now() + db.execute( + """ + INSERT INTO projects (id, user_id, name, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + project_id, + account["id"], + request.name.strip(), + request.description.strip(), + now, + now, + ), + ) + ensure_user_kb(account["id"], project_id, username=account["username"]) + return project_payload(db.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,))) + + +@app.get("/v2/content-sources") +def list_content_sources( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(require_approved), +) -> list[dict[str, Any]]: + if project_id: + resolve_target_project(account["id"], project_id, username=account["username"]) + rows = db.fetch_all( + "SELECT * FROM content_sources WHERE user_id = ? AND project_id = ? ORDER BY created_at DESC", + (account["id"], project_id), + ) + else: + rows = db.fetch_all("SELECT * FROM content_sources WHERE user_id = ? ORDER BY created_at DESC", (account["id"],)) + return [content_source_payload(row) for row in rows] + + +@app.post("/v2/content-sources") +def create_content_source_api(request: ContentSourceCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + row = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind=request.source_kind.strip(), + platform=request.platform.strip(), + handle=request.handle.strip(), + source_url=request.source_url.strip(), + title=request.title.strip(), + local_path=request.local_path.strip(), + metadata=request.metadata, + ) + return content_source_payload(row) + + @app.get("/v2/model-profiles") def list_model_profiles(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]: rows = db.fetch_all( @@ -779,11 +1436,15 @@ def list_knowledge_bases(account: dict[str, Any] = Depends(require_approved)) -> @app.post("/v2/knowledge-bases") def create_knowledge_base(request: KnowledgeBaseCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) kb_id = make_id("kb") now = utc_now() db.execute( - "INSERT INTO knowledge_bases (id, user_id, name, description, sync_status, created_at, updated_at) VALUES (?, ?, ?, ?, 'pending', ?, ?)", - (kb_id, account["id"], request.name.strip(), request.description.strip(), now, now), + """ + INSERT INTO knowledge_bases (id, user_id, project_id, name, description, sync_status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, 'ready', ?, ?) + """, + (kb_id, account["id"], project["id"], request.name.strip(), request.description.strip(), now, now), ) row = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (kb_id,)) return knowledge_base_payload(row) @@ -811,80 +1472,228 @@ def get_job(job_id: str, account: dict[str, Any] = Depends(require_approved)) -> return job_payload(row) -def resolve_target_kb(account_id: str, requested_kb_id: str | None) -> dict[str, Any]: +@app.get("/v2/explore/jobs/{job_id}/events") +def get_job_events(job_id: str, account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]: + row = db.fetch_one("SELECT id FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"])) + if not row: + raise HTTPException(status_code=404, detail="Job not found") + return [ + job_event_payload(item) + for item in db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (job_id,)) + ] + + +def resolve_target_kb(account_id: str, requested_kb_id: str | None, project_id: str = "", username: str = "默认用户") -> dict[str, Any]: if requested_kb_id: kb = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ? AND user_id = ?", (requested_kb_id, account_id)) if kb: + if project_id and kb.get("project_id") and kb.get("project_id") != project_id: + raise HTTPException(status_code=400, detail="Knowledge base does not belong to target project") return kb raise HTTPException(status_code=404, detail="Knowledge base not found") - return ensure_user_kb(account_id) + return ensure_user_kb(account_id, project_id, username=username) @app.post("/v2/explore/text") -async def create_text_job(request: ExploreTextRequest, background_tasks: BackgroundTasks, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: - kb = resolve_target_kb(account["id"], request.knowledge_base_id) +async def create_text_job(request: ExploreTextRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + kb = resolve_target_kb(account["id"], request.knowledge_base_id, project["id"], username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id, project["id"]) profile = model_profile_for_account(account["id"], request.analysis_model_profile_id) - job_id = make_id("job") - now = utc_now() - artifacts = json.dumps({"input_text": request.content}, ensure_ascii=False) - db.execute( - """ - INSERT INTO jobs (id, user_id, assistant_id, knowledge_base_id, source_type, source_url, title, language, status, artifacts_json, analysis_model_profile_id, created_at, updated_at) - VALUES (?, ?, ?, ?, 'text', NULL, ?, 'zh-CN', 'queued', ?, ?, ?, ?) - """, - (job_id, account["id"], request.assistant_id, kb["id"], request.title.strip(), artifacts, profile["id"], now, now), + source = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind="inline_text", + title=request.title.strip(), + metadata={"content_preview": request.content[:280]}, ) - background_tasks.add_task(process_job, job_id) - return job_payload(db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))) + job_row = create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="text", + line_type="analysis", + workflow_key="analysis_pipeline", + title=request.title.strip(), + language="zh-CN", + assistant_id=(assistant or {}).get("id"), + content_source_id=source["id"], + artifacts={"input_text": request.content}, + analysis_model_profile_id=profile["id"], + ) + return job_payload(await trigger_orchestrated_job(job_row)) @app.post("/v2/explore/video-link") -async def create_video_link_job(request: ExploreVideoLinkRequest, background_tasks: BackgroundTasks, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: - kb = resolve_target_kb(account["id"], request.knowledge_base_id) +async def create_video_link_job(request: ExploreVideoLinkRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + kb = resolve_target_kb(account["id"], request.knowledge_base_id, project["id"], username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id, project["id"]) profile = model_profile_for_account(account["id"], request.analysis_model_profile_id) - job_id = make_id("job") - now = utc_now() - db.execute( - """ - INSERT INTO jobs (id, user_id, assistant_id, knowledge_base_id, source_type, source_url, title, language, status, artifacts_json, analysis_model_profile_id, created_at, updated_at) - VALUES (?, ?, ?, ?, 'video_link', ?, ?, ?, 'queued', '{}', ?, ?, ?) - """, - (job_id, account["id"], request.assistant_id, kb["id"], request.video_url.strip(), (request.title or "短视频素材").strip(), request.language, profile["id"], now, now), + source = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind="video_link", + source_url=request.video_url.strip(), + title=(request.title or "短视频素材").strip(), + metadata={"platform": "video_link"}, ) - background_tasks.add_task(process_job, job_id) - return job_payload(db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))) + job_row = create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="video_link", + line_type="analysis", + workflow_key="analysis_pipeline", + title=(request.title or "短视频素材").strip(), + language=request.language, + source_url=request.video_url.strip(), + assistant_id=(assistant or {}).get("id"), + content_source_id=source["id"], + artifacts={}, + analysis_model_profile_id=profile["id"], + ) + return job_payload(await trigger_orchestrated_job(job_row)) @app.post("/v2/explore/upload-video") async def upload_video( - background_tasks: BackgroundTasks, file: UploadFile = File(...), title: str = Form(""), + project_id: str = Form(""), knowledge_base_id: str = Form(""), assistant_id: str = Form(""), analysis_model_profile_id: str = Form(""), account: dict[str, Any] = Depends(require_approved), ) -> dict[str, Any]: - kb = resolve_target_kb(account["id"], knowledge_base_id or None) + project = resolve_target_project(account["id"], project_id or None, username=account["username"]) + kb = resolve_target_kb(account["id"], knowledge_base_id or None, project["id"], username=account["username"]) + assistant = resolve_target_assistant(account["id"], assistant_id or None, project["id"]) profile = model_profile_for_account(account["id"], analysis_model_profile_id or None) - job_id = make_id("job") + job_id = make_id("job_upload") job_dir = JOBS_DIR / job_id job_dir.mkdir(parents=True, exist_ok=True) suffix = Path(file.filename or "upload.mp4").suffix or ".mp4" target_path = job_dir / f"source{suffix}" with target_path.open("wb") as handle: shutil.copyfileobj(file.file, handle) - now = utc_now() - artifacts = json.dumps({"uploaded_path": str(target_path)}, ensure_ascii=False) - db.execute( - """ - INSERT INTO jobs (id, user_id, assistant_id, knowledge_base_id, source_type, source_url, title, language, status, artifacts_json, analysis_model_profile_id, created_at, updated_at) - VALUES (?, ?, ?, ?, 'upload_video', ?, ?, 'auto', 'queued', ?, ?, ?, ?) - """, - (job_id, account["id"], assistant_id or None, kb["id"], file.filename or "", (title or file.filename or "上传视频素材").strip(), artifacts, profile["id"], now, now), + source = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind="upload_video", + source_url=file.filename or "", + title=(title or file.filename or "上传视频素材").strip(), + local_path=str(target_path), + metadata={"filename": file.filename or "", "size_bytes": target_path.stat().st_size}, ) - background_tasks.add_task(process_job, job_id) - return job_payload(db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))) + job_row = create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="upload_video", + line_type="analysis", + workflow_key="analysis_pipeline", + title=(title or file.filename or "上传视频素材").strip(), + source_url=file.filename or "", + assistant_id=(assistant or {}).get("id"), + content_source_id=source["id"], + artifacts={"uploaded_path": str(target_path)}, + analysis_model_profile_id=profile["id"], + ) + return job_payload(await trigger_orchestrated_job(job_row)) + + +@app.post("/v2/pipelines/real-cut") +async def create_real_cut_job(request: RealCutJobRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + kb = ensure_user_kb(account["id"], project["id"], username=account["username"]) + source = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind="real_cut_input", + title=request.title.strip(), + local_path=request.input_dir.strip(), + metadata={"line_type": "real_cut"}, + ) + job_row = create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="real_cut", + line_type="real_cut", + workflow_key="real_cut_pipeline", + title=request.title.strip(), + source_url=request.input_dir.strip(), + content_source_id=source["id"], + artifacts={ + "cutvideo_request": { + "base_config": request.base_config.strip() or CUTVIDEO_BASE_CONFIG, + "name": request.title.strip(), + "input_dir": request.input_dir.strip(), + "objective": request.objective, + "target_duration_sec": request.target_duration_sec, + "target_aspect_ratio": request.target_aspect_ratio, + "ideal_segment_duration_sec": request.ideal_segment_duration_sec, + "max_segment_duration_sec": request.max_segment_duration_sec, + "transcript_backend": request.transcript_backend, + "transcript_device": request.transcript_device, + "review_enabled": request.review_enabled, + "dry_run": request.dry_run, + } + }, + ) + return job_payload(await trigger_orchestrated_job(job_row)) + + +@app.post("/v2/pipelines/ai-video") +async def create_ai_video_job(request: AiVideoJobRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + source_job = None + source_project_id = "" + source_kb_id = "" + if request.source_job_id.strip(): + source_job = db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (request.source_job_id.strip(), account["id"])) + if not source_job: + raise HTTPException(status_code=404, detail="Source job not found") + if source_job["status"] != "completed": + raise HTTPException(status_code=409, detail="Source job must be completed before AI video generation") + source_project_id = source_job.get("project_id", "") + source_kb_id = source_job.get("knowledge_base_id", "") + + requested_project_id = request.project_id or source_project_id + project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + kb = resolve_target_kb(account["id"], request.knowledge_base_id or source_kb_id or None, project["id"], username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + source = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind="ai_video_brief", + title=request.title.strip(), + metadata={"source_job_id": request.source_job_id.strip()}, + ) + job_row = create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="ai_video", + line_type="ai_video", + workflow_key="ai_video_pipeline", + title=request.title.strip(), + assistant_id=(assistant or {}).get("id"), + content_source_id=source["id"], + artifacts={ + "brief": request.brief, + "style": request.style, + "shots": request.shots, + "image_provider": request.image_provider, + "image_model": request.image_model, + "video_provider": request.video_provider, + "video_model": request.video_model, + "aspect_ratio": request.aspect_ratio, + "duration": request.duration, + "source_job_id": request.source_job_id.strip(), + }, + ) + return job_payload(await trigger_orchestrated_job(job_row)) @app.get("/v2/assistants") @@ -896,20 +1705,21 @@ def list_assistants(account: dict[str, Any] = Depends(require_approved)) -> list def create_assistant(request: AssistantCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: assistant_id = make_id("assistant") now = utc_now() + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) model_profile = model_profile_for_account(account["id"], request.model_profile_id or None) db.execute( """ - INSERT INTO assistants (id, user_id, name, description, system_prompt, generation_goal, fastgpt_app_key, model_profile_id, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO assistants (id, user_id, project_id, name, description, system_prompt, generation_goal, config_json, model_profile_id, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, '{}', ?, ?, ?) """, ( assistant_id, account["id"], + project["id"], request.name.strip(), request.description.strip(), request.system_prompt.strip(), request.generation_goal.strip(), - request.fastgpt_app_key.strip(), model_profile["id"], now, now, @@ -927,12 +1737,15 @@ def update_assistant(assistant_id: str, request: AssistantUpdateRequest, account current = db.fetch_one("SELECT * FROM assistants WHERE id = ? AND user_id = ?", (assistant_id, account["id"])) if not current: raise HTTPException(status_code=404, detail="Assistant not found") + project_id = current.get("project_id", "") + if request.project_id is not None: + project_id = resolve_target_project(account["id"], request.project_id, username=account["username"])["id"] payload = { "name": request.name if request.name is not None else current["name"], "description": request.description if request.description is not None else current.get("description", ""), "system_prompt": request.system_prompt if request.system_prompt is not None else current.get("system_prompt", ""), "generation_goal": request.generation_goal if request.generation_goal is not None else current.get("generation_goal", ""), - "fastgpt_app_key": request.fastgpt_app_key if request.fastgpt_app_key is not None else current.get("fastgpt_app_key", ""), + "project_id": project_id, "model_profile_id": current.get("model_profile_id", ""), } if request.model_profile_id is not None: @@ -940,15 +1753,15 @@ def update_assistant(assistant_id: str, request: AssistantUpdateRequest, account db.execute( """ UPDATE assistants - SET name = ?, description = ?, system_prompt = ?, generation_goal = ?, fastgpt_app_key = ?, model_profile_id = ?, updated_at = ? + SET project_id = ?, name = ?, description = ?, system_prompt = ?, generation_goal = ?, model_profile_id = ?, updated_at = ? WHERE id = ? """, ( + payload["project_id"], payload["name"], payload["description"], payload["system_prompt"], payload["generation_goal"], - payload["fastgpt_app_key"], payload["model_profile_id"], utc_now(), assistant_id, @@ -963,6 +1776,21 @@ def update_assistant(assistant_id: str, request: AssistantUpdateRequest, account return assistant_payload(db.fetch_one("SELECT * FROM assistants WHERE id = ?", (assistant_id,))) +@app.get("/v2/agents") +def list_agents(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]: + return list_assistants(account) + + +@app.post("/v2/agents") +def create_agent(request: AssistantCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + return create_assistant(request, account) + + +@app.patch("/v2/agents/{assistant_id}") +def update_agent(assistant_id: str, request: AssistantUpdateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + return update_assistant(assistant_id, request, account) + + @app.post("/v2/assistants/{assistant_id}/generate") async def generate_copy(assistant_id: str, request: GenerateCopyRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: assistant = db.fetch_one("SELECT * FROM assistants WHERE id = ? AND user_id = ?", (assistant_id, account["id"])) @@ -1001,6 +1829,392 @@ async def generate_copy(assistant_id: str, request: GenerateCopyRequest, account } +def load_owned_job(job_id: str, account_id: str) -> dict[str, Any]: + row = db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account_id)) + if not row: + raise HTTPException(status_code=404, detail="Job not found") + return row + + +def load_internal_job(job_id: str) -> dict[str, Any]: + row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) + if not row: + raise HTTPException(status_code=404, detail="Job not found") + return row + + +def parse_job_artifacts(row: dict[str, Any]) -> dict[str, Any]: + raw = row.get("artifacts_json") or "{}" + try: + return json.loads(raw) + except json.JSONDecodeError: + return {} + + +def parse_job_result(row: dict[str, Any]) -> dict[str, Any]: + raw = row.get("result_json") or "{}" + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except json.JSONDecodeError: + return {} + + +def extract_source_storyboards(source_job: dict[str, Any] | None) -> list[dict[str, Any]]: + if not source_job: + return [] + return coerce_storyboards(parse_job_result(source_job).get("storyboards")) + + +def resolve_internal_job_id(request: InternalStepRequest | None, query_job_id: str = "") -> str: + resolved = (query_job_id or "").strip() + if not resolved and request is not None: + resolved = ( + request.job_id + or request.jobId + or str(request.payload.get("job_id") or request.payload.get("jobId") or "") + ).strip() + return resolved + + +def load_step_job(request: InternalStepRequest | None, query_job_id: str, workflow_key: str) -> dict[str, Any]: + resolved_job_id = resolve_internal_job_id(request, query_job_id) + if resolved_job_id: + return load_internal_job(resolved_job_id) + row = db.fetch_one( + """ + SELECT * FROM jobs + WHERE workflow_key = ? AND status IN ('pending', 'queued') + ORDER BY created_at ASC + LIMIT 1 + """, + (workflow_key,), + ) + if not row: + raise HTTPException(status_code=400, detail="job_id is required") + return row + + +@app.get("/internal/jobs/{job_id}/context") +def internal_job_context(job_id: str, _: bool = Depends(require_orchestrator)) -> dict[str, Any]: + return job_context_payload(load_internal_job(job_id)) + + +@app.post("/internal/jobs/steps/analyze") +async def internal_run_analysis( + request: InternalStepRequest | None = Body(default=None), + job_id: str = Query(default=""), + _: bool = Depends(require_orchestrator), +) -> dict[str, Any]: + row = load_step_job(request, job_id, "analysis_pipeline") + await process_job(row["id"]) + return job_context_payload(load_internal_job(row["id"])) + + +@app.post("/internal/jobs/steps/real-cut/submit") +async def internal_real_cut_submit( + request: InternalStepRequest | None = Body(default=None), + job_id: str = Query(default=""), + _: bool = Depends(require_orchestrator), +) -> dict[str, Any]: + if not cutvideo_client.enabled: + raise HTTPException(status_code=503, detail="CutVideo is not configured") + row = load_step_job(request, job_id, "real_cut_pipeline") + artifacts = parse_job_artifacts(row) + cutvideo_request = artifacts.get("cutvideo_request") or {} + if not isinstance(cutvideo_request, dict): + raise HTTPException(status_code=400, detail="Invalid cutvideo request payload") + append_job_event(row["id"], "cutvideo.submit.requested", cutvideo_request) + submit_result = await cutvideo_client.submit_job(cutvideo_request) + task_id = str(submit_result.get("task_id") or "") + updated = update_job_state( + row["id"], + status="processing", + provider_name="cutvideo", + provider_task_id=task_id, + result={"cutvideo_submit": submit_result}, + ) + return job_context_payload(updated) + + +@app.post("/internal/jobs/steps/real-cut/poll") +async def internal_real_cut_poll( + request: InternalStepRequest | None = Body(default=None), + job_id: str = Query(default=""), + _: bool = Depends(require_orchestrator), +) -> dict[str, Any]: + row = load_step_job(request, job_id, "real_cut_pipeline") + if not row.get("provider_task_id"): + raise HTTPException(status_code=409, detail="CutVideo task has not been submitted") + task_payload = await cutvideo_client.get_task(row["provider_task_id"]) + status = str(task_payload.get("status") or "").lower() + run_payload: dict[str, Any] = {} + artifacts: dict[str, Any] = {"cutvideo_task": task_payload} + next_status = row["status"] + error = row.get("error", "") + if status == "completed": + next_status = "completed" + run_id = str(task_payload.get("run_id") or "") + if run_id: + run_payload = await cutvideo_client.get_run(run_id) + artifacts["cutvideo_run"] = run_payload + elif status == "failed": + next_status = "failed" + error = str(task_payload.get("error") or "CutVideo task failed") + else: + next_status = "processing" + + updated = update_job_state( + row["id"], + status=next_status, + error=error, + provider_name="cutvideo", + provider_task_id=row["provider_task_id"], + artifacts=artifacts, + result={"cutvideo_run": run_payload} if run_payload else {"cutvideo_task": task_payload}, + ) + return job_context_payload(updated) + + +@app.post("/internal/jobs/steps/real-cut/run") +async def internal_real_cut_run( + request: InternalStepRequest | None = Body(default=None), + job_id: str = Query(default=""), + _: bool = Depends(require_orchestrator), +) -> dict[str, Any]: + if not cutvideo_client.enabled: + raise HTTPException(status_code=503, detail="CutVideo is not configured") + + row = load_step_job(request, job_id, "real_cut_pipeline") + if not row.get("provider_task_id"): + artifacts = parse_job_artifacts(row) + cutvideo_request = artifacts.get("cutvideo_request") or {} + if not isinstance(cutvideo_request, dict): + raise HTTPException(status_code=400, detail="Invalid cutvideo request payload") + submit_result = await cutvideo_client.submit_job(cutvideo_request) + row = update_job_state( + row["id"], + status="processing", + provider_name="cutvideo", + provider_task_id=str(submit_result.get("task_id") or ""), + result={"cutvideo_submit": submit_result}, + ) + + deadline = now_ts() + HUOBAO_MAX_WAIT_SEC + while True: + task_payload = await cutvideo_client.get_task(row["provider_task_id"]) + status = str(task_payload.get("status") or "").lower() + if status == "completed": + run_payload: dict[str, Any] = {} + run_id = str(task_payload.get("run_id") or "") + if run_id: + run_payload = await cutvideo_client.get_run(run_id) + updated = update_job_state( + row["id"], + status="completed", + provider_name="cutvideo", + provider_task_id=row["provider_task_id"], + artifacts={"cutvideo_task": task_payload, "cutvideo_run": run_payload}, + result={"cutvideo_task": task_payload, "cutvideo_run": run_payload}, + ) + return job_context_payload(updated) + if status == "failed": + updated = update_job_state( + row["id"], + status="failed", + error=str(task_payload.get("error") or "CutVideo task failed"), + provider_name="cutvideo", + provider_task_id=row["provider_task_id"], + artifacts={"cutvideo_task": task_payload}, + result={"cutvideo_task": task_payload}, + ) + return job_context_payload(updated) + if now_ts() >= deadline: + updated = update_job_state( + row["id"], + status="failed", + error="CutVideo task timed out", + provider_name="cutvideo", + provider_task_id=row["provider_task_id"], + artifacts={"cutvideo_task": task_payload}, + result={"cutvideo_task": task_payload}, + ) + return job_context_payload(updated) + await asyncio.sleep(CUTVIDEO_POLL_INTERVAL_SEC) + row = load_internal_job(row["id"]) + + +@app.post("/internal/jobs/steps/ai-video/render") +async def internal_ai_video_render( + request: InternalStepRequest | None = Body(default=None), + job_id: str = Query(default=""), + _: bool = Depends(require_orchestrator), +) -> dict[str, Any]: + if not huobao_client.enabled: + raise HTTPException(status_code=503, detail="Huobao is not configured") + + row = load_step_job(request, job_id, "ai_video_pipeline") + artifacts = parse_job_artifacts(row) + assistant = db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],)) if row.get("assistant_id") else None + source_job = None + source_storyboards: list[dict[str, Any]] = [] + source_job_id = str(artifacts.get("source_job_id") or "").strip() + if source_job_id: + source_job = db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (source_job_id, row["user_id"])) + if source_job: + source_storyboards = extract_source_storyboards(source_job) + + if source_storyboards: + storyboard_items = source_storyboards[: max(int(artifacts.get("shots") or 4), 1)] + else: + profile = model_profile_for_account(row["user_id"], row.get("analysis_model_profile_id") or None) + blueprint = await generate_content_blueprint( + profile, + title=row["title"], + transcript_text=str(artifacts.get("brief") or row["title"]), + style_summary=str(artifacts.get("style") or ""), + agent_prompt=(assistant or {}).get("system_prompt", ""), + generation_goal=(assistant or {}).get("generation_goal", "") or "生成适合视频模型的分镜与提示词", + ) + storyboard_items = coerce_storyboards(blueprint.get("storyboards"))[: max(int(artifacts.get("shots") or 4), 1)] + + if not storyboard_items: + raise HTTPException(status_code=400, detail="No storyboards available for AI video rendering") + + drama_payload = await huobao_client.create_drama( + { + "title": row["title"], + "description": str(artifacts.get("brief") or row["title"]), + "style": str(artifacts.get("style") or "realistic"), + "genre": "short_video", + "tags": "storyforge", + } + ) + drama_id = str(drama_payload.get("id") or "") + if not drama_id: + raise RuntimeError("Huobao did not return drama id") + + update_job_state( + row["id"], + status="processing", + provider_name="huobao-drama", + provider_task_id=drama_id, + result={"huobao_drama": drama_payload}, + ) + + rendered_scenes: list[dict[str, Any]] = [] + image_provider = str(artifacts.get("image_provider") or "openai") + image_model = str(artifacts.get("image_model") or "") + video_provider = str(artifacts.get("video_provider") or "doubao") + video_model = str(artifacts.get("video_model") or "") + aspect_ratio = str(artifacts.get("aspect_ratio") or "9:16") + image_size = huobao_image_size_for_aspect_ratio(aspect_ratio) + duration = int(artifacts.get("duration") or 5) + style = str(artifacts.get("style") or "realistic") + + for idx, storyboard in enumerate(storyboard_items, start=1): + first_prompt = str(storyboard.get("first_frame_prompt") or storyboard.get("visual") or storyboard.get("title") or row["title"]) + last_prompt = str(storyboard.get("last_frame_prompt") or storyboard.get("visual") or storyboard.get("title") or row["title"]) + video_prompt = str(storyboard.get("video_prompt") or storyboard.get("narration") or storyboard.get("title") or row["title"]) + + first_image = await huobao_client.generate_image( + { + "drama_id": drama_id, + "image_type": "storyboard", + "frame_type": "first", + "prompt": first_prompt, + "provider": image_provider, + "model": image_model, + "size": image_size, + "style": style, + } + ) + last_image = await huobao_client.generate_image( + { + "drama_id": drama_id, + "image_type": "storyboard", + "frame_type": "last", + "prompt": last_prompt, + "provider": image_provider, + "model": image_model, + "size": image_size, + "style": style, + } + ) + + first_ready = await wait_for_huobao_image(str(first_image.get("id") or "")) + last_ready = await wait_for_huobao_image(str(last_image.get("id") or "")) + if str(first_ready.get("status") or "").lower() != "completed": + raise RuntimeError(f"First frame generation failed for scene {idx}") + if str(last_ready.get("status") or "").lower() != "completed": + raise RuntimeError(f"Last frame generation failed for scene {idx}") + + first_frame_url = first_ready.get("image_url") or first_ready.get("local_path") + last_frame_url = last_ready.get("image_url") or last_ready.get("local_path") + if not first_frame_url or not last_frame_url: + raise RuntimeError(f"Huobao image output missing for scene {idx}") + + video_payload = await huobao_client.generate_video( + { + "drama_id": drama_id, + "prompt": video_prompt, + "provider": video_provider, + "model": video_model, + "reference_mode": "first_last", + "first_frame_url": first_frame_url, + "last_frame_url": last_frame_url, + "aspect_ratio": aspect_ratio, + "duration": duration, + "style": style, + } + ) + video_ready = await wait_for_huobao_video(str(video_payload.get("id") or "")) + if str(video_ready.get("status") or "").lower() != "completed": + raise RuntimeError(f"Video generation failed for scene {idx}") + + rendered_scenes.append( + { + "shot_index": storyboard.get("shot_index", idx), + "title": storyboard.get("title", f"镜头{idx}"), + "narration": storyboard.get("narration", ""), + "first_frame": first_ready, + "last_frame": last_ready, + "video": video_ready, + } + ) + + updated = update_job_state( + row["id"], + status="completed", + provider_name="huobao-drama", + provider_task_id=drama_id, + artifacts={ + "huobao_drama_id": drama_id, + "source_job_id": source_job_id, + }, + result={ + "huobao_drama": drama_payload, + "rendered_scenes": rendered_scenes, + "storyboards": storyboard_items, + }, + ) + return job_context_payload(updated) + + +@app.post("/internal/jobs/{job_id}/status") +def internal_update_job_status(job_id: str, request: JobStatusUpdateRequest, _: bool = Depends(require_orchestrator)) -> dict[str, Any]: + updated = update_job_state( + job_id, + status=request.status, + error=request.error, + provider_name=request.provider_name or None, + provider_task_id=request.provider_task_id or None, + artifacts=request.artifacts, + result=request.result, + ) + return job_context_payload(updated) + + @app.get("/v2/admin/accounts/pending") def pending_accounts(admin: dict[str, Any] = Depends(require_super_admin)) -> list[dict[str, Any]]: rows = db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC") @@ -1017,7 +2231,8 @@ def approve_account(account_id: str, admin: dict[str, Any] = Depends(require_sup (admin["id"], utc_now(), utc_now(), account_id), ) approved = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,)) - ensure_user_kb(account_id) + project = ensure_default_project(account_id, username=approved["username"]) + ensure_user_kb(account_id, project["id"], username=approved["username"]) return {"saved": True, "account": normalize_account(approved)} diff --git a/docker-compose.yml b/docker-compose.yml index d37b6ee..0f78270 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,56 +1,30 @@ -version: "3.9" - services: - mongo: - image: mongo:6 - container_name: storyforge-mongo - restart: unless-stopped - ports: - - "27017:27017" - volumes: - - ./data/mongo:/data/db - - vectorDB: - image: pgvector/pgvector:pg16 - container_name: storyforge-pgvector + n8n: + image: ${N8N_IMAGE:-docker.n8n.io/n8nio/n8n:latest} + container_name: storyforge-n8n restart: unless-stopped environment: - POSTGRES_DB: ${POSTGRES_DB:-fastgpt} - POSTGRES_USER: ${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} + N8N_HOST: ${N8N_HOST:-0.0.0.0} + N8N_PORT: 5678 + N8N_PROTOCOL: ${N8N_PROTOCOL:-http} + WEBHOOK_URL: ${WEBHOOK_URL:-http://127.0.0.1:5670/} + GENERIC_TIMEZONE: ${GENERIC_TIMEZONE:-Asia/Shanghai} + TZ: ${TZ:-Asia/Shanghai} + N8N_SECURE_COOKIE: ${N8N_SECURE_COOKIE:-false} + N8N_ENFORCE_SETTINGS_FILE_PERMISSIONS: ${N8N_ENFORCE_SETTINGS_FILE_PERMISSIONS:-false} ports: - - "5432:5432" + - "5670:5678" volumes: - - ./data/pg:/var/lib/postgresql/data - - redis: - image: redis:7-alpine - container_name: storyforge-redis - restart: unless-stopped - ports: - - "6379:6379" - volumes: - - ./data/redis:/data - - minio: - image: minio/minio:RELEASE.2025-02-07T23-21-09Z - container_name: storyforge-minio - restart: unless-stopped - command: server /data --console-address ":9001" - environment: - MINIO_ROOT_USER: ${MINIO_ROOT_USER:-minioadmin} - MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD:-minioadmin} - ports: - - "9000:9000" - - "9001:9001" - volumes: - - ./data/minio:/data + - ./data/n8n:/home/node/.n8n + - ./n8n:/workspace/n8n:ro collector: build: context: ./collector-service container_name: storyforge-collector restart: unless-stopped + depends_on: + - n8n environment: DATA_DIR: /data/collector DATABASE_PATH: /data/collector/storyforge.db @@ -58,40 +32,29 @@ services: LOCAL_OPENAI_BASE_URL: ${LOCAL_OPENAI_BASE_URL:-http://host.docker.internal:8317/v1} LOCAL_OPENAI_MODEL: ${LOCAL_OPENAI_MODEL:-GLM-5} LOCAL_OPENAI_API_KEY: ${LOCAL_OPENAI_API_KEY:-} - FASTGPT_BASE_URL: ${FASTGPT_BASE_URL:-http://host.docker.internal:3000} - FASTGPT_DATASET_API_KEY: ${FASTGPT_DATASET_API_KEY:-} + N8N_BASE_URL: ${N8N_BASE_URL:-http://n8n:5678} + N8N_ANALYSIS_WEBHOOK_PATH: ${N8N_ANALYSIS_WEBHOOK_PATH:-/webhook/storyforge-analysis} + N8N_REAL_CUT_WEBHOOK_PATH: ${N8N_REAL_CUT_WEBHOOK_PATH:-/webhook/storyforge-real-cut} + N8N_AI_VIDEO_WEBHOOK_PATH: ${N8N_AI_VIDEO_WEBHOOK_PATH:-/webhook/storyforge-ai-video} + ORCHESTRATOR_SHARED_SECRET: ${ORCHESTRATOR_SHARED_SECRET:-storyforge-local-secret} + CUTVIDEO_BASE_URL: ${CUTVIDEO_BASE_URL:-} + CUTVIDEO_API_KEY: ${CUTVIDEO_API_KEY:-} + CUTVIDEO_BASE_CONFIG: ${CUTVIDEO_BASE_CONFIG:-example.job.yaml} + CUTVIDEO_POLL_INTERVAL_SEC: ${CUTVIDEO_POLL_INTERVAL_SEC:-10} + CUTVIDEO_MAX_WAIT_SEC: ${CUTVIDEO_MAX_WAIT_SEC:-1800} + HUOBAO_BASE_URL: ${HUOBAO_BASE_URL:-http://host.docker.internal:5678} YTDLP_BIN: ${YTDLP_BIN:-yt-dlp} FFMPEG_BIN: ${FFMPEG_BIN:-ffmpeg} WHISPER_BIN: ${WHISPER_BIN:-} WHISPER_MODEL: ${WHISPER_MODEL:-/data/collector/models/ggml-base.en.bin} + HUOBAO_POLL_INTERVAL_SEC: ${HUOBAO_POLL_INTERVAL_SEC:-10} + HUOBAO_MAX_WAIT_SEC: ${HUOBAO_MAX_WAIT_SEC:-900} ports: - "8081:8081" volumes: - ./data/collector:/data/collector command: uvicorn app.main:app --host 0.0.0.0 --port 8081 - fastgpt: - image: ghcr.io/labring/fastgpt:latest - container_name: storyforge-fastgpt - restart: unless-stopped - depends_on: - - mongo - - vectorDB - - redis - - minio - ports: - - "3000:3000" - - sandbox: - image: ghcr.io/labring/fastgpt-sandbox:latest - container_name: storyforge-sandbox - restart: unless-stopped - - fastgpt-plugin: - image: ghcr.io/labring/fastgpt-plugin:latest - container_name: storyforge-fastgpt-plugin - restart: unless-stopped - cli-proxy-api: image: ${CLIPROXY_IMAGE:-storyforge/cli-proxy-api:patched} container_name: storyforge-cliproxyapi diff --git a/docs/AUDIT_2026-03-18.md b/docs/AUDIT_2026-03-18.md new file mode 100644 index 0000000..82177b1 --- /dev/null +++ b/docs/AUDIT_2026-03-18.md @@ -0,0 +1,134 @@ +# StoryForge 现状审计 + +日期:2026-03-18 + +## 结论 + +当前应以 `/Users/kris/code/StoryForge-gitea` 作为主工作区继续推进,而不是 `/Users/kris/code/Fastgpt`。后者更像一次不完整的导入快照,前者才是可持续开发的真实仓库。 + +## 现有功能归位 + +### 1. `collector-service` 之前承担的功能 + +- 账号注册、登录、审批 +- 本地模型配置 +- 知识库、智能体、任务管理 +- 视频链接/上传视频/文本三类入口 +- 下载器、ffmpeg、whisper.cpp 风格的本地处理调用 +- Android OTA 查询/发布 + +### 2. FastGPT 实际承担的功能 + +- 仅承担“数据集/文档同步”的外部依赖角色 +- 代码痕迹集中在: + - `collector-service/app/fastgpt.py` + - `docker-compose.yml` + - 若干 `fastgpt_*` 字段 + +结论:FastGPT 并不是业务内核,适合迁移后整体删除。 + +### 3. n8n 适合接管的功能 + +- 任务触发 +- 工作流分流 +- 外部能力编排入口 +- 任务执行顺序控制 + +不适合承载: + +- 用户、项目、Agent、知识库、任务、历史记录的主数据 +- 业务状态唯一真相源 + +结论:应采用“业务状态在 `collector-service`,流程编排在 `n8n`”的分层。 + +## 多用户与数据边界 + +当前已明确采用: + +- `accounts` +- `projects` +- `knowledge_bases` +- `assistants` +- `content_sources` +- `jobs` +- `job_events` + +推荐模型:`user + project`。 + +理由: + +- 只做 `user` 级隔离,会导致一个用户内部不同内容工作流难以再分边界 +- `project` 可以自然承接“一个创作者方向 / 一个客户 / 一个账号矩阵 / 一个内容实验” +- `assistant`、`knowledge_base`、`job`、`content_source` 都能挂到 `project`,便于后续扩展协作空间 + +## 外部链路审计 + +### 1. 下载器 + +- 已存在,不需要重写 +- 现阶段通过 `yt-dlp` 命令集成 + +### 2. ASR + +- 现有实现已部署,但入口未完全标准化 +- 当前后端支持 `ffmpeg + whisper.cpp` 风格接入 +- 若需要接现有常驻 ASR 服务,后续只需把 `transcribe_media()` 改成 HTTP/RPC 适配即可 + +### 3. Windows `cutvideo` + +- 仓库:`/Users/kris/code/cutvideo` +- 具备清晰 API: + - `POST /api/jobs` + - `GET /api/tasks/{task_id}` + - `GET /api/runs/{run_id}` +- 适合集成为“由 StoryForge 后端授权调用的局域网剪辑能力” + +当前限制: + +- 现有 `cutvideo` API 主要接受 `input_dir` +- 对“用户上传实拍素材后直接推送到 Windows 机器”这一步,还缺一层文件转运方案 + +### 4. `huobao-drama` + +- 旧改版位置:`/Users/kris/code/huobaoduanju/huobao-drama-master` +- 最新 upstream:`/Users/kris/code/huobao-drama-upstream` +- 旧改版主要多了一套 `ad_workflow` 方向,和当前 StoryForge MVP 不完全对齐 +- 最新版已具备: + - `POST /api/v1/dramas` + - `POST /api/v1/images` + - `GET /api/v1/images/{id}` + - `POST /api/v1/videos` + - `GET /api/v1/videos/{id}` + - `reference_mode=first_last` + +本次真实联调里,旧改版为了兼容 `qnaigc` 需要补 4 个点: + +- `pkg/image/openai_image_client.go` +- `application/services/image_generation_service.go` +- `pkg/video/openai_sora_client.go` +- `application/services/video_generation_service.go` + +核对结果: + +- 以上 4 个文件与本机 upstream 同名文件在补丁前没有明显结构分叉 +- 当前差异基本就是 `qnaigc` 图片异步查询、Kling 视频 JSON 协议、结果 URL 解析、远程首尾帧 URL 保留这几处兼容逻辑 + +结论:这批补丁是可移植补丁,MVP 已在旧改版实例上验证通过;下一步应把同样补丁迁到最新版 `huobao-drama-upstream`,而不是继续在旧目录长期演进。 + +## 当前已完成迁移面 + +- FastGPT 运行时依赖已从 `collector-service` 主代码中剥离 +- 数据库已支持 `project/content_source/job_events` +- `collector-service` 已增加: + - `n8n` 触发 + - `cutvideo` 集成 client + - `huobao-drama` 集成 client + - 内部编排接口 +- `docker-compose.yml` 已改为 `collector + n8n + cli-proxy-api` +- `n8n` 工作流导出文件已纳入仓库 + +## 当前主要风险 + +1. `cutvideo` 的素材传输还未完整闭环 +2. 本地 ASR 的“最终生产入口”仍需按你现有部署方式再绑一次 +3. `huobao-drama` 已在本机旧改版实例上跑通,但兼容补丁尚未迁到 upstream 仓库并形成正式提交 diff --git a/docs/IMPLEMENTATION_PLAN_2026-03-18.md b/docs/IMPLEMENTATION_PLAN_2026-03-18.md new file mode 100644 index 0000000..d3bdb19 --- /dev/null +++ b/docs/IMPLEMENTATION_PLAN_2026-03-18.md @@ -0,0 +1,97 @@ +# StoryForge 分阶段实施计划 + +日期:2026-03-18 + +## Phase 0: 审计与基线收拢 + +- 确认主工作区 +- 识别 FastGPT 真实职责 +- 识别多用户、多项目需要的主数据模型 +- 对比 `huobao-drama` 旧改版与 upstream +- 审计 `cutvideo` 接口能力 + +状态:已完成 + +## Phase 1: 业务后端改造成主状态中心 + +- 引入 `projects` +- 引入 `content_sources` +- 引入 `job_events` +- 让 `knowledge_bases / assistants / jobs` 全部 project 化 +- 去掉 `collector-service` 中的 FastGPT 运行时逻辑 +- 增加 `agents` 别名接口,统一 Agent 语义 + +状态:已完成首版 + +## Phase 2: n8n 接管流程编排 + +- 公共任务创建接口只负责建任务并触发工作流 +- `n8n` 负责分发: + - `analysis_pipeline` + - `real_cut_pipeline` + - `ai_video_pipeline` +- 业务步骤落在 `collector-service` 内部接口,保证状态统一入库 + +状态:已完成首版 + +## Phase 3: 内容分析主线 MVP + +- 支持文本 +- 支持视频链接 +- 支持上传视频 +- 接下载器 +- 接本地 ASR +- 接本地 LLM +- 产出: + - transcript + - style_summary + - analysis + - rewrite + - storyboards + +状态:已完成首版 + +## Phase 4: 实拍自动剪辑主线 MVP + +- 建立 `real_cut` 任务类型 +- 通过 `n8n -> collector -> cutvideo` 调度 Windows 机器 +- 记录 `task_id / run_id / 结果产物` + +状态:已完成 API 级集成 + +待补: + +- 用户上传素材到 Windows 侧的文件转运闭环 + +## Phase 5: AI 自动生成视频主线 MVP + +- 建立 `ai_video` 任务类型 +- 从分析结果或直接 brief 生成分镜 +- 调 `huobao-drama`: + - 创建 drama + - 生成首帧 + - 生成尾帧 + - 基于首尾帧生成视频 +- 结果回写任务 + +状态:已完成 API 级集成 + +## Phase 6: 删除 FastGPT 运行依赖 + +- 删除代码依赖 +- 删除 compose 服务 +- 删除环境变量 +- 删除 README 说明 + +状态:已完成主仓库首版 + +## Phase 7: 联调与验证 + +- Python 语法检查 +- Compose 配置检查 +- `collector-service` 本地启动 +- `n8n` workflow 导入 +- Windows `cutvideo` 局域网调度 +- `huobao-drama` 本机调用 + +状态:进行中 diff --git a/docs/LAN_E2E_GUIDE_2026-03-18.md b/docs/LAN_E2E_GUIDE_2026-03-18.md new file mode 100644 index 0000000..25b5842 --- /dev/null +++ b/docs/LAN_E2E_GUIDE_2026-03-18.md @@ -0,0 +1,146 @@ +# StoryForge 本地 / 局域网联调说明 + +日期:2026-03-18 + +## 1. 准备 `.env` + +复制: + +```bash +cd /Users/kris/code/StoryForge-gitea +cp .env.example .env +``` + +至少确认这些变量: + +- `N8N_BASE_URL=http://127.0.0.1:5670` +- `ORCHESTRATOR_SHARED_SECRET=storyforge-local-secret` +- `CUTVIDEO_BASE_URL=http://:7860` +- `CUTVIDEO_API_KEY=` 如果 Windows 服务启用了鉴权 +- `HUOBAO_BASE_URL=http://127.0.0.1:5678` +- `WHISPER_BIN=` 指向你现有本地 ASR 可执行文件时填写 + +说明: + +- 如果你单独重建 `collector`,要确保运行时仍带上 `CUTVIDEO_BASE_URL`,否则容器会退回空值 +- 当前已验证可用的 Windows `cutvideo` 地址是 `http://192.168.31.18:7860` + +## 2. 启动基础服务 + +```bash +cd /Users/kris/code/StoryForge-gitea +docker compose up -d --build +``` + +检查: + +- `collector-service`:`http://127.0.0.1:8081/healthz` +- `n8n`:`http://127.0.0.1:5670` +- `cli-proxy-api`:`http://127.0.0.1:8317` +- 本机 `huobao-drama`:`http://127.0.0.1:5678/health` + +## 3. 导入 n8n workflows + +从 `n8n/workflows/` 导入: + +- `storyforge-analysis.json` +- `storyforge-real-cut.json` +- `storyforge-ai-video.json` + +导入后: + +- 检查每个 HTTP Request 节点的 `X-Orchestrator-Secret` +- 如果你改了 `.env` 的 secret,这里必须同步 + +## 4. 登录与审批 + +默认超级管理员: + +- 用户名:`kris` +- 密码:`Asd123456.` + +新用户注册后,需要用超级管理员审批。 + +## 5. 内容分析链路验证 + +### 文本 + +调用 `POST /v2/explore/text` + +预期: + +- 任务创建成功 +- `n8n` webhook 被触发 +- 任务最终进入 `completed` +- 知识库文档里出现 transcript / style_summary / analysis / storyboards + +已验证样例: + +- `job_203bc8e9b20f4b1cbbc6cf7da79e46f4` + +### 视频链接 + +调用 `POST /v2/explore/video-link` + +前提: + +- `yt-dlp` 可用 +- `ffmpeg` 可用 +- ASR 可调用 + +### 上传视频 + +调用 `POST /v2/explore/upload-video` + +预期与视频链接类似,但素材来源为本地上传 + +## 6. `cutvideo` 实拍剪辑链路验证 + +调用 `POST /v2/pipelines/real-cut` + +当前 MVP 前提: + +- `input_dir` 必须是 Windows `cutvideo` 机器可访问的目录 +- 该目录中的素材已准备好 + +预期: + +- 任务创建成功 +- `n8n` 调用 `collector-service` 内部 real-cut step +- 后端记录 `provider_task_id` +- 最终任务写回 `cutvideo_run` + +已验证样例: + +- `job_5ebd829c3f2144bca5c941183e75bdcd` +- Windows 返回 `task_id=8d8f4a0cd5d9` +- 运行目录 `20260318-093520-Windows cutvideo 联调样例` + +## 7. `huobao-drama` AI 视频链路验证 + +调用 `POST /v2/pipelines/ai-video` + +推荐方式: + +- 先完成一个分析任务 +- 再把该分析任务的 `source_job_id` 传给 AI 视频任务 + +预期: + +- 创建 drama +- 每个分镜生成首帧、尾帧 +- 每个分镜生成视频 +- 最终 `job.result.rendered_scenes` 有完整结果 + +已验证样例: + +- `job_01828c40377747cf914b51be360cc333` +- `provider_task_id=10` +- `video.task_id=qvideo-1380265978-1773799215825814468` +- 最终视频已回写到 `job.result.rendered_scenes[0].video.video_url` + +## 8. 当前已知卡点 + +- `cutvideo` 端到端“上传素材后自动送到 Windows 机器”还未彻底闭环 +- ASR 如果不是命令行模式,而是你现有常驻服务模式,需要再做一次入口绑定 +- `huobao-drama` 目前跑通依赖本地旧改版中的 qnaigc 兼容补丁,下一步要迁到 upstream 仓库 diff --git a/docs/MVP_STATUS_2026-03-18.md b/docs/MVP_STATUS_2026-03-18.md new file mode 100644 index 0000000..6cb1220 --- /dev/null +++ b/docs/MVP_STATUS_2026-03-18.md @@ -0,0 +1,41 @@ +# StoryForge MVP 状态 + +日期:2026-03-18 + +## 已跑通或已完成代码接通 + +- 多用户账号体系 +- 审批机制 +- `user -> project -> assistant / knowledge base / job / content source` 数据模型 +- 文本 / 视频链接 / 上传视频 三类分析任务创建 +- `n8n` 工作流导入、激活与触发接口 +- 本地下载器调用 +- 本地 `ffmpeg` / `whisper` 风格入口封装 +- 本地大模型内容分析、二创文案、分镜生成 +- Windows `cutvideo` API 调度与结果回写接口 +- 本机 `huobao-drama` API 调度、首尾帧生成、视频生成与结果回写接口 +- FastGPT 运行时依赖删除 + +## 已验证的真实任务 + +- 分析链路:`job_203bc8e9b20f4b1cbbc6cf7da79e46f4` +- 实拍剪辑链路:`job_5ebd829c3f2144bca5c941183e75bdcd` +- AI 视频链路:`job_01828c40377747cf914b51be360cc333` + +## 已实现但仍待环境验证 + +- 现有 ASR 部署入口与 `collector-service` 的最终绑定 + +## 尚未完全跑通 + +- 用户上传实拍素材后,自动把素材转运到 Windows `cutvideo` 机器的闭环 +- 对“抖音 / bilibili / 小红书账号级内容源”的批量抓取与分析调度 +- `huobao-drama` 本地兼容补丁向 upstream 仓库的迁移、分支化和提交 + +## 下一步优先级 + +1. 把 `huobao-drama` 本地兼容补丁迁到 `/Users/kris/code/huobao-drama-upstream` +2. 绑定你的真实 ASR 入口 +3. 决定实拍素材转运方案:共享目录优先,上传 API 作为备选 +4. 补账号级内容源抓取调度 +5. 把改动整理成提交并推送 diff --git a/n8n/README.md b/n8n/README.md new file mode 100644 index 0000000..ed78a36 --- /dev/null +++ b/n8n/README.md @@ -0,0 +1,28 @@ +# n8n Workflows + +本目录保存 StoryForge 的工作流导出文件,避免流程只存在于 n8n UI。 + +## 工作流 + +- `workflows/storyforge-analysis.json`:内容分析主线 +- `workflows/storyforge-real-cut.json`:Windows `cutvideo` 调度主线 +- `workflows/storyforge-ai-video.json`:`huobao-drama` AI 生成视频主线 + +## 约定 + +- 工作流内部默认通过 `http://collector:8081` 调用 `collector-service` +- 内部调用头部使用 `X-Orchestrator-Secret: storyforge-local-secret` +- 如果你修改了 `.env` 里的 `ORCHESTRATOR_SHARED_SECRET`,导入工作流后需要同步更新对应 HTTP Request 节点 + +## 导入 + +1. 先执行 `docker compose up -d n8n collector` +2. 打开 `http://127.0.0.1:5670` +3. 从 UI 导入本目录下的 3 个 JSON +4. 激活工作流 + +## Webhook 路径 + +- `/webhook/storyforge-analysis` +- `/webhook/storyforge-real-cut` +- `/webhook/storyforge-ai-video` diff --git a/n8n/workflows/storyforge-ai-video.json b/n8n/workflows/storyforge-ai-video.json new file mode 100644 index 0000000..6e35cd3 --- /dev/null +++ b/n8n/workflows/storyforge-ai-video.json @@ -0,0 +1,70 @@ +{ + "name": "StoryForge AI Video Pipeline", + "nodes": [ + { + "parameters": { + "httpMethod": "POST", + "path": "storyforge-ai-video", + "responseMode": "onReceived", + "options": {} + }, + "id": "aivideo-webhook", + "name": "AI Video Webhook", + "type": "n8n-nodes-base.webhook", + "typeVersion": 2, + "position": [ + 220, + 300 + ], + "webhookId": "storyforge-ai-video" + }, + { + "parameters": { + "method": "POST", + "url": "={{'http://collector:8081/internal/jobs/steps/ai-video/render?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "sendHeaders": true, + "headerParameters": { + "parameters": [ + { + "name": "X-Orchestrator-Secret", + "value": "storyforge-local-secret" + } + ] + }, + "options": { + "timeout": 3600000 + } + }, + "id": "aivideo-runner", + "name": "Run AI Video Step", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [ + 520, + 300 + ] + } + ], + "connections": { + "AI Video Webhook": { + "main": [ + [ + { + "node": "Run AI Video Step", + "type": "main", + "index": 0 + } + ] + ] + }, + "Run AI Video Step": { + "main": [ + [] + ] + } + }, + "active": false, + "settings": {}, + "pinData": {}, + "versionId": "storyforge-ai-video-v1" +} diff --git a/n8n/workflows/storyforge-analysis.json b/n8n/workflows/storyforge-analysis.json new file mode 100644 index 0000000..77b910b --- /dev/null +++ b/n8n/workflows/storyforge-analysis.json @@ -0,0 +1,70 @@ +{ + "name": "StoryForge Analysis Pipeline", + "nodes": [ + { + "parameters": { + "httpMethod": "POST", + "path": "storyforge-analysis", + "responseMode": "onReceived", + "options": {} + }, + "id": "analysis-webhook", + "name": "Analysis Webhook", + "type": "n8n-nodes-base.webhook", + "typeVersion": 2, + "position": [ + 220, + 300 + ], + "webhookId": "storyforge-analysis" + }, + { + "parameters": { + "method": "POST", + "url": "={{'http://collector:8081/internal/jobs/steps/analyze?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "sendHeaders": true, + "headerParameters": { + "parameters": [ + { + "name": "X-Orchestrator-Secret", + "value": "storyforge-local-secret" + } + ] + }, + "options": { + "timeout": 600000 + } + }, + "id": "analysis-runner", + "name": "Run Analysis Step", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [ + 520, + 300 + ] + } + ], + "connections": { + "Analysis Webhook": { + "main": [ + [ + { + "node": "Run Analysis Step", + "type": "main", + "index": 0 + } + ] + ] + }, + "Run Analysis Step": { + "main": [ + [] + ] + } + }, + "active": false, + "settings": {}, + "pinData": {}, + "versionId": "storyforge-analysis-v1" +} diff --git a/n8n/workflows/storyforge-real-cut.json b/n8n/workflows/storyforge-real-cut.json new file mode 100644 index 0000000..5fd2892 --- /dev/null +++ b/n8n/workflows/storyforge-real-cut.json @@ -0,0 +1,70 @@ +{ + "name": "StoryForge Real Cut Pipeline", + "nodes": [ + { + "parameters": { + "httpMethod": "POST", + "path": "storyforge-real-cut", + "responseMode": "onReceived", + "options": {} + }, + "id": "realcut-webhook", + "name": "Real Cut Webhook", + "type": "n8n-nodes-base.webhook", + "typeVersion": 2, + "position": [ + 220, + 300 + ], + "webhookId": "storyforge-real-cut" + }, + { + "parameters": { + "method": "POST", + "url": "={{'http://collector:8081/internal/jobs/steps/real-cut/run?job_id=' + ($json.body.job_id || $json.body.jobId)}}", + "sendHeaders": true, + "headerParameters": { + "parameters": [ + { + "name": "X-Orchestrator-Secret", + "value": "storyforge-local-secret" + } + ] + }, + "options": { + "timeout": 3600000 + } + }, + "id": "realcut-runner", + "name": "Run Real Cut Step", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4.2, + "position": [ + 520, + 300 + ] + } + ], + "connections": { + "Real Cut Webhook": { + "main": [ + [ + { + "node": "Run Real Cut Step", + "type": "main", + "index": 0 + } + ] + ] + }, + "Run Real Cut Step": { + "main": [ + [] + ] + } + }, + "active": false, + "settings": {}, + "pinData": {}, + "versionId": "storyforge-real-cut-v1" +}