feat: secure nas storage artifacts and archive links

This commit is contained in:
kris
2026-03-23 11:53:21 +08:00
parent a8d503159f
commit d427b89409

View File

@@ -19,8 +19,7 @@ from urllib.parse import quote, urljoin, urlparse
from fastapi import Body, Depends, FastAPI, File, Form, Header, HTTPException, Query, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, Field
from .database import Database, utc_now
@@ -98,9 +97,6 @@ app.add_middleware(
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/downloads", StaticFiles(directory=str(DOWNLOADS_DIR)), name="downloads")
class RegisterAccountRequest(BaseModel):
username: str
password: str
@@ -490,6 +486,310 @@ def tenant_download_root(account_id: str, project_id: str | None) -> Path:
return DOWNLOADS_DIR / storage_token(account_id, "anonymous") / storage_token(project_id, "default-project")
def download_account_root(account_id: str) -> Path:
return DOWNLOADS_DIR / storage_token(account_id, "anonymous")
def download_job_root(account_id: str, project_id: str | None, job_id: str) -> Path:
return tenant_download_root(account_id, project_id) / storage_token(job_id, "job")
def download_relative_path(path: Path) -> str:
try:
return path.resolve().relative_to(DOWNLOADS_DIR.resolve()).as_posix()
except Exception:
return ""
def download_content_url(relative_path: str) -> str:
normalized = relative_path.strip().lstrip("/")
if not normalized:
return ""
return f"/v2/storage/artifacts/{encode_storage_artifact_id('downloads', normalized)}/content"
def download_content_url_for_path(path: Path) -> str:
return download_content_url(download_relative_path(path))
def write_json_snapshot(target_path: Path, payload: Any) -> None:
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def copy_file_if_needed(source_path: Path, target_path: Path) -> bool:
source_resolved = source_path.resolve()
try:
if target_path.exists():
target_resolved = target_path.resolve()
if target_resolved == source_resolved:
return False
source_stat = source_path.stat()
target_stat = target_path.stat()
if (
source_stat.st_size == target_stat.st_size
and int(source_stat.st_mtime_ns) == int(target_stat.st_mtime_ns)
):
return False
except OSError:
pass
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_path, target_path)
return True
def make_archive_item(label: str, source_path: Path, archived_path: Path, *, copied: bool) -> dict[str, Any]:
try:
size_bytes = archived_path.stat().st_size
except OSError:
size_bytes = 0
relative_path = download_relative_path(archived_path)
return {
"label": label,
"source_path": str(source_path),
"archived_path": str(archived_path),
"relative_path": relative_path,
"content_url": download_content_url(relative_path),
"size_bytes": size_bytes,
"copied": copied,
}
def sync_directory_to_archive(
source_dir: Path,
target_dir: Path,
*,
label_prefix: str,
seen_targets: set[str],
) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
if not source_dir.exists() or not source_dir.is_dir():
return items
for source_path in sorted(source_dir.rglob("*")):
if not source_path.is_file():
continue
relative_name = source_path.relative_to(source_dir)
target_path = target_dir / relative_name
target_key = str(target_path.resolve())
if target_key in seen_targets:
continue
copied = copy_file_if_needed(source_path, target_path)
seen_targets.add(target_key)
items.append(make_archive_item(f"{label_prefix}/{relative_name.as_posix()}", source_path, target_path, copied=copied))
return items
def looks_like_external_url(value: str) -> bool:
text = str(value or "").strip().lower()
return text.startswith("http://") or text.startswith("https://")
def existing_local_path(value: Any) -> Path | None:
text = str(value or "").strip()
if not text or looks_like_external_url(text):
return None
if "/" not in text and "\\" not in text:
return None
candidate = Path(os.path.expandvars(os.path.expanduser(text)))
if not candidate.is_absolute():
candidate = (BASE_DIR / candidate).resolve()
else:
candidate = candidate.resolve()
return candidate if candidate.exists() else None
def iter_payload_paths(value: Any, prefix: str = "") -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
if isinstance(value, dict):
for key, child in value.items():
if key == "download_archive":
continue
child_prefix = f"{prefix}.{key}" if prefix else str(key)
items.extend(iter_payload_paths(child, child_prefix))
elif isinstance(value, list):
for index, child in enumerate(value):
child_prefix = f"{prefix}[{index}]" if prefix else f"[{index}]"
items.extend(iter_payload_paths(child, child_prefix))
else:
candidate = existing_local_path(value)
if candidate is not None:
items.append({"label": prefix or "value", "path": candidate})
return items
def iter_external_links(value: Any, prefix: str = "") -> list[dict[str, str]]:
items: list[dict[str, str]] = []
if isinstance(value, dict):
for key, child in value.items():
if key == "download_archive":
continue
child_prefix = f"{prefix}.{key}" if prefix else str(key)
items.extend(iter_external_links(child, child_prefix))
elif isinstance(value, list):
for index, child in enumerate(value):
child_prefix = f"{prefix}[{index}]" if prefix else f"[{index}]"
items.extend(iter_external_links(child, child_prefix))
else:
text = str(value or "").strip()
if looks_like_external_url(text):
items.append({"label": prefix or "value", "url": text})
return items
def archive_target_for_label(files_root: Path, label: str, source_path: Path) -> Path:
suffix = source_path.suffix or ""
stem = storage_token(label, source_path.stem or "file")
return files_root / "linked" / f"{stem}{suffix}"
def materialize_job_download_archive(
row: dict[str, Any],
*,
artifacts: dict[str, Any],
result: dict[str, Any],
) -> dict[str, Any]:
archive_root = download_job_root(row.get("user_id", ""), row.get("project_id", ""), row["id"])
files_root = archive_root / "files"
files_root.mkdir(parents=True, exist_ok=True)
seen_targets: set[str] = set()
items: list[dict[str, Any]] = []
job_dir = job_storage_dir(
account_id=row.get("user_id", ""),
project_id=row.get("project_id", ""),
job_id=row["id"],
)
if job_dir.exists() and job_dir.is_dir():
items.extend(sync_directory_to_archive(job_dir, files_root / "job", label_prefix="job", seen_targets=seen_targets))
job_dir_resolved = job_dir.resolve() if job_dir.exists() else None
downloads_root_resolved = DOWNLOADS_DIR.resolve()
for source in iter_payload_paths({"artifacts": artifacts, "result": result}):
source_path = source["path"]
label = source["label"]
try:
source_resolved = source_path.resolve()
except OSError:
continue
if source_path.is_file():
if str(source_resolved).startswith(str(downloads_root_resolved)):
target_path = source_path
target_key = str(target_path.resolve())
if target_key in seen_targets:
continue
seen_targets.add(target_key)
items.append(make_archive_item(label, source_path, target_path, copied=False))
continue
if job_dir_resolved is not None and source_resolved.is_relative_to(job_dir_resolved):
target_path = files_root / "job" / source_resolved.relative_to(job_dir_resolved)
target_key = str(target_path.resolve())
if target_key in seen_targets:
continue
copied = copy_file_if_needed(source_path, target_path)
seen_targets.add(target_key)
items.append(make_archive_item(label, source_path, target_path, copied=copied))
continue
target_path = archive_target_for_label(files_root, label, source_path)
target_key = str(target_path.resolve())
if target_key in seen_targets:
continue
copied = copy_file_if_needed(source_path, target_path)
seen_targets.add(target_key)
items.append(make_archive_item(label, source_path, target_path, copied=copied))
elif source_path.is_dir():
if job_dir_resolved is not None and source_resolved.is_relative_to(job_dir_resolved):
continue
target_dir = files_root / "dirs" / storage_token(label, source_path.name or "dir")
items.extend(sync_directory_to_archive(source_path, target_dir, label_prefix=label, seen_targets=seen_targets))
unique_links: list[dict[str, str]] = []
seen_urls: set[str] = set()
for item in iter_external_links({"artifacts": artifacts, "result": result}):
url = item["url"]
if url in seen_urls:
continue
seen_urls.add(url)
unique_links.append(item)
artifacts_snapshot_path = archive_root / "artifacts.json"
result_snapshot_path = archive_root / "result.json"
links_snapshot_path = archive_root / "external-links.json"
job_snapshot_path = archive_root / "job.json"
manifest_path = archive_root / "archive-manifest.json"
write_json_snapshot(
job_snapshot_path,
{
"id": row["id"],
"user_id": row.get("user_id", ""),
"project_id": row.get("project_id", ""),
"source_type": row.get("source_type", ""),
"line_type": row.get("line_type", ""),
"workflow_key": row.get("workflow_key", ""),
"title": row.get("title", ""),
"status": row.get("status", ""),
"updated_at": row.get("updated_at", ""),
},
)
write_json_snapshot(artifacts_snapshot_path, artifacts)
write_json_snapshot(result_snapshot_path, result)
write_json_snapshot(links_snapshot_path, unique_links)
generated_at = utc_now()
manifest_payload = {
"job_id": row["id"],
"user_id": row.get("user_id", ""),
"project_id": row.get("project_id", ""),
"status": row.get("status", ""),
"line_type": row.get("line_type", ""),
"workflow_key": row.get("workflow_key", ""),
"title": row.get("title", ""),
"download_root": str(archive_root),
"items": items,
"external_links": unique_links,
"generated_at": generated_at,
}
write_json_snapshot(manifest_path, manifest_payload)
manifest_relative_path = download_relative_path(manifest_path)
return {
"mode": storage_mode(archive_root),
"download_dir": str(archive_root),
"download_root_relative_path": download_relative_path(archive_root),
"job_snapshot_path": str(job_snapshot_path),
"job_snapshot_url": download_content_url_for_path(job_snapshot_path),
"manifest_path": str(manifest_path),
"manifest_relative_path": manifest_relative_path,
"manifest_url": download_content_url(manifest_relative_path),
"artifacts_snapshot_path": str(artifacts_snapshot_path),
"artifacts_snapshot_url": download_content_url_for_path(artifacts_snapshot_path),
"result_snapshot_path": str(result_snapshot_path),
"result_snapshot_url": download_content_url_for_path(result_snapshot_path),
"external_links_path": str(links_snapshot_path),
"external_links_url": download_content_url_for_path(links_snapshot_path),
"item_count": len(items),
"items": items[:40],
"external_link_count": len(unique_links),
"external_links_preview": unique_links[:40],
"generated_at": generated_at,
}
def best_effort_job_download_archive(
row: dict[str, Any],
*,
artifacts: dict[str, Any],
result: dict[str, Any],
) -> dict[str, Any]:
try:
return materialize_job_download_archive(row, artifacts=artifacts, result=result)
except Exception as exc:
return {
"mode": storage_mode(download_job_root(row.get("user_id", ""), row.get("project_id", ""), row["id"])),
"error": str(exc)[:500],
"generated_at": utc_now(),
}
def storage_mode(path: Path) -> str:
text = str(path)
if text.startswith("/Users/kris/mnt/fnos-share"):
@@ -568,10 +868,29 @@ def recent_job_storage_examples(account_id: str, project_id: str | None, limit:
for row in rows:
artifacts = parse_json_object(row.get("artifacts_json") or "{}")
paths = []
for key in ("uploaded_path", "source_path", "audio_path", "transcript_path", "project_job_dir"):
for key in (
"uploaded_path",
"source_path",
"audio_path",
"transcript_path",
"project_job_dir",
"download_bundle_dir",
):
value = str(artifacts.get(key) or "").strip()
if value:
paths.append({"key": key, "path": value})
archive_payload = artifacts.get("download_archive") if isinstance(artifacts.get("download_archive"), dict) else {}
for key in (
"download_dir",
"job_snapshot_path",
"manifest_path",
"artifacts_snapshot_path",
"result_snapshot_path",
"external_links_path",
):
value = str(archive_payload.get(key) or "").strip()
if value:
paths.append({"key": f"archive.{key}", "path": value})
items.append(
{
"job_id": row["id"],
@@ -584,6 +903,141 @@ def recent_job_storage_examples(account_id: str, project_id: str | None, limit:
return items
def encode_storage_artifact_id(kind: str, relative_path: str) -> str:
raw = f"{kind}:{relative_path}"
return base64.urlsafe_b64encode(raw.encode("utf-8")).decode("ascii").rstrip("=")
def decode_storage_artifact_id(file_id: str) -> tuple[str, str]:
padding = "=" * (-len(file_id) % 4)
try:
raw = base64.urlsafe_b64decode((file_id + padding).encode("ascii")).decode("utf-8")
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid storage artifact id: {exc}")
kind, _, relative_path = raw.partition(":")
normalized_kind = kind.strip()
normalized_relative = relative_path.strip().lstrip("/")
if normalized_kind not in {"downloads", "jobs"} or not normalized_relative:
raise HTTPException(status_code=400, detail="Invalid storage artifact token")
return normalized_kind, normalized_relative
def _storage_base_dir(kind: str) -> Path:
return DOWNLOADS_DIR if kind == "downloads" else JOBS_DIR
def _storage_scope_root(kind: str, account_id: str, project_id: str | None) -> Path:
if kind == "downloads":
return tenant_download_root(account_id, project_id) if project_id else download_account_root(account_id)
return job_project_root(account_id, project_id) if project_id else job_account_root(account_id)
def build_storage_artifact_payload(path: Path, *, base_dir: Path, kind: str) -> dict[str, Any]:
stat = path.stat()
relative_path = str(path.relative_to(base_dir))
return {
"id": encode_storage_artifact_id(kind, relative_path),
"kind": kind,
"name": path.name,
"relative_path": relative_path,
"path": str(path),
"size_bytes": stat.st_size,
"updated_at": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(),
"mime_type": mimetypes.guess_type(path.name)[0] or "application/octet-stream",
"content_url": f"/v2/storage/artifacts/{encode_storage_artifact_id(kind, relative_path)}/content",
}
def recent_storage_artifacts(kind: str, account_id: str, project_id: str | None, limit: int = 8) -> list[dict[str, Any]]:
scope_root = _storage_scope_root(kind, account_id, project_id)
base_dir = _storage_base_dir(kind)
if not scope_root.exists():
return []
items: list[dict[str, Any]] = []
for file_path in scope_root.rglob("*"):
if not file_path.is_file():
continue
try:
items.append(build_storage_artifact_payload(file_path, base_dir=base_dir, kind=kind))
except OSError:
continue
items.sort(key=lambda item: str(item.get("updated_at") or ""), reverse=True)
return items[:limit]
def resolve_owned_storage_artifact(file_id: str, account_id: str, project_id: str | None) -> dict[str, Any]:
kind, relative_path = decode_storage_artifact_id(file_id)
base_dir = _storage_base_dir(kind)
allowed_root = _storage_scope_root(kind, account_id, project_id).resolve()
target_path = (base_dir / relative_path).resolve()
try:
target_path.relative_to(allowed_root)
except ValueError:
raise HTTPException(status_code=404, detail="Storage artifact not found")
if not target_path.exists() or not target_path.is_file():
raise HTTPException(status_code=404, detail="Storage artifact not found")
return build_storage_artifact_payload(target_path, base_dir=base_dir, kind=kind)
def _write_download_text(path: Path, content: str) -> bool:
if not str(content or "").strip():
return False
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(str(content), encoding="utf-8")
return True
def _write_download_json(path: Path, payload: Any) -> bool:
if payload in (None, "", [], {}):
return False
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
return True
def persist_download_bundle(
*,
account_id: str,
project_id: str | None,
job_id: str,
title: str,
source_type: str,
source_url: str,
transcript_text: str,
style_summary: str,
combined_text: str,
content_blueprint: dict[str, Any],
) -> dict[str, Any]:
bundle_root = download_job_root(account_id, project_id, job_id)
bundle_root.mkdir(parents=True, exist_ok=True)
manifest = {
"job_id": job_id,
"title": title,
"source_type": source_type,
"source_url": source_url,
"generated_at": utc_now(),
}
files: list[dict[str, Any]] = []
writers = [
(bundle_root / "manifest.json", lambda: _write_download_json(bundle_root / "manifest.json", manifest)),
(bundle_root / "transcript.txt", lambda: _write_download_text(bundle_root / "transcript.txt", transcript_text)),
(bundle_root / "style_summary.md", lambda: _write_download_text(bundle_root / "style_summary.md", style_summary)),
(bundle_root / "combined.md", lambda: _write_download_text(bundle_root / "combined.md", combined_text)),
(bundle_root / "rewrite.md", lambda: _write_download_text(bundle_root / "rewrite.md", str((content_blueprint.get("rewrite") or {}).get("script") or ""))),
(bundle_root / "analysis.json", lambda: _write_download_json(bundle_root / "analysis.json", content_blueprint.get("analysis") or {})),
(bundle_root / "storyboards.json", lambda: _write_download_json(bundle_root / "storyboards.json", content_blueprint.get("storyboards") or [])),
]
for path, writer in writers:
if writer():
files.append(build_storage_artifact_payload(path, base_dir=DOWNLOADS_DIR, kind="downloads"))
return {
"download_bundle_dir": str(bundle_root),
"download_artifacts": files,
}
def live_recorder_remote_name(source_id: str) -> str:
return f"{LIVE_RECORDER_MANAGED_PREFIX}{source_id.replace('-', '')[:12]}"
@@ -1254,6 +1708,7 @@ def job_payload(row: dict[str, Any]) -> dict[str, Any]:
"error": row.get("error", ""),
"artifacts": artifacts_map,
"result": result_map,
"archive": artifacts_map.get("download_archive") if isinstance(artifacts_map.get("download_archive"), dict) else {},
"analysis_model_profile_id": row.get("analysis_model_profile_id", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
@@ -1360,8 +1815,17 @@ def update_job_state(
row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_id,))
if not row:
raise HTTPException(status_code=404, detail="Job not found")
merged_artifacts = merge_json_field(row.get("artifacts_json") or "{}", artifacts or {})
merged_result = merge_json_field(row.get("result_json") or "{}", result or {})
merged_artifacts_map = parse_json_object(row.get("artifacts_json") or "{}")
merged_artifacts_map.update(artifacts or {})
merged_result_map = parse_json_object(row.get("result_json") or "{}")
merged_result_map.update(result or {})
merged_artifacts_map["download_archive"] = best_effort_job_download_archive(
{**row, "status": status},
artifacts=merged_artifacts_map,
result=merged_result_map,
)
merged_artifacts = json.dumps(merged_artifacts_map, ensure_ascii=False)
merged_result = json.dumps(merged_result_map, ensure_ascii=False)
db.execute(
"""
UPDATE jobs
@@ -1943,6 +2407,21 @@ def create_job_record(
) -> dict[str, Any]:
job_id = make_id("job")
now = utc_now()
artifacts_map = dict(artifacts or {})
artifacts_map["download_archive"] = best_effort_job_download_archive(
{
"id": job_id,
"user_id": account_id,
"project_id": project_id,
"source_type": source_type,
"line_type": line_type,
"workflow_key": workflow_key,
"title": title,
"status": "pending",
},
artifacts=artifacts_map,
result={},
)
db.execute(
"""
INSERT INTO jobs (
@@ -1966,7 +2445,7 @@ def create_job_record(
source_url or None,
title,
language,
json.dumps(artifacts or {}, ensure_ascii=False),
json.dumps(artifacts_map, ensure_ascii=False),
analysis_model_profile_id,
now,
now,
@@ -2171,6 +2650,18 @@ async def process_job(job_id: str) -> None:
"------\n"
f"分镜:\n{json.dumps(content_blueprint.get('storyboards') or [], ensure_ascii=False, indent=2)}"
)
download_bundle = persist_download_bundle(
account_id=row.get("user_id", ""),
project_id=row.get("project_id", ""),
job_id=job_id,
title=row["title"],
source_type=row["source_type"],
source_url=row.get("source_url") or "",
transcript_text=transcript_text,
style_summary=style_summary,
combined_text=combined_text,
content_blueprint=content_blueprint,
)
kb_row = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ?", (row["knowledge_base_id"],))
if not kb_row:
raise RuntimeError("Knowledge base not found")
@@ -2207,6 +2698,7 @@ async def process_job(job_id: str) -> None:
artifacts={
"document_id": document_id,
"project_job_dir": str(job_dir),
**download_bundle,
**artifacts,
},
result={
@@ -2428,6 +2920,7 @@ def storage_status(
if normalized_project_id:
resolve_target_project(account["id"], normalized_project_id, username=account["username"])
jobs_account_root = job_account_root(account["id"])
downloads_account_root = download_account_root(account["id"])
jobs_project_root = job_project_root(account["id"], normalized_project_id or None)
downloads_project_root = tenant_download_root(account["id"], normalized_project_id or None)
return {
@@ -2446,14 +2939,37 @@ def storage_status(
},
"tenant_usage": {
"account_jobs": directory_usage_payload(jobs_account_root),
"account_downloads": directory_usage_payload(downloads_account_root),
"project_jobs": directory_usage_payload(jobs_project_root),
"project_downloads": directory_usage_payload(downloads_project_root),
"project_id": normalized_project_id,
"recent_jobs": recent_job_storage_examples(account["id"], normalized_project_id or None, limit=6),
"recent_download_artifacts": recent_storage_artifacts("downloads", account["id"], normalized_project_id or None, limit=8),
"recent_job_artifacts": recent_storage_artifacts("jobs", account["id"], normalized_project_id or None, limit=8),
},
}
@app.get("/v2/storage/artifacts/{file_id}/content")
def stream_storage_artifact_content(
file_id: str,
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(require_approved),
) -> FileResponse:
normalized_project_id = (project_id or "").strip()
if normalized_project_id:
resolve_target_project(account["id"], normalized_project_id, username=account["username"])
matched = resolve_owned_storage_artifact(file_id, account["id"], normalized_project_id or None)
filename = matched.get("name") or "artifact.bin"
media_type = matched.get("mime_type") or "application/octet-stream"
return FileResponse(
matched["path"],
media_type=media_type,
filename=filename,
headers={"Content-Disposition": f'inline; filename="{filename}"'},
)
@app.get("/v2/integrations/local-models")
def integrations_local_models(account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
_ = account