diff --git a/collector-service/app/core_main.py b/collector-service/app/core_main.py index 341eab6..92aa7b3 100644 --- a/collector-service/app/core_main.py +++ b/collector-service/app/core_main.py @@ -1,8 +1,10 @@ from __future__ import annotations import asyncio +import base64 import httpx import json +import mimetypes import os import re import secrets @@ -13,10 +15,11 @@ import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any -from urllib.parse import urljoin, urlparse +from urllib.parse import quote, urljoin, urlparse from fastapi import Body, Depends, FastAPI, File, Form, Header, HTTPException, Query, UploadFile from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field @@ -292,6 +295,24 @@ class LiveRecorderEnabledRequest(BaseModel): assistant_id: str | None = None +class LiveRecorderSourceCreateRequest(BaseModel): + project_id: str = "" + assistant_id: str = "" + platform: str = "" + source_url: str + title: str = "" + quality: str = "原画" + enabled: bool = True + + +class LiveRecorderSourceUpdateRequest(BaseModel): + project_id: str | None = None + assistant_id: str | None = None + title: str | None = None + quality: str | None = None + enabled: bool | None = None + + class InternalStepRequest(BaseModel): job_id: str = "" jobId: str = "" @@ -425,6 +446,461 @@ def content_source_payload(row: dict[str, Any]) -> dict[str, Any]: } +LIVE_RECORDER_MANAGED_PREFIX = "sfsrc_" +LIVE_RECORDER_QUALITY_RANKS = { + "流畅": 1, + "标清": 2, + "高清": 3, + "超清": 4, + "蓝光": 5, + "原画": 6, +} + + +def normalize_live_recorder_quality(value: str | None) -> str: + normalized = str(value or "").strip() + if normalized in LIVE_RECORDER_QUALITY_RANKS: + return normalized + return "原画" + + +def live_recorder_remote_name(source_id: str) -> str: + return f"{LIVE_RECORDER_MANAGED_PREFIX}{source_id.replace('-', '')[:12]}" + + +def parse_live_recorder_import_text(raw_text: str) -> list[dict[str, Any]]: + entries: list[dict[str, Any]] = [] + for raw_line in str(raw_text or "").replace("\r\n", "\n").replace("\r", "\n").split("\n"): + line = raw_line.strip() + if not line: + continue + enabled = not line.startswith("#") + if not enabled: + line = line.lstrip("#").strip() + if not line: + continue + parts = [part.strip() for part in re.split(r"[,,]", line) if part.strip()] + quality = "原画" + source_url = "" + title = "" + if len(parts) == 1: + source_url = parts[0] + elif len(parts) == 2: + if parts[0] in LIVE_RECORDER_QUALITY_RANKS: + quality = parts[0] + source_url = parts[1] + else: + source_url = parts[0] + title = parts[1] + else: + if parts[0] in LIVE_RECORDER_QUALITY_RANKS: + quality = parts[0] + source_url = parts[1] + title = " ".join(parts[2:]) + else: + source_url = parts[0] + title = " ".join(parts[1:]) + source_url = source_url.strip() + if not source_url: + continue + entries.append( + { + "source_url": source_url, + "quality": normalize_live_recorder_quality(quality), + "title": title.strip(), + "enabled": enabled, + } + ) + deduped: list[dict[str, Any]] = [] + seen: set[str] = set() + for entry in entries: + key = entry["source_url"] + if key in seen: + continue + seen.add(key) + deduped.append(entry) + return deduped + + +def live_recorder_binding_payload(row: dict[str, Any], *, active_recordings: list[dict[str, Any]] | None = None) -> dict[str, Any]: + metadata = parse_json_object(row.get("source_metadata_json") or "{}") + items = active_recordings or [] + return { + "id": row["binding_id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", "") or "", + "assistant_id": row.get("assistant_id", "") or "", + "source_id": row["source_id"], + "platform": row.get("platform", "") or "", + "source_url": row.get("source_url", "") or "", + "remote_name": row.get("remote_name", "") or "", + "title": row.get("binding_title") or row.get("source_title") or "", + "quality": normalize_live_recorder_quality(row.get("quality")), + "enabled": bool(row.get("enabled", 1)), + "metadata": metadata, + "active_recordings": items, + "recording_count": len(items), + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + +def live_recorder_binding_rows(user_id: str, project_id: str = "") -> list[dict[str, Any]]: + clauses = ["b.user_id = ?"] + params: list[Any] = [user_id] + if project_id: + clauses.append("b.project_id = ?") + params.append(project_id) + sql = f""" + SELECT + b.id AS binding_id, + b.user_id, + b.project_id, + b.assistant_id, + b.source_id, + b.title AS binding_title, + b.quality, + b.enabled, + b.created_at, + b.updated_at, + s.platform, + s.source_url, + s.remote_name, + s.title AS source_title, + s.metadata_json AS source_metadata_json + FROM live_recorder_bindings b + JOIN live_recorder_sources s ON s.id = b.source_id + WHERE {" AND ".join(clauses)} + ORDER BY b.updated_at DESC, b.created_at DESC + """ + return db.fetch_all(sql, tuple(params)) + + +def load_owned_live_recorder_binding(binding_id: str, user_id: str) -> dict[str, Any]: + row = db.fetch_one( + """ + SELECT + b.id AS binding_id, + b.user_id, + b.project_id, + b.assistant_id, + b.source_id, + b.title AS binding_title, + b.quality, + b.enabled, + b.created_at, + b.updated_at, + s.platform, + s.source_url, + s.remote_name, + s.title AS source_title, + s.metadata_json AS source_metadata_json + FROM live_recorder_bindings b + JOIN live_recorder_sources s ON s.id = b.source_id + WHERE b.id = ? AND b.user_id = ? + """, + (binding_id, user_id), + ) + if not row: + raise HTTPException(status_code=404, detail="Live recorder source not found") + return row + + +def get_or_create_live_recorder_source(*, platform: str, source_url: str, title: str = "") -> dict[str, Any]: + normalized_platform = ensure_domestic_platform(platform or infer_platform_from_url(source_url), allow_blank=False) + normalized_url = source_url.strip() + existing = db.fetch_one( + "SELECT * FROM live_recorder_sources WHERE platform = ? AND source_url = ?", + (normalized_platform, normalized_url), + ) + timestamp = utc_now() + if existing: + if title.strip() and title.strip() != (existing.get("title") or ""): + db.execute( + "UPDATE live_recorder_sources SET title = ?, updated_at = ? WHERE id = ?", + (title.strip(), timestamp, existing["id"]), + ) + existing = db.fetch_one("SELECT * FROM live_recorder_sources WHERE id = ?", (existing["id"],)) + return existing + source_id = make_id("lrsrc") + remote_name = live_recorder_remote_name(source_id) + db.execute( + """ + INSERT INTO live_recorder_sources (id, platform, source_url, remote_name, title, metadata_json, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + source_id, + normalized_platform, + normalized_url, + remote_name, + title.strip(), + json.dumps({}, ensure_ascii=False), + timestamp, + timestamp, + ), + ) + return db.fetch_one("SELECT * FROM live_recorder_sources WHERE id = ?", (source_id,)) + + +def upsert_live_recorder_binding( + *, + user_id: str, + project_id: str, + assistant_id: str = "", + platform: str, + source_url: str, + title: str = "", + quality: str = "原画", + enabled: bool = True, +) -> dict[str, Any]: + source_row = get_or_create_live_recorder_source(platform=platform, source_url=source_url, title=title) + normalized_project_id = project_id.strip() or None + normalized_assistant_id = assistant_id.strip() or None + existing = db.fetch_one( + "SELECT * FROM live_recorder_bindings WHERE user_id = ? AND source_id = ?", + (user_id, source_row["id"]), + ) + timestamp = utc_now() + normalized_quality = normalize_live_recorder_quality(quality) + if existing: + db.execute( + """ + UPDATE live_recorder_bindings + SET project_id = ?, assistant_id = ?, title = ?, quality = ?, enabled = ?, updated_at = ? + WHERE id = ? + """, + ( + normalized_project_id if normalized_project_id is not None else existing.get("project_id"), + normalized_assistant_id, + title.strip(), + normalized_quality, + 1 if enabled else 0, + timestamp, + existing["id"], + ), + ) + return load_owned_live_recorder_binding(existing["id"], user_id) + binding_id = make_id("lrbind") + db.execute( + """ + INSERT INTO live_recorder_bindings ( + id, user_id, project_id, assistant_id, source_id, title, quality, enabled, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + binding_id, + user_id, + normalized_project_id, + normalized_assistant_id, + source_row["id"], + title.strip(), + normalized_quality, + 1 if enabled else 0, + timestamp, + timestamp, + ), + ) + return load_owned_live_recorder_binding(binding_id, user_id) + + +def live_recorder_source_groups() -> dict[str, dict[str, Any]]: + rows = db.fetch_all( + """ + SELECT + s.id AS source_id, + s.platform, + s.source_url, + s.remote_name, + s.title AS source_title, + b.id AS binding_id, + b.user_id, + b.project_id, + b.assistant_id, + b.title AS binding_title, + b.quality, + b.enabled + FROM live_recorder_sources s + LEFT JOIN live_recorder_bindings b ON b.source_id = s.id + ORDER BY s.created_at ASC, b.created_at ASC + """ + ) + grouped: dict[str, dict[str, Any]] = {} + for row in rows: + source_id = row["source_id"] + target = grouped.setdefault( + source_id, + { + "source_id": source_id, + "platform": row.get("platform", ""), + "source_url": row.get("source_url", ""), + "remote_name": row.get("remote_name", ""), + "source_title": row.get("source_title", ""), + "bindings": [], + }, + ) + if row.get("binding_id"): + target["bindings"].append( + { + "binding_id": row["binding_id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", "") or "", + "assistant_id": row.get("assistant_id", "") or "", + "binding_title": row.get("binding_title", "") or "", + "quality": normalize_live_recorder_quality(row.get("quality")), + "enabled": bool(row.get("enabled", 1)), + } + ) + return grouped + + +def build_live_recorder_managed_raw() -> tuple[str, dict[str, Any]]: + existing = live_recorder_request("GET", "/api/url-config", timeout=15.0) + preserved_lines: list[str] = [] + for line in existing.get("lines") or []: + raw = str(line.get("raw") or "") + name = str(line.get("name") or "").strip() + if line.get("type") == "url" and name.startswith(LIVE_RECORDER_MANAGED_PREFIX): + continue + preserved_lines.append(raw) + + generated_lines: list[str] = [] + active_source_count = 0 + active_binding_count = 0 + for source in live_recorder_source_groups().values(): + active_bindings = [binding for binding in source["bindings"] if binding.get("enabled")] + if not active_bindings: + continue + quality = max( + (binding.get("quality") or "原画" for binding in active_bindings), + key=lambda item: LIVE_RECORDER_QUALITY_RANKS.get(item, 0), + ) + generated_lines.append(f"{quality},{source['source_url']},{source['remote_name']}") + active_source_count += 1 + active_binding_count += len(active_bindings) + + final_lines = [line for line in preserved_lines if str(line).strip()] + final_lines.extend(generated_lines) + raw = "\n".join(final_lines).strip() + if raw: + raw += "\n" + stats = { + "preserved_count": len([line for line in preserved_lines if str(line).strip()]), + "managed_source_count": active_source_count, + "managed_binding_count": active_binding_count, + "generated_count": len(generated_lines), + } + return raw, stats + + +def sync_live_recorder_remote_config() -> dict[str, Any]: + raw, stats = build_live_recorder_managed_raw() + payload = live_recorder_request("POST", "/api/url-config", {"raw": raw}, timeout=20.0) + return { + "ok": True, + "stats": stats, + "remote": payload, + } + + +def live_recorder_runtime_payload() -> dict[str, Any]: + payload = live_recorder_request("GET", "/api/status-lite", timeout=8.0) + return payload if isinstance(payload, dict) else {"recordings": []} + + +def live_recorder_recordings_payload() -> dict[str, Any]: + payload = live_recorder_request("GET", "/api/recordings", timeout=12.0) + return payload if isinstance(payload, dict) else {"recordings": []} + + +def live_recorder_downloads_payload() -> dict[str, Any]: + payload = live_recorder_request("GET", "/api/downloads", timeout=20.0) + return payload if isinstance(payload, dict) else {"files": []} + + +def owned_live_recorder_sources(user_id: str, project_id: str = "") -> tuple[list[dict[str, Any]], dict[str, dict[str, Any]]]: + rows = live_recorder_binding_rows(user_id, project_id) + mapping = {row["remote_name"]: row for row in rows} + return rows, mapping + + +def filter_owned_live_recorder_recordings(user_id: str, project_id: str = "") -> list[dict[str, Any]]: + rows, mapping = owned_live_recorder_sources(user_id, project_id) + if not rows: + return [] + status_payload = live_recorder_recordings_payload() + items = [] + for recording in status_payload.get("recordings") or []: + haystack = " ".join( + [ + str(recording.get("anchor_name") or ""), + str(recording.get("record_name") or ""), + str(recording.get("save_dir") or ""), + str(recording.get("save_file") or ""), + ] + ) + matched = None + for remote_name, row in mapping.items(): + if remote_name and remote_name in haystack: + matched = row + break + if str(recording.get("record_url") or "").strip() == str(row.get("source_url") or "").strip(): + matched = row + break + if not matched: + continue + payload = dict(recording) + payload["source_id"] = matched["source_id"] + payload["binding_id"] = matched["binding_id"] + payload["project_id"] = matched.get("project_id", "") or "" + payload["title"] = matched.get("binding_title") or matched.get("source_title") or matched.get("remote_name") or "" + items.append(payload) + return items + + +def encode_live_recorder_file_id(relative_path: str) -> str: + return base64.urlsafe_b64encode(relative_path.encode("utf-8")).decode("ascii").rstrip("=") + + +def decode_live_recorder_file_id(file_id: str) -> str: + padding = "=" * (-len(file_id) % 4) + try: + return base64.urlsafe_b64decode((file_id + padding).encode("ascii")).decode("utf-8") + except Exception as exc: + raise HTTPException(status_code=400, detail=f"Invalid live recorder file id: {exc}") + + +def filter_owned_live_recorder_files(user_id: str, project_id: str = "", limit: int = 200) -> list[dict[str, Any]]: + rows, mapping = owned_live_recorder_sources(user_id, project_id) + if not rows: + return [] + downloads_payload = live_recorder_downloads_payload() + files = [] + for item in downloads_payload.get("files") or []: + relative_path = str(item.get("relative_path") or "") + name = str(item.get("name") or "") + haystack = f"{relative_path} {name}" + matched = None + for remote_name, row in mapping.items(): + if remote_name and remote_name in haystack: + matched = row + break + if not matched: + continue + payload = dict(item) + payload["id"] = encode_live_recorder_file_id(relative_path) + payload["source_id"] = matched["source_id"] + payload["binding_id"] = matched["binding_id"] + payload["project_id"] = matched.get("project_id", "") or "" + payload["platform"] = matched.get("platform", "") or "" + payload["title"] = matched.get("binding_title") or matched.get("source_title") or matched.get("remote_name") or "" + payload["content_url"] = f"/v2/live-recorder/files/{payload['id']}/content" + files.append(payload) + if len(files) >= limit: + break + return files + + def job_event_payload(row: dict[str, Any]) -> dict[str, Any]: return { "id": row["id"], @@ -1821,10 +2297,120 @@ def live_recorder_health(account: dict[str, Any] = Depends(require_approved)) -> return live_recorder_request("GET", "/api/healthz", timeout=8.0) +@app.get("/v2/live-recorder/sources") +def list_live_recorder_sources( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(require_approved), +) -> dict[str, Any]: + normalized_project_id = (project_id or "").strip() + if normalized_project_id: + resolve_target_project(account["id"], normalized_project_id, username=account["username"]) + active_by_binding: dict[str, list[dict[str, Any]]] = {} + for item in filter_owned_live_recorder_recordings(account["id"], normalized_project_id): + active_by_binding.setdefault(item["binding_id"], []).append(item) + items = [ + live_recorder_binding_payload(row, active_recordings=active_by_binding.get(row["binding_id"], [])) + for row in live_recorder_binding_rows(account["id"], normalized_project_id) + ] + return {"items": items, "count": len(items)} + + +@app.post("/v2/live-recorder/sources") +def create_live_recorder_source( + request: LiveRecorderSourceCreateRequest, + account: dict[str, Any] = Depends(require_approved), +) -> dict[str, Any]: + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + binding = upsert_live_recorder_binding( + user_id=account["id"], + project_id=project["id"], + assistant_id=(assistant or {}).get("id", ""), + platform=request.platform or infer_platform_from_url(request.source_url), + source_url=request.source_url, + title=request.title, + quality=request.quality, + enabled=request.enabled, + ) + sync_result = sync_live_recorder_remote_config() + return {"item": live_recorder_binding_payload(binding), "sync": sync_result} + + +@app.patch("/v2/live-recorder/sources/{binding_id}") +def update_live_recorder_source( + binding_id: str, + request: LiveRecorderSourceUpdateRequest, + account: dict[str, Any] = Depends(require_approved), +) -> dict[str, Any]: + current = load_owned_live_recorder_binding(binding_id, account["id"]) + project_id = current.get("project_id", "") or "" + if request.project_id is not None: + if request.project_id.strip(): + project = resolve_target_project(account["id"], request.project_id.strip(), username=account["username"]) + project_id = project["id"] + else: + project_id = "" + assistant_id = current.get("assistant_id", "") or "" + if request.assistant_id is not None: + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project_id) + assistant_id = (assistant or {}).get("id", "") + db.execute( + """ + UPDATE live_recorder_bindings + SET project_id = ?, assistant_id = ?, title = ?, quality = ?, enabled = ?, updated_at = ? + WHERE id = ? AND user_id = ? + """, + ( + project_id or None, + assistant_id or None, + request.title.strip() if request.title is not None else current.get("binding_title", ""), + normalize_live_recorder_quality(request.quality if request.quality is not None else current.get("quality")), + 1 if (request.enabled if request.enabled is not None else bool(current.get("enabled", 1))) else 0, + utc_now(), + binding_id, + account["id"], + ), + ) + updated = load_owned_live_recorder_binding(binding_id, account["id"]) + sync_result = sync_live_recorder_remote_config() + return {"item": live_recorder_binding_payload(updated), "sync": sync_result} + + +@app.delete("/v2/live-recorder/sources/{binding_id}") +def delete_live_recorder_source(binding_id: str, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + current = load_owned_live_recorder_binding(binding_id, account["id"]) + db.execute("DELETE FROM live_recorder_bindings WHERE id = ? AND user_id = ?", (binding_id, account["id"])) + remaining = db.fetch_one("SELECT COUNT(*) AS count FROM live_recorder_bindings WHERE source_id = ?", (current["source_id"],)) + if not remaining or int(remaining.get("count") or 0) <= 0: + db.execute("DELETE FROM live_recorder_sources WHERE id = ?", (current["source_id"],)) + sync_result = sync_live_recorder_remote_config() + return {"ok": True, "deleted_id": binding_id, "sync": sync_result} + + @app.get("/v2/live-recorder/status") -def live_recorder_status(account: dict[str, Any] = Depends(require_approved)) -> Any: - _ = account - return live_recorder_request("GET", "/api/status-lite", timeout=8.0) +def live_recorder_status( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(require_approved), +) -> Any: + normalized_project_id = (project_id or "").strip() + if normalized_project_id: + resolve_target_project(account["id"], normalized_project_id, username=account["username"]) + runtime = live_recorder_runtime_payload() + active_items = filter_owned_live_recorder_recordings(account["id"], normalized_project_id) + files = filter_owned_live_recorder_files(account["id"], normalized_project_id, limit=12) + return { + "backend": runtime.get("backend") or {}, + "running": bool(runtime.get("running")), + "pid": runtime.get("pid"), + "uptime_seconds": runtime.get("uptime_seconds", 0), + "started_at": runtime.get("started_at"), + "last_exit_code": runtime.get("last_exit_code"), + "managed": bool(runtime.get("managed")), + "url_info": runtime.get("url_info") or {}, + "active_recordings": active_items, + "recording_count": len(active_items), + "files_preview": files, + } @app.post("/v2/live-recorder/url-config/import") @@ -1832,8 +2418,23 @@ def live_recorder_import_urls( request: LiveRecorderImportRequest, account: dict[str, Any] = Depends(require_approved), ) -> Any: - _ = account - return live_recorder_request("POST", "/api/url-config/import", {"raw": request.raw}, timeout=20.0) + project = ensure_default_project(account["id"], username=account["username"]) + entries = parse_live_recorder_import_text(request.raw) + items = [] + for entry in entries: + platform = infer_platform_from_url(entry["source_url"]) or "kuaishou" + binding = upsert_live_recorder_binding( + user_id=account["id"], + project_id=project["id"], + platform=platform, + source_url=entry["source_url"], + title=entry.get("title") or "", + quality=entry.get("quality") or "原画", + enabled=bool(entry.get("enabled", True)), + ) + items.append(live_recorder_binding_payload(binding)) + sync_result = sync_live_recorder_remote_config() + return {"ok": True, "items": items, "count": len(items), "sync": sync_result} @app.post("/v2/live-recorder/url-config/set-enabled") @@ -1841,14 +2442,57 @@ def live_recorder_set_enabled( request: LiveRecorderEnabledRequest, account: dict[str, Any] = Depends(require_approved), ) -> Any: + _ = request _ = account - return live_recorder_request( - "POST", - "/api/url-config/set-enabled", - {"line_no": request.line_no, "enabled": request.enabled}, - timeout=20.0, + raise HTTPException(status_code=410, detail="Use /v2/live-recorder/sources/{binding_id} to update tenant-scoped recorder sources") + + +@app.get("/v2/live-recorder/files") +def list_live_recorder_files( + project_id: str | None = Query(default=None), + limit: int = Query(default=100, ge=1, le=500), + account: dict[str, Any] = Depends(require_approved), +) -> dict[str, Any]: + normalized_project_id = (project_id or "").strip() + if normalized_project_id: + resolve_target_project(account["id"], normalized_project_id, username=account["username"]) + items = filter_owned_live_recorder_files(account["id"], normalized_project_id, limit=limit) + return {"items": items, "count": len(items)} + + +@app.get("/v2/live-recorder/files/{file_id}/content") +def stream_live_recorder_file( + file_id: str, + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(require_approved), +) -> StreamingResponse: + normalized_project_id = (project_id or "").strip() + relative_path = decode_live_recorder_file_id(file_id) + owned_items = filter_owned_live_recorder_files(account["id"], normalized_project_id, limit=500) + matched = next((item for item in owned_items if item.get("relative_path") == relative_path), None) + if not matched: + raise HTTPException(status_code=404, detail="Live recorder file not found") + target_url = urljoin( + LIVE_RECORDER_BASE_URL if LIVE_RECORDER_BASE_URL.endswith("/") else f"{LIVE_RECORDER_BASE_URL}/", + f"downloads/{quote(relative_path.lstrip('/'), safe='/')}", ) + def iterator(): + try: + with httpx.stream("GET", target_url, timeout=120.0, follow_redirects=True) as response: + response.raise_for_status() + for chunk in response.iter_bytes(): + if chunk: + yield chunk + except httpx.HTTPStatusError as exc: + raise HTTPException(status_code=exc.response.status_code, detail="Failed to stream live recorder file") + except Exception as exc: + raise HTTPException(status_code=502, detail=f"Failed to proxy live recorder file: {exc}") + + media_type = mimetypes.guess_type(relative_path)[0] or "application/octet-stream" + headers = {"Content-Disposition": f'inline; filename="{matched.get("name") or "recording.bin"}"'} + return StreamingResponse(iterator(), media_type=media_type, headers=headers) + @app.post("/v2/live-recorder/recorder/start") def live_recorder_start(account: dict[str, Any] = Depends(require_approved)) -> Any: @@ -1857,7 +2501,7 @@ def live_recorder_start(account: dict[str, Any] = Depends(require_approved)) -> @app.post("/v2/live-recorder/recorder/stop") -def live_recorder_stop(account: dict[str, Any] = Depends(require_approved)) -> Any: +def live_recorder_stop(account: dict[str, Any] = Depends(require_super_admin)) -> Any: _ = account return live_recorder_request("POST", "/api/recorder/stop", timeout=30.0) diff --git a/collector-service/app/database.py b/collector-service/app/database.py index f83b801..755cdf7 100644 --- a/collector-service/app/database.py +++ b/collector-service/app/database.py @@ -235,6 +235,36 @@ class Database: FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL ); + CREATE TABLE IF NOT EXISTS live_recorder_sources ( + id TEXT PRIMARY KEY, + platform TEXT NOT NULL DEFAULT '', + source_url TEXT NOT NULL, + remote_name TEXT NOT NULL UNIQUE, + title TEXT NOT NULL DEFAULT '', + metadata_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(platform, source_url) + ); + + CREATE TABLE IF NOT EXISTS live_recorder_bindings ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT, + assistant_id TEXT, + source_id TEXT NOT NULL, + title TEXT NOT NULL DEFAULT '', + quality TEXT NOT NULL DEFAULT '原画', + enabled INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, source_id), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL, + FOREIGN KEY(source_id) REFERENCES live_recorder_sources(id) ON DELETE CASCADE + ); + CREATE TABLE IF NOT EXISTS job_events ( id TEXT PRIMARY KEY, job_id TEXT NOT NULL, diff --git a/collector-service/app/main.py b/collector-service/app/main.py index 2dbbec1..689c2fa 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -19,3 +19,4 @@ register_domestic_platform_routes(app, core, platform="xiaohongshu", label="小 register_domestic_platform_routes(app, core, platform="bilibili", label="哔哩哔哩") register_domestic_platform_routes(app, core, platform="kuaishou", label="快手") register_domestic_platform_routes(app, core, platform="wechat_video", label="微信视频号") +app.openapi_schema = None