Add Xiaohongshu route module

This commit is contained in:
kris
2026-03-23 09:05:58 +08:00
parent 70e4652996
commit 9afac1eff7

View File

@@ -0,0 +1,765 @@
from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from html import unescape
from typing import Any, Iterable
from urllib.parse import unquote
import httpx
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
DEFAULT_TIMEOUT = 20.0
MAX_HTML_SEARCH_BYTES = 2_000_000
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
)
XHS_PLATFORM = "xiaohongshu"
class XHSManualPageCapture(BaseModel):
url: str = ""
title: str = ""
payload: dict[str, Any] = Field(default_factory=dict)
class XiaohongshuContentSourceCreateRequest(BaseModel):
project_id: str = ""
source_kind: str
handle: str = ""
source_url: str = ""
title: str = ""
local_path: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
class XiaohongshuContentSourceSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
source_url: str = ""
handle: str = ""
title: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
manual_source_payload: dict[str, Any] | None = None
manual_pages: list[XHSManualPageCapture] = Field(default_factory=list)
discovery_note: str = ""
class XiaohongshuReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
platform: str = XHS_PLATFORM
content_type: str = "note"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
class XiaohongshuReviewUpdateRequest(BaseModel):
title: str | None = None
platform: str | None = None
content_type: str | None = None
publish_url: str | None = None
published_at: str | None = None
metrics: dict[str, Any] | None = None
verdict: str | None = None
highlights: str | None = None
next_actions: str | None = None
notes: str | None = None
assistant_id: str | None = None
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def _first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
elif value not in ("", [], {}, ()):
return str(value)
return ""
def _dedupe_strings(values: Iterable[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = str(value).strip()
if not item:
continue
key = item.lower()
if key in seen:
continue
seen.add(key)
result.append(item)
return result
def _compact_text(value: Any, limit: int = 500) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return f"{text[: limit - 1]}"
def _parse_count(value: Any) -> float:
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
text = str(value).strip().lower().replace(",", "")
if not text:
return 0.0
multiplier = 1.0
if text.endswith("w") or text.endswith(""):
multiplier = 10_000.0
text = text[:-1]
elif text.endswith("亿"):
multiplier = 100_000_000.0
text = text[:-1]
text = text.replace("+", "")
match = re.search(r"-?\d+(?:\.\d+)?", text)
if not match:
return 0.0
try:
return float(match.group()) * multiplier
except ValueError:
return 0.0
def _normalize_timestamp(value: Any) -> str | None:
if value in (None, "", 0, "0"):
return None
if isinstance(value, str):
stripped = value.strip()
if not stripped:
return None
if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
return stripped
if stripped.isdigit():
value = int(stripped)
else:
return stripped
if isinstance(value, (int, float)):
ts = float(value)
if ts > 10_000_000_000:
ts /= 1000.0
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
except Exception:
return None
return None
def _extract_hashtags(*texts: str) -> list[str]:
tags: list[str] = []
for text in texts:
if not text:
continue
tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
return _dedupe_strings(tags)
def _extract_keywords(*texts: str) -> list[str]:
candidates: list[str] = []
for text in texts:
if not text:
continue
candidates.extend(_extract_hashtags(text))
candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
stop_words = {
"小红书",
"笔记",
"内容",
"账号",
"发布",
"更多",
"关注",
"用户",
"xhs",
"xiaohongshu",
}
return _dedupe_strings(item for item in candidates if item.lower() not in stop_words)
def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
if isinstance(value, dict):
yield value
for child in value.values():
yield from _walk_json(child)
elif isinstance(value, list):
for child in value:
yield from _walk_json(child)
def _extract_json_objects_from_text(text: str) -> list[Any]:
decoder = json.JSONDecoder()
objects: list[Any] = []
seen: set[str] = set()
if not text:
return objects
candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
for candidate in candidates:
snippet = candidate[:MAX_HTML_SEARCH_BYTES]
for match in re.finditer(r"[\{\[]", snippet):
try:
obj, _ = decoder.raw_decode(snippet[match.start() :])
except Exception:
continue
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
objects.append(obj)
if len(objects) >= 50:
return objects
return objects
def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
blobs: list[dict[str, Any]] = []
seen: set[str] = set()
for attrs, content in re.findall(r"<script([^>]*)>(.*?)</script>", html, re.IGNORECASE | re.DOTALL):
script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
script_id = script_id_match.group(1) if script_id_match else ""
for obj in _extract_json_objects_from_text(content.strip()):
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
blobs.append({"script_id": script_id, "payload": obj})
return blobs
async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
if cookie.strip():
headers["Cookie"] = cookie.strip()
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
return str(response.url), response.text
def _note_candidate_score(value: dict[str, Any]) -> int:
score = 0
if any(key in value for key in ("note_id", "noteId", "id", "post_id")):
score += 2
if any(key in value for key in ("title", "desc", "content", "text", "note")):
score += 2
if any(key in value for key in ("author", "user", "owner")):
score += 2
if "stats" in value and isinstance(value["stats"], dict):
score += 2
return score
def _extract_note_candidates(payload: Any) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in _walk_json(payload):
if _note_candidate_score(item) >= 4:
candidates.append(item)
for key in ("author", "user", "owner"):
child = item.get(key)
if isinstance(child, dict) and _note_candidate_score(child) >= 3:
candidates.append(child)
return candidates
def _normalize_note_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
stats_source = candidate.get("stats") if isinstance(candidate.get("stats"), dict) else {}
author = candidate.get("author") if isinstance(candidate.get("author"), dict) else {}
if not author and isinstance(candidate.get("user"), dict):
author = candidate["user"]
cover = candidate.get("cover") or candidate.get("image") or candidate.get("images")
if isinstance(cover, list) and cover:
cover = cover[0]
if isinstance(cover, dict):
cover = _first_non_empty(
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
cover.get("url"),
)
return {
"note_id": _first_non_empty(candidate.get("note_id"), candidate.get("noteId"), candidate.get("id"), candidate.get("post_id")),
"title": _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("content"), candidate.get("text")),
"content": _first_non_empty(candidate.get("content"), candidate.get("desc"), candidate.get("text"), candidate.get("note")),
"author_name": _first_non_empty(author.get("nickname"), author.get("name"), candidate.get("nickname")),
"author_url": _first_non_empty(author.get("profile_url"), candidate.get("profile_url")),
"share_url": _first_non_empty(candidate.get("share_url"), candidate.get("url"), fallback_url),
"cover_url": _first_non_empty(cover),
"published_at": _normalize_timestamp(candidate.get("publish_time") or candidate.get("created_at") or candidate.get("create_time")),
"tags": _extract_hashtags(
_first_non_empty(candidate.get("title")),
_first_non_empty(candidate.get("desc"), candidate.get("content")),
),
"stats": {
"like": _parse_count(stats_source.get("like_count") or stats_source.get("liked_count") or candidate.get("like_count")),
"comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
"collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count")),
"share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
},
"raw": candidate,
}
def _extract_notes(payloads: Iterable[Any]) -> list[dict[str, Any]]:
notes: list[dict[str, Any]] = []
seen: set[str] = set()
for payload in payloads:
for candidate in _extract_note_candidates(payload):
normalized = _normalize_note_candidate(candidate)
dedupe_key = normalized["note_id"] or normalized["share_url"] or normalized["title"]
if not dedupe_key or dedupe_key in seen:
continue
seen.add(dedupe_key)
notes.append(normalized)
notes.sort(
key=lambda item: (
item["stats"]["like"] + item["stats"]["comment"] * 3 + item["stats"]["collect"] * 2 + item["stats"]["share"] * 4
),
reverse=True,
)
return notes
def _is_xhs_source_row(row: dict[str, Any]) -> bool:
platform = str(row.get("platform", "") or "").strip().lower()
if platform == XHS_PLATFORM:
return True
source_url = str(row.get("source_url", "") or "")
normalized = source_url.strip().lower()
return "xiaohongshu.com" in normalized or "xhslink.com" in normalized
def _job_matches_platform(row: dict[str, Any], legacy: Any) -> bool:
if row.get("content_source_id"):
source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],))
if source:
return _is_xhs_source_row(source)
source_url = str(row.get("source_url") or "")
return "xiaohongshu.com" in source_url.lower() or "xhslink.com" in source_url.lower()
def _review_matches_platform(row: dict[str, Any], legacy: Any) -> bool:
return str(row.get("platform", "") or "").strip().lower() == XHS_PLATFORM
def _normalize_platform(value: str | None) -> str:
return str(value or "").strip().lower()
def _require_xhs_platform(value: str | None) -> str:
normalized = _normalize_platform(value or XHS_PLATFORM)
if normalized != XHS_PLATFORM:
raise HTTPException(status_code=400, detail="Xiaohongshu routes only support the xiaohongshu platform")
return normalized
def register_xiaohongshu_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def _content_source_row_or_404(source_id: str, account_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id))
if not row:
raise HTTPException(status_code=404, detail="Content source not found")
if not _is_xhs_source_row(row):
raise HTTPException(status_code=404, detail="Content source not found")
return row
def _xhs_job_payload(row: dict[str, Any]) -> dict[str, Any]:
payload = legacy.job_payload(row)
if row.get("content_source_id"):
source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],))
if source_row and _is_xhs_source_row(source_row):
payload["content_source"] = legacy.content_source_payload(source_row)
return payload
def _xhs_review_payload(row: dict[str, Any]) -> dict[str, Any]:
payload = legacy.review_payload(row)
if payload.get("platform", "") != XHS_PLATFORM:
payload["platform"] = XHS_PLATFORM
return payload
async def _collect_public_source(
source_url: str,
manual_payload: dict[str, Any] | None,
manual_pages: list[XHSManualPageCapture],
) -> dict[str, Any]:
source_url = source_url.strip()
blobs: list[dict[str, Any]] = []
errors: list[str] = []
if manual_payload:
blobs.append({"script_id": "manual_source_payload", "payload": manual_payload})
for page in manual_pages:
blobs.append({
"script_id": "manual_page_payload",
"url": page.url,
"title": page.title,
"payload": page.payload,
})
if source_url:
try:
final_url, html = await _fetch_html(source_url)
source_url = final_url
blobs.extend(_extract_json_blobs_from_html(html))
except Exception as exc:
errors.append(f"source_fetch_failed: {exc}")
payloads = [item["payload"] for item in blobs]
notes = _extract_notes(payloads)
source_title = _first_non_empty(
manual_payload.get("title", "") if manual_payload else "",
*(item.get("title", "") for item in notes[:3]),
source_url,
)
return {
"source_url": source_url,
"title": source_title,
"notes": notes,
"raw_pages": blobs,
"errors": errors,
}
@app.get("/v2/xiaohongshu/content-sources")
def list_content_sources(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account["id"], XHS_PLATFORM]
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("project_id = ?")
params.append(normalized_project)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
rows = legacy.db.fetch_all(
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
tuple(params),
)
return [legacy.content_source_payload(row) for row in rows]
@app.post("/v2/xiaohongshu/content-sources")
def create_content_source_api(
request: XiaohongshuContentSourceCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = legacy.resolve_target_project(account["id"], request.project_id or None, username=account["username"])
row = legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=request.source_kind.strip(),
platform=XHS_PLATFORM,
handle=request.handle.strip(),
source_url=request.source_url.strip(),
title=request.title.strip(),
local_path=request.local_path.strip(),
metadata={
**request.metadata,
"platform_label": "小红书",
"platform": XHS_PLATFORM,
},
)
return legacy.content_source_payload(row)
@app.get("/v2/xiaohongshu/content-sources/{source_id}")
def get_content_source(source_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
row = _content_source_row_or_404(source_id, account["id"])
return legacy.content_source_payload(row)
@app.post("/v2/xiaohongshu/content-sources/sync")
async def sync_content_source(
request: XiaohongshuContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = None
if request.content_source_id.strip():
source_row = _content_source_row_or_404(request.content_source_id.strip(), account["id"])
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
source_url = (request.source_url or (source_row or {}).get("source_url") or "").strip()
if not source_url and not source_row:
raise HTTPException(status_code=400, detail="source_url or content_source_id is required")
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
if source_row and not _is_xhs_source_row(source_row):
raise HTTPException(status_code=400, detail="Content source is not scoped to Xiaohongshu")
source_kind = (source_row or {}).get("source_kind", "creator_account")
handle = (request.handle or (source_row or {}).get("handle", "")).strip()
source_title = (
request.title.strip()
or (source_row or {}).get("title", "").strip()
or handle
or source_url
)
if not source_row:
source_row = legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=source_kind or "creator_account",
platform=XHS_PLATFORM,
handle=handle,
source_url=source_url,
title=source_title,
metadata={
"platform": XHS_PLATFORM,
"platform_label": "小红书",
"sync_mode": "recent_notes",
"max_items": request.max_items,
},
)
public_data = await _collect_public_source(source_url, request.manual_source_payload, request.manual_pages)
note_count = len(public_data["notes"])
top_notes = [
{
"note_id": item["note_id"],
"title": _compact_text(item["title"], 120),
"content": _compact_text(item["content"], 180),
"author_name": item["author_name"],
"published_at": item["published_at"],
"stats": item["stats"],
"tags": item["tags"][:6],
}
for item in public_data["notes"][: request.max_items]
]
job_row = legacy.create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{source_title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": XHS_PLATFORM,
"handle": handle,
"source_account_url": source_url,
"source_title": source_title,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
"max_items": request.max_items,
"note_count": note_count,
"top_notes": top_notes,
"raw_pages": public_data["raw_pages"],
"errors": public_data["errors"],
"discovery_note": request.discovery_note.strip(),
},
analysis_model_profile_id="",
)
legacy.update_content_source_metadata(
source_row["id"],
{
"platform": XHS_PLATFORM,
"platform_label": "小红书",
"sync_mode": "recent_notes",
"max_items": request.max_items,
"note_count": note_count,
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": now(),
},
)
return legacy.job_payload(await legacy.trigger_orchestrated_job(job_row))
@app.get("/v2/xiaohongshu/jobs")
def list_jobs(
parent_job_id: str | None = Query(default=None),
line_type: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?"]
params: list[Any] = [account["id"]]
if parent_job_id is not None:
normalized_parent = parent_job_id.strip()
if normalized_parent:
clauses.append("parent_job_id = ?")
params.append(normalized_parent)
else:
clauses.append("(parent_job_id IS NULL OR parent_job_id = '')")
if line_type:
clauses.append("line_type = ?")
params.append(line_type.strip())
rows = legacy.db.fetch_all(
f"SELECT * FROM jobs WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
tuple(params),
)
return [_xhs_job_payload(row) for row in rows if _job_matches_platform(row, legacy)]
@app.get("/v2/xiaohongshu/jobs/{job_id}")
def get_job(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"]))
if not row or not _job_matches_platform(row, legacy):
raise HTTPException(status_code=404, detail="Job not found")
return _xhs_job_payload(row)
@app.get("/v2/xiaohongshu/jobs/{job_id}/events")
def get_job_events(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"]))
if not row or not _job_matches_platform(row, legacy):
raise HTTPException(status_code=404, detail="Job not found")
return [
legacy.job_event_payload(item)
for item in legacy.db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (job_id,))
]
@app.get("/v2/xiaohongshu/reviews")
def list_reviews(
project_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account["id"], XHS_PLATFORM]
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("project_id = ?")
params.append(normalized_project)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
sql = (
f"SELECT * FROM publish_reviews WHERE {' AND '.join(clauses)} "
"ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT ?"
)
params.append(limit)
return [_xhs_review_payload(row) for row in legacy.db.fetch_all(sql, tuple(params))]
@app.post("/v2/xiaohongshu/reviews")
def create_review(
request: XiaohongshuReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_job = None
if request.source_job_id.strip():
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
if not _job_matches_platform(source_job, legacy):
raise HTTPException(status_code=404, detail="Job not found")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
review_id = make_id("review")
title = request.title.strip() or (source_job.get("title", "") if source_job else "")
if not title:
title = f"{project['name']} 复盘"
timestamp = now()
normalized_platform = _require_xhs_platform(request.platform)
legacy.db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
normalized_platform,
request.content_type.strip() or "note",
request.publish_url.strip(),
request.published_at.strip(),
_safe_json_dumps(request.metrics),
request.verdict.strip(),
request.highlights.strip(),
request.next_actions.strip(),
request.notes.strip(),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return _xhs_review_payload(row)
@app.patch("/v2/xiaohongshu/reviews/{review_id}")
def update_review(
review_id: str,
request: XiaohongshuReviewUpdateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
current = legacy.load_owned_review(review_id, account["id"])
if not _review_matches_platform(current, legacy):
raise HTTPException(status_code=404, detail="Review not found")
assistant_id = current.get("assistant_id") or None
if request.assistant_id is not None:
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, current.get("project_id", ""))
assistant_id = (assistant or {}).get("id") or None
normalized_platform = current.get("platform", XHS_PLATFORM)
if request.platform is not None:
normalized_platform = _require_xhs_platform(request.platform)
legacy.db.execute(
"""
UPDATE publish_reviews
SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?,
metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?,
assistant_id = ?, updated_at = ?
WHERE id = ? AND user_id = ?
""",
(
request.title if request.title is not None else current.get("title", ""),
normalized_platform,
request.content_type if request.content_type is not None else current.get("content_type", "note"),
request.publish_url if request.publish_url is not None else current.get("publish_url", ""),
request.published_at if request.published_at is not None else current.get("published_at", ""),
_safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")),
request.verdict if request.verdict is not None else current.get("verdict", ""),
request.highlights if request.highlights is not None else current.get("highlights", ""),
request.next_actions if request.next_actions is not None else current.get("next_actions", ""),
request.notes if request.notes is not None else current.get("notes", ""),
assistant_id,
now(),
review_id,
account["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return _xhs_review_payload(row)