From 9afac1eff7475649d41ee41fb197048972e2a4ab Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 09:05:58 +0800 Subject: [PATCH] Add Xiaohongshu route module --- collector-service/app/xiaohongshu_features.py | 765 ++++++++++++++++++ 1 file changed, 765 insertions(+) create mode 100644 collector-service/app/xiaohongshu_features.py diff --git a/collector-service/app/xiaohongshu_features.py b/collector-service/app/xiaohongshu_features.py new file mode 100644 index 0000000..5502653 --- /dev/null +++ b/collector-service/app/xiaohongshu_features.py @@ -0,0 +1,765 @@ +from __future__ import annotations + +import json +import re +from datetime import datetime, timezone +from html import unescape +from typing import Any, Iterable +from urllib.parse import unquote + +import httpx +from fastapi import Depends, HTTPException, Query +from pydantic import BaseModel, Field + +DEFAULT_TIMEOUT = 20.0 +MAX_HTML_SEARCH_BYTES = 2_000_000 +DEFAULT_USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" +) +XHS_PLATFORM = "xiaohongshu" + + +class XHSManualPageCapture(BaseModel): + url: str = "" + title: str = "" + payload: dict[str, Any] = Field(default_factory=dict) + + +class XiaohongshuContentSourceCreateRequest(BaseModel): + project_id: str = "" + source_kind: str + handle: str = "" + source_url: str = "" + title: str = "" + local_path: str = "" + metadata: dict[str, Any] = Field(default_factory=dict) + + +class XiaohongshuContentSourceSyncRequest(BaseModel): + project_id: str = "" + knowledge_base_id: str = "" + assistant_id: str = "" + content_source_id: str = "" + source_url: str = "" + handle: str = "" + title: str = "" + language: str = "auto" + max_items: int = Field(default=5, ge=1, le=20) + skip_existing: bool = True + auto_trigger_analysis: bool = True + manual_source_payload: dict[str, Any] | None = None + manual_pages: list[XHSManualPageCapture] = Field(default_factory=list) + discovery_note: str = "" + + +class XiaohongshuReviewCreateRequest(BaseModel): + project_id: str = "" + source_job_id: str = "" + assistant_id: str = "" + title: str = "" + platform: str = XHS_PLATFORM + content_type: str = "note" + publish_url: str = "" + published_at: str = "" + metrics: dict[str, Any] = Field(default_factory=dict) + verdict: str = "" + highlights: str = "" + next_actions: str = "" + notes: str = "" + + +class XiaohongshuReviewUpdateRequest(BaseModel): + title: str | None = None + platform: str | None = None + content_type: str | None = None + publish_url: str | None = None + published_at: str | None = None + metrics: dict[str, Any] | None = None + verdict: str | None = None + highlights: str | None = None + next_actions: str | None = None + notes: str | None = None + assistant_id: str | None = None + + +def _safe_json_dumps(value: Any) -> str: + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) + + +def _first_non_empty(*values: Any) -> str: + for value in values: + if value is None: + continue + if isinstance(value, str): + stripped = value.strip() + if stripped: + return stripped + elif value not in ("", [], {}, ()): + return str(value) + return "" + + +def _dedupe_strings(values: Iterable[str]) -> list[str]: + result: list[str] = [] + seen: set[str] = set() + for value in values: + item = str(value).strip() + if not item: + continue + key = item.lower() + if key in seen: + continue + seen.add(key) + result.append(item) + return result + + +def _compact_text(value: Any, limit: int = 500) -> str: + text = str(value or "").strip() + if len(text) <= limit: + return text + return f"{text[: limit - 1]}…" + + +def _parse_count(value: Any) -> float: + if value is None: + return 0.0 + if isinstance(value, (int, float)): + return float(value) + text = str(value).strip().lower().replace(",", "") + if not text: + return 0.0 + multiplier = 1.0 + if text.endswith("w") or text.endswith("万"): + multiplier = 10_000.0 + text = text[:-1] + elif text.endswith("亿"): + multiplier = 100_000_000.0 + text = text[:-1] + text = text.replace("+", "") + match = re.search(r"-?\d+(?:\.\d+)?", text) + if not match: + return 0.0 + try: + return float(match.group()) * multiplier + except ValueError: + return 0.0 + + +def _normalize_timestamp(value: Any) -> str | None: + if value in (None, "", 0, "0"): + return None + if isinstance(value, str): + stripped = value.strip() + if not stripped: + return None + if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped): + return stripped + if stripped.isdigit(): + value = int(stripped) + else: + return stripped + if isinstance(value, (int, float)): + ts = float(value) + if ts > 10_000_000_000: + ts /= 1000.0 + try: + return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat() + except Exception: + return None + return None + + +def _extract_hashtags(*texts: str) -> list[str]: + tags: list[str] = [] + for text in texts: + if not text: + continue + tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text)) + return _dedupe_strings(tags) + + +def _extract_keywords(*texts: str) -> list[str]: + candidates: list[str] = [] + for text in texts: + if not text: + continue + candidates.extend(_extract_hashtags(text)) + candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text)) + candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text)) + stop_words = { + "小红书", + "笔记", + "内容", + "账号", + "发布", + "更多", + "关注", + "用户", + "xhs", + "xiaohongshu", + } + return _dedupe_strings(item for item in candidates if item.lower() not in stop_words) + + +def _walk_json(value: Any) -> Iterable[dict[str, Any]]: + if isinstance(value, dict): + yield value + for child in value.values(): + yield from _walk_json(child) + elif isinstance(value, list): + for child in value: + yield from _walk_json(child) + + +def _extract_json_objects_from_text(text: str) -> list[Any]: + decoder = json.JSONDecoder() + objects: list[Any] = [] + seen: set[str] = set() + if not text: + return objects + + candidates = [text, unquote(text), unescape(text), unescape(unquote(text))] + for candidate in candidates: + snippet = candidate[:MAX_HTML_SEARCH_BYTES] + for match in re.finditer(r"[\{\[]", snippet): + try: + obj, _ = decoder.raw_decode(snippet[match.start() :]) + except Exception: + continue + marker = _safe_json_dumps(obj) + if marker in seen: + continue + seen.add(marker) + objects.append(obj) + if len(objects) >= 50: + return objects + return objects + + +def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]: + blobs: list[dict[str, Any]] = [] + seen: set[str] = set() + for attrs, content in re.findall(r"]*)>(.*?)", html, re.IGNORECASE | re.DOTALL): + script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE) + script_id = script_id_match.group(1) if script_id_match else "" + for obj in _extract_json_objects_from_text(content.strip()): + marker = _safe_json_dumps(obj) + if marker in seen: + continue + seen.add(marker) + blobs.append({"script_id": script_id, "payload": obj}) + return blobs + + +async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]: + headers = { + "User-Agent": DEFAULT_USER_AGENT, + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + } + if cookie.strip(): + headers["Cookie"] = cookie.strip() + async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client: + response = await client.get(url, headers=headers) + response.raise_for_status() + return str(response.url), response.text + + +def _note_candidate_score(value: dict[str, Any]) -> int: + score = 0 + if any(key in value for key in ("note_id", "noteId", "id", "post_id")): + score += 2 + if any(key in value for key in ("title", "desc", "content", "text", "note")): + score += 2 + if any(key in value for key in ("author", "user", "owner")): + score += 2 + if "stats" in value and isinstance(value["stats"], dict): + score += 2 + return score + + +def _extract_note_candidates(payload: Any) -> list[dict[str, Any]]: + candidates: list[dict[str, Any]] = [] + for item in _walk_json(payload): + if _note_candidate_score(item) >= 4: + candidates.append(item) + for key in ("author", "user", "owner"): + child = item.get(key) + if isinstance(child, dict) and _note_candidate_score(child) >= 3: + candidates.append(child) + return candidates + + +def _normalize_note_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]: + stats_source = candidate.get("stats") if isinstance(candidate.get("stats"), dict) else {} + author = candidate.get("author") if isinstance(candidate.get("author"), dict) else {} + if not author and isinstance(candidate.get("user"), dict): + author = candidate["user"] + cover = candidate.get("cover") or candidate.get("image") or candidate.get("images") + if isinstance(cover, list) and cover: + cover = cover[0] + if isinstance(cover, dict): + cover = _first_non_empty( + cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "", + cover.get("url"), + ) + return { + "note_id": _first_non_empty(candidate.get("note_id"), candidate.get("noteId"), candidate.get("id"), candidate.get("post_id")), + "title": _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("content"), candidate.get("text")), + "content": _first_non_empty(candidate.get("content"), candidate.get("desc"), candidate.get("text"), candidate.get("note")), + "author_name": _first_non_empty(author.get("nickname"), author.get("name"), candidate.get("nickname")), + "author_url": _first_non_empty(author.get("profile_url"), candidate.get("profile_url")), + "share_url": _first_non_empty(candidate.get("share_url"), candidate.get("url"), fallback_url), + "cover_url": _first_non_empty(cover), + "published_at": _normalize_timestamp(candidate.get("publish_time") or candidate.get("created_at") or candidate.get("create_time")), + "tags": _extract_hashtags( + _first_non_empty(candidate.get("title")), + _first_non_empty(candidate.get("desc"), candidate.get("content")), + ), + "stats": { + "like": _parse_count(stats_source.get("like_count") or stats_source.get("liked_count") or candidate.get("like_count")), + "comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")), + "collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count")), + "share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")), + }, + "raw": candidate, + } + + +def _extract_notes(payloads: Iterable[Any]) -> list[dict[str, Any]]: + notes: list[dict[str, Any]] = [] + seen: set[str] = set() + for payload in payloads: + for candidate in _extract_note_candidates(payload): + normalized = _normalize_note_candidate(candidate) + dedupe_key = normalized["note_id"] or normalized["share_url"] or normalized["title"] + if not dedupe_key or dedupe_key in seen: + continue + seen.add(dedupe_key) + notes.append(normalized) + notes.sort( + key=lambda item: ( + item["stats"]["like"] + item["stats"]["comment"] * 3 + item["stats"]["collect"] * 2 + item["stats"]["share"] * 4 + ), + reverse=True, + ) + return notes + + +def _is_xhs_source_row(row: dict[str, Any]) -> bool: + platform = str(row.get("platform", "") or "").strip().lower() + if platform == XHS_PLATFORM: + return True + source_url = str(row.get("source_url", "") or "") + normalized = source_url.strip().lower() + return "xiaohongshu.com" in normalized or "xhslink.com" in normalized + + +def _job_matches_platform(row: dict[str, Any], legacy: Any) -> bool: + if row.get("content_source_id"): + source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],)) + if source: + return _is_xhs_source_row(source) + source_url = str(row.get("source_url") or "") + return "xiaohongshu.com" in source_url.lower() or "xhslink.com" in source_url.lower() + + +def _review_matches_platform(row: dict[str, Any], legacy: Any) -> bool: + return str(row.get("platform", "") or "").strip().lower() == XHS_PLATFORM + + +def _normalize_platform(value: str | None) -> str: + return str(value or "").strip().lower() + + +def _require_xhs_platform(value: str | None) -> str: + normalized = _normalize_platform(value or XHS_PLATFORM) + if normalized != XHS_PLATFORM: + raise HTTPException(status_code=400, detail="Xiaohongshu routes only support the xiaohongshu platform") + return normalized + + +def register_xiaohongshu_routes(app: Any, legacy: Any) -> None: + def now() -> str: + return legacy.utc_now() + + def make_id(prefix: str) -> str: + return legacy.make_id(prefix) + + def _content_source_row_or_404(source_id: str, account_id: str) -> dict[str, Any]: + row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id)) + if not row: + raise HTTPException(status_code=404, detail="Content source not found") + if not _is_xhs_source_row(row): + raise HTTPException(status_code=404, detail="Content source not found") + return row + + def _xhs_job_payload(row: dict[str, Any]) -> dict[str, Any]: + payload = legacy.job_payload(row) + if row.get("content_source_id"): + source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],)) + if source_row and _is_xhs_source_row(source_row): + payload["content_source"] = legacy.content_source_payload(source_row) + return payload + + def _xhs_review_payload(row: dict[str, Any]) -> dict[str, Any]: + payload = legacy.review_payload(row) + if payload.get("platform", "") != XHS_PLATFORM: + payload["platform"] = XHS_PLATFORM + return payload + + async def _collect_public_source( + source_url: str, + manual_payload: dict[str, Any] | None, + manual_pages: list[XHSManualPageCapture], + ) -> dict[str, Any]: + source_url = source_url.strip() + blobs: list[dict[str, Any]] = [] + errors: list[str] = [] + + if manual_payload: + blobs.append({"script_id": "manual_source_payload", "payload": manual_payload}) + + for page in manual_pages: + blobs.append({ + "script_id": "manual_page_payload", + "url": page.url, + "title": page.title, + "payload": page.payload, + }) + + if source_url: + try: + final_url, html = await _fetch_html(source_url) + source_url = final_url + blobs.extend(_extract_json_blobs_from_html(html)) + except Exception as exc: + errors.append(f"source_fetch_failed: {exc}") + + payloads = [item["payload"] for item in blobs] + notes = _extract_notes(payloads) + source_title = _first_non_empty( + manual_payload.get("title", "") if manual_payload else "", + *(item.get("title", "") for item in notes[:3]), + source_url, + ) + return { + "source_url": source_url, + "title": source_title, + "notes": notes, + "raw_pages": blobs, + "errors": errors, + } + + @app.get("/v2/xiaohongshu/content-sources") + def list_content_sources( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + clauses = ["user_id = ?", "platform = ?"] + params: list[Any] = [account["id"], XHS_PLATFORM] + if project_id is not None: + normalized_project = project_id.strip() + if normalized_project: + clauses.append("project_id = ?") + params.append(normalized_project) + else: + clauses.append("(project_id IS NULL OR project_id = '')") + rows = legacy.db.fetch_all( + f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC", + tuple(params), + ) + return [legacy.content_source_payload(row) for row in rows] + + @app.post("/v2/xiaohongshu/content-sources") + def create_content_source_api( + request: XiaohongshuContentSourceCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = legacy.resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + row = legacy.create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind=request.source_kind.strip(), + platform=XHS_PLATFORM, + handle=request.handle.strip(), + source_url=request.source_url.strip(), + title=request.title.strip(), + local_path=request.local_path.strip(), + metadata={ + **request.metadata, + "platform_label": "小红书", + "platform": XHS_PLATFORM, + }, + ) + return legacy.content_source_payload(row) + + @app.get("/v2/xiaohongshu/content-sources/{source_id}") + def get_content_source(source_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + row = _content_source_row_or_404(source_id, account["id"]) + return legacy.content_source_payload(row) + + @app.post("/v2/xiaohongshu/content-sources/sync") + async def sync_content_source( + request: XiaohongshuContentSourceSyncRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_row = None + if request.content_source_id.strip(): + source_row = _content_source_row_or_404(request.content_source_id.strip(), account["id"]) + + requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "") + project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"]) + assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + source_url = (request.source_url or (source_row or {}).get("source_url") or "").strip() + if not source_url and not source_row: + raise HTTPException(status_code=400, detail="source_url or content_source_id is required") + + if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]: + raise HTTPException(status_code=400, detail="Content source does not belong to target project") + + if source_row and not _is_xhs_source_row(source_row): + raise HTTPException(status_code=400, detail="Content source is not scoped to Xiaohongshu") + + source_kind = (source_row or {}).get("source_kind", "creator_account") + handle = (request.handle or (source_row or {}).get("handle", "")).strip() + source_title = ( + request.title.strip() + or (source_row or {}).get("title", "").strip() + or handle + or source_url + ) + + if not source_row: + source_row = legacy.create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind=source_kind or "creator_account", + platform=XHS_PLATFORM, + handle=handle, + source_url=source_url, + title=source_title, + metadata={ + "platform": XHS_PLATFORM, + "platform_label": "小红书", + "sync_mode": "recent_notes", + "max_items": request.max_items, + }, + ) + + public_data = await _collect_public_source(source_url, request.manual_source_payload, request.manual_pages) + note_count = len(public_data["notes"]) + top_notes = [ + { + "note_id": item["note_id"], + "title": _compact_text(item["title"], 120), + "content": _compact_text(item["content"], 180), + "author_name": item["author_name"], + "published_at": item["published_at"], + "stats": item["stats"], + "tags": item["tags"][:6], + } + for item in public_data["notes"][: request.max_items] + ] + + job_row = legacy.create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="content_source_sync", + line_type="content_source_sync", + workflow_key="content_source_sync_pipeline", + title=f"{source_title} 内容源同步", + language=request.language, + source_url=source_url, + assistant_id=(assistant or {}).get("id"), + content_source_id=source_row["id"], + artifacts={ + "platform": XHS_PLATFORM, + "handle": handle, + "source_account_url": source_url, + "source_title": source_title, + "skip_existing": request.skip_existing, + "auto_trigger_analysis": request.auto_trigger_analysis, + "max_items": request.max_items, + "note_count": note_count, + "top_notes": top_notes, + "raw_pages": public_data["raw_pages"], + "errors": public_data["errors"], + "discovery_note": request.discovery_note.strip(), + }, + analysis_model_profile_id="", + ) + + legacy.update_content_source_metadata( + source_row["id"], + { + "platform": XHS_PLATFORM, + "platform_label": "小红书", + "sync_mode": "recent_notes", + "max_items": request.max_items, + "note_count": note_count, + "last_sync_job_id": job_row["id"], + "last_sync_requested_at": now(), + }, + ) + return legacy.job_payload(await legacy.trigger_orchestrated_job(job_row)) + + @app.get("/v2/xiaohongshu/jobs") + def list_jobs( + parent_job_id: str | None = Query(default=None), + line_type: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + clauses = ["user_id = ?"] + params: list[Any] = [account["id"]] + if parent_job_id is not None: + normalized_parent = parent_job_id.strip() + if normalized_parent: + clauses.append("parent_job_id = ?") + params.append(normalized_parent) + else: + clauses.append("(parent_job_id IS NULL OR parent_job_id = '')") + if line_type: + clauses.append("line_type = ?") + params.append(line_type.strip()) + rows = legacy.db.fetch_all( + f"SELECT * FROM jobs WHERE {' AND '.join(clauses)} ORDER BY created_at DESC", + tuple(params), + ) + return [_xhs_job_payload(row) for row in rows if _job_matches_platform(row, legacy)] + + @app.get("/v2/xiaohongshu/jobs/{job_id}") + def get_job(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"])) + if not row or not _job_matches_platform(row, legacy): + raise HTTPException(status_code=404, detail="Job not found") + return _xhs_job_payload(row) + + @app.get("/v2/xiaohongshu/jobs/{job_id}/events") + def get_job_events(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]: + row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"])) + if not row or not _job_matches_platform(row, legacy): + raise HTTPException(status_code=404, detail="Job not found") + return [ + legacy.job_event_payload(item) + for item in legacy.db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (job_id,)) + ] + + @app.get("/v2/xiaohongshu/reviews") + def list_reviews( + project_id: str | None = Query(default=None), + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + clauses = ["user_id = ?", "platform = ?"] + params: list[Any] = [account["id"], XHS_PLATFORM] + if project_id is not None: + normalized_project = project_id.strip() + if normalized_project: + clauses.append("project_id = ?") + params.append(normalized_project) + else: + clauses.append("(project_id IS NULL OR project_id = '')") + sql = ( + f"SELECT * FROM publish_reviews WHERE {' AND '.join(clauses)} " + "ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT ?" + ) + params.append(limit) + return [_xhs_review_payload(row) for row in legacy.db.fetch_all(sql, tuple(params))] + + @app.post("/v2/xiaohongshu/reviews") + def create_review( + request: XiaohongshuReviewCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_job = None + if request.source_job_id.strip(): + source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"]) + if not _job_matches_platform(source_job, legacy): + raise HTTPException(status_code=404, detail="Job not found") + requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "") + project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + review_id = make_id("review") + title = request.title.strip() or (source_job.get("title", "") if source_job else "") + if not title: + title = f"{project['name']} 复盘" + timestamp = now() + normalized_platform = _require_xhs_platform(request.platform) + legacy.db.execute( + """ + INSERT INTO publish_reviews ( + id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type, + publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + review_id, + account["id"], + project["id"], + source_job["id"] if source_job else None, + (assistant or {}).get("id") or None, + title, + normalized_platform, + request.content_type.strip() or "note", + request.publish_url.strip(), + request.published_at.strip(), + _safe_json_dumps(request.metrics), + request.verdict.strip(), + request.highlights.strip(), + request.next_actions.strip(), + request.notes.strip(), + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return _xhs_review_payload(row) + + @app.patch("/v2/xiaohongshu/reviews/{review_id}") + def update_review( + review_id: str, + request: XiaohongshuReviewUpdateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + current = legacy.load_owned_review(review_id, account["id"]) + if not _review_matches_platform(current, legacy): + raise HTTPException(status_code=404, detail="Review not found") + assistant_id = current.get("assistant_id") or None + if request.assistant_id is not None: + assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, current.get("project_id", "")) + assistant_id = (assistant or {}).get("id") or None + normalized_platform = current.get("platform", XHS_PLATFORM) + if request.platform is not None: + normalized_platform = _require_xhs_platform(request.platform) + legacy.db.execute( + """ + UPDATE publish_reviews + SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?, + metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?, + assistant_id = ?, updated_at = ? + WHERE id = ? AND user_id = ? + """, + ( + request.title if request.title is not None else current.get("title", ""), + normalized_platform, + request.content_type if request.content_type is not None else current.get("content_type", "note"), + request.publish_url if request.publish_url is not None else current.get("publish_url", ""), + request.published_at if request.published_at is not None else current.get("published_at", ""), + _safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")), + request.verdict if request.verdict is not None else current.get("verdict", ""), + request.highlights if request.highlights is not None else current.get("highlights", ""), + request.next_actions if request.next_actions is not None else current.get("next_actions", ""), + request.notes if request.notes is not None else current.get("notes", ""), + assistant_id, + now(), + review_id, + account["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return _xhs_review_payload(row)