From 4ee3af45d712f0bf5e292252b4f9d9ffecbd0f3b Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 10:29:49 +0800 Subject: [PATCH] feat: support nas-backed tenant media storage --- collector-service/app/core_main.py | 42 +++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/collector-service/app/core_main.py b/collector-service/app/core_main.py index 92aa7b3..57c089c 100644 --- a/collector-service/app/core_main.py +++ b/collector-service/app/core_main.py @@ -29,9 +29,9 @@ from .openai_compat import OpenAICompatClient BASE_DIR = Path(__file__).resolve().parents[2] DATA_DIR = Path(os.getenv("DATA_DIR", BASE_DIR / "data" / "collector")) -DOWNLOADS_DIR = DATA_DIR / "downloads" -JOBS_DIR = DATA_DIR / "jobs" -MODELS_DIR = DATA_DIR / "models" +DOWNLOADS_DIR = Path(os.getenv("DOWNLOADS_DIR", str(DATA_DIR / "downloads"))) +JOBS_DIR = Path(os.getenv("JOBS_DIR", str(DATA_DIR / "jobs"))) +MODELS_DIR = Path(os.getenv("MODELS_DIR", str(DATA_DIR / "models"))) DB_PATH = os.getenv("DATABASE_PATH", str(DATA_DIR / "storyforge.db")) DEFAULT_EXTERNAL_BASE_URL = os.getenv("DEFAULT_EXTERNAL_BASE_URL", "https://test.hyzq.net/storyforge") LOCAL_OPENAI_BASE_URL = os.getenv("LOCAL_OPENAI_BASE_URL", "http://127.0.0.1:8317/v1") @@ -464,6 +464,20 @@ def normalize_live_recorder_quality(value: str | None) -> str: return "原画" +def storage_token(value: str | None, fallback: str) -> str: + normalized = re.sub(r"[^a-zA-Z0-9_-]+", "-", str(value or "").strip()).strip("-") + return normalized or fallback + + +def job_storage_dir(*, account_id: str, project_id: str | None, job_id: str) -> Path: + project_token = storage_token(project_id, "default-project") + return JOBS_DIR / storage_token(account_id, "anonymous") / project_token / storage_token(job_id, "job") + + +def legacy_job_storage_dir(job_id: str) -> Path: + return JOBS_DIR / storage_token(job_id, "job") + + def live_recorder_remote_name(source_id: str) -> str: return f"{LIVE_RECORDER_MANAGED_PREFIX}{source_id.replace('-', '')[:12]}" @@ -1703,7 +1717,14 @@ def resolve_real_cut_source_file(source_job: dict[str, Any]) -> tuple[Path, dict if source_row and source_row.get("local_path"): candidates.append(Path(str(source_row["local_path"]))) if source_job.get("source_type") == "video_link": - candidates.append(JOBS_DIR / source_job["id"] / "source.mp4") + candidates.append( + job_storage_dir( + account_id=source_job.get("user_id", ""), + project_id=source_job.get("project_id", ""), + job_id=source_job["id"], + ) / "source.mp4" + ) + candidates.append(legacy_job_storage_dir(source_job["id"]) / "source.mp4") seen: set[str] = set() for candidate in candidates: @@ -1722,7 +1743,7 @@ async def stage_real_cut_source_to_cutvideo(source_job: dict[str, Any]) -> dict[ raise HTTPException(status_code=503, detail="CutVideo is not configured") source_path, source_artifacts = resolve_real_cut_source_file(source_job) - folder_name = f"storyforge-{source_job['id']}" + folder_name = f"storyforge-{storage_token(source_job.get('user_id', ''), 'acct')[:16]}-{source_job['id']}" upload_payload = await cutvideo_client.upload_source_file(source_path, folder_name=folder_name) input_dir = str(upload_payload.get("input_dir") or "").strip() if not input_dir: @@ -1992,7 +2013,11 @@ async def process_job(job_id: str) -> None: try: artifacts = json.loads(row.get("artifacts_json") or "{}") transcript_text = row.get("transcript_text", "") - job_dir = JOBS_DIR / job_id + job_dir = job_storage_dir( + account_id=row.get("user_id", ""), + project_id=row.get("project_id", ""), + job_id=job_id, + ) job_dir.mkdir(parents=True, exist_ok=True) if row["source_type"] == "text": @@ -2232,6 +2257,9 @@ def healthz() -> dict[str, Any]: return { "status": "ok", "dbPath": DB_PATH, + "downloadsDir": str(DOWNLOADS_DIR), + "jobsDir": str(JOBS_DIR), + "modelsDir": str(MODELS_DIR), "defaultExternalBaseUrl": DEFAULT_EXTERNAL_BASE_URL, "localModelBaseUrl": LOCAL_OPENAI_BASE_URL, "asrHttpBaseUrl": ASR_HTTP_BASE_URL, @@ -3127,7 +3155,7 @@ async def upload_video( assistant = resolve_target_assistant(account["id"], assistant_id or None, project["id"]) profile = model_profile_for_account(account["id"], analysis_model_profile_id or None) job_id = make_id("job_upload") - job_dir = JOBS_DIR / job_id + job_dir = job_storage_dir(account_id=account["id"], project_id=project["id"], job_id=job_id) job_dir.mkdir(parents=True, exist_ok=True) suffix = Path(file.filename or "upload.mp4").suffix or ".mp4" target_path = job_dir / f"source{suffix}"