Files
storyforge/scripts/smoke_business.sh

109 lines
4.9 KiB
Bash
Executable File

#!/bin/sh
set -eu
BASE_URL="${STORYFORGE_BASE_URL:-http://127.0.0.1:8081}"
USERNAME="${STORYFORGE_USERNAME:-storyforge-admin}"
PASSWORD="${STORYFORGE_PASSWORD:-}"
if [ -z "$PASSWORD" ]; then
echo "STORYFORGE_PASSWORD is required. Export the bootstrap super-admin password before running smoke_business.sh." >&2
exit 1
fi
python3 - <<'PY'
import json
import os
import urllib.request
base = os.environ.get("BASE_URL", "http://127.0.0.1:8081").rstrip("/")
username = os.environ.get("USERNAME", "storyforge-admin")
password = os.environ.get("PASSWORD", "")
platforms = ["douyin", "xiaohongshu", "bilibili", "kuaishou", "wechat_video"]
if not password:
raise SystemExit("STORYFORGE_PASSWORD is required")
def request_json(path: str, *, method: str = "GET", payload: dict | None = None, headers: dict | None = None, timeout: int = 30):
body = None
req_headers = {"content-type": "application/json"}
if headers:
req_headers.update(headers)
if payload is not None:
body = json.dumps(payload).encode()
req = urllib.request.Request(base + path, data=body, headers=req_headers, method=method)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.load(resp)
ready = request_json("/readyz", timeout=20)
if not ready.get("ready"):
raise SystemExit("collector readyz is not healthy")
login_req = urllib.request.Request(
base + "/v2/auth/login",
data=json.dumps({"username": username, "password": password}).encode(),
headers={"content-type": "application/json"},
)
with urllib.request.urlopen(login_req, timeout=20) as resp:
login = json.load(resp)
token = login["token"]
headers = {"authorization": "Bearer " + token}
print("smoke login: ok")
platform_agents = request_json("/v2/platform-agents", headers=headers)
tenant_quota = request_json("/v2/tenant/quota", headers=headers)
if not isinstance(platform_agents, dict):
raise SystemExit("/v2/platform-agents did not return an object")
if not isinstance(tenant_quota, dict):
raise SystemExit("/v2/tenant/quota did not return an object")
print("platform-agents: " + json.dumps({"items": len(platform_agents.get("items") or [])}, ensure_ascii=False))
print("tenant-quota: " + json.dumps({"keys": sorted(tenant_quota.keys())[:6]}, ensure_ascii=False))
for platform in platforms:
accounts = request_json(f"/v2/{platform}/accounts", headers=headers)
if not isinstance(accounts, list):
raise SystemExit(f"/v2/{platform}/accounts did not return a list")
digest = request_json(f"/v2/{platform}/tracking/digest", headers=headers)
if not isinstance(digest, dict):
raise SystemExit(f"/v2/{platform}/tracking/digest did not return an object")
digest_keys = {"generated_at", "since", "items", "tracked_accounts", "cursor_last_seen_at"}
if not digest_keys.issubset(digest.keys()):
raise SystemExit(f"/v2/{platform}/tracking/digest missing keys: {sorted(digest_keys - set(digest.keys()))}")
summary = {
"accounts": len(accounts),
"tracked_accounts": len(digest.get("tracked_accounts") or []),
"digest_items": len(digest.get("items") or []),
}
if accounts:
account_id = accounts[0]["id"]
workspace = request_json(f"/v2/{platform}/accounts/{account_id}/workspace", headers=headers)
analysis_reports = request_json(f"/v2/{platform}/accounts/{account_id}/analysis-reports", headers=headers)
if not isinstance(workspace, dict):
raise SystemExit(f"/v2/{platform}/accounts/{{id}}/workspace did not return an object")
if not isinstance(analysis_reports, list):
raise SystemExit(f"/v2/{platform}/accounts/{{id}}/analysis-reports did not return a list")
if (workspace.get("account") or {}).get("platform") != platform:
raise SystemExit(f"/v2/{platform}/accounts/{{id}}/workspace returned wrong platform")
summary.update({
"workspace_reports": len(workspace.get("recent_reports") or []),
"analysis_reports": len(analysis_reports),
})
if platform == "douyin":
snapshots = request_json(f"/v2/{platform}/accounts/{account_id}/snapshots", headers=headers)
if not isinstance(snapshots, list):
raise SystemExit("/v2/douyin/accounts/{id}/snapshots did not return a list")
summary["snapshots"] = len(snapshots)
creator_snapshots = [item for item in snapshots if item.get("snapshot_type") == "creator_center"]
if creator_snapshots:
creator_fields = request_json(f"/v2/{platform}/accounts/{account_id}/creator-fields", headers=headers)
if creator_fields.get("snapshot_type") != "creator_center":
raise SystemExit("/v2/douyin/accounts/{id}/creator-fields returned an unexpected snapshot type")
summary["creator_fields"] = creator_fields.get("field_count", 0)
print(f"{platform}: " + json.dumps(summary, ensure_ascii=False))
PY