diff --git a/collector-service/app/database.py b/collector-service/app/database.py index c3bb5a2..f83b801 100644 --- a/collector-service/app/database.py +++ b/collector-service/app/database.py @@ -211,6 +211,30 @@ class Database: FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL ); + CREATE TABLE IF NOT EXISTS publish_reviews ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT, + source_job_id TEXT, + assistant_id TEXT, + title TEXT NOT NULL, + platform TEXT NOT NULL DEFAULT 'douyin', + content_type TEXT NOT NULL DEFAULT 'video', + publish_url TEXT NOT NULL DEFAULT '', + published_at TEXT NOT NULL DEFAULT '', + metrics_json TEXT NOT NULL DEFAULT '{}', + verdict TEXT NOT NULL DEFAULT '', + highlights TEXT NOT NULL DEFAULT '', + next_actions TEXT NOT NULL DEFAULT '', + notes TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(source_job_id) REFERENCES jobs(id) ON DELETE SET NULL, + FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL + ); + CREATE TABLE IF NOT EXISTS job_events ( id TEXT PRIMARY KEY, job_id TEXT NOT NULL, diff --git a/collector-service/app/main.py b/collector-service/app/main.py index 5557060..dea0062 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -1,17 +1,20 @@ from __future__ import annotations import asyncio +import httpx import json import os import re import secrets import shutil +import socket import subprocess import sys import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any +from urllib.parse import urljoin, urlparse from fastapi import Body, Depends, FastAPI, File, Form, Header, HTTPException, Query, UploadFile from fastapi.middleware.cors import CORSMiddleware @@ -251,6 +254,36 @@ class AiVideoJobRequest(BaseModel): duration: int = 5 +class ReviewCreateRequest(BaseModel): + project_id: str = "" + source_job_id: str = "" + assistant_id: str = "" + title: str = "" + platform: str = "douyin" + content_type: str = "video" + publish_url: str = "" + published_at: str = "" + metrics: dict[str, Any] = Field(default_factory=dict) + verdict: str = "" + highlights: str = "" + next_actions: str = "" + notes: str = "" + + +class ReviewUpdateRequest(BaseModel): + title: str | None = None + platform: str | None = None + content_type: str | None = None + publish_url: str | None = None + published_at: str | None = None + metrics: dict[str, Any] | None = None + verdict: str | None = None + highlights: str | None = None + next_actions: str | None = None + notes: str | None = None + assistant_id: str | None = None + + class InternalStepRequest(BaseModel): job_id: str = "" jobId: str = "" @@ -521,6 +554,41 @@ def assistant_payload(row: dict[str, Any]) -> dict[str, Any]: } +def review_payload(row: dict[str, Any]) -> dict[str, Any]: + metrics = parse_json_object(row.get("metrics_json") or "{}") + source_job = None + assistant = None + if row.get("source_job_id"): + source_job_row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (row["source_job_id"],)) + if source_job_row: + source_job = job_payload(source_job_row) + if row.get("assistant_id"): + assistant_row = db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],)) + if assistant_row: + assistant = assistant_payload(assistant_row) + return { + "id": row["id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", ""), + "source_job_id": row.get("source_job_id", ""), + "assistant_id": row.get("assistant_id", ""), + "title": row.get("title", ""), + "platform": row.get("platform", "douyin"), + "content_type": row.get("content_type", "video"), + "publish_url": row.get("publish_url", ""), + "published_at": row.get("published_at", ""), + "metrics": metrics, + "verdict": row.get("verdict", ""), + "highlights": row.get("highlights", ""), + "next_actions": row.get("next_actions", ""), + "notes": row.get("notes", ""), + "source_job": source_job, + "assistant": assistant, + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + def document_payload(row: dict[str, Any]) -> dict[str, Any]: analysis_map = parse_json_object(row.get("analysis_json") or "{}") source_artifacts = parse_json_object(row.get("source_artifact_json") or "{}") @@ -1353,6 +1421,44 @@ async def process_job(job_id: str) -> None: update_job_state(job_id, status="failed", error=str(exc)) +def probe_tcp(url: str, timeout: float = 3.0) -> dict[str, Any]: + if not url: + return {"configured": False, "reachable": False, "status_code": 0, "error": "not_configured", "url": ""} + parsed = urlparse(url) + host = parsed.hostname + port = parsed.port or (443 if parsed.scheme == "https" else 80) + if not host: + return {"configured": True, "reachable": False, "status_code": 0, "error": "invalid_url", "url": url} + sock = socket.socket() + sock.settimeout(timeout) + try: + sock.connect((host, port)) + return {"configured": True, "reachable": True, "status_code": 0, "error": "", "url": url} + except Exception as exc: # pragma: no cover - operational probe + return {"configured": True, "reachable": False, "status_code": 0, "error": str(exc), "url": url} + finally: + sock.close() + + +def probe_http(url: str, path: str = "", timeout: float = 3.0) -> dict[str, Any]: + tcp = probe_tcp(url, timeout=timeout) + target_url = urljoin(url if url.endswith("/") else f"{url}/", path.lstrip("/")) if url else "" + if not tcp["configured"] or not tcp["reachable"]: + if target_url: + tcp["url"] = target_url + return tcp + try: + response = httpx.get(target_url or url, timeout=timeout, follow_redirects=True) + tcp["status_code"] = response.status_code + tcp["reachable"] = response.status_code < 500 + tcp["error"] = "" if response.status_code < 500 else f"http_{response.status_code}" + except Exception as exc: # pragma: no cover - operational probe + tcp["reachable"] = False + tcp["error"] = str(exc) + tcp["url"] = target_url or url + return tcp + + @app.on_event("startup") def on_startup() -> None: db.init_schema() @@ -1374,6 +1480,29 @@ def healthz() -> dict[str, Any]: } +@app.get("/v2/integrations/health") +def integrations_health(account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + _ = account + return { + "cutvideo": { + "base_url": CUTVIDEO_BASE_URL, + **probe_http(CUTVIDEO_BASE_URL, "/api/bootstrap"), + }, + "huobao": { + "base_url": HUOBAO_BASE_URL, + **probe_http(HUOBAO_BASE_URL, "/health"), + }, + "n8n": { + "base_url": N8N_BASE_URL, + **probe_http(N8N_BASE_URL, "/healthz"), + }, + "asr": { + "base_url": ASR_HTTP_BASE_URL, + **probe_tcp(ASR_HTTP_BASE_URL), + }, + } + + def seed_defaults() -> None: if not db.fetch_one("SELECT id FROM model_profiles WHERE is_default = 1 LIMIT 1"): profile_id = make_id("model") @@ -1746,6 +1875,107 @@ def list_knowledge_documents(knowledge_base_id: str, account: dict[str, Any] = D return [document_payload(row) for row in rows] +@app.get("/v2/reviews") +def list_reviews( + project_id: str | None = Query(default=None), + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(require_approved), +) -> list[dict[str, Any]]: + clauses = ["user_id = ?"] + params: list[Any] = [account["id"]] + if project_id is not None: + normalized_project = project_id.strip() + if normalized_project: + clauses.append("project_id = ?") + params.append(normalized_project) + else: + clauses.append("(project_id IS NULL OR project_id = '')") + sql = f"SELECT * FROM publish_reviews WHERE {' AND '.join(clauses)} ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT ?" + params.append(limit) + return [review_payload(row) for row in db.fetch_all(sql, tuple(params))] + + +@app.post("/v2/reviews") +def create_review(request: ReviewCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + source_job = None + if request.source_job_id.strip(): + source_job = load_owned_job(request.source_job_id.strip(), account["id"]) + requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "") + project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + review_id = make_id("review") + title = request.title.strip() or (source_job.get("title", "") if source_job else "") + if not title: + title = f"{project['name']} 复盘" + timestamp = utc_now() + db.execute( + """ + INSERT INTO publish_reviews ( + id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type, + publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + review_id, + account["id"], + project["id"], + source_job["id"] if source_job else None, + (assistant or {}).get("id") or None, + title, + request.platform or "douyin", + request.content_type or "video", + request.publish_url.strip(), + request.published_at.strip(), + json.dumps(request.metrics, ensure_ascii=False), + request.verdict.strip(), + request.highlights.strip(), + request.next_actions.strip(), + request.notes.strip(), + timestamp, + timestamp, + ), + ) + row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return review_payload(row) + + +@app.patch("/v2/reviews/{review_id}") +def update_review(review_id: str, request: ReviewUpdateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + current = load_owned_review(review_id, account["id"]) + assistant_id = current.get("assistant_id") or None + if request.assistant_id is not None: + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, current.get("project_id", "")) + assistant_id = (assistant or {}).get("id") or None + db.execute( + """ + UPDATE publish_reviews + SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?, + metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?, + assistant_id = ?, updated_at = ? + WHERE id = ? AND user_id = ? + """, + ( + request.title if request.title is not None else current.get("title", ""), + request.platform if request.platform is not None else current.get("platform", "douyin"), + request.content_type if request.content_type is not None else current.get("content_type", "video"), + request.publish_url if request.publish_url is not None else current.get("publish_url", ""), + request.published_at if request.published_at is not None else current.get("published_at", ""), + json.dumps(request.metrics if request.metrics is not None else parse_json_object(current.get("metrics_json") or "{}"), ensure_ascii=False), + request.verdict if request.verdict is not None else current.get("verdict", ""), + request.highlights if request.highlights is not None else current.get("highlights", ""), + request.next_actions if request.next_actions is not None else current.get("next_actions", ""), + request.notes if request.notes is not None else current.get("notes", ""), + assistant_id, + utc_now(), + review_id, + account["id"], + ), + ) + row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return review_payload(row) + + @app.get("/v2/explore/jobs") def list_jobs( parent_job_id: str | None = Query(default=None), @@ -2178,6 +2408,13 @@ def load_owned_content_source(source_id: str, account_id: str) -> dict[str, Any] return row +def load_owned_review(review_id: str, account_id: str) -> dict[str, Any]: + row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ? AND user_id = ?", (review_id, account_id)) + if not row: + raise HTTPException(status_code=404, detail="Review not found") + return row + + def load_internal_job(job_id: str) -> dict[str, Any]: row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) if not row: diff --git a/web/storyforge-web-v4/assets/app.js b/web/storyforge-web-v4/assets/app.js index 69f7bd3..fdb57a0 100644 --- a/web/storyforge-web-v4/assets/app.js +++ b/web/storyforge-web-v4/assets/app.js @@ -22,6 +22,8 @@ const appState = { lastSeenAt: Number(localStorage.getItem(STORAGE_KEY + ":lastSeenAt") || Date.now()), trackingAccounts: [], trackingDigest: null, + reviews: [], + integrationHealth: null, busy: false, message: "", lastAction: null, @@ -98,6 +100,9 @@ function statusTone(status) { const normalized = String(status || "").toLowerCase(); if (["completed", "ready", "approved", "ok"].includes(normalized)) return "green"; if (["failed", "error", "rejected"].includes(normalized)) return "red"; + if (["worth_scaling", "good_reference"].includes(normalized)) return "green"; + if (["needs_rework"].includes(normalized)) return "red"; + if (["hold"].includes(normalized)) return "orange"; if (["running", "processing", "pending", "queued"].includes(normalized)) return "orange"; return "blue"; } @@ -489,6 +494,8 @@ async function logoutSession() { appState.documents = []; appState.trackingAccounts = []; appState.trackingDigest = null; + appState.reviews = []; + appState.integrationHealth = null; appState.lastAction = null; appState.lastGeneratedCopy = null; appState.lastSimilaritySearch = null; @@ -547,11 +554,13 @@ async function bootstrap() { renderAll(); return; } - const [dashboard, contentSources, accounts, trackingAccountsPayload] = await Promise.all([ + const [dashboard, contentSources, accounts, trackingAccountsPayload, reviews, integrationHealth] = await Promise.all([ storyforgeFetch("/v2/me/dashboard"), storyforgeFetch("/v2/content-sources").catch(() => []), storyforgeFetch("/v2/douyin/accounts").catch(() => []), - storyforgeFetch("/v2/douyin/tracking/accounts").catch(() => ({ items: [], cursor_last_seen_at: "" })) + storyforgeFetch("/v2/douyin/tracking/accounts").catch(() => ({ items: [], cursor_last_seen_at: "" })), + storyforgeFetch("/v2/reviews").catch(() => []), + storyforgeFetch("/v2/integrations/health").catch(() => null) ]); const trackingCursorLastSeenAt = trackingAccountsPayload?.cursor_last_seen_at || ""; if (trackingCursorLastSeenAt) { @@ -568,6 +577,8 @@ async function bootstrap() { appState.accounts = safeArray(accounts); appState.trackingAccounts = safeArray(trackingAccountsPayload.items || trackingAccountsPayload); appState.trackingDigest = trackingDigest; + appState.reviews = safeArray(reviews); + appState.integrationHealth = integrationHealth; appState.documents = await loadKnowledgeDocuments(dashboard.knowledge_bases); appState.selectedProjectId = appState.selectedProjectId || dashboard.projects?.[0]?.id || ""; const selectedAssistantExists = safeArray(dashboard.assistants).some((item) => item.id === appState.selectedAssistantId); @@ -650,6 +661,14 @@ function getProjectStats(projectId) { return { knowledgeBases, assistants, jobs, sources }; } +function getProjectReviews(projectId) { + return safeArray(appState.reviews).filter((item) => item.project_id === projectId); +} + +function getReviewById(reviewId) { + return safeArray(appState.reviews).find((item) => item.id === reviewId) || null; +} + function getContentSourcesForAccount(account) { if (!account) return []; const profileUrl = String(account.profile_url || "").trim(); @@ -1383,6 +1402,13 @@ function renderAutomationScreen() { const analysisJobs = jobs.filter((item) => item.line_type === "analysis").length; const aiVideoJobs = jobs.filter((item) => item.line_type === "ai_video").length; const realCutJobs = jobs.filter((item) => item.line_type === "real_cut").length; + const integrations = appState.integrationHealth || {}; + const integrationCards = [ + { key: "cutvideo", label: "自动剪辑", hint: "Windows cutvideo" }, + { key: "huobao", label: "AI 视频", hint: "huobao-drama" }, + { key: "n8n", label: "编排", hint: "n8n workflow" }, + { key: "asr", label: "ASR", hint: "转写服务" } + ]; return screenShell( "自动流程", "自动同步、日报生成和失败补跑先统一看这里。", @@ -1398,6 +1424,26 @@ function renderAutomationScreen() {
${escapeHtml(item.hint)}
+ +${escapeHtml(brief(job.style_summary || job.transcript_text || "已完成,待补复盘。", 84))}
- +${escapeHtml(brief(review.highlights || review.next_actions || review.notes || "已保存复盘,待继续补充表现数据。", 92))}
+ +可以把最近完成任务直接写成一条复盘。
先去生产中心跑一条链路。
${escapeHtml(brief(job.style_summary || job.transcript_text || "已完成,待补复盘。", 84))}
+ +先去生产中心跑一条链路。
${escapeHtml(brief(detail.job.style_summary || detail.job.transcript_text || detail.job.error || "暂无摘要", 120))}