Files
storyforge/collector-service/app/integrations.py

218 lines
7.7 KiB
Python

from __future__ import annotations
import mimetypes
from pathlib import Path
from typing import Any
import httpx
def _join_url(base_url: str, path: str) -> str:
base = base_url.rstrip("/")
if path.startswith("http://") or path.startswith("https://"):
return path
return f"{base}/{path.lstrip('/')}"
def _unwrap_response(payload: Any) -> dict[str, Any]:
if not isinstance(payload, dict):
return {"value": payload}
if payload.get("success") is True and "data" in payload:
data = payload.get("data")
if isinstance(data, dict):
return data
return {"value": data}
return payload
class N8NClient:
def __init__(
self,
*,
base_url: str,
workflow_paths: dict[str, str],
shared_secret: str = "",
timeout: float = 60.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.workflow_paths = workflow_paths
self.shared_secret = shared_secret.strip()
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
async def trigger(self, workflow_key: str, payload: dict[str, Any]) -> dict[str, Any]:
workflow_path = self.workflow_paths.get(workflow_key, "").strip()
if not workflow_path:
raise ValueError(f"workflow path not configured for {workflow_key}")
try:
workflow_path = workflow_path.format(**payload)
except KeyError:
pass
headers: dict[str, str] = {}
if self.shared_secret:
headers["X-Orchestrator-Secret"] = self.shared_secret
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, workflow_path),
json=payload,
headers=headers,
)
response.raise_for_status()
if not response.content:
return {"triggered": True}
return _unwrap_response(response.json())
class CutVideoClient:
def __init__(
self,
*,
base_url: str,
api_key: str = "",
timeout: float = 120.0,
upload_timeout: float = 1800.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.api_key = api_key.strip()
self.timeout = timeout
self.upload_timeout = upload_timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
def _headers(self) -> dict[str, str]:
headers: dict[str, str] = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
async def submit_job(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/jobs"),
json=payload,
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
async def upload_source_file(self, source_path: Path, *, folder_name: str = "") -> dict[str, Any]:
content_type = mimetypes.guess_type(source_path.name)[0] or "application/octet-stream"
headers = self._headers()
data = {"folder_name": folder_name} if folder_name else {}
async with httpx.AsyncClient(timeout=self.upload_timeout) as client:
with source_path.open("rb") as handle:
response = await client.post(
_join_url(self.base_url, "/api/uploads"),
data=data,
files={"files": (source_path.name, handle, content_type)},
headers=headers,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_task(self, task_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/tasks/{task_id}"),
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_run(self, run_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/runs/{run_id}"),
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
class AsrHttpClient:
def __init__(
self,
*,
base_url: str,
transcribe_path: str = "/transcribe",
field_name: str = "wav",
timeout: float = 120.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.transcribe_path = transcribe_path
self.field_name = field_name.strip() or "wav"
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
async def transcribe_audio(self, audio_path: Path) -> dict[str, Any]:
content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream"
async with httpx.AsyncClient(timeout=self.timeout) as client:
with audio_path.open("rb") as handle:
response = await client.post(
_join_url(self.base_url, self.transcribe_path),
files={self.field_name: (audio_path.name, handle, content_type)},
)
response.raise_for_status()
return _unwrap_response(response.json())
class HuobaoDramaClient:
def __init__(self, *, base_url: str, timeout: float = 180.0) -> None:
self.base_url = base_url.rstrip("/")
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
async def create_drama(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/v1/dramas"),
json=payload,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def generate_image(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/v1/images"),
json=payload,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_image(self, image_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/v1/images/{image_id}"),
)
response.raise_for_status()
return _unwrap_response(response.json())
async def generate_video(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/v1/videos"),
json=payload,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_video(self, video_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/v1/videos/{video_id}"),
)
response.raise_for_status()
return _unwrap_response(response.json())