feat: upgrade douyin work list filters and ranking

This commit is contained in:
kris
2026-03-21 02:36:18 +08:00
parent 1fb39e040f
commit c09a976628
2 changed files with 273 additions and 30 deletions

View File

@@ -245,11 +245,22 @@ def _video_score_breakdown(video: dict[str, Any]) -> dict[str, Any]:
else:
age_days = 999.0
engagement_rate = (like + comment * 2.2 + share * 4.2 + collect * 3.0) / max(play, 1.0)
share_rate = share / max(play, 1.0)
collect_rate = collect / max(play, 1.0)
comment_rate = comment / max(play, 1.0)
like_rate = like / max(play, 1.0)
if play > 0:
rate_denominator = play
else:
rate_denominator = max(
like * 18.0,
comment * 70.0,
share * 95.0,
collect * 55.0,
1000.0
)
engagement_rate = (like + comment * 2.2 + share * 4.2 + collect * 3.0) / max(rate_denominator, 1.0)
share_rate = share / max(rate_denominator, 1.0)
collect_rate = collect / max(rate_denominator, 1.0)
comment_rate = comment / max(rate_denominator, 1.0)
like_rate = like / max(rate_denominator, 1.0)
volume_component = min(36.0, math.log10(play + 1.0) * 9.0)
interaction_component = min(28.0, engagement_rate * 100.0)
@@ -261,6 +272,17 @@ def _video_score_breakdown(video: dict[str, Any]) -> dict[str, Any]:
min(100.0, volume_component + interaction_component + spread_component + freshness_component + baseline_component),
2
)
popularity_score = round(
min(
100.0,
math.log10(play + 1.0) * 24.0
+ math.log10(like + 1.0) * 22.0
+ math.log10(comment + 1.0) * 20.0
+ math.log10(share + 1.0) * 18.0
+ math.log10(collect + 1.0) * 16.0
),
2
)
commercial_score = round(
min(
100.0,
@@ -288,6 +310,7 @@ def _video_score_breakdown(video: dict[str, Any]) -> dict[str, Any]:
return {
"performance_score": performance_score,
"popularity_score": popularity_score,
"commercial_score": commercial_score,
"engagement_rate": round(engagement_rate, 4),
"share_rate": round(share_rate, 4),
@@ -486,27 +509,79 @@ def _pick_best_profile(candidates: list[dict[str, Any]], fallback_url: str = "")
def _normalize_video_candidate(candidate: dict[str, Any]) -> dict[str, Any]:
def _collect_image_urls(node: Any) -> list[str]:
urls: list[str] = []
def _visit(value: Any) -> None:
if isinstance(value, str):
text = value.strip()
if text.startswith("http"):
urls.append(text)
return
if isinstance(value, list):
for item in value[:20]:
_visit(item)
return
if not isinstance(value, dict):
return
for key in ("url", "download_url", "origin_url", "display_url", "cover_url"):
target = value.get(key)
if isinstance(target, str) and target.strip().startswith("http"):
urls.append(target.strip())
url_list = value.get("url_list")
if isinstance(url_list, list):
for item in url_list[:5]:
_visit(item)
for key in ("image", "images", "cover", "display_image", "origin_image"):
child = value.get(key)
if child not in (None, "", [], {}):
_visit(child)
_visit(node)
return _dedupe_strings(urls)
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
video_source = candidate.get("video") if isinstance(candidate.get("video"), dict) else {}
title = _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("share_title"))
description = _first_non_empty(candidate.get("desc"), candidate.get("title"), candidate.get("text"))
cover = candidate.get("cover") or video_source.get("cover")
image_urls = _collect_image_urls(
[
candidate.get("images"),
candidate.get("image_infos"),
candidate.get("image_list"),
candidate.get("slides"),
candidate.get("photos"),
candidate.get("photo"),
candidate.get("image_post_info"),
]
)
if isinstance(cover, dict):
cover = _first_non_empty(
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
cover.get("url")
)
duration_raw = float(candidate.get("duration") or video_source.get("duration") or 0)
duration_sec = duration_raw / 1000.0 if duration_raw > 1000 else duration_raw
has_video_media = bool(video_source) or duration_sec > 0.3
aweme_type = str(candidate.get("aweme_type") or "")
looks_like_image_text = bool(image_urls) and (not has_video_media or aweme_type in {"51", "55", "61", "68", "122", "150"})
content_type = "image_text" if looks_like_image_text else "video"
return {
"aweme_id": _first_non_empty(candidate.get("aweme_id"), candidate.get("item_id"), candidate.get("group_id")),
"title": title,
"description": description,
"share_url": _first_non_empty(candidate.get("share_url")),
"cover_url": _first_non_empty(cover),
"duration_sec": float(candidate.get("duration") or video_source.get("duration") or 0) / 1000.0
if float(candidate.get("duration") or video_source.get("duration") or 0) > 1000
else float(candidate.get("duration") or video_source.get("duration") or 0),
"cover_url": _first_non_empty(cover, image_urls[0] if image_urls else ""),
"duration_sec": duration_sec,
"published_at": _normalize_timestamp(candidate.get("create_time") or candidate.get("publish_time")),
"tags": _extract_hashtags(title, description),
"content_type": content_type,
"content_type_label": "图文" if content_type == "image_text" else "视频",
"image_count": len(image_urls),
"stats": {
"play": _parse_count(stats_source.get("play_count") or candidate.get("play_count")),
"like": _parse_count(stats_source.get("digg_count") or candidate.get("digg_count")),
@@ -1341,6 +1416,8 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
)
payloads: list[dict[str, Any]] = []
for row in rows:
raw_payload = _safe_json_loads(row["raw_json"], {})
normalized = _normalize_video_candidate(raw_payload) if isinstance(raw_payload, dict) and raw_payload else {}
payloads.append({
"id": row["id"],
"aweme_id": row["aweme_id"],
@@ -1352,7 +1429,10 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
"published_at": row["published_at"],
"tags": _safe_json_loads(row["tags_json"], []),
"stats": _safe_json_loads(row["stats_json"], {}),
"raw": _safe_json_loads(row["raw_json"], {})
"content_type": normalized.get("content_type", "video"),
"content_type_label": normalized.get("content_type_label", "视频"),
"image_count": int(normalized.get("image_count") or 0),
"raw": raw_payload
})
return payloads
@@ -1409,6 +1489,9 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
"duration_sec": video["duration_sec"],
"published_at": video["published_at"],
"tags": video["tags"],
"content_type": video.get("content_type", "video"),
"content_type_label": video.get("content_type_label", "视频"),
"image_count": int(video.get("image_count") or 0),
"stats": video["stats"],
"score": score
}
@@ -1417,6 +1500,12 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
return payload
def _video_sort_key(video: dict[str, Any], sort_by: str) -> tuple[Any, ...]:
if sort_by in {"popular", "popularity"}:
return (
float(video.get("score", {}).get("popularity_score") or 0),
float(video.get("score", {}).get("performance_score") or 0),
float(video.get("score", {}).get("commercial_score") or 0)
)
if sort_by == "latest":
return (
_parse_iso_datetime(video.get("published_at")) or datetime.fromtimestamp(0, tz=timezone.utc),
@@ -1467,6 +1556,8 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
high_score_threshold = 60.0
high_score_videos = [video for video in videos_by_score if float(video["score"]["performance_score"]) >= high_score_threshold]
analyzed_count = sum(1 for video in videos if video.get("latest_analysis"))
video_only_count = sum(1 for video in videos if video.get("content_type") == "video")
image_text_count = sum(1 for video in videos if video.get("content_type") == "image_text")
return {
"items": videos,
"top_scored_video_ids": [video["id"] for video in videos_by_score[: min(12, len(videos_by_score))]],
@@ -1475,7 +1566,9 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
"meta": {
"total_count": len(videos),
"analyzed_count": analyzed_count,
"high_score_count": len(high_score_videos)
"high_score_count": len(high_score_videos),
"video_count": video_only_count,
"image_text_count": image_text_count
}
}
@@ -2818,9 +2911,10 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
@app.get("/v2/douyin/accounts/{account_id}/videos")
def list_douyin_account_videos(
account_id: str,
limit: int = 60,
limit: int = 200,
sort_by: str = "score",
scope: str = "all",
content_type: str = "all",
q: str = "",
tag: str = "",
account: dict[str, Any] = Depends(legacy.require_approved)
@@ -2836,6 +2930,13 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
elif normalized_scope == "latest":
items = [item_map[video_id] for video_id in workspace["latest_video_ids"] if video_id in item_map]
normalized_content_type = (content_type or "all").strip().lower()
if normalized_content_type in {"video", "image_text"}:
items = [
item for item in items
if str(item.get("content_type") or "video").strip().lower() == normalized_content_type
]
query_text = (q or "").strip().lower()
if query_text:
items = [
@@ -2863,13 +2964,14 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
"account_id": account_row["id"],
"sort_by": normalized_sort,
"scope": normalized_scope,
"content_type": normalized_content_type,
"query": q,
"tag": tag,
"high_score_threshold": workspace["high_score_threshold"],
"meta": workspace["meta"],
"top_scored_video_ids": workspace["top_scored_video_ids"],
"latest_video_ids": workspace["latest_video_ids"],
"items": items[: max(1, min(limit, 120))]
"items": items[: max(1, min(limit, 1000))]
}
@app.get("/v2/douyin/accounts/{account_id}/analysis-reports")