Files
storyforge/collector-service/app/main.py
2026-03-14 21:32:55 +08:00

1116 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import os
import secrets
import shutil
import subprocess
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import BackgroundTasks, Depends, FastAPI, File, Form, Header, HTTPException, Query, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from .database import Database, utc_now
from .fastgpt import FastGPTClient
from .openai_compat import OpenAICompatClient
BASE_DIR = Path(__file__).resolve().parents[2]
DATA_DIR = Path(os.getenv("DATA_DIR", BASE_DIR / "data" / "collector"))
DOWNLOADS_DIR = DATA_DIR / "downloads"
JOBS_DIR = DATA_DIR / "jobs"
MODELS_DIR = DATA_DIR / "models"
DB_PATH = os.getenv("DATABASE_PATH", str(DATA_DIR / "storyforge.db"))
DEFAULT_EXTERNAL_BASE_URL = os.getenv("DEFAULT_EXTERNAL_BASE_URL", "https://test.hyzq.net/storyforge")
LOCAL_OPENAI_BASE_URL = os.getenv("LOCAL_OPENAI_BASE_URL", "http://127.0.0.1:8317/v1")
LOCAL_OPENAI_MODEL = os.getenv("LOCAL_OPENAI_MODEL", "GLM-5")
LOCAL_OPENAI_API_KEY = os.getenv("LOCAL_OPENAI_API_KEY", "")
FASTGPT_BASE_URL = os.getenv("FASTGPT_BASE_URL", "http://127.0.0.1:3000")
FASTGPT_DATASET_API_KEY = os.getenv("FASTGPT_DATASET_API_KEY", "")
YTDLP_BIN = os.getenv("YTDLP_BIN", "yt-dlp")
FFMPEG_BIN = os.getenv("FFMPEG_BIN", "ffmpeg")
WHISPER_BIN = os.getenv("WHISPER_BIN", "")
WHISPER_MODEL = os.getenv("WHISPER_MODEL", str(MODELS_DIR / "ggml-base.en.bin"))
for path in (DATA_DIR, DOWNLOADS_DIR, JOBS_DIR, MODELS_DIR):
path.mkdir(parents=True, exist_ok=True)
db = Database(DB_PATH)
openai_client = OpenAICompatClient()
fastgpt_client = FastGPTClient(base_url=FASTGPT_BASE_URL, dataset_api_key=FASTGPT_DATASET_API_KEY)
app = FastAPI(title="StoryForge Collector Service", version="0.2.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/downloads", StaticFiles(directory=str(DOWNLOADS_DIR)), name="downloads")
class RegisterAccountRequest(BaseModel):
username: str
password: str
display_name: str = ""
class LoginRequest(BaseModel):
username: str
password: str
class ModelProfileRequest(BaseModel):
name: str
base_url: str
api_key: str = ""
model_name: str
is_default: bool = False
class PreferredModelRequest(BaseModel):
model_profile_id: str
class KnowledgeBaseCreateRequest(BaseModel):
name: str
description: str = ""
class ExploreVideoLinkRequest(BaseModel):
video_url: str
title: str | None = None
knowledge_base_id: str | None = None
assistant_id: str | None = None
analysis_model_profile_id: str | None = None
language: str = "auto"
class ExploreTextRequest(BaseModel):
title: str
content: str
knowledge_base_id: str | None = None
assistant_id: str | None = None
analysis_model_profile_id: str | None = None
class AssistantCreateRequest(BaseModel):
name: str
description: str = ""
system_prompt: str = ""
generation_goal: str = ""
knowledge_base_ids: list[str] = Field(default_factory=list)
fastgpt_app_key: str = ""
model_profile_id: str = ""
class AssistantUpdateRequest(BaseModel):
name: str | None = None
description: str | None = None
system_prompt: str | None = None
generation_goal: str | None = None
knowledge_base_ids: list[str] | None = None
fastgpt_app_key: str | None = None
model_profile_id: str | None = None
class GenerateCopyRequest(BaseModel):
brief: str
platform: str = "抖音"
audience: str = "创业者"
extra_requirements: str = ""
knowledge_base_ids: list[str] = Field(default_factory=list)
class PublishAppUpdateRequest(BaseModel):
platform: str = "android"
channel: str = "stable"
versionCode: int
versionName: str
minSupportedCode: int
apkUrl: str
apkSha256: str = ""
notes: str = ""
forceUpdate: bool = False
isActive: bool = True
def now_ts() -> int:
return int(datetime.now(timezone.utc).timestamp())
def make_id(prefix: str) -> str:
return f"{prefix}_{uuid.uuid4().hex}"
def hash_password(password: str, salt: str) -> str:
import hashlib
return hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt.encode("utf-8"), 120_000).hex()
def create_password_hash(password: str) -> tuple[str, str]:
salt = secrets.token_hex(16)
return hash_password(password, salt), salt
def verify_password(password: str, hashed: str, salt: str) -> bool:
return secrets.compare_digest(hash_password(password, salt), hashed)
def mask_api_key(value: str) -> str:
if not value:
return ""
if len(value) <= 8:
return "*" * len(value)
return f"{value[:4]}***{value[-4:]}"
def normalize_model_profile(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"owner_account_id": row.get("owner_account_id"),
"name": row["name"],
"provider": row["provider"],
"base_url": row["base_url"],
"api_key_masked": mask_api_key(row.get("api_key", "")),
"model_name": row["model_name"],
"is_system": bool(row.get("is_system", 0)),
"is_default": bool(row.get("is_default", 0)),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def normalize_account(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"username": row["username"],
"display_name": row["display_name"],
"role": row["role"],
"approval_status": row["approval_status"],
"approved_by": row.get("approved_by"),
"approved_at": row.get("approved_at"),
"preferred_analysis_model_id": row.get("preferred_analysis_model_id") or "",
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def model_profile_for_account(account_id: str, requested_id: str | None) -> dict[str, Any]:
if requested_id:
row = db.fetch_one(
"SELECT * FROM model_profiles WHERE id = ? AND (owner_account_id IS NULL OR owner_account_id = ?)",
(requested_id, account_id),
)
if row:
return row
account = db.fetch_one("SELECT preferred_analysis_model_id FROM accounts WHERE id = ?", (account_id,))
preferred_id = (account or {}).get("preferred_analysis_model_id") or ""
if preferred_id:
row = db.fetch_one(
"SELECT * FROM model_profiles WHERE id = ? AND (owner_account_id IS NULL OR owner_account_id = ?)",
(preferred_id, account_id),
)
if row:
return row
row = db.fetch_one("SELECT * FROM model_profiles WHERE is_default = 1 ORDER BY is_system DESC, created_at ASC LIMIT 1")
if not row:
raise HTTPException(status_code=500, detail="No model profile configured")
return row
def knowledge_base_payload(row: dict[str, Any]) -> dict[str, Any]:
document_count = db.fetch_one(
"SELECT COUNT(*) AS count FROM knowledge_documents WHERE knowledge_base_id = ?",
(row["id"],),
)["count"]
linked_count = db.fetch_one(
"SELECT COUNT(*) AS count FROM assistant_knowledge_bases WHERE knowledge_base_id = ?",
(row["id"],),
)["count"]
return {
"id": row["id"],
"user_id": row["user_id"],
"name": row["name"],
"description": row.get("description", ""),
"fastgpt_dataset_id": row.get("fastgpt_dataset_id"),
"sync_status": row.get("sync_status", "pending"),
"document_count": document_count,
"linked_assistant_count": linked_count,
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def assistant_payload(row: dict[str, Any]) -> dict[str, Any]:
kb_rows = db.fetch_all(
"SELECT knowledge_base_id FROM assistant_knowledge_bases WHERE assistant_id = ? ORDER BY knowledge_base_id ASC",
(row["id"],),
)
return {
"id": row["id"],
"user_id": row["user_id"],
"name": row["name"],
"description": row.get("description", ""),
"system_prompt": row.get("system_prompt", ""),
"generation_goal": row.get("generation_goal", ""),
"knowledge_base_ids": [item["knowledge_base_id"] for item in kb_rows],
"fastgpt_app_key": row.get("fastgpt_app_key", ""),
"model_profile_id": row.get("model_profile_id", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def document_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"knowledge_base_id": row["knowledge_base_id"],
"title": row["title"],
"source_type": row["source_type"],
"source_url": row.get("source_url", ""),
"transcript_text": row.get("transcript_text", ""),
"style_summary": row.get("style_summary", ""),
"combined_text": row.get("combined_text", ""),
"fastgpt_collection_id": row.get("fastgpt_collection_id", ""),
"analysis_model_profile_id": row.get("analysis_model_profile_id", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def job_payload(row: dict[str, Any]) -> dict[str, Any]:
artifacts = row.get("artifacts_json") or "{}"
try:
artifacts_map = json.loads(artifacts)
except json.JSONDecodeError:
artifacts_map = {}
return {
"id": row["id"],
"user_id": row["user_id"],
"assistant_id": row.get("assistant_id"),
"knowledge_base_id": row["knowledge_base_id"],
"source_type": row["source_type"],
"source_url": row.get("source_url"),
"title": row["title"],
"language": row.get("language", "auto"),
"status": row["status"],
"transcript_text": row.get("transcript_text", ""),
"style_summary": row.get("style_summary", ""),
"fastgpt_collection_id": row.get("fastgpt_collection_id", ""),
"upload_status": row.get("upload_status", "pending"),
"error": row.get("error", ""),
"artifacts": artifacts_map,
"analysis_model_profile_id": row.get("analysis_model_profile_id", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def require_auth(authorization: str | None = Header(default=None)) -> dict[str, Any]:
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing bearer token")
token = authorization.split(" ", 1)[1].strip()
token_row = db.fetch_one("SELECT * FROM auth_tokens WHERE token = ?", (token,))
if not token_row:
raise HTTPException(status_code=401, detail="Invalid token")
account = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (token_row["account_id"],))
if not account:
raise HTTPException(status_code=401, detail="Account not found")
return account
def require_approved(account: dict[str, Any] = Depends(require_auth)) -> dict[str, Any]:
if account["approval_status"] != "approved":
raise HTTPException(status_code=403, detail="Account pending approval")
return account
def require_super_admin(account: dict[str, Any] = Depends(require_auth)) -> dict[str, Any]:
if account["role"] != "super_admin":
raise HTTPException(status_code=403, detail="Super admin required")
return account
async def call_model(profile: dict[str, Any], system_prompt: str, user_prompt: str, temperature: float = 0.4) -> str:
try:
content = await openai_client.chat_completion(
base_url=profile["base_url"],
api_key=profile.get("api_key", ""),
model=profile["model_name"],
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature,
)
if content:
return content.strip()
except Exception:
pass
excerpt = user_prompt.strip().replace("\n", " ")[:220]
return f"风格摘要:内容以强结论开头,节奏偏短句,强调冲突转折和行动指令。素材摘要:{excerpt}"
async def summarize_style(profile: dict[str, Any], transcript_text: str, title: str) -> str:
prompt = (
f"标题:{title}\n\n"
f"素材全文:\n{transcript_text}\n\n"
"请提炼这段素材的文案风格、结构节奏、开头钩子、情绪推进、收尾 CTA并给出可复用的学习结论。"
)
system_prompt = "你是短视频文案拆解师,输出简洁、结构化、适合沉淀进知识库。"
return await call_model(profile, system_prompt, prompt, temperature=0.3)
def fallback_transcript_from_text(title: str, content: str) -> str:
return f"标题:{title}\n\n正文:\n{content.strip()}"
def command_exists(name: str) -> bool:
return shutil.which(name) is not None
def run_command(command: list[str], cwd: Path | None = None) -> tuple[int, str, str]:
proc = subprocess.run(command, cwd=str(cwd) if cwd else None, capture_output=True, text=True)
return proc.returncode, proc.stdout, proc.stderr
def transcribe_media(job_dir: Path, source_path: Path, title: str, source_url: str = "") -> tuple[str, dict[str, str]]:
artifacts: dict[str, str] = {}
transcript = ""
media_path = source_path
artifacts["source_path"] = str(media_path)
if not source_path.exists():
transcript = (
f"素材标题:{title}\n"
f"素材来源:{source_url or source_path.name}\n\n"
"当前环境未找到可直接处理的本地视频文件,已记录来源信息并进入降级学习流程。"
)
return transcript, artifacts
audio_path = job_dir / "audio.wav"
if command_exists(FFMPEG_BIN):
code, _, err = run_command([FFMPEG_BIN, "-y", "-i", str(source_path), "-ar", "16000", "-ac", "1", str(audio_path)])
if code == 0 and audio_path.exists():
artifacts["audio_path"] = str(audio_path)
media_path = audio_path
elif err:
artifacts["ffmpeg_error"] = err.strip()[:500]
if WHISPER_BIN and Path(WHISPER_BIN).exists() and Path(WHISPER_MODEL).exists():
out_prefix = job_dir / "whisper"
code, stdout, stderr = run_command([
WHISPER_BIN,
"-m",
WHISPER_MODEL,
"-f",
str(media_path),
"-otxt",
"-of",
str(out_prefix),
])
txt_path = Path(str(out_prefix) + ".txt")
if code == 0 and txt_path.exists():
transcript = txt_path.read_text(encoding="utf-8", errors="ignore").strip()
artifacts["transcript_path"] = str(txt_path)
else:
artifacts["whisper_stdout"] = stdout.strip()[:500]
artifacts["whisper_error"] = stderr.strip()[:500]
if not transcript:
transcript = (
f"素材标题:{title}\n"
f"素材来源:{source_url or source_path.name}\n\n"
"当前环境未完成真实 ASR已保留原始素材供后续转写。请结合标题、来源和上下文进行初步风格学习。"
)
return transcript, artifacts
def ensure_user_kb(account_id: str) -> dict[str, Any]:
row = db.fetch_one("SELECT * FROM knowledge_bases WHERE user_id = ? ORDER BY created_at ASC LIMIT 1", (account_id,))
if row:
return row
kb_id = make_id("kb")
now = utc_now()
db.execute(
"INSERT INTO knowledge_bases (id, user_id, name, description, sync_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(kb_id, account_id, "默认知识库", "系统为新用户自动创建", "pending", now, now),
)
return db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (kb_id,))
async def sync_document_to_fastgpt(kb_row: dict[str, Any], title: str, combined_text: str) -> tuple[str, str]:
dataset_id = kb_row.get("fastgpt_dataset_id") or ""
collection_id = ""
sync_status = kb_row.get("sync_status", "pending")
if not fastgpt_client.enabled:
return collection_id, "local_only"
try:
if not dataset_id:
dataset = await fastgpt_client.ensure_dataset(kb_row["name"], kb_row.get("description", ""))
dataset_id = str((dataset or {}).get("_id") or (dataset or {}).get("id") or "")
if dataset_id:
db.execute(
"UPDATE knowledge_bases SET fastgpt_dataset_id = ?, sync_status = ?, updated_at = ? WHERE id = ?",
(dataset_id, "synced", utc_now(), kb_row["id"]),
)
if dataset_id:
data = await fastgpt_client.add_text_document(dataset_id, title, combined_text)
collection_id = str((data or {}).get("_id") or (data or {}).get("id") or "")
sync_status = "synced"
except Exception:
sync_status = "pending"
return collection_id, sync_status
async def process_job(job_id: str) -> None:
row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))
if not row:
return
now = utc_now()
db.execute("UPDATE jobs SET status = ?, updated_at = ? WHERE id = ?", ("processing", now, job_id))
try:
artifacts = json.loads(row.get("artifacts_json") or "{}")
transcript_text = row.get("transcript_text", "")
job_dir = JOBS_DIR / job_id
job_dir.mkdir(parents=True, exist_ok=True)
if row["source_type"] == "text":
transcript_text = fallback_transcript_from_text(row["title"], artifacts.get("input_text", ""))
elif row["source_type"] == "video_link":
downloaded = job_dir / "source.mp4"
if command_exists(YTDLP_BIN):
code, stdout, stderr = run_command([
YTDLP_BIN,
"--no-playlist",
"-o",
str(downloaded),
row.get("source_url") or "",
], cwd=job_dir)
if code == 0 and downloaded.exists():
artifacts["download_stdout"] = stdout.strip()[:500]
else:
artifacts["download_error"] = stderr.strip()[:500]
transcript_text, extra = transcribe_media(job_dir, downloaded if downloaded.exists() else job_dir / "placeholder.mp4", row["title"], row.get("source_url") or "")
artifacts.update(extra)
elif row["source_type"] == "upload_video":
source_path = Path(artifacts.get("uploaded_path", ""))
transcript_text, extra = transcribe_media(job_dir, source_path, row["title"], row.get("source_url") or "")
artifacts.update(extra)
profile = model_profile_for_account(row["user_id"], row.get("analysis_model_profile_id") or None)
style_summary = await summarize_style(profile, transcript_text, row["title"])
combined_text = f"{transcript_text}\n\n------\n风格学习结论:\n{style_summary}"
kb_row = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (row["knowledge_base_id"],))
if not kb_row:
raise RuntimeError("Knowledge base not found")
collection_id, sync_status = await sync_document_to_fastgpt(kb_row, row["title"], combined_text)
document_id = make_id("doc")
timestamp = utc_now()
db.execute(
"""
INSERT INTO knowledge_documents (
id, knowledge_base_id, title, source_type, source_url, transcript_text,
style_summary, combined_text, fastgpt_collection_id, analysis_model_profile_id,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
document_id,
row["knowledge_base_id"],
row["title"],
row["source_type"],
row.get("source_url") or "",
transcript_text,
style_summary,
combined_text,
collection_id,
profile["id"],
timestamp,
timestamp,
),
)
db.execute(
"""
UPDATE jobs
SET status = ?, transcript_text = ?, style_summary = ?, fastgpt_collection_id = ?,
upload_status = ?, artifacts_json = ?, updated_at = ?
WHERE id = ?
""",
(
"completed",
transcript_text,
style_summary,
collection_id,
sync_status,
json.dumps(artifacts, ensure_ascii=False),
timestamp,
job_id,
),
)
except Exception as exc:
db.execute(
"UPDATE jobs SET status = ?, error = ?, updated_at = ? WHERE id = ?",
("failed", str(exc), utc_now(), job_id),
)
@app.on_event("startup")
def on_startup() -> None:
db.init_schema()
seed_defaults()
@app.get("/healthz")
def healthz() -> dict[str, Any]:
return {
"status": "ok",
"dbPath": DB_PATH,
"defaultExternalBaseUrl": DEFAULT_EXTERNAL_BASE_URL,
"localModelBaseUrl": LOCAL_OPENAI_BASE_URL,
}
def seed_defaults() -> None:
if not db.fetch_one("SELECT id FROM model_profiles WHERE is_default = 1 LIMIT 1"):
profile_id = make_id("model")
now = utc_now()
db.execute(
"""
INSERT INTO model_profiles (id, owner_account_id, name, provider, base_url, api_key, model_name, is_system, is_default, created_at, updated_at)
VALUES (?, NULL, ?, ?, ?, ?, ?, 1, 1, ?, ?)
""",
(
profile_id,
"本机默认模型",
"openai_compat",
LOCAL_OPENAI_BASE_URL,
LOCAL_OPENAI_API_KEY,
LOCAL_OPENAI_MODEL,
now,
now,
),
)
if not db.fetch_one("SELECT id FROM accounts WHERE username = ?", ("kris",)):
account_id = make_id("acct")
password_hash, password_salt = create_password_hash("Asd123456.")
now = utc_now()
model_row = db.fetch_one("SELECT id FROM model_profiles WHERE is_default = 1 LIMIT 1")
db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role,
approval_status, approved_by, approved_at, preferred_analysis_model_id,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
"kris",
password_hash,
password_salt,
"Kris",
"super_admin",
"approved",
account_id,
now,
model_row["id"] if model_row else "",
now,
now,
),
)
kb = ensure_user_kb(account_id)
assistant_id = make_id("assistant")
db.execute(
"""
INSERT INTO assistants (id, user_id, name, description, system_prompt, generation_goal, fastgpt_app_key, model_profile_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, '', ?, ?, ?)
""",
(
assistant_id,
account_id,
"默认文案助手",
"系统为超级管理员预置",
"你是一个擅长学习短视频文案风格的 AI 助手。",
"为用户生成稳定风格的短视频文案。",
model_row["id"] if model_row else "",
now,
now,
),
)
db.execute(
"INSERT INTO assistant_knowledge_bases (assistant_id, knowledge_base_id) VALUES (?, ?)",
(assistant_id, kb["id"]),
)
@app.post("/v2/auth/register")
def register(request: RegisterAccountRequest) -> dict[str, Any]:
username = request.username.strip()
password = request.password.strip()
display_name = request.display_name.strip() or username
if not username or not password:
raise HTTPException(status_code=400, detail="username and password are required")
if db.fetch_one("SELECT id FROM accounts WHERE username = ?", (username,)):
raise HTTPException(status_code=409, detail="username already exists")
account_id = make_id("acct")
password_hash, password_salt = create_password_hash(password)
now = utc_now()
default_model = db.fetch_one("SELECT id FROM model_profiles WHERE is_default = 1 LIMIT 1")
db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role,
approval_status, approved_by, approved_at, preferred_analysis_model_id,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, 'user', 'pending', NULL, NULL, ?, ?, ?)
""",
(
account_id,
username,
password_hash,
password_salt,
display_name,
default_model["id"] if default_model else "",
now,
now,
),
)
account = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,))
return normalize_account(account)
@app.post("/v2/auth/login")
def login(request: LoginRequest) -> dict[str, Any]:
account = db.fetch_one("SELECT * FROM accounts WHERE username = ?", (request.username.strip(),))
if not account or not verify_password(request.password, account["password_hash"], account["password_salt"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = secrets.token_urlsafe(32)
db.execute(
"INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)",
(token, account["id"], utc_now()),
)
return {
"token": token,
"account": normalize_account(account),
"default_external_base_url": DEFAULT_EXTERNAL_BASE_URL,
}
@app.post("/v2/auth/logout")
def logout(account: dict[str, Any] = Depends(require_auth), authorization: str | None = Header(default=None)) -> dict[str, bool]:
token = authorization.split(" ", 1)[1].strip()
db.execute("DELETE FROM auth_tokens WHERE token = ?", (token,))
return {"saved": True}
@app.get("/v2/me")
def me(account: dict[str, Any] = Depends(require_auth)) -> dict[str, Any]:
return normalize_account(account)
@app.get("/v2/me/dashboard")
def dashboard(account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
knowledge_bases = [knowledge_base_payload(row) for row in db.fetch_all("SELECT * FROM knowledge_bases WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))]
assistants = [assistant_payload(row) for row in db.fetch_all("SELECT * FROM assistants WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))]
jobs = [job_payload(row) for row in db.fetch_all("SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT 20", (account["id"],))]
model_profiles = [normalize_model_profile(row) for row in db.fetch_all("SELECT * FROM model_profiles WHERE owner_account_id IS NULL OR owner_account_id = ? ORDER BY is_default DESC, created_at ASC", (account["id"],))]
return {
"account": normalize_account(account),
"knowledge_bases": knowledge_bases,
"assistants": assistants,
"recent_jobs": jobs,
"model_profiles": model_profiles,
}
@app.get("/v2/model-profiles")
def list_model_profiles(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]:
rows = db.fetch_all(
"SELECT * FROM model_profiles WHERE owner_account_id IS NULL OR owner_account_id = ? ORDER BY is_default DESC, is_system DESC, created_at ASC",
(account["id"],),
)
return [normalize_model_profile(row) for row in rows]
@app.post("/v2/model-profiles")
def create_model_profile(request: ModelProfileRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
model_id = make_id("model")
now = utc_now()
if request.is_default:
db.execute("UPDATE model_profiles SET is_default = 0 WHERE owner_account_id = ?", (account["id"],))
db.execute(
"""
INSERT INTO model_profiles (id, owner_account_id, name, provider, base_url, api_key, model_name, is_system, is_default, created_at, updated_at)
VALUES (?, ?, ?, 'openai_compat', ?, ?, ?, 0, ?, ?, ?)
""",
(model_id, account["id"], request.name.strip(), request.base_url.strip(), request.api_key.strip(), request.model_name.strip(), 1 if request.is_default else 0, now, now),
)
row = db.fetch_one("SELECT * FROM model_profiles WHERE id = ?", (model_id,))
return normalize_model_profile(row)
@app.post("/v2/me/preferences/analysis-model")
def set_preferred_analysis_model(request: PreferredModelRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
model = db.fetch_one(
"SELECT * FROM model_profiles WHERE id = ? AND (owner_account_id IS NULL OR owner_account_id = ?)",
(request.model_profile_id, account["id"]),
)
if not model:
raise HTTPException(status_code=404, detail="Model profile not found")
db.execute(
"UPDATE accounts SET preferred_analysis_model_id = ?, updated_at = ? WHERE id = ?",
(request.model_profile_id, utc_now(), account["id"]),
)
account = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account["id"],))
return normalize_account(account)
@app.get("/v2/knowledge-bases")
def list_knowledge_bases(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]:
return [knowledge_base_payload(row) for row in db.fetch_all("SELECT * FROM knowledge_bases WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))]
@app.post("/v2/knowledge-bases")
def create_knowledge_base(request: KnowledgeBaseCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
kb_id = make_id("kb")
now = utc_now()
db.execute(
"INSERT INTO knowledge_bases (id, user_id, name, description, sync_status, created_at, updated_at) VALUES (?, ?, ?, ?, 'pending', ?, ?)",
(kb_id, account["id"], request.name.strip(), request.description.strip(), now, now),
)
row = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (kb_id,))
return knowledge_base_payload(row)
@app.get("/v2/knowledge-bases/{knowledge_base_id}/documents")
def list_knowledge_documents(knowledge_base_id: str, account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]:
kb = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ? AND user_id = ?", (knowledge_base_id, account["id"]))
if not kb:
raise HTTPException(status_code=404, detail="Knowledge base not found")
rows = db.fetch_all("SELECT * FROM knowledge_documents WHERE knowledge_base_id = ? ORDER BY created_at DESC", (knowledge_base_id,))
return [document_payload(row) for row in rows]
@app.get("/v2/explore/jobs")
def list_jobs(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]:
return [job_payload(row) for row in db.fetch_all("SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))]
@app.get("/v2/explore/jobs/{job_id}")
def get_job(job_id: str, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
row = db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"]))
if not row:
raise HTTPException(status_code=404, detail="Job not found")
return job_payload(row)
def resolve_target_kb(account_id: str, requested_kb_id: str | None) -> dict[str, Any]:
if requested_kb_id:
kb = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ? AND user_id = ?", (requested_kb_id, account_id))
if kb:
return kb
raise HTTPException(status_code=404, detail="Knowledge base not found")
return ensure_user_kb(account_id)
@app.post("/v2/explore/text")
async def create_text_job(request: ExploreTextRequest, background_tasks: BackgroundTasks, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
kb = resolve_target_kb(account["id"], request.knowledge_base_id)
profile = model_profile_for_account(account["id"], request.analysis_model_profile_id)
job_id = make_id("job")
now = utc_now()
artifacts = json.dumps({"input_text": request.content}, ensure_ascii=False)
db.execute(
"""
INSERT INTO jobs (id, user_id, assistant_id, knowledge_base_id, source_type, source_url, title, language, status, artifacts_json, analysis_model_profile_id, created_at, updated_at)
VALUES (?, ?, ?, ?, 'text', NULL, ?, 'zh-CN', 'queued', ?, ?, ?, ?)
""",
(job_id, account["id"], request.assistant_id, kb["id"], request.title.strip(), artifacts, profile["id"], now, now),
)
background_tasks.add_task(process_job, job_id)
return job_payload(db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)))
@app.post("/v2/explore/video-link")
async def create_video_link_job(request: ExploreVideoLinkRequest, background_tasks: BackgroundTasks, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
kb = resolve_target_kb(account["id"], request.knowledge_base_id)
profile = model_profile_for_account(account["id"], request.analysis_model_profile_id)
job_id = make_id("job")
now = utc_now()
db.execute(
"""
INSERT INTO jobs (id, user_id, assistant_id, knowledge_base_id, source_type, source_url, title, language, status, artifacts_json, analysis_model_profile_id, created_at, updated_at)
VALUES (?, ?, ?, ?, 'video_link', ?, ?, ?, 'queued', '{}', ?, ?, ?)
""",
(job_id, account["id"], request.assistant_id, kb["id"], request.video_url.strip(), (request.title or "短视频素材").strip(), request.language, profile["id"], now, now),
)
background_tasks.add_task(process_job, job_id)
return job_payload(db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)))
@app.post("/v2/explore/upload-video")
async def upload_video(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
title: str = Form(""),
knowledge_base_id: str = Form(""),
assistant_id: str = Form(""),
analysis_model_profile_id: str = Form(""),
account: dict[str, Any] = Depends(require_approved),
) -> dict[str, Any]:
kb = resolve_target_kb(account["id"], knowledge_base_id or None)
profile = model_profile_for_account(account["id"], analysis_model_profile_id or None)
job_id = make_id("job")
job_dir = JOBS_DIR / job_id
job_dir.mkdir(parents=True, exist_ok=True)
suffix = Path(file.filename or "upload.mp4").suffix or ".mp4"
target_path = job_dir / f"source{suffix}"
with target_path.open("wb") as handle:
shutil.copyfileobj(file.file, handle)
now = utc_now()
artifacts = json.dumps({"uploaded_path": str(target_path)}, ensure_ascii=False)
db.execute(
"""
INSERT INTO jobs (id, user_id, assistant_id, knowledge_base_id, source_type, source_url, title, language, status, artifacts_json, analysis_model_profile_id, created_at, updated_at)
VALUES (?, ?, ?, ?, 'upload_video', ?, ?, 'auto', 'queued', ?, ?, ?, ?)
""",
(job_id, account["id"], assistant_id or None, kb["id"], file.filename or "", (title or file.filename or "上传视频素材").strip(), artifacts, profile["id"], now, now),
)
background_tasks.add_task(process_job, job_id)
return job_payload(db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)))
@app.get("/v2/assistants")
def list_assistants(account: dict[str, Any] = Depends(require_approved)) -> list[dict[str, Any]]:
return [assistant_payload(row) for row in db.fetch_all("SELECT * FROM assistants WHERE user_id = ? ORDER BY created_at DESC", (account["id"],))]
@app.post("/v2/assistants")
def create_assistant(request: AssistantCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
assistant_id = make_id("assistant")
now = utc_now()
model_profile = model_profile_for_account(account["id"], request.model_profile_id or None)
db.execute(
"""
INSERT INTO assistants (id, user_id, name, description, system_prompt, generation_goal, fastgpt_app_key, model_profile_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
assistant_id,
account["id"],
request.name.strip(),
request.description.strip(),
request.system_prompt.strip(),
request.generation_goal.strip(),
request.fastgpt_app_key.strip(),
model_profile["id"],
now,
now,
),
)
for kb_id in request.knowledge_base_ids:
kb = db.fetch_one("SELECT id FROM knowledge_bases WHERE id = ? AND user_id = ?", (kb_id, account["id"]))
if kb:
db.execute("INSERT OR IGNORE INTO assistant_knowledge_bases (assistant_id, knowledge_base_id) VALUES (?, ?)", (assistant_id, kb_id))
return assistant_payload(db.fetch_one("SELECT * FROM assistants WHERE id = ?", (assistant_id,)))
@app.patch("/v2/assistants/{assistant_id}")
def update_assistant(assistant_id: str, request: AssistantUpdateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
current = db.fetch_one("SELECT * FROM assistants WHERE id = ? AND user_id = ?", (assistant_id, account["id"]))
if not current:
raise HTTPException(status_code=404, detail="Assistant not found")
payload = {
"name": request.name if request.name is not None else current["name"],
"description": request.description if request.description is not None else current.get("description", ""),
"system_prompt": request.system_prompt if request.system_prompt is not None else current.get("system_prompt", ""),
"generation_goal": request.generation_goal if request.generation_goal is not None else current.get("generation_goal", ""),
"fastgpt_app_key": request.fastgpt_app_key if request.fastgpt_app_key is not None else current.get("fastgpt_app_key", ""),
"model_profile_id": current.get("model_profile_id", ""),
}
if request.model_profile_id is not None:
payload["model_profile_id"] = model_profile_for_account(account["id"], request.model_profile_id)["id"]
db.execute(
"""
UPDATE assistants
SET name = ?, description = ?, system_prompt = ?, generation_goal = ?, fastgpt_app_key = ?, model_profile_id = ?, updated_at = ?
WHERE id = ?
""",
(
payload["name"],
payload["description"],
payload["system_prompt"],
payload["generation_goal"],
payload["fastgpt_app_key"],
payload["model_profile_id"],
utc_now(),
assistant_id,
),
)
if request.knowledge_base_ids is not None:
db.execute("DELETE FROM assistant_knowledge_bases WHERE assistant_id = ?", (assistant_id,))
for kb_id in request.knowledge_base_ids:
kb = db.fetch_one("SELECT id FROM knowledge_bases WHERE id = ? AND user_id = ?", (kb_id, account["id"]))
if kb:
db.execute("INSERT OR IGNORE INTO assistant_knowledge_bases (assistant_id, knowledge_base_id) VALUES (?, ?)", (assistant_id, kb_id))
return assistant_payload(db.fetch_one("SELECT * FROM assistants WHERE id = ?", (assistant_id,)))
@app.post("/v2/assistants/{assistant_id}/generate")
async def generate_copy(assistant_id: str, request: GenerateCopyRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
assistant = db.fetch_one("SELECT * FROM assistants WHERE id = ? AND user_id = ?", (assistant_id, account["id"]))
if not assistant:
raise HTTPException(status_code=404, detail="Assistant not found")
kb_ids = request.knowledge_base_ids or [row["knowledge_base_id"] for row in db.fetch_all("SELECT knowledge_base_id FROM assistant_knowledge_bases WHERE assistant_id = ?", (assistant_id,))]
used_documents: list[dict[str, Any]] = []
excerpts: list[str] = []
for kb_id in kb_ids:
docs = db.fetch_all("SELECT * FROM knowledge_documents WHERE knowledge_base_id = ? ORDER BY created_at DESC LIMIT 3", (kb_id,))
for doc in docs:
payload = document_payload(doc)
used_documents.append(payload)
excerpt = payload["combined_text"] or payload["style_summary"] or payload["transcript_text"]
excerpts.append(f"[{payload['title']}]\n{excerpt[:1200]}")
prompt_excerpt = "\n\n".join(excerpts)[:6000]
system_prompt = assistant.get("system_prompt") or "你是文案助手。"
generation_goal = assistant.get("generation_goal") or "生成短视频文案。"
user_prompt = (
f"任务目标:{generation_goal}\n"
f"创作需求:{request.brief}\n"
f"平台:{request.platform}\n"
f"受众:{request.audience}\n"
f"额外要求:{request.extra_requirements or ''}\n\n"
f"参考知识库素材:\n{prompt_excerpt or '暂无参考素材,请按通用短视频结构输出。'}\n\n"
"请输出完整文案,包含标题、开场钩子、正文结构和结尾行动指令。"
)
profile = model_profile_for_account(account["id"], assistant.get("model_profile_id") or None)
content = await call_model(profile, system_prompt, user_prompt, temperature=0.7)
return {
"assistant_id": assistant_id,
"knowledge_base_ids": kb_ids,
"content": content,
"prompt_excerpt": prompt_excerpt[:2000],
"used_documents": used_documents,
}
@app.get("/v2/admin/accounts/pending")
def pending_accounts(admin: dict[str, Any] = Depends(require_super_admin)) -> list[dict[str, Any]]:
rows = db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC")
return [normalize_account(row) for row in rows]
@app.post("/v2/admin/accounts/{account_id}/approve")
def approve_account(account_id: str, admin: dict[str, Any] = Depends(require_super_admin)) -> dict[str, Any]:
account = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,))
if not account:
raise HTTPException(status_code=404, detail="Account not found")
db.execute(
"UPDATE accounts SET approval_status = 'approved', approved_by = ?, approved_at = ?, updated_at = ? WHERE id = ?",
(admin["id"], utc_now(), utc_now(), account_id),
)
approved = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,))
ensure_user_kb(account_id)
return {"saved": True, "account": normalize_account(approved)}
@app.post("/v2/admin/accounts/{account_id}/reject")
def reject_account(account_id: str, admin: dict[str, Any] = Depends(require_super_admin)) -> dict[str, Any]:
account = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,))
if not account:
raise HTTPException(status_code=404, detail="Account not found")
db.execute(
"UPDATE accounts SET approval_status = 'rejected', approved_by = ?, approved_at = ?, updated_at = ? WHERE id = ?",
(admin["id"], utc_now(), utc_now(), account_id),
)
rejected = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,))
return {"saved": True, "account": normalize_account(rejected)}
@app.get("/api/v1/app/update/latest")
def latest_update(
platform: str = Query(default="android"),
channel: str = Query(default="stable"),
currentVersionCode: int | None = Query(default=None),
) -> dict[str, Any]:
row = db.fetch_one(
"SELECT * FROM app_updates WHERE platform = ? AND channel = ? AND is_active = 1 ORDER BY version_code DESC, published_at DESC LIMIT 1",
(platform, channel),
)
if not row:
return {
"platform": platform,
"channel": channel,
"hasUpdate": False,
"latestVersionCode": currentVersionCode or 0,
"latestVersionName": "",
"minSupportedCode": 0,
"downloadUrl": "",
"apkSha256": "",
"releaseNotes": "",
"forceUpdate": False,
"publishedAt": 0,
}
latest_version_code = int(row["version_code"])
return {
"platform": row["platform"],
"channel": row["channel"],
"hasUpdate": currentVersionCode is None or latest_version_code > currentVersionCode,
"latestVersionCode": latest_version_code,
"latestVersionName": row["version_name"],
"minSupportedCode": int(row["min_supported_code"]),
"downloadUrl": row["apk_url"],
"apkSha256": row.get("apk_sha256", ""),
"releaseNotes": row.get("notes", ""),
"forceUpdate": bool(row.get("force_update", 0)),
"publishedAt": int(row.get("published_at", 0)),
}
@app.post("/v2/admin/app/update/publish")
def publish_app_update(request: PublishAppUpdateRequest, admin: dict[str, Any] = Depends(require_super_admin)) -> dict[str, Any]:
db.execute(
"UPDATE app_updates SET is_active = 0 WHERE platform = ? AND channel = ?",
(request.platform, request.channel),
)
db.execute(
"""
INSERT INTO app_updates (
platform, channel, version_code, version_name, min_supported_code,
apk_url, apk_sha256, notes, force_update, is_active, published_at, created_by
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
request.platform,
request.channel,
request.versionCode,
request.versionName,
request.minSupportedCode,
request.apkUrl,
request.apkSha256,
request.notes,
1 if request.forceUpdate else 0,
1 if request.isActive else 0,
now_ts(),
admin["id"],
),
)
row = db.fetch_one(
"""
SELECT id
FROM app_updates
WHERE platform = ? AND channel = ? AND version_code = ?
ORDER BY id DESC
LIMIT 1
""",
(request.platform, request.channel, request.versionCode),
)
return {"saved": True, "action": "published", "updateId": row["id"] if row else 0}