#!/bin/sh set -eu BASE_URL="${STORYFORGE_BASE_URL:-http://127.0.0.1:8081}" USERNAME="${STORYFORGE_USERNAME:-storyforge-admin}" PASSWORD="${STORYFORGE_PASSWORD:-}" if [ -z "$PASSWORD" ]; then echo "STORYFORGE_PASSWORD is required. Export the bootstrap super-admin password before running smoke_business.sh." >&2 exit 1 fi python3 - <<'PY' import json import os import urllib.request base = os.environ.get("BASE_URL", "http://127.0.0.1:8081").rstrip("/") username = os.environ.get("USERNAME", "storyforge-admin") password = os.environ.get("PASSWORD", "") platforms = ["douyin", "xiaohongshu", "bilibili", "kuaishou", "wechat_video"] if not password: raise SystemExit("STORYFORGE_PASSWORD is required") def request_json(path: str, *, method: str = "GET", payload: dict | None = None, headers: dict | None = None, timeout: int = 30): body = None req_headers = {"content-type": "application/json"} if headers: req_headers.update(headers) if payload is not None: body = json.dumps(payload).encode() req = urllib.request.Request(base + path, data=body, headers=req_headers, method=method) with urllib.request.urlopen(req, timeout=timeout) as resp: return json.load(resp) ready = request_json("/readyz", timeout=20) if not ready.get("ready"): raise SystemExit("collector readyz is not healthy") login_req = urllib.request.Request( base + "/v2/auth/login", data=json.dumps({"username": username, "password": password}).encode(), headers={"content-type": "application/json"}, ) with urllib.request.urlopen(login_req, timeout=20) as resp: login = json.load(resp) token = login["token"] headers = {"authorization": "Bearer " + token} print("smoke login: ok") platform_agents = request_json("/v2/platform-agents", headers=headers) tenant_quota = request_json("/v2/tenant/quota", headers=headers) if not isinstance(platform_agents, dict): raise SystemExit("/v2/platform-agents did not return an object") if not isinstance(tenant_quota, dict): raise SystemExit("/v2/tenant/quota did not return an object") print("platform-agents: " + json.dumps({"items": len(platform_agents.get("items") or [])}, ensure_ascii=False)) print("tenant-quota: " + json.dumps({"keys": sorted(tenant_quota.keys())[:6]}, ensure_ascii=False)) for platform in platforms: accounts = request_json(f"/v2/{platform}/accounts", headers=headers) if not isinstance(accounts, list): raise SystemExit(f"/v2/{platform}/accounts did not return a list") digest = request_json(f"/v2/{platform}/tracking/digest", headers=headers) if not isinstance(digest, dict): raise SystemExit(f"/v2/{platform}/tracking/digest did not return an object") digest_keys = {"generated_at", "since", "items", "tracked_accounts", "cursor_last_seen_at"} if not digest_keys.issubset(digest.keys()): raise SystemExit(f"/v2/{platform}/tracking/digest missing keys: {sorted(digest_keys - set(digest.keys()))}") summary = { "accounts": len(accounts), "tracked_accounts": len(digest.get("tracked_accounts") or []), "digest_items": len(digest.get("items") or []), } if accounts: account_id = accounts[0]["id"] workspace = request_json(f"/v2/{platform}/accounts/{account_id}/workspace", headers=headers) analysis_reports = request_json(f"/v2/{platform}/accounts/{account_id}/analysis-reports", headers=headers) if not isinstance(workspace, dict): raise SystemExit(f"/v2/{platform}/accounts/{{id}}/workspace did not return an object") if not isinstance(analysis_reports, list): raise SystemExit(f"/v2/{platform}/accounts/{{id}}/analysis-reports did not return a list") if (workspace.get("account") or {}).get("platform") != platform: raise SystemExit(f"/v2/{platform}/accounts/{{id}}/workspace returned wrong platform") summary.update({ "workspace_reports": len(workspace.get("recent_reports") or []), "analysis_reports": len(analysis_reports), }) if platform == "douyin": snapshots = request_json(f"/v2/{platform}/accounts/{account_id}/snapshots", headers=headers) if not isinstance(snapshots, list): raise SystemExit("/v2/douyin/accounts/{id}/snapshots did not return a list") summary["snapshots"] = len(snapshots) creator_snapshots = [item for item in snapshots if item.get("snapshot_type") == "creator_center"] if creator_snapshots: creator_fields = request_json(f"/v2/{platform}/accounts/{account_id}/creator-fields", headers=headers) if creator_fields.get("snapshot_type") != "creator_center": raise SystemExit("/v2/douyin/accounts/{id}/creator-fields returned an unexpected snapshot type") summary["creator_fields"] = creator_fields.get("field_count", 0) print(f"{platform}: " + json.dumps(summary, ensure_ascii=False)) PY