from __future__ import annotations import mimetypes from pathlib import Path from typing import Any import httpx def _join_url(base_url: str, path: str) -> str: base = base_url.rstrip("/") if path.startswith("http://") or path.startswith("https://"): return path return f"{base}/{path.lstrip('/')}" def _unwrap_response(payload: Any) -> dict[str, Any]: if not isinstance(payload, dict): return {"value": payload} if payload.get("success") is True and "data" in payload: data = payload.get("data") if isinstance(data, dict): return data return {"value": data} return payload class N8NClient: def __init__( self, *, base_url: str, workflow_paths: dict[str, str], shared_secret: str = "", timeout: float = 60.0, ) -> None: self.base_url = base_url.rstrip("/") self.workflow_paths = workflow_paths self.shared_secret = shared_secret.strip() self.timeout = timeout @property def enabled(self) -> bool: return bool(self.base_url) async def trigger(self, workflow_key: str, payload: dict[str, Any]) -> dict[str, Any]: workflow_path = self.workflow_paths.get(workflow_key, "").strip() if not workflow_path: raise ValueError(f"workflow path not configured for {workflow_key}") try: workflow_path = workflow_path.format(**payload) except KeyError: pass headers: dict[str, str] = {} if self.shared_secret: headers["X-Orchestrator-Secret"] = self.shared_secret async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( _join_url(self.base_url, workflow_path), json=payload, headers=headers, ) response.raise_for_status() if not response.content: return {"triggered": True} return _unwrap_response(response.json()) class CutVideoClient: def __init__( self, *, base_url: str, api_key: str = "", timeout: float = 120.0, upload_timeout: float = 1800.0, ) -> None: self.base_url = base_url.rstrip("/") self.api_key = api_key.strip() self.timeout = timeout self.upload_timeout = upload_timeout @property def enabled(self) -> bool: return bool(self.base_url) def _headers(self) -> dict[str, str]: headers: dict[str, str] = {} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" return headers async def submit_job(self, payload: dict[str, Any]) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( _join_url(self.base_url, "/api/jobs"), json=payload, headers=self._headers(), ) response.raise_for_status() return _unwrap_response(response.json()) async def upload_source_file(self, source_path: Path, *, folder_name: str = "") -> dict[str, Any]: content_type = mimetypes.guess_type(source_path.name)[0] or "application/octet-stream" headers = self._headers() data = {"folder_name": folder_name} if folder_name else {} async with httpx.AsyncClient(timeout=self.upload_timeout) as client: with source_path.open("rb") as handle: response = await client.post( _join_url(self.base_url, "/api/uploads"), data=data, files={"files": (source_path.name, handle, content_type)}, headers=headers, ) response.raise_for_status() return _unwrap_response(response.json()) async def get_task(self, task_id: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( _join_url(self.base_url, f"/api/tasks/{task_id}"), headers=self._headers(), ) response.raise_for_status() return _unwrap_response(response.json()) async def get_run(self, run_id: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( _join_url(self.base_url, f"/api/runs/{run_id}"), headers=self._headers(), ) response.raise_for_status() return _unwrap_response(response.json()) class AsrHttpClient: def __init__( self, *, base_url: str, transcribe_path: str = "/transcribe", field_name: str = "wav", timeout: float = 120.0, ) -> None: self.base_url = base_url.rstrip("/") self.transcribe_path = transcribe_path self.field_name = field_name.strip() or "wav" self.timeout = timeout @property def enabled(self) -> bool: return bool(self.base_url) async def transcribe_audio(self, audio_path: Path) -> dict[str, Any]: content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream" async with httpx.AsyncClient(timeout=self.timeout) as client: with audio_path.open("rb") as handle: response = await client.post( _join_url(self.base_url, self.transcribe_path), files={self.field_name: (audio_path.name, handle, content_type)}, ) response.raise_for_status() return _unwrap_response(response.json()) class HuobaoDramaClient: def __init__(self, *, base_url: str, timeout: float = 180.0) -> None: self.base_url = base_url.rstrip("/") self.timeout = timeout @property def enabled(self) -> bool: return bool(self.base_url) async def create_drama(self, payload: dict[str, Any]) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( _join_url(self.base_url, "/api/v1/dramas"), json=payload, ) response.raise_for_status() return _unwrap_response(response.json()) async def generate_image(self, payload: dict[str, Any]) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( _join_url(self.base_url, "/api/v1/images"), json=payload, ) response.raise_for_status() return _unwrap_response(response.json()) async def get_image(self, image_id: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( _join_url(self.base_url, f"/api/v1/images/{image_id}"), ) response.raise_for_status() return _unwrap_response(response.json()) async def generate_video(self, payload: dict[str, Any]) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( _join_url(self.base_url, "/api/v1/videos"), json=payload, ) response.raise_for_status() return _unwrap_response(response.json()) async def get_video(self, video_id: str) -> dict[str, Any]: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( _join_url(self.base_url, f"/api/v1/videos/{video_id}"), ) response.raise_for_status() return _unwrap_response(response.json())