From 98b3efa10b7bbab89aafaaca38d50def99b612e7 Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 16:01:24 +0800 Subject: [PATCH] feat: extend oneliner execution workflows --- collector-service/app/oneliner_features.py | 603 +++++++++++++++++++++ 1 file changed, 603 insertions(+) diff --git a/collector-service/app/oneliner_features.py b/collector-service/app/oneliner_features.py index 077281b..08fc699 100644 --- a/collector-service/app/oneliner_features.py +++ b/collector-service/app/oneliner_features.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import re from typing import Any from fastapi import Depends, HTTPException, Query @@ -700,6 +701,262 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ) return str((row or {}).get("content") or "").strip() + def _extract_first_url(text: str) -> str: + cleaned = str(text or "").strip() + if not cleaned: + return "" + match = re.search(r"https?://[^\s<>'\"]+", cleaned) + if not match: + return "" + return match.group(0).rstrip(",。;;,.)]》】!?!?") + + def _find_creator_source_by_url( + account: dict[str, Any], + *, + project_id: str, + platform: str, + source_url: str, + ) -> dict[str, Any] | None: + return legacy.db.fetch_one( + """ + SELECT * FROM content_sources + WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account' AND source_url = ? + ORDER BY updated_at DESC, created_at DESC + LIMIT 1 + """, + (account["id"], project_id, platform, source_url), + ) + + def _latest_platform_account( + account: dict[str, Any], + *, + project_id: str, + platform: str, + ) -> dict[str, Any] | None: + return legacy.db.fetch_one( + """ + SELECT * FROM content_sources + WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account' + ORDER BY updated_at DESC, created_at DESC + LIMIT 1 + """, + (account["id"], project_id, platform), + ) + + def _load_owned_job(account: dict[str, Any], job_id: str) -> dict[str, Any] | None: + normalized_job_id = str(job_id or "").strip() + if not normalized_job_id: + return None + return legacy.db.fetch_one( + "SELECT * FROM jobs WHERE id = ? AND user_id = ?", + (normalized_job_id, account["id"]), + ) + + def _latest_derivable_job( + account: dict[str, Any], + *, + project_id: str, + exclude_line_types: set[str] | None = None, + ) -> dict[str, Any] | None: + excluded = {item.strip() for item in (exclude_line_types or set()) if str(item or "").strip()} + rows = legacy.db.fetch_all( + """ + SELECT * FROM jobs + WHERE user_id = ? AND project_id = ? AND status IN ('completed', 'done', 'succeeded') + ORDER BY updated_at DESC, created_at DESC + LIMIT 24 + """, + (account["id"], project_id), + ) + for row in rows: + if str(row.get("line_type") or "").strip() in excluded: + continue + return row + return rows[0] if rows else None + + def _job_performance_score(job_row: dict[str, Any] | None) -> float: + if not job_row: + return 0.0 + result_map = _parse_json(job_row.get("result_json") or "{}", {}) + artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {}) + candidates = [ + result_map.get("performance_score"), + (result_map.get("analysis") or {}).get("performance_score"), + (result_map.get("scores") or {}).get("performance_score"), + artifacts_map.get("performance_score"), + (artifacts_map.get("scores") or {}).get("performance_score"), + ] + for value in candidates: + try: + return float(value) + except (TypeError, ValueError): + continue + return 0.0 + + def _linked_platform_videos( + account: dict[str, Any], + *, + project_id: str, + platform: str, + account_row: dict[str, Any], + limit: int = 8, + ) -> list[dict[str, Any]]: + rows = legacy.db.fetch_all( + """ + SELECT * FROM content_sources + WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'video_link' + ORDER BY updated_at DESC, created_at DESC + """, + (account["id"], project_id, platform), + ) + account_source_url = str(account_row.get("source_url") or "").strip() + items: list[dict[str, Any]] = [] + for row in rows: + payload = legacy.content_source_payload(row) + metadata = payload.get("metadata") or {} + if metadata.get("origin_content_source_id") != account_row["id"] and metadata.get("source_account_url") != account_source_url: + continue + latest_job = legacy.db.fetch_one( + "SELECT * FROM jobs WHERE content_source_id = ? ORDER BY updated_at DESC, created_at DESC LIMIT 1", + (row["id"],), + ) + result_map = _parse_json((latest_job or {}).get("result_json") or "{}", {}) + published_at = ( + metadata.get("published_at") + or metadata.get("publish_time") + or metadata.get("created_at") + or payload.get("updated_at") + or payload.get("created_at") + or "" + ) + items.append( + { + "id": payload["id"], + "title": payload.get("title") or payload.get("handle") or payload.get("source_url") or payload["id"], + "source_url": payload.get("source_url", ""), + "published_at": published_at, + "score": { + "performance_score": _job_performance_score(latest_job), + }, + "latest_job_id": (latest_job or {}).get("id", ""), + "latest_job_status": (latest_job or {}).get("status", ""), + "summary": str(result_map.get("summary") or result_map.get("headline_summary") or "")[:240], + } + ) + items.sort(key=lambda item: (float((item.get("score") or {}).get("performance_score") or 0), item.get("published_at") or ""), reverse=True) + return items[: max(1, min(int(limit or 8), 16))] + + def _fallback_platform_videos( + account: dict[str, Any], + *, + project_id: str, + platform: str, + requested_account_id: str = "", + limit: int = 8, + ) -> tuple[dict[str, Any] | None, list[dict[str, Any]]]: + safe_limit = max(1, min(int(limit or 8), 16)) + if platform == "douyin": + target_account = None + if requested_account_id: + target_account = legacy.db.fetch_one( + "SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ?", + (requested_account_id, account["id"]), + ) + if not target_account: + target_account = legacy.db.fetch_one( + "SELECT * FROM douyin_accounts WHERE user_id = ? ORDER BY updated_at DESC, created_at DESC LIMIT 1", + (account["id"],), + ) + if not target_account: + return None, [] + rows = legacy.db.fetch_all( + """ + SELECT * FROM douyin_videos + WHERE account_id = ? + ORDER BY COALESCE(published_at, updated_at) DESC, updated_at DESC + LIMIT ? + """, + (target_account["id"], safe_limit), + ) + items: list[dict[str, Any]] = [] + for row in rows: + stats = _parse_json(row.get("stats_json") or "{}", {}) + play_count = float(stats.get("play_count") or stats.get("play") or 0) + like_count = float(stats.get("digg_count") or stats.get("like_count") or 0) + comment_count = float(stats.get("comment_count") or 0) + share_count = float(stats.get("share_count") or 0) + score = min( + 100.0, + play_count / 10000 * 55 + + like_count / 1000 * 25 + + comment_count / 100 * 10 + + share_count / 100 * 10, + ) + items.append( + { + "id": row["id"], + "title": row.get("title") or row.get("description") or row.get("share_url") or row["id"], + "source_url": row.get("share_url", ""), + "published_at": row.get("published_at") or "", + "score": {"performance_score": round(score, 2)}, + "latest_job_id": "", + "latest_job_status": "", + "summary": "", + } + ) + account_payload = { + "id": target_account["id"], + "title": target_account.get("nickname") or target_account.get("douyin_id") or "抖音账号", + "handle": target_account.get("douyin_id") or "", + "source_url": target_account.get("canonical_profile_url") or target_account.get("profile_url") or "", + "platform": "douyin", + } + items.sort(key=lambda item: (float((item.get("score") or {}).get("performance_score") or 0), item.get("published_at") or ""), reverse=True) + return account_payload, items[:safe_limit] + + source_account = _latest_platform_account(account, project_id=project_id, platform=platform) + if not source_account: + return None, [] + source_payload = legacy.content_source_payload(source_account) + metadata = source_payload.get("metadata") or {} + summary_videos = ((metadata.get("video_summary") or {}).get("videos") or [])[:safe_limit] + items = [] + for item in summary_videos: + score = float(item.get("performance_score") or item.get("score") or 0) + items.append( + { + "id": str(item.get("id") or item.get("aweme_id") or item.get("video_id") or make_id(f"{platform}_video")), + "title": str(item.get("title") or item.get("description") or item.get("share_url") or "平台作品"), + "source_url": str(item.get("share_url") or item.get("url") or ""), + "published_at": str(item.get("published_at") or item.get("publish_time") or ""), + "score": {"performance_score": score}, + "latest_job_id": "", + "latest_job_status": "", + "summary": "", + } + ) + items.sort(key=lambda item: (float((item.get("score") or {}).get("performance_score") or 0), item.get("published_at") or ""), reverse=True) + return source_payload, items[:safe_limit] + + def _assistant_brief_from_job(job_row: dict[str, Any] | None) -> str: + if not job_row: + return "" + result_map = _parse_json(job_row.get("result_json") or "{}", {}) + artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {}) + candidates = [ + result_map.get("summary"), + result_map.get("headline_summary"), + artifacts_map.get("summary"), + artifacts_map.get("objective"), + artifacts_map.get("brief"), + job_row.get("title"), + ] + for value in candidates: + cleaned = str(value or "").strip() + if cleaned: + return cleaned[:480] + return "" + def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]: row = legacy.db.fetch_one( "SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?", @@ -1090,6 +1347,20 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "platform": plan.get("platform", ""), } ) + first_url = _extract_first_url(message) + if plan.get("intent_key") in {"import_homepage", "custom"} and first_url: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接导入主页", + "kind": "api_action", + "executor_key": "import-homepage", + "platform": plan.get("platform", ""), + "payload": { + "source_url": first_url, + }, + } + ) if plan.get("intent_key") in {"storage_status", "custom"}: secondary_actions.append( { @@ -1110,6 +1381,40 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "platform": plan.get("platform", ""), } ) + if first_url: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接保存录制源", + "kind": "api_action", + "executor_key": "save-live-recorder-source", + "platform": plan.get("platform", ""), + "payload": { + "source_url": first_url, + "auto_start": True, + }, + } + ) + latest_platform_account = None + if plan.get("platform"): + latest_platform_account = _latest_platform_account( + account, + project_id=project_id or "", + platform=plan.get("platform", ""), + ) + if plan.get("platform") and latest_platform_account and plan.get("intent_key") in {"analyze_top_videos", "analyze_account", "custom"}: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接分析高分作品", + "kind": "api_action", + "executor_key": "analyze-top-videos", + "platform": plan.get("platform", ""), + "payload": { + "target_account_id": latest_platform_account["id"], + }, + } + ) if context.get("assistant"): secondary_actions.append( { @@ -1131,6 +1436,32 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "platform": plan.get("platform", ""), } ) + derivable_job = _latest_derivable_job(account, project_id=project_id or "", exclude_line_types={"ai_video", "real_cut"}) + if derivable_job and plan.get("intent_key") in {"ai_video", "real_cut", "review", "custom"}: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接创建 AI 视频", + "kind": "api_action", + "executor_key": "create-ai-video", + "platform": plan.get("platform", ""), + "payload": { + "source_job_id": derivable_job["id"], + }, + } + ) + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接创建实拍剪辑", + "kind": "api_action", + "executor_key": "create-real-cut", + "platform": plan.get("platform", ""), + "payload": { + "source_job_id": derivable_job["id"], + }, + } + ) if account.get("role") == "super_admin": secondary_actions.append( { @@ -1708,6 +2039,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: if not action_key: raise HTTPException(status_code=400, detail="Action key is required") latest_user_message = _last_user_message_text(request.session_id, account["id"]) if request.session_id else "" + requested_payload = request.payload or {} async def _run_platform_self_check() -> dict[str, Any]: if not normalized_platform: @@ -1820,6 +2152,272 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "payload": payload, } + async def _run_import_homepage() -> dict[str, Any]: + source_url = str( + requested_payload.get("source_url") + or requested_payload.get("sourceUrl") + or _extract_first_url(latest_user_message) + or "" + ).strip() + if not source_url: + raise HTTPException(status_code=400, detail="No homepage URL available for import") + inferred_platform = normalize_platform_from_text( + requested_payload.get("platform") + or requested_payload.get("platform_label") + or normalized_platform + or legacy.infer_platform_from_url(source_url) + ) or _safe_platform(legacy.infer_platform_from_url(source_url), fallback="douyin") + assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=inferred_platform) + existing_source = _find_creator_source_by_url( + account, + project_id=project["id"], + platform=inferred_platform, + source_url=source_url, + ) + sync_job = await legacy.create_content_source_sync_job( + legacy.ContentSourceSyncRequest( + project_id=project["id"], + knowledge_base_id=str(requested_payload.get("knowledge_base_id") or requested_payload.get("knowledgeBaseId") or ""), + assistant_id=(assistant or {}).get("id", ""), + content_source_id=(existing_source or {}).get("id", ""), + platform=inferred_platform, + handle=str(requested_payload.get("handle") or ""), + source_url=source_url, + title=str(requested_payload.get("title") or requested_payload.get("name") or ""), + analysis_model_profile_id=str(requested_payload.get("analysis_model_profile_id") or requested_payload.get("analysisModelProfileId") or ""), + language=str(requested_payload.get("language") or "auto"), + max_items=max(1, min(int(requested_payload.get("max_items") or requested_payload.get("maxItems") or 5), 20)), + skip_existing=bool(requested_payload.get("skip_existing", requested_payload.get("skipExisting", True))), + auto_trigger_analysis=bool(requested_payload.get("auto_trigger_analysis", requested_payload.get("autoTriggerAnalysis", True))), + ), + account, + ) + return { + "title": "OneLiner 已导入主页", + "summary": f"已把主页接入当前项目,并触发 {legacy.platform_label(inferred_platform)} 内容源同步。", + "payload": { + "job": sync_job, + "platform": inferred_platform, + "source_url": source_url, + "existing_source_id": (existing_source or {}).get("id", ""), + }, + } + + async def _run_analyze_top_videos() -> dict[str, Any]: + if not normalized_platform: + raise HTTPException(status_code=400, detail="Platform is required for top video analysis") + target_account = None + requested_account_id = str(requested_payload.get("target_account_id") or requested_payload.get("targetAccountId") or "").strip() + if requested_account_id and normalized_platform != "douyin": + target_account = legacy.db.fetch_one( + """ + SELECT * FROM content_sources + WHERE id = ? AND user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account' + """, + (requested_account_id, account["id"], project["id"], normalized_platform), + ) + if not target_account: + target_account = _latest_platform_account(account, project_id=project["id"], platform=normalized_platform) + videos = _linked_platform_videos( + account, + project_id=project["id"], + platform=normalized_platform, + account_row=target_account or {}, + limit=max(2, min(int(requested_payload.get("top_video_count") or requested_payload.get("topVideoCount") or 6), 12)), + ) if target_account else [] + account_payload = legacy.content_source_payload(target_account) if target_account else None + if not videos: + account_payload, videos = _fallback_platform_videos( + account, + project_id=project["id"], + platform=normalized_platform, + requested_account_id=requested_account_id, + limit=max(2, min(int(requested_payload.get("top_video_count") or requested_payload.get("topVideoCount") or 6), 12)), + ) + if not account_payload: + raise HTTPException(status_code=404, detail="No platform account available for top video analysis") + min_score = float(requested_payload.get("min_score") or requested_payload.get("minScore") or 0) + ranked = [item for item in videos if float((item.get("score") or {}).get("performance_score") or 0) >= min_score] + if not ranked: + raise HTTPException(status_code=404, detail="No candidate videos available for analysis") + profile = legacy.model_profile_for_account( + account["id"], + str(requested_payload.get("model_profile_id") or requested_payload.get("modelProfileId") or ""), + ) + items: list[dict[str, Any]] = [] + parse_json_object = getattr(legacy, "parse_json_object", None) + for video in ranked: + prompt = ( + f"请拆解这条{legacy.platform_label(normalized_platform)}作品为什么值得关注," + "输出 JSON,字段包括 summary、borrow_points、risks、next_actions。" + f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}" + ) + output = await legacy.call_model( + profile, + "你是平台内容拆解助手。尽量输出 JSON,字段包括 summary、borrow_points、risks、next_actions。", + prompt, + temperature=float(requested_payload.get("temperature") or 0.25), + ) + parsed = parse_json_object(output) if callable(parse_json_object) else _parse_json(output, {}) + summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output).strip()[:240] + items.append( + { + "id": make_id(f"{normalized_platform}_va"), + "video_id": video["id"], + "video_title": video["title"], + "status": "ok", + "summary_text": summary_text, + "parsed_json": parsed, + "performance_score": (video.get("score") or {}).get("performance_score", 0), + "latest_job_id": video.get("latest_job_id", ""), + "created_at": now(), + } + ) + memory = _remember_platform_observation( + account, + project_id=project["id"], + platform=normalized_platform, + memory_key=f"top_videos::{target_account['id']}", + title=f"{legacy.platform_label(normalized_platform)} 高分作品拆解", + summary=f"已拆解 {len(items)} 条高分作品,继续固化有效结构与风险判断。", + details={ + "account_id": account_payload["id"], + "items": items, + }, + confidence=0.84, + ) + return { + "title": "OneLiner 已分析高分作品", + "summary": f"已为 {legacy.platform_label(normalized_platform)} 账号拆解 {len(items)} 条高分作品。", + "payload": { + "platform": normalized_platform, + "account": account_payload, + "analyzed_count": len(items), + "items": items, + "memory": memory, + }, + } + + async def _run_create_ai_video() -> dict[str, Any]: + source_job = _load_owned_job(account, str(requested_payload.get("source_job_id") or requested_payload.get("sourceJobId") or "")) or _latest_derivable_job( + account, + project_id=project["id"], + exclude_line_types={"ai_video"}, + ) + if not source_job: + raise HTTPException(status_code=404, detail="No completed source job available for AI video") + assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=normalized_platform) + brief = str( + requested_payload.get("brief") + or requested_payload.get("video_brief") + or requested_payload.get("videoBrief") + or latest_user_message + or _assistant_brief_from_job(source_job) + or "" + ).strip() + if not brief: + brief = f"请基于任务「{source_job.get('title') or source_job['id']}」输出一版适合短视频平台的 AI 视频。" + job = await legacy.create_ai_video_job( + legacy.AiVideoJobRequest( + project_id=project["id"], + assistant_id=(assistant or {}).get("id", ""), + knowledge_base_id=str(requested_payload.get("knowledge_base_id") or requested_payload.get("knowledgeBaseId") or source_job.get("knowledge_base_id") or ""), + source_job_id=source_job["id"], + title=str(requested_payload.get("title") or f"{source_job.get('title') or '任务'} · AI 视频"), + brief=brief, + style=str(requested_payload.get("style") or "realistic"), + shots=max(1, min(int(requested_payload.get("shots") or 4), 12)), + duration=max(3, min(int(requested_payload.get("duration") or 5), 12)), + ), + account, + ) + return { + "title": "OneLiner 已创建 AI 视频任务", + "summary": f"已基于「{source_job.get('title') or source_job['id']}」创建 AI 视频任务。", + "payload": { + "job": job, + "source_job": legacy.job_payload(source_job), + "brief": brief, + }, + } + + async def _run_create_real_cut() -> dict[str, Any]: + source_job = _load_owned_job(account, str(requested_payload.get("source_job_id") or requested_payload.get("sourceJobId") or "")) or _latest_derivable_job( + account, + project_id=project["id"], + exclude_line_types={"ai_video", "real_cut"}, + ) + if not source_job: + raise HTTPException(status_code=404, detail="No completed source job available for real cut") + job = await legacy.create_real_cut_job( + legacy.RealCutJobRequest( + project_id=project["id"], + title=str(requested_payload.get("title") or f"{source_job.get('title') or '任务'} · 实拍剪辑"), + source_job_id=source_job["id"], + objective=str( + requested_payload.get("objective") + or "保留高信息密度片段,输出适合短视频平台的粗剪结果" + ), + target_duration_sec=max(10, min(int(requested_payload.get("target_duration_sec") or requested_payload.get("targetDurationSec") or 60), 300)), + target_aspect_ratio=str(requested_payload.get("target_aspect_ratio") or requested_payload.get("targetAspectRatio") or "9:16"), + review_enabled=bool(requested_payload.get("review_enabled", requested_payload.get("reviewEnabled", False))), + ), + account, + ) + return { + "title": "OneLiner 已创建实拍剪辑任务", + "summary": f"已基于「{source_job.get('title') or source_job['id']}」创建实拍剪辑任务。", + "payload": { + "job": job, + "source_job": legacy.job_payload(source_job), + }, + } + + async def _run_save_live_recorder_source() -> dict[str, Any]: + source_url = str( + requested_payload.get("source_url") + or requested_payload.get("sourceUrl") + or _extract_first_url(latest_user_message) + or "" + ).strip() + if not source_url: + raise HTTPException(status_code=400, detail="No live recorder source URL available") + recorder_platform = normalize_platform_from_text( + requested_payload.get("platform") + or requested_payload.get("platform_label") + or normalized_platform + or legacy.infer_platform_from_url(source_url) + ) or _safe_platform(legacy.infer_platform_from_url(source_url), fallback="kuaishou") + assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=recorder_platform) + saved = legacy.create_live_recorder_source( + legacy.LiveRecorderSourceCreateRequest( + project_id=project["id"], + assistant_id=(assistant or {}).get("id", ""), + platform=recorder_platform, + source_url=source_url, + title=str(requested_payload.get("title") or ""), + quality=str(requested_payload.get("quality") or "原画"), + enabled=bool(requested_payload.get("enabled", True)), + ), + account, + ) + started = None + if bool(requested_payload.get("auto_start", requested_payload.get("autoStart", True))): + try: + started = legacy.live_recorder_start(account) + except Exception as exc: + started = {"ok": False, "message": str(exc)} + return { + "title": "OneLiner 已保存录制源", + "summary": f"已把直播源保存到当前租户的 NAS 录制配置里。", + "payload": { + "saved": saved, + "started": started, + "platform": recorder_platform, + "source_url": source_url, + }, + } + executors = { "platform-self-check": _run_platform_self_check, "storage-status": _run_storage_status, @@ -1827,6 +2425,11 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "scan-admin-ops": _run_ops_scan, "generate-copy": _run_generate_copy, "review-draft": _run_review_draft, + "import-homepage": _run_import_homepage, + "analyze-top-videos": _run_analyze_top_videos, + "create-ai-video": _run_create_ai_video, + "create-real-cut": _run_create_real_cut, + "save-live-recorder-source": _run_save_live_recorder_source, } executor = executors.get(action_key) if not executor: