From d427b89409e61d9a0ac7b0643266b59c8bb1b329 Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 11:53:21 +0800 Subject: [PATCH] feat: secure nas storage artifacts and archive links --- collector-service/app/core_main.py | 534 ++++++++++++++++++++++++++++- 1 file changed, 525 insertions(+), 9 deletions(-) diff --git a/collector-service/app/core_main.py b/collector-service/app/core_main.py index 17de693..fb9159e 100644 --- a/collector-service/app/core_main.py +++ b/collector-service/app/core_main.py @@ -19,8 +19,7 @@ from urllib.parse import quote, urljoin, urlparse from fastapi import Body, Depends, FastAPI, File, Form, Header, HTTPException, Query, UploadFile from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import StreamingResponse -from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse, StreamingResponse from pydantic import BaseModel, Field from .database import Database, utc_now @@ -98,9 +97,6 @@ app.add_middleware( allow_methods=["*"], allow_headers=["*"], ) -app.mount("/downloads", StaticFiles(directory=str(DOWNLOADS_DIR)), name="downloads") - - class RegisterAccountRequest(BaseModel): username: str password: str @@ -490,6 +486,310 @@ def tenant_download_root(account_id: str, project_id: str | None) -> Path: return DOWNLOADS_DIR / storage_token(account_id, "anonymous") / storage_token(project_id, "default-project") +def download_account_root(account_id: str) -> Path: + return DOWNLOADS_DIR / storage_token(account_id, "anonymous") + + +def download_job_root(account_id: str, project_id: str | None, job_id: str) -> Path: + return tenant_download_root(account_id, project_id) / storage_token(job_id, "job") + + +def download_relative_path(path: Path) -> str: + try: + return path.resolve().relative_to(DOWNLOADS_DIR.resolve()).as_posix() + except Exception: + return "" + + +def download_content_url(relative_path: str) -> str: + normalized = relative_path.strip().lstrip("/") + if not normalized: + return "" + return f"/v2/storage/artifacts/{encode_storage_artifact_id('downloads', normalized)}/content" + + +def download_content_url_for_path(path: Path) -> str: + return download_content_url(download_relative_path(path)) + + +def write_json_snapshot(target_path: Path, payload: Any) -> None: + target_path.parent.mkdir(parents=True, exist_ok=True) + target_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def copy_file_if_needed(source_path: Path, target_path: Path) -> bool: + source_resolved = source_path.resolve() + try: + if target_path.exists(): + target_resolved = target_path.resolve() + if target_resolved == source_resolved: + return False + source_stat = source_path.stat() + target_stat = target_path.stat() + if ( + source_stat.st_size == target_stat.st_size + and int(source_stat.st_mtime_ns) == int(target_stat.st_mtime_ns) + ): + return False + except OSError: + pass + target_path.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(source_path, target_path) + return True + + +def make_archive_item(label: str, source_path: Path, archived_path: Path, *, copied: bool) -> dict[str, Any]: + try: + size_bytes = archived_path.stat().st_size + except OSError: + size_bytes = 0 + relative_path = download_relative_path(archived_path) + return { + "label": label, + "source_path": str(source_path), + "archived_path": str(archived_path), + "relative_path": relative_path, + "content_url": download_content_url(relative_path), + "size_bytes": size_bytes, + "copied": copied, + } + + +def sync_directory_to_archive( + source_dir: Path, + target_dir: Path, + *, + label_prefix: str, + seen_targets: set[str], +) -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + if not source_dir.exists() or not source_dir.is_dir(): + return items + for source_path in sorted(source_dir.rglob("*")): + if not source_path.is_file(): + continue + relative_name = source_path.relative_to(source_dir) + target_path = target_dir / relative_name + target_key = str(target_path.resolve()) + if target_key in seen_targets: + continue + copied = copy_file_if_needed(source_path, target_path) + seen_targets.add(target_key) + items.append(make_archive_item(f"{label_prefix}/{relative_name.as_posix()}", source_path, target_path, copied=copied)) + return items + + +def looks_like_external_url(value: str) -> bool: + text = str(value or "").strip().lower() + return text.startswith("http://") or text.startswith("https://") + + +def existing_local_path(value: Any) -> Path | None: + text = str(value or "").strip() + if not text or looks_like_external_url(text): + return None + if "/" not in text and "\\" not in text: + return None + candidate = Path(os.path.expandvars(os.path.expanduser(text))) + if not candidate.is_absolute(): + candidate = (BASE_DIR / candidate).resolve() + else: + candidate = candidate.resolve() + return candidate if candidate.exists() else None + + +def iter_payload_paths(value: Any, prefix: str = "") -> list[dict[str, Any]]: + items: list[dict[str, Any]] = [] + if isinstance(value, dict): + for key, child in value.items(): + if key == "download_archive": + continue + child_prefix = f"{prefix}.{key}" if prefix else str(key) + items.extend(iter_payload_paths(child, child_prefix)) + elif isinstance(value, list): + for index, child in enumerate(value): + child_prefix = f"{prefix}[{index}]" if prefix else f"[{index}]" + items.extend(iter_payload_paths(child, child_prefix)) + else: + candidate = existing_local_path(value) + if candidate is not None: + items.append({"label": prefix or "value", "path": candidate}) + return items + + +def iter_external_links(value: Any, prefix: str = "") -> list[dict[str, str]]: + items: list[dict[str, str]] = [] + if isinstance(value, dict): + for key, child in value.items(): + if key == "download_archive": + continue + child_prefix = f"{prefix}.{key}" if prefix else str(key) + items.extend(iter_external_links(child, child_prefix)) + elif isinstance(value, list): + for index, child in enumerate(value): + child_prefix = f"{prefix}[{index}]" if prefix else f"[{index}]" + items.extend(iter_external_links(child, child_prefix)) + else: + text = str(value or "").strip() + if looks_like_external_url(text): + items.append({"label": prefix or "value", "url": text}) + return items + + +def archive_target_for_label(files_root: Path, label: str, source_path: Path) -> Path: + suffix = source_path.suffix or "" + stem = storage_token(label, source_path.stem or "file") + return files_root / "linked" / f"{stem}{suffix}" + + +def materialize_job_download_archive( + row: dict[str, Any], + *, + artifacts: dict[str, Any], + result: dict[str, Any], +) -> dict[str, Any]: + archive_root = download_job_root(row.get("user_id", ""), row.get("project_id", ""), row["id"]) + files_root = archive_root / "files" + files_root.mkdir(parents=True, exist_ok=True) + + seen_targets: set[str] = set() + items: list[dict[str, Any]] = [] + job_dir = job_storage_dir( + account_id=row.get("user_id", ""), + project_id=row.get("project_id", ""), + job_id=row["id"], + ) + if job_dir.exists() and job_dir.is_dir(): + items.extend(sync_directory_to_archive(job_dir, files_root / "job", label_prefix="job", seen_targets=seen_targets)) + + job_dir_resolved = job_dir.resolve() if job_dir.exists() else None + downloads_root_resolved = DOWNLOADS_DIR.resolve() + for source in iter_payload_paths({"artifacts": artifacts, "result": result}): + source_path = source["path"] + label = source["label"] + try: + source_resolved = source_path.resolve() + except OSError: + continue + if source_path.is_file(): + if str(source_resolved).startswith(str(downloads_root_resolved)): + target_path = source_path + target_key = str(target_path.resolve()) + if target_key in seen_targets: + continue + seen_targets.add(target_key) + items.append(make_archive_item(label, source_path, target_path, copied=False)) + continue + if job_dir_resolved is not None and source_resolved.is_relative_to(job_dir_resolved): + target_path = files_root / "job" / source_resolved.relative_to(job_dir_resolved) + target_key = str(target_path.resolve()) + if target_key in seen_targets: + continue + copied = copy_file_if_needed(source_path, target_path) + seen_targets.add(target_key) + items.append(make_archive_item(label, source_path, target_path, copied=copied)) + continue + target_path = archive_target_for_label(files_root, label, source_path) + target_key = str(target_path.resolve()) + if target_key in seen_targets: + continue + copied = copy_file_if_needed(source_path, target_path) + seen_targets.add(target_key) + items.append(make_archive_item(label, source_path, target_path, copied=copied)) + elif source_path.is_dir(): + if job_dir_resolved is not None and source_resolved.is_relative_to(job_dir_resolved): + continue + target_dir = files_root / "dirs" / storage_token(label, source_path.name or "dir") + items.extend(sync_directory_to_archive(source_path, target_dir, label_prefix=label, seen_targets=seen_targets)) + + unique_links: list[dict[str, str]] = [] + seen_urls: set[str] = set() + for item in iter_external_links({"artifacts": artifacts, "result": result}): + url = item["url"] + if url in seen_urls: + continue + seen_urls.add(url) + unique_links.append(item) + + artifacts_snapshot_path = archive_root / "artifacts.json" + result_snapshot_path = archive_root / "result.json" + links_snapshot_path = archive_root / "external-links.json" + job_snapshot_path = archive_root / "job.json" + manifest_path = archive_root / "archive-manifest.json" + write_json_snapshot( + job_snapshot_path, + { + "id": row["id"], + "user_id": row.get("user_id", ""), + "project_id": row.get("project_id", ""), + "source_type": row.get("source_type", ""), + "line_type": row.get("line_type", ""), + "workflow_key": row.get("workflow_key", ""), + "title": row.get("title", ""), + "status": row.get("status", ""), + "updated_at": row.get("updated_at", ""), + }, + ) + write_json_snapshot(artifacts_snapshot_path, artifacts) + write_json_snapshot(result_snapshot_path, result) + write_json_snapshot(links_snapshot_path, unique_links) + + generated_at = utc_now() + manifest_payload = { + "job_id": row["id"], + "user_id": row.get("user_id", ""), + "project_id": row.get("project_id", ""), + "status": row.get("status", ""), + "line_type": row.get("line_type", ""), + "workflow_key": row.get("workflow_key", ""), + "title": row.get("title", ""), + "download_root": str(archive_root), + "items": items, + "external_links": unique_links, + "generated_at": generated_at, + } + write_json_snapshot(manifest_path, manifest_payload) + + manifest_relative_path = download_relative_path(manifest_path) + return { + "mode": storage_mode(archive_root), + "download_dir": str(archive_root), + "download_root_relative_path": download_relative_path(archive_root), + "job_snapshot_path": str(job_snapshot_path), + "job_snapshot_url": download_content_url_for_path(job_snapshot_path), + "manifest_path": str(manifest_path), + "manifest_relative_path": manifest_relative_path, + "manifest_url": download_content_url(manifest_relative_path), + "artifacts_snapshot_path": str(artifacts_snapshot_path), + "artifacts_snapshot_url": download_content_url_for_path(artifacts_snapshot_path), + "result_snapshot_path": str(result_snapshot_path), + "result_snapshot_url": download_content_url_for_path(result_snapshot_path), + "external_links_path": str(links_snapshot_path), + "external_links_url": download_content_url_for_path(links_snapshot_path), + "item_count": len(items), + "items": items[:40], + "external_link_count": len(unique_links), + "external_links_preview": unique_links[:40], + "generated_at": generated_at, + } + + +def best_effort_job_download_archive( + row: dict[str, Any], + *, + artifacts: dict[str, Any], + result: dict[str, Any], +) -> dict[str, Any]: + try: + return materialize_job_download_archive(row, artifacts=artifacts, result=result) + except Exception as exc: + return { + "mode": storage_mode(download_job_root(row.get("user_id", ""), row.get("project_id", ""), row["id"])), + "error": str(exc)[:500], + "generated_at": utc_now(), + } + + def storage_mode(path: Path) -> str: text = str(path) if text.startswith("/Users/kris/mnt/fnos-share"): @@ -568,10 +868,29 @@ def recent_job_storage_examples(account_id: str, project_id: str | None, limit: for row in rows: artifacts = parse_json_object(row.get("artifacts_json") or "{}") paths = [] - for key in ("uploaded_path", "source_path", "audio_path", "transcript_path", "project_job_dir"): + for key in ( + "uploaded_path", + "source_path", + "audio_path", + "transcript_path", + "project_job_dir", + "download_bundle_dir", + ): value = str(artifacts.get(key) or "").strip() if value: paths.append({"key": key, "path": value}) + archive_payload = artifacts.get("download_archive") if isinstance(artifacts.get("download_archive"), dict) else {} + for key in ( + "download_dir", + "job_snapshot_path", + "manifest_path", + "artifacts_snapshot_path", + "result_snapshot_path", + "external_links_path", + ): + value = str(archive_payload.get(key) or "").strip() + if value: + paths.append({"key": f"archive.{key}", "path": value}) items.append( { "job_id": row["id"], @@ -584,6 +903,141 @@ def recent_job_storage_examples(account_id: str, project_id: str | None, limit: return items +def encode_storage_artifact_id(kind: str, relative_path: str) -> str: + raw = f"{kind}:{relative_path}" + return base64.urlsafe_b64encode(raw.encode("utf-8")).decode("ascii").rstrip("=") + + +def decode_storage_artifact_id(file_id: str) -> tuple[str, str]: + padding = "=" * (-len(file_id) % 4) + try: + raw = base64.urlsafe_b64decode((file_id + padding).encode("ascii")).decode("utf-8") + except Exception as exc: + raise HTTPException(status_code=400, detail=f"Invalid storage artifact id: {exc}") + kind, _, relative_path = raw.partition(":") + normalized_kind = kind.strip() + normalized_relative = relative_path.strip().lstrip("/") + if normalized_kind not in {"downloads", "jobs"} or not normalized_relative: + raise HTTPException(status_code=400, detail="Invalid storage artifact token") + return normalized_kind, normalized_relative + + +def _storage_base_dir(kind: str) -> Path: + return DOWNLOADS_DIR if kind == "downloads" else JOBS_DIR + + +def _storage_scope_root(kind: str, account_id: str, project_id: str | None) -> Path: + if kind == "downloads": + return tenant_download_root(account_id, project_id) if project_id else download_account_root(account_id) + return job_project_root(account_id, project_id) if project_id else job_account_root(account_id) + + +def build_storage_artifact_payload(path: Path, *, base_dir: Path, kind: str) -> dict[str, Any]: + stat = path.stat() + relative_path = str(path.relative_to(base_dir)) + return { + "id": encode_storage_artifact_id(kind, relative_path), + "kind": kind, + "name": path.name, + "relative_path": relative_path, + "path": str(path), + "size_bytes": stat.st_size, + "updated_at": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), + "mime_type": mimetypes.guess_type(path.name)[0] or "application/octet-stream", + "content_url": f"/v2/storage/artifacts/{encode_storage_artifact_id(kind, relative_path)}/content", + } + + +def recent_storage_artifacts(kind: str, account_id: str, project_id: str | None, limit: int = 8) -> list[dict[str, Any]]: + scope_root = _storage_scope_root(kind, account_id, project_id) + base_dir = _storage_base_dir(kind) + if not scope_root.exists(): + return [] + items: list[dict[str, Any]] = [] + for file_path in scope_root.rglob("*"): + if not file_path.is_file(): + continue + try: + items.append(build_storage_artifact_payload(file_path, base_dir=base_dir, kind=kind)) + except OSError: + continue + items.sort(key=lambda item: str(item.get("updated_at") or ""), reverse=True) + return items[:limit] + + +def resolve_owned_storage_artifact(file_id: str, account_id: str, project_id: str | None) -> dict[str, Any]: + kind, relative_path = decode_storage_artifact_id(file_id) + base_dir = _storage_base_dir(kind) + allowed_root = _storage_scope_root(kind, account_id, project_id).resolve() + target_path = (base_dir / relative_path).resolve() + try: + target_path.relative_to(allowed_root) + except ValueError: + raise HTTPException(status_code=404, detail="Storage artifact not found") + if not target_path.exists() or not target_path.is_file(): + raise HTTPException(status_code=404, detail="Storage artifact not found") + return build_storage_artifact_payload(target_path, base_dir=base_dir, kind=kind) + + +def _write_download_text(path: Path, content: str) -> bool: + if not str(content or "").strip(): + return False + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(str(content), encoding="utf-8") + return True + + +def _write_download_json(path: Path, payload: Any) -> bool: + if payload in (None, "", [], {}): + return False + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + return True + + +def persist_download_bundle( + *, + account_id: str, + project_id: str | None, + job_id: str, + title: str, + source_type: str, + source_url: str, + transcript_text: str, + style_summary: str, + combined_text: str, + content_blueprint: dict[str, Any], +) -> dict[str, Any]: + bundle_root = download_job_root(account_id, project_id, job_id) + bundle_root.mkdir(parents=True, exist_ok=True) + + manifest = { + "job_id": job_id, + "title": title, + "source_type": source_type, + "source_url": source_url, + "generated_at": utc_now(), + } + files: list[dict[str, Any]] = [] + writers = [ + (bundle_root / "manifest.json", lambda: _write_download_json(bundle_root / "manifest.json", manifest)), + (bundle_root / "transcript.txt", lambda: _write_download_text(bundle_root / "transcript.txt", transcript_text)), + (bundle_root / "style_summary.md", lambda: _write_download_text(bundle_root / "style_summary.md", style_summary)), + (bundle_root / "combined.md", lambda: _write_download_text(bundle_root / "combined.md", combined_text)), + (bundle_root / "rewrite.md", lambda: _write_download_text(bundle_root / "rewrite.md", str((content_blueprint.get("rewrite") or {}).get("script") or ""))), + (bundle_root / "analysis.json", lambda: _write_download_json(bundle_root / "analysis.json", content_blueprint.get("analysis") or {})), + (bundle_root / "storyboards.json", lambda: _write_download_json(bundle_root / "storyboards.json", content_blueprint.get("storyboards") or [])), + ] + for path, writer in writers: + if writer(): + files.append(build_storage_artifact_payload(path, base_dir=DOWNLOADS_DIR, kind="downloads")) + + return { + "download_bundle_dir": str(bundle_root), + "download_artifacts": files, + } + + def live_recorder_remote_name(source_id: str) -> str: return f"{LIVE_RECORDER_MANAGED_PREFIX}{source_id.replace('-', '')[:12]}" @@ -1254,6 +1708,7 @@ def job_payload(row: dict[str, Any]) -> dict[str, Any]: "error": row.get("error", ""), "artifacts": artifacts_map, "result": result_map, + "archive": artifacts_map.get("download_archive") if isinstance(artifacts_map.get("download_archive"), dict) else {}, "analysis_model_profile_id": row.get("analysis_model_profile_id", ""), "created_at": row["created_at"], "updated_at": row["updated_at"], @@ -1360,8 +1815,17 @@ def update_job_state( row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,)) if not row: raise HTTPException(status_code=404, detail="Job not found") - merged_artifacts = merge_json_field(row.get("artifacts_json") or "{}", artifacts or {}) - merged_result = merge_json_field(row.get("result_json") or "{}", result or {}) + merged_artifacts_map = parse_json_object(row.get("artifacts_json") or "{}") + merged_artifacts_map.update(artifacts or {}) + merged_result_map = parse_json_object(row.get("result_json") or "{}") + merged_result_map.update(result or {}) + merged_artifacts_map["download_archive"] = best_effort_job_download_archive( + {**row, "status": status}, + artifacts=merged_artifacts_map, + result=merged_result_map, + ) + merged_artifacts = json.dumps(merged_artifacts_map, ensure_ascii=False) + merged_result = json.dumps(merged_result_map, ensure_ascii=False) db.execute( """ UPDATE jobs @@ -1943,6 +2407,21 @@ def create_job_record( ) -> dict[str, Any]: job_id = make_id("job") now = utc_now() + artifacts_map = dict(artifacts or {}) + artifacts_map["download_archive"] = best_effort_job_download_archive( + { + "id": job_id, + "user_id": account_id, + "project_id": project_id, + "source_type": source_type, + "line_type": line_type, + "workflow_key": workflow_key, + "title": title, + "status": "pending", + }, + artifacts=artifacts_map, + result={}, + ) db.execute( """ INSERT INTO jobs ( @@ -1966,7 +2445,7 @@ def create_job_record( source_url or None, title, language, - json.dumps(artifacts or {}, ensure_ascii=False), + json.dumps(artifacts_map, ensure_ascii=False), analysis_model_profile_id, now, now, @@ -2171,6 +2650,18 @@ async def process_job(job_id: str) -> None: "------\n" f"分镜:\n{json.dumps(content_blueprint.get('storyboards') or [], ensure_ascii=False, indent=2)}" ) + download_bundle = persist_download_bundle( + account_id=row.get("user_id", ""), + project_id=row.get("project_id", ""), + job_id=job_id, + title=row["title"], + source_type=row["source_type"], + source_url=row.get("source_url") or "", + transcript_text=transcript_text, + style_summary=style_summary, + combined_text=combined_text, + content_blueprint=content_blueprint, + ) kb_row = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (row["knowledge_base_id"],)) if not kb_row: raise RuntimeError("Knowledge base not found") @@ -2207,6 +2698,7 @@ async def process_job(job_id: str) -> None: artifacts={ "document_id": document_id, "project_job_dir": str(job_dir), + **download_bundle, **artifacts, }, result={ @@ -2428,6 +2920,7 @@ def storage_status( if normalized_project_id: resolve_target_project(account["id"], normalized_project_id, username=account["username"]) jobs_account_root = job_account_root(account["id"]) + downloads_account_root = download_account_root(account["id"]) jobs_project_root = job_project_root(account["id"], normalized_project_id or None) downloads_project_root = tenant_download_root(account["id"], normalized_project_id or None) return { @@ -2446,14 +2939,37 @@ def storage_status( }, "tenant_usage": { "account_jobs": directory_usage_payload(jobs_account_root), + "account_downloads": directory_usage_payload(downloads_account_root), "project_jobs": directory_usage_payload(jobs_project_root), "project_downloads": directory_usage_payload(downloads_project_root), "project_id": normalized_project_id, "recent_jobs": recent_job_storage_examples(account["id"], normalized_project_id or None, limit=6), + "recent_download_artifacts": recent_storage_artifacts("downloads", account["id"], normalized_project_id or None, limit=8), + "recent_job_artifacts": recent_storage_artifacts("jobs", account["id"], normalized_project_id or None, limit=8), }, } +@app.get("/v2/storage/artifacts/{file_id}/content") +def stream_storage_artifact_content( + file_id: str, + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(require_approved), +) -> FileResponse: + normalized_project_id = (project_id or "").strip() + if normalized_project_id: + resolve_target_project(account["id"], normalized_project_id, username=account["username"]) + matched = resolve_owned_storage_artifact(file_id, account["id"], normalized_project_id or None) + filename = matched.get("name") or "artifact.bin" + media_type = matched.get("mime_type") or "application/octet-stream" + return FileResponse( + matched["path"], + media_type=media_type, + filename=filename, + headers={"Content-Disposition": f'inline; filename="{filename}"'}, + ) + + @app.get("/v2/integrations/local-models") def integrations_local_models(account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: _ = account