From 5a739a414d80e98e19a4d627a4100ffac6610c68 Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 07:39:39 +0800 Subject: [PATCH] feat: add live reviews and douyin tracking routes --- collector-service/app/core_main.py | 173 +++++++++++ collector-service/app/database.py | 24 ++ collector-service/app/douyin_features.py | 351 ++++++++++++++++++++++- 3 files changed, 547 insertions(+), 1 deletion(-) diff --git a/collector-service/app/core_main.py b/collector-service/app/core_main.py index 7c4329b..16b359e 100644 --- a/collector-service/app/core_main.py +++ b/collector-service/app/core_main.py @@ -252,6 +252,36 @@ class AiVideoJobRequest(BaseModel): duration: int = 5 +class ReviewCreateRequest(BaseModel): + project_id: str = "" + source_job_id: str = "" + assistant_id: str = "" + title: str = "" + platform: str = "douyin" + content_type: str = "video" + publish_url: str = "" + published_at: str = "" + metrics: dict[str, Any] = Field(default_factory=dict) + verdict: str = "" + highlights: str = "" + next_actions: str = "" + notes: str = "" + + +class ReviewUpdateRequest(BaseModel): + title: str | None = None + platform: str | None = None + content_type: str | None = None + publish_url: str | None = None + published_at: str | None = None + metrics: dict[str, Any] | None = None + verdict: str | None = None + highlights: str | None = None + next_actions: str | None = None + notes: str | None = None + assistant_id: str | None = None + + class InternalStepRequest(BaseModel): job_id: str = "" jobId: str = "" @@ -522,6 +552,41 @@ def assistant_payload(row: dict[str, Any]) -> dict[str, Any]: } +def review_payload(row: dict[str, Any]) -> dict[str, Any]: + metrics = parse_json_object(row.get("metrics_json") or "{}") + source_job = None + assistant = None + if row.get("source_job_id"): + source_job_row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (row["source_job_id"],)) + if source_job_row: + source_job = job_payload(source_job_row) + if row.get("assistant_id"): + assistant_row = db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],)) + if assistant_row: + assistant = assistant_payload(assistant_row) + return { + "id": row["id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", ""), + "source_job_id": row.get("source_job_id", ""), + "assistant_id": row.get("assistant_id", ""), + "title": row.get("title", ""), + "platform": row.get("platform", "douyin"), + "content_type": row.get("content_type", "video"), + "publish_url": row.get("publish_url", ""), + "published_at": row.get("published_at", ""), + "metrics": metrics, + "verdict": row.get("verdict", ""), + "highlights": row.get("highlights", ""), + "next_actions": row.get("next_actions", ""), + "notes": row.get("notes", ""), + "source_job": source_job, + "assistant": assistant, + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + def document_payload(row: dict[str, Any]) -> dict[str, Any]: analysis_map = parse_json_object(row.get("analysis_json") or "{}") source_artifacts = parse_json_object(row.get("source_artifact_json") or "{}") @@ -2027,6 +2092,107 @@ def list_knowledge_documents(knowledge_base_id: str, account: dict[str, Any] = D return [document_payload(row) for row in rows] +@app.get("/v2/reviews") +def list_reviews( + project_id: str | None = Query(default=None), + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(require_approved), +) -> list[dict[str, Any]]: + clauses = ["user_id = ?"] + params: list[Any] = [account["id"]] + if project_id is not None: + normalized_project = project_id.strip() + if normalized_project: + clauses.append("project_id = ?") + params.append(normalized_project) + else: + clauses.append("(project_id IS NULL OR project_id = '')") + sql = f"SELECT * FROM publish_reviews WHERE {' AND '.join(clauses)} ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT ?" + params.append(limit) + return [review_payload(row) for row in db.fetch_all(sql, tuple(params))] + + +@app.post("/v2/reviews") +def create_review(request: ReviewCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + source_job = None + if request.source_job_id.strip(): + source_job = load_owned_job(request.source_job_id.strip(), account["id"]) + requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "") + project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + review_id = make_id("review") + title = request.title.strip() or (source_job.get("title", "") if source_job else "") + if not title: + title = f"{project['name']} 复盘" + timestamp = utc_now() + db.execute( + """ + INSERT INTO publish_reviews ( + id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type, + publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + review_id, + account["id"], + project["id"], + source_job["id"] if source_job else None, + (assistant or {}).get("id") or None, + title, + request.platform or "douyin", + request.content_type or "video", + request.publish_url.strip(), + request.published_at.strip(), + json.dumps(request.metrics, ensure_ascii=False), + request.verdict.strip(), + request.highlights.strip(), + request.next_actions.strip(), + request.notes.strip(), + timestamp, + timestamp, + ), + ) + row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return review_payload(row) + + +@app.patch("/v2/reviews/{review_id}") +def update_review(review_id: str, request: ReviewUpdateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + current = load_owned_review(review_id, account["id"]) + assistant_id = current.get("assistant_id") or None + if request.assistant_id is not None: + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, current.get("project_id", "")) + assistant_id = (assistant or {}).get("id") or None + db.execute( + """ + UPDATE publish_reviews + SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?, + metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?, + assistant_id = ?, updated_at = ? + WHERE id = ? AND user_id = ? + """, + ( + request.title if request.title is not None else current.get("title", ""), + request.platform if request.platform is not None else current.get("platform", "douyin"), + request.content_type if request.content_type is not None else current.get("content_type", "video"), + request.publish_url if request.publish_url is not None else current.get("publish_url", ""), + request.published_at if request.published_at is not None else current.get("published_at", ""), + json.dumps(request.metrics if request.metrics is not None else parse_json_object(current.get("metrics_json") or "{}"), ensure_ascii=False), + request.verdict if request.verdict is not None else current.get("verdict", ""), + request.highlights if request.highlights is not None else current.get("highlights", ""), + request.next_actions if request.next_actions is not None else current.get("next_actions", ""), + request.notes if request.notes is not None else current.get("notes", ""), + assistant_id, + utc_now(), + review_id, + account["id"], + ), + ) + row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return review_payload(row) + + @app.get("/v2/explore/jobs") def list_jobs( parent_job_id: str | None = Query(default=None), @@ -2452,6 +2618,13 @@ def load_owned_job(job_id: str, account_id: str) -> dict[str, Any]: return row +def load_owned_review(review_id: str, account_id: str) -> dict[str, Any]: + row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ? AND user_id = ?", (review_id, account_id)) + if not row: + raise HTTPException(status_code=404, detail="Review not found") + return row + + def load_owned_content_source(source_id: str, account_id: str) -> dict[str, Any]: row = db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id)) if not row: diff --git a/collector-service/app/database.py b/collector-service/app/database.py index c3bb5a2..f83b801 100644 --- a/collector-service/app/database.py +++ b/collector-service/app/database.py @@ -211,6 +211,30 @@ class Database: FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL ); + CREATE TABLE IF NOT EXISTS publish_reviews ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT, + source_job_id TEXT, + assistant_id TEXT, + title TEXT NOT NULL, + platform TEXT NOT NULL DEFAULT 'douyin', + content_type TEXT NOT NULL DEFAULT 'video', + publish_url TEXT NOT NULL DEFAULT '', + published_at TEXT NOT NULL DEFAULT '', + metrics_json TEXT NOT NULL DEFAULT '{}', + verdict TEXT NOT NULL DEFAULT '', + highlights TEXT NOT NULL DEFAULT '', + next_actions TEXT NOT NULL DEFAULT '', + notes TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(source_job_id) REFERENCES jobs(id) ON DELETE SET NULL, + FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL + ); + CREATE TABLE IF NOT EXISTS job_events ( id TEXT PRIMARY KEY, job_id TEXT NOT NULL, diff --git a/collector-service/app/douyin_features.py b/collector-service/app/douyin_features.py index ccc248e..90f8b94 100644 --- a/collector-service/app/douyin_features.py +++ b/collector-service/app/douyin_features.py @@ -4,7 +4,7 @@ import asyncio import json import re from collections import Counter -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from html import unescape from typing import Any, Iterable from urllib.parse import quote, unquote @@ -71,6 +71,16 @@ class DouyinBenchmarkLinkRequest(BaseModel): search_id: str = "" +class DouyinTrackedAccountRequest(BaseModel): + tracked_account_id: str = "" + assistant_id: str = "" + note: str = "" + + +class DouyinTrackingCursorRequest(BaseModel): + last_seen_at: str = "" + + def _safe_json_dumps(value: Any) -> str: return json.dumps(value, ensure_ascii=False, separators=(",", ":")) @@ -743,6 +753,30 @@ def register_douyin_routes(app: Any, legacy: Any) -> None: CREATE INDEX IF NOT EXISTS idx_douyin_account_relations_source ON douyin_account_relations(source_account_id, created_at DESC); + + CREATE TABLE IF NOT EXISTS douyin_tracked_accounts ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + tracked_account_id TEXT NOT NULL, + assistant_id TEXT, + note TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, tracked_account_id), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(tracked_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE, + FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS idx_douyin_tracked_accounts_user_updated + ON douyin_tracked_accounts(user_id, updated_at DESC); + + CREATE TABLE IF NOT EXISTS douyin_tracking_cursors ( + user_id TEXT PRIMARY KEY, + last_seen_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE + ); """ with legacy.db.session() as conn: conn.executescript(schema) @@ -1179,6 +1213,190 @@ def register_douyin_routes(app: Any, legacy: Any) -> None: }) return payloads + def _load_owned_assistant(assistant_id: str, user_id: str) -> dict[str, Any] | None: + if not str(assistant_id or "").strip(): + return None + row = legacy.db.fetch_one( + "SELECT * FROM assistants WHERE id = ? AND user_id = ?", + (assistant_id, user_id) + ) + if not row: + raise HTTPException(status_code=404, detail="Assistant not found") + return row + + def _parse_iso_datetime(value: str | None) -> datetime | None: + text = str(value or "").strip() + if not text: + return None + try: + normalized = text.replace("Z", "+00:00") + parsed = datetime.fromisoformat(normalized) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + except Exception: + return None + + def _get_tracking_cursor(user_id: str) -> dict[str, Any] | None: + return legacy.db.fetch_one( + "SELECT * FROM douyin_tracking_cursors WHERE user_id = ?", + (user_id,) + ) + + def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]: + existing = _get_tracking_cursor(user_id) + timestamp = _first_non_empty(last_seen_at, now()) + updated_at = now() + if existing: + legacy.db.execute( + "UPDATE douyin_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?", + (timestamp, updated_at, user_id) + ) + else: + legacy.db.execute( + "INSERT INTO douyin_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)", + (user_id, timestamp, updated_at) + ) + return legacy.db.fetch_one("SELECT * FROM douyin_tracking_cursors WHERE user_id = ?", (user_id,)) + + def _list_tracked_accounts(user_id: str) -> list[dict[str, Any]]: + rows = legacy.db.fetch_all( + """ + SELECT track.*, + assistant.name AS assistant_name + FROM douyin_tracked_accounts track + LEFT JOIN assistants assistant ON assistant.id = track.assistant_id + WHERE track.user_id = ? + ORDER BY track.updated_at DESC + """, + (user_id,) + ) + payloads: list[dict[str, Any]] = [] + for row in rows: + account_row = _require_owned_account(row["tracked_account_id"], user_id) + account_payload = _build_account_payload(account_row, include_recent_videos=6) + payloads.append({ + "id": row["id"], + "tracked_account_id": row["tracked_account_id"], + "assistant_id": row.get("assistant_id", "") or "", + "assistant_name": row.get("assistant_name", "") or "", + "note": row.get("note", "") or "", + "created_at": row["created_at"], + "updated_at": row["updated_at"], + "account": account_payload + }) + return payloads + + def _extract_tracking_borrowing_points(video: dict[str, Any]) -> list[str]: + stats = video.get("stats", {}) or {} + tags = video.get("tags", []) or [] + candidates: list[str] = [] + play_count = int(stats.get("play") or 0) + like_count = int(stats.get("like") or 0) + comment_count = int(stats.get("comment") or 0) + share_count = int(stats.get("share") or 0) + if like_count >= 100: + candidates.append("点赞明显更高,适合借标题切口和开头表达。") + if comment_count >= 20: + candidates.append("评论互动活跃,可借提问句和争议点设计。") + if share_count >= 10: + candidates.append("分享意愿较强,可借观点浓度和传播句式。") + if play_count >= 5000: + candidates.append("播放信号较强,值得拆成同题材复用模板。") + if tags: + candidates.append(f"标签集中在 {', '.join(tags[:3])},适合做系列化选题。") + deduped: list[str] = [] + seen: set[str] = set() + for item in candidates: + normalized = _compact_text(item, 80) + if not normalized or normalized in seen: + continue + seen.add(normalized) + deduped.append(normalized) + return deduped[:4] + + def _build_tracking_digest_item(tracked_item: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]: + stats = video.get("stats", {}) or {} + summary = video.get("description") or video.get("title") or "暂无摘要" + borrowing_points = _extract_tracking_borrowing_points(video) + high_value = int(stats.get("like") or 0) >= 100 or int(stats.get("play") or 0) >= 5000 or bool(borrowing_points) + return { + "tracking_id": tracked_item["id"], + "tracked_account_id": tracked_item["tracked_account_id"], + "assistant_id": tracked_item["assistant_id"], + "assistant_name": tracked_item["assistant_name"], + "account": tracked_item["account"], + "video": video, + "summary": _compact_text(summary, 160), + "borrowing_points": borrowing_points, + "is_high_value": high_value, + } + + def _build_tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]: + tracked_accounts = _list_tracked_accounts(user_id) + cursor = _get_tracking_cursor(user_id) + since_dt = _parse_iso_datetime(since_value) if since_value else None + if since_dt is None and cursor: + since_dt = _parse_iso_datetime(cursor.get("last_seen_at")) + if since_dt is None: + since_dt = (datetime.now(timezone.utc) - timedelta(days=3)).replace(microsecond=0) + items: list[dict[str, Any]] = [] + for tracked in tracked_accounts: + account_payload = tracked.get("account", {}) or {} + for video in account_payload.get("video_summary", {}).get("videos", []): + published_at = _parse_iso_datetime(video.get("published_at")) + if published_at is None or published_at <= since_dt: + continue + items.append(_build_tracking_digest_item(tracked, video)) + items.sort( + key=lambda item: _parse_iso_datetime(item["video"].get("published_at")) or datetime.fromtimestamp(0, tz=timezone.utc), + reverse=True + ) + return { + "generated_at": now(), + "since": since_dt.isoformat(), + "cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""), + "tracked_accounts": tracked_accounts, + "items": items[: max(1, min(limit, 100))] + } + + async def _refresh_tracked_account_workspace( + owner: dict[str, Any], + tracked_account_id: str, + discovery_note: str = "tracking_refresh" + ) -> dict[str, Any]: + account_row = _require_owned_account(tracked_account_id, owner["id"]) + profile_url = _first_non_empty( + account_row.get("canonical_profile_url"), + account_row.get("profile_url") + ) + if not profile_url: + raise HTTPException(status_code=400, detail="Tracked account has no profile_url to refresh") + request = DouyinAccountSyncRequest( + profile_url=profile_url, + discovery_note=discovery_note + ) + public_data = await _collect_public_profile(profile_url, None) + creator_data = await _collect_creator_center_pages([], "", []) + if not public_data.get("profile", {}).get("canonical_profile_url"): + public_data["profile"]["canonical_profile_url"] = profile_url + if public_data["errors"]: + raise HTTPException( + status_code=502, + detail={ + "message": "刷新对标账号失败", + "public_errors": public_data["errors"], + "creator_errors": creator_data["errors"], + }, + ) + refreshed_account = _upsert_account(owner, public_data["profile"], request, public_data, creator_data) + return { + "account": _build_account_payload(refreshed_account, include_recent_videos=6), + "sync_errors": public_data["errors"] + creator_data["errors"], + "public_video_count": len(public_data.get("videos", [])), + "creator_page_count": len(creator_data.get("pages", [])), + } + def _build_workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]: account_payload = _build_account_payload(account_row) latest_public_snapshot = legacy.db.fetch_one( @@ -1978,3 +2196,134 @@ def register_douyin_routes(app: Any, legacy: Any) -> None: "relation_ids": linked_ids, "links": _list_linked_accounts(account_row) } + + @app.get("/v2/douyin/tracking/accounts") + def list_douyin_tracked_accounts( + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + cursor = _get_tracking_cursor(account["id"]) + return { + "cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""), + "items": _list_tracked_accounts(account["id"]) + } + + @app.post("/v2/douyin/tracking/accounts") + def create_douyin_tracked_account( + request: DouyinTrackedAccountRequest, + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + tracked_account = _require_owned_account(request.tracked_account_id, account["id"]) + assistant = _load_owned_assistant(request.assistant_id, account["id"]) + existing = legacy.db.fetch_one( + "SELECT * FROM douyin_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?", + (account["id"], tracked_account["id"]) + ) + updated_at = now() + if existing: + legacy.db.execute( + """ + UPDATE douyin_tracked_accounts + SET assistant_id = ?, note = ?, updated_at = ? + WHERE id = ? + """, + ((assistant or {}).get("id"), request.note.strip(), updated_at, existing["id"]) + ) + else: + legacy.db.execute( + """ + INSERT INTO douyin_tracked_accounts ( + id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + (make_id("dytrack"), account["id"], tracked_account["id"], (assistant or {}).get("id"), request.note.strip(), updated_at, updated_at) + ) + return { + "tracked_account_id": tracked_account["id"], + "assistant_id": (assistant or {}).get("id", ""), + "items": _list_tracked_accounts(account["id"]) + } + + @app.post("/v2/douyin/tracking/accounts/{tracked_account_id}/refresh") + async def refresh_douyin_tracked_account( + tracked_account_id: str, + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + account_row = _require_owned_account(tracked_account_id, account["id"]) + account_payload = _build_account_payload(account_row, include_recent_videos=6) + try: + refreshed = await _refresh_tracked_account_workspace(account, tracked_account_id) + return { + "success": True, + "tracked_account_id": tracked_account_id, + "account": refreshed.get("account", {}), + "sync_errors": refreshed.get("sync_errors", []), + "public_video_count": refreshed.get("public_video_count", 0), + "creator_page_count": refreshed.get("creator_page_count", 0) + } + except HTTPException as exc: + detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)} + return { + "success": False, + "tracked_account_id": tracked_account_id, + "account": account_payload, + "message": detail.get("message") or str(exc.detail), + "detail": detail, + "sync_errors": detail.get("public_errors", []) + detail.get("creator_errors", []) + } + + @app.post("/v2/douyin/tracking/refresh") + async def refresh_all_douyin_tracked_accounts( + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + tracked_accounts = _list_tracked_accounts(account["id"]) + items: list[dict[str, Any]] = [] + errors: list[dict[str, Any]] = [] + for tracked in tracked_accounts: + try: + refreshed = await _refresh_tracked_account_workspace(account, tracked["tracked_account_id"]) + items.append({ + "tracking_id": tracked["id"], + "tracked_account_id": tracked["tracked_account_id"], + "nickname": (refreshed.get("account") or {}).get("nickname", ""), + "sync_errors": refreshed.get("sync_errors", []), + "public_video_count": refreshed.get("public_video_count", 0) + }) + except HTTPException as exc: + errors.append({ + "tracking_id": tracked["id"], + "tracked_account_id": tracked["tracked_account_id"], + "message": str(exc.detail) + }) + except Exception as exc: + errors.append({ + "tracking_id": tracked["id"], + "tracked_account_id": tracked["tracked_account_id"], + "message": str(exc) + }) + return { + "tracked_count": len(tracked_accounts), + "refreshed": len(items), + "failed": len(errors), + "items": items, + "errors": errors + } + + @app.post("/v2/douyin/tracking/cursor") + def update_douyin_tracking_cursor( + request: DouyinTrackingCursorRequest, + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + cursor = _set_tracking_cursor(account["id"], request.last_seen_at) + return { + "user_id": account["id"], + "last_seen_at": cursor["last_seen_at"], + "updated_at": cursor["updated_at"] + } + + @app.get("/v2/douyin/tracking/digest") + def get_douyin_tracking_digest( + since: str | None = None, + limit: int = 24, + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + return _build_tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)