feat: extend oneliner execution workflows

This commit is contained in:
kris
2026-03-23 16:01:24 +08:00
parent 3b7d4f0d5b
commit 98b3efa10b

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import json
import re
from typing import Any
from fastapi import Depends, HTTPException, Query
@@ -700,6 +701,262 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
)
return str((row or {}).get("content") or "").strip()
def _extract_first_url(text: str) -> str:
cleaned = str(text or "").strip()
if not cleaned:
return ""
match = re.search(r"https?://[^\s<>'\"]+", cleaned)
if not match:
return ""
return match.group(0).rstrip(",。;;,.)]》】!?")
def _find_creator_source_by_url(
account: dict[str, Any],
*,
project_id: str,
platform: str,
source_url: str,
) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"""
SELECT * FROM content_sources
WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account' AND source_url = ?
ORDER BY updated_at DESC, created_at DESC
LIMIT 1
""",
(account["id"], project_id, platform, source_url),
)
def _latest_platform_account(
account: dict[str, Any],
*,
project_id: str,
platform: str,
) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"""
SELECT * FROM content_sources
WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account'
ORDER BY updated_at DESC, created_at DESC
LIMIT 1
""",
(account["id"], project_id, platform),
)
def _load_owned_job(account: dict[str, Any], job_id: str) -> dict[str, Any] | None:
normalized_job_id = str(job_id or "").strip()
if not normalized_job_id:
return None
return legacy.db.fetch_one(
"SELECT * FROM jobs WHERE id = ? AND user_id = ?",
(normalized_job_id, account["id"]),
)
def _latest_derivable_job(
account: dict[str, Any],
*,
project_id: str,
exclude_line_types: set[str] | None = None,
) -> dict[str, Any] | None:
excluded = {item.strip() for item in (exclude_line_types or set()) if str(item or "").strip()}
rows = legacy.db.fetch_all(
"""
SELECT * FROM jobs
WHERE user_id = ? AND project_id = ? AND status IN ('completed', 'done', 'succeeded')
ORDER BY updated_at DESC, created_at DESC
LIMIT 24
""",
(account["id"], project_id),
)
for row in rows:
if str(row.get("line_type") or "").strip() in excluded:
continue
return row
return rows[0] if rows else None
def _job_performance_score(job_row: dict[str, Any] | None) -> float:
if not job_row:
return 0.0
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
candidates = [
result_map.get("performance_score"),
(result_map.get("analysis") or {}).get("performance_score"),
(result_map.get("scores") or {}).get("performance_score"),
artifacts_map.get("performance_score"),
(artifacts_map.get("scores") or {}).get("performance_score"),
]
for value in candidates:
try:
return float(value)
except (TypeError, ValueError):
continue
return 0.0
def _linked_platform_videos(
account: dict[str, Any],
*,
project_id: str,
platform: str,
account_row: dict[str, Any],
limit: int = 8,
) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT * FROM content_sources
WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'video_link'
ORDER BY updated_at DESC, created_at DESC
""",
(account["id"], project_id, platform),
)
account_source_url = str(account_row.get("source_url") or "").strip()
items: list[dict[str, Any]] = []
for row in rows:
payload = legacy.content_source_payload(row)
metadata = payload.get("metadata") or {}
if metadata.get("origin_content_source_id") != account_row["id"] and metadata.get("source_account_url") != account_source_url:
continue
latest_job = legacy.db.fetch_one(
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY updated_at DESC, created_at DESC LIMIT 1",
(row["id"],),
)
result_map = _parse_json((latest_job or {}).get("result_json") or "{}", {})
published_at = (
metadata.get("published_at")
or metadata.get("publish_time")
or metadata.get("created_at")
or payload.get("updated_at")
or payload.get("created_at")
or ""
)
items.append(
{
"id": payload["id"],
"title": payload.get("title") or payload.get("handle") or payload.get("source_url") or payload["id"],
"source_url": payload.get("source_url", ""),
"published_at": published_at,
"score": {
"performance_score": _job_performance_score(latest_job),
},
"latest_job_id": (latest_job or {}).get("id", ""),
"latest_job_status": (latest_job or {}).get("status", ""),
"summary": str(result_map.get("summary") or result_map.get("headline_summary") or "")[:240],
}
)
items.sort(key=lambda item: (float((item.get("score") or {}).get("performance_score") or 0), item.get("published_at") or ""), reverse=True)
return items[: max(1, min(int(limit or 8), 16))]
def _fallback_platform_videos(
account: dict[str, Any],
*,
project_id: str,
platform: str,
requested_account_id: str = "",
limit: int = 8,
) -> tuple[dict[str, Any] | None, list[dict[str, Any]]]:
safe_limit = max(1, min(int(limit or 8), 16))
if platform == "douyin":
target_account = None
if requested_account_id:
target_account = legacy.db.fetch_one(
"SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ?",
(requested_account_id, account["id"]),
)
if not target_account:
target_account = legacy.db.fetch_one(
"SELECT * FROM douyin_accounts WHERE user_id = ? ORDER BY updated_at DESC, created_at DESC LIMIT 1",
(account["id"],),
)
if not target_account:
return None, []
rows = legacy.db.fetch_all(
"""
SELECT * FROM douyin_videos
WHERE account_id = ?
ORDER BY COALESCE(published_at, updated_at) DESC, updated_at DESC
LIMIT ?
""",
(target_account["id"], safe_limit),
)
items: list[dict[str, Any]] = []
for row in rows:
stats = _parse_json(row.get("stats_json") or "{}", {})
play_count = float(stats.get("play_count") or stats.get("play") or 0)
like_count = float(stats.get("digg_count") or stats.get("like_count") or 0)
comment_count = float(stats.get("comment_count") or 0)
share_count = float(stats.get("share_count") or 0)
score = min(
100.0,
play_count / 10000 * 55
+ like_count / 1000 * 25
+ comment_count / 100 * 10
+ share_count / 100 * 10,
)
items.append(
{
"id": row["id"],
"title": row.get("title") or row.get("description") or row.get("share_url") or row["id"],
"source_url": row.get("share_url", ""),
"published_at": row.get("published_at") or "",
"score": {"performance_score": round(score, 2)},
"latest_job_id": "",
"latest_job_status": "",
"summary": "",
}
)
account_payload = {
"id": target_account["id"],
"title": target_account.get("nickname") or target_account.get("douyin_id") or "抖音账号",
"handle": target_account.get("douyin_id") or "",
"source_url": target_account.get("canonical_profile_url") or target_account.get("profile_url") or "",
"platform": "douyin",
}
items.sort(key=lambda item: (float((item.get("score") or {}).get("performance_score") or 0), item.get("published_at") or ""), reverse=True)
return account_payload, items[:safe_limit]
source_account = _latest_platform_account(account, project_id=project_id, platform=platform)
if not source_account:
return None, []
source_payload = legacy.content_source_payload(source_account)
metadata = source_payload.get("metadata") or {}
summary_videos = ((metadata.get("video_summary") or {}).get("videos") or [])[:safe_limit]
items = []
for item in summary_videos:
score = float(item.get("performance_score") or item.get("score") or 0)
items.append(
{
"id": str(item.get("id") or item.get("aweme_id") or item.get("video_id") or make_id(f"{platform}_video")),
"title": str(item.get("title") or item.get("description") or item.get("share_url") or "平台作品"),
"source_url": str(item.get("share_url") or item.get("url") or ""),
"published_at": str(item.get("published_at") or item.get("publish_time") or ""),
"score": {"performance_score": score},
"latest_job_id": "",
"latest_job_status": "",
"summary": "",
}
)
items.sort(key=lambda item: (float((item.get("score") or {}).get("performance_score") or 0), item.get("published_at") or ""), reverse=True)
return source_payload, items[:safe_limit]
def _assistant_brief_from_job(job_row: dict[str, Any] | None) -> str:
if not job_row:
return ""
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
candidates = [
result_map.get("summary"),
result_map.get("headline_summary"),
artifacts_map.get("summary"),
artifacts_map.get("objective"),
artifacts_map.get("brief"),
job_row.get("title"),
]
for value in candidates:
cleaned = str(value or "").strip()
if cleaned:
return cleaned[:480]
return ""
def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?",
@@ -1090,6 +1347,20 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"platform": plan.get("platform", ""),
}
)
first_url = _extract_first_url(message)
if plan.get("intent_key") in {"import_homepage", "custom"} and first_url:
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "直接导入主页",
"kind": "api_action",
"executor_key": "import-homepage",
"platform": plan.get("platform", ""),
"payload": {
"source_url": first_url,
},
}
)
if plan.get("intent_key") in {"storage_status", "custom"}:
secondary_actions.append(
{
@@ -1110,6 +1381,40 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"platform": plan.get("platform", ""),
}
)
if first_url:
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "直接保存录制源",
"kind": "api_action",
"executor_key": "save-live-recorder-source",
"platform": plan.get("platform", ""),
"payload": {
"source_url": first_url,
"auto_start": True,
},
}
)
latest_platform_account = None
if plan.get("platform"):
latest_platform_account = _latest_platform_account(
account,
project_id=project_id or "",
platform=plan.get("platform", ""),
)
if plan.get("platform") and latest_platform_account and plan.get("intent_key") in {"analyze_top_videos", "analyze_account", "custom"}:
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "直接分析高分作品",
"kind": "api_action",
"executor_key": "analyze-top-videos",
"platform": plan.get("platform", ""),
"payload": {
"target_account_id": latest_platform_account["id"],
},
}
)
if context.get("assistant"):
secondary_actions.append(
{
@@ -1131,6 +1436,32 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"platform": plan.get("platform", ""),
}
)
derivable_job = _latest_derivable_job(account, project_id=project_id or "", exclude_line_types={"ai_video", "real_cut"})
if derivable_job and plan.get("intent_key") in {"ai_video", "real_cut", "review", "custom"}:
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "直接创建 AI 视频",
"kind": "api_action",
"executor_key": "create-ai-video",
"platform": plan.get("platform", ""),
"payload": {
"source_job_id": derivable_job["id"],
},
}
)
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "直接创建实拍剪辑",
"kind": "api_action",
"executor_key": "create-real-cut",
"platform": plan.get("platform", ""),
"payload": {
"source_job_id": derivable_job["id"],
},
}
)
if account.get("role") == "super_admin":
secondary_actions.append(
{
@@ -1708,6 +2039,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
if not action_key:
raise HTTPException(status_code=400, detail="Action key is required")
latest_user_message = _last_user_message_text(request.session_id, account["id"]) if request.session_id else ""
requested_payload = request.payload or {}
async def _run_platform_self_check() -> dict[str, Any]:
if not normalized_platform:
@@ -1820,6 +2152,272 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"payload": payload,
}
async def _run_import_homepage() -> dict[str, Any]:
source_url = str(
requested_payload.get("source_url")
or requested_payload.get("sourceUrl")
or _extract_first_url(latest_user_message)
or ""
).strip()
if not source_url:
raise HTTPException(status_code=400, detail="No homepage URL available for import")
inferred_platform = normalize_platform_from_text(
requested_payload.get("platform")
or requested_payload.get("platform_label")
or normalized_platform
or legacy.infer_platform_from_url(source_url)
) or _safe_platform(legacy.infer_platform_from_url(source_url), fallback="douyin")
assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=inferred_platform)
existing_source = _find_creator_source_by_url(
account,
project_id=project["id"],
platform=inferred_platform,
source_url=source_url,
)
sync_job = await legacy.create_content_source_sync_job(
legacy.ContentSourceSyncRequest(
project_id=project["id"],
knowledge_base_id=str(requested_payload.get("knowledge_base_id") or requested_payload.get("knowledgeBaseId") or ""),
assistant_id=(assistant or {}).get("id", ""),
content_source_id=(existing_source or {}).get("id", ""),
platform=inferred_platform,
handle=str(requested_payload.get("handle") or ""),
source_url=source_url,
title=str(requested_payload.get("title") or requested_payload.get("name") or ""),
analysis_model_profile_id=str(requested_payload.get("analysis_model_profile_id") or requested_payload.get("analysisModelProfileId") or ""),
language=str(requested_payload.get("language") or "auto"),
max_items=max(1, min(int(requested_payload.get("max_items") or requested_payload.get("maxItems") or 5), 20)),
skip_existing=bool(requested_payload.get("skip_existing", requested_payload.get("skipExisting", True))),
auto_trigger_analysis=bool(requested_payload.get("auto_trigger_analysis", requested_payload.get("autoTriggerAnalysis", True))),
),
account,
)
return {
"title": "OneLiner 已导入主页",
"summary": f"已把主页接入当前项目,并触发 {legacy.platform_label(inferred_platform)} 内容源同步。",
"payload": {
"job": sync_job,
"platform": inferred_platform,
"source_url": source_url,
"existing_source_id": (existing_source or {}).get("id", ""),
},
}
async def _run_analyze_top_videos() -> dict[str, Any]:
if not normalized_platform:
raise HTTPException(status_code=400, detail="Platform is required for top video analysis")
target_account = None
requested_account_id = str(requested_payload.get("target_account_id") or requested_payload.get("targetAccountId") or "").strip()
if requested_account_id and normalized_platform != "douyin":
target_account = legacy.db.fetch_one(
"""
SELECT * FROM content_sources
WHERE id = ? AND user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account'
""",
(requested_account_id, account["id"], project["id"], normalized_platform),
)
if not target_account:
target_account = _latest_platform_account(account, project_id=project["id"], platform=normalized_platform)
videos = _linked_platform_videos(
account,
project_id=project["id"],
platform=normalized_platform,
account_row=target_account or {},
limit=max(2, min(int(requested_payload.get("top_video_count") or requested_payload.get("topVideoCount") or 6), 12)),
) if target_account else []
account_payload = legacy.content_source_payload(target_account) if target_account else None
if not videos:
account_payload, videos = _fallback_platform_videos(
account,
project_id=project["id"],
platform=normalized_platform,
requested_account_id=requested_account_id,
limit=max(2, min(int(requested_payload.get("top_video_count") or requested_payload.get("topVideoCount") or 6), 12)),
)
if not account_payload:
raise HTTPException(status_code=404, detail="No platform account available for top video analysis")
min_score = float(requested_payload.get("min_score") or requested_payload.get("minScore") or 0)
ranked = [item for item in videos if float((item.get("score") or {}).get("performance_score") or 0) >= min_score]
if not ranked:
raise HTTPException(status_code=404, detail="No candidate videos available for analysis")
profile = legacy.model_profile_for_account(
account["id"],
str(requested_payload.get("model_profile_id") or requested_payload.get("modelProfileId") or ""),
)
items: list[dict[str, Any]] = []
parse_json_object = getattr(legacy, "parse_json_object", None)
for video in ranked:
prompt = (
f"请拆解这条{legacy.platform_label(normalized_platform)}作品为什么值得关注,"
"输出 JSON字段包括 summary、borrow_points、risks、next_actions。"
f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}"
)
output = await legacy.call_model(
profile,
"你是平台内容拆解助手。尽量输出 JSON字段包括 summary、borrow_points、risks、next_actions。",
prompt,
temperature=float(requested_payload.get("temperature") or 0.25),
)
parsed = parse_json_object(output) if callable(parse_json_object) else _parse_json(output, {})
summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output).strip()[:240]
items.append(
{
"id": make_id(f"{normalized_platform}_va"),
"video_id": video["id"],
"video_title": video["title"],
"status": "ok",
"summary_text": summary_text,
"parsed_json": parsed,
"performance_score": (video.get("score") or {}).get("performance_score", 0),
"latest_job_id": video.get("latest_job_id", ""),
"created_at": now(),
}
)
memory = _remember_platform_observation(
account,
project_id=project["id"],
platform=normalized_platform,
memory_key=f"top_videos::{target_account['id']}",
title=f"{legacy.platform_label(normalized_platform)} 高分作品拆解",
summary=f"已拆解 {len(items)} 条高分作品,继续固化有效结构与风险判断。",
details={
"account_id": account_payload["id"],
"items": items,
},
confidence=0.84,
)
return {
"title": "OneLiner 已分析高分作品",
"summary": f"已为 {legacy.platform_label(normalized_platform)} 账号拆解 {len(items)} 条高分作品。",
"payload": {
"platform": normalized_platform,
"account": account_payload,
"analyzed_count": len(items),
"items": items,
"memory": memory,
},
}
async def _run_create_ai_video() -> dict[str, Any]:
source_job = _load_owned_job(account, str(requested_payload.get("source_job_id") or requested_payload.get("sourceJobId") or "")) or _latest_derivable_job(
account,
project_id=project["id"],
exclude_line_types={"ai_video"},
)
if not source_job:
raise HTTPException(status_code=404, detail="No completed source job available for AI video")
assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=normalized_platform)
brief = str(
requested_payload.get("brief")
or requested_payload.get("video_brief")
or requested_payload.get("videoBrief")
or latest_user_message
or _assistant_brief_from_job(source_job)
or ""
).strip()
if not brief:
brief = f"请基于任务「{source_job.get('title') or source_job['id']}」输出一版适合短视频平台的 AI 视频。"
job = await legacy.create_ai_video_job(
legacy.AiVideoJobRequest(
project_id=project["id"],
assistant_id=(assistant or {}).get("id", ""),
knowledge_base_id=str(requested_payload.get("knowledge_base_id") or requested_payload.get("knowledgeBaseId") or source_job.get("knowledge_base_id") or ""),
source_job_id=source_job["id"],
title=str(requested_payload.get("title") or f"{source_job.get('title') or '任务'} · AI 视频"),
brief=brief,
style=str(requested_payload.get("style") or "realistic"),
shots=max(1, min(int(requested_payload.get("shots") or 4), 12)),
duration=max(3, min(int(requested_payload.get("duration") or 5), 12)),
),
account,
)
return {
"title": "OneLiner 已创建 AI 视频任务",
"summary": f"已基于「{source_job.get('title') or source_job['id']}」创建 AI 视频任务。",
"payload": {
"job": job,
"source_job": legacy.job_payload(source_job),
"brief": brief,
},
}
async def _run_create_real_cut() -> dict[str, Any]:
source_job = _load_owned_job(account, str(requested_payload.get("source_job_id") or requested_payload.get("sourceJobId") or "")) or _latest_derivable_job(
account,
project_id=project["id"],
exclude_line_types={"ai_video", "real_cut"},
)
if not source_job:
raise HTTPException(status_code=404, detail="No completed source job available for real cut")
job = await legacy.create_real_cut_job(
legacy.RealCutJobRequest(
project_id=project["id"],
title=str(requested_payload.get("title") or f"{source_job.get('title') or '任务'} · 实拍剪辑"),
source_job_id=source_job["id"],
objective=str(
requested_payload.get("objective")
or "保留高信息密度片段,输出适合短视频平台的粗剪结果"
),
target_duration_sec=max(10, min(int(requested_payload.get("target_duration_sec") or requested_payload.get("targetDurationSec") or 60), 300)),
target_aspect_ratio=str(requested_payload.get("target_aspect_ratio") or requested_payload.get("targetAspectRatio") or "9:16"),
review_enabled=bool(requested_payload.get("review_enabled", requested_payload.get("reviewEnabled", False))),
),
account,
)
return {
"title": "OneLiner 已创建实拍剪辑任务",
"summary": f"已基于「{source_job.get('title') or source_job['id']}」创建实拍剪辑任务。",
"payload": {
"job": job,
"source_job": legacy.job_payload(source_job),
},
}
async def _run_save_live_recorder_source() -> dict[str, Any]:
source_url = str(
requested_payload.get("source_url")
or requested_payload.get("sourceUrl")
or _extract_first_url(latest_user_message)
or ""
).strip()
if not source_url:
raise HTTPException(status_code=400, detail="No live recorder source URL available")
recorder_platform = normalize_platform_from_text(
requested_payload.get("platform")
or requested_payload.get("platform_label")
or normalized_platform
or legacy.infer_platform_from_url(source_url)
) or _safe_platform(legacy.infer_platform_from_url(source_url), fallback="kuaishou")
assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=recorder_platform)
saved = legacy.create_live_recorder_source(
legacy.LiveRecorderSourceCreateRequest(
project_id=project["id"],
assistant_id=(assistant or {}).get("id", ""),
platform=recorder_platform,
source_url=source_url,
title=str(requested_payload.get("title") or ""),
quality=str(requested_payload.get("quality") or "原画"),
enabled=bool(requested_payload.get("enabled", True)),
),
account,
)
started = None
if bool(requested_payload.get("auto_start", requested_payload.get("autoStart", True))):
try:
started = legacy.live_recorder_start(account)
except Exception as exc:
started = {"ok": False, "message": str(exc)}
return {
"title": "OneLiner 已保存录制源",
"summary": f"已把直播源保存到当前租户的 NAS 录制配置里。",
"payload": {
"saved": saved,
"started": started,
"platform": recorder_platform,
"source_url": source_url,
},
}
executors = {
"platform-self-check": _run_platform_self_check,
"storage-status": _run_storage_status,
@@ -1827,6 +2425,11 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"scan-admin-ops": _run_ops_scan,
"generate-copy": _run_generate_copy,
"review-draft": _run_review_draft,
"import-homepage": _run_import_homepage,
"analyze-top-videos": _run_analyze_top_videos,
"create-ai-video": _run_create_ai_video,
"create-real-cut": _run_create_real_cut,
"save-live-recorder-source": _run_save_live_recorder_source,
}
executor = executors.get(action_key)
if not executor: