163 lines
5.7 KiB
Python
163 lines
5.7 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
|
|
def _join_url(base_url: str, path: str) -> str:
|
|
base = base_url.rstrip("/")
|
|
if path.startswith("http://") or path.startswith("https://"):
|
|
return path
|
|
return f"{base}/{path.lstrip('/')}"
|
|
|
|
|
|
def _unwrap_response(payload: Any) -> dict[str, Any]:
|
|
if not isinstance(payload, dict):
|
|
return {"value": payload}
|
|
if payload.get("success") is True and "data" in payload:
|
|
data = payload.get("data")
|
|
if isinstance(data, dict):
|
|
return data
|
|
return {"value": data}
|
|
return payload
|
|
|
|
|
|
class N8NClient:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
base_url: str,
|
|
workflow_paths: dict[str, str],
|
|
shared_secret: str = "",
|
|
timeout: float = 60.0,
|
|
) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.workflow_paths = workflow_paths
|
|
self.shared_secret = shared_secret.strip()
|
|
self.timeout = timeout
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return bool(self.base_url)
|
|
|
|
async def trigger(self, workflow_key: str, payload: dict[str, Any]) -> dict[str, Any]:
|
|
workflow_path = self.workflow_paths.get(workflow_key, "").strip()
|
|
if not workflow_path:
|
|
raise ValueError(f"workflow path not configured for {workflow_key}")
|
|
try:
|
|
workflow_path = workflow_path.format(**payload)
|
|
except KeyError:
|
|
pass
|
|
headers: dict[str, str] = {}
|
|
if self.shared_secret:
|
|
headers["X-Orchestrator-Secret"] = self.shared_secret
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
_join_url(self.base_url, workflow_path),
|
|
json=payload,
|
|
headers=headers,
|
|
)
|
|
response.raise_for_status()
|
|
if not response.content:
|
|
return {"triggered": True}
|
|
return _unwrap_response(response.json())
|
|
|
|
|
|
class CutVideoClient:
|
|
def __init__(self, *, base_url: str, api_key: str = "", timeout: float = 120.0) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.api_key = api_key.strip()
|
|
self.timeout = timeout
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return bool(self.base_url)
|
|
|
|
def _headers(self) -> dict[str, str]:
|
|
headers: dict[str, str] = {}
|
|
if self.api_key:
|
|
headers["Authorization"] = f"Bearer {self.api_key}"
|
|
return headers
|
|
|
|
async def submit_job(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
_join_url(self.base_url, "/api/jobs"),
|
|
json=payload,
|
|
headers=self._headers(),
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|
|
|
|
async def get_task(self, task_id: str) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
_join_url(self.base_url, f"/api/tasks/{task_id}"),
|
|
headers=self._headers(),
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|
|
|
|
async def get_run(self, run_id: str) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
_join_url(self.base_url, f"/api/runs/{run_id}"),
|
|
headers=self._headers(),
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|
|
|
|
|
|
class HuobaoDramaClient:
|
|
def __init__(self, *, base_url: str, timeout: float = 180.0) -> None:
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
|
|
@property
|
|
def enabled(self) -> bool:
|
|
return bool(self.base_url)
|
|
|
|
async def create_drama(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
_join_url(self.base_url, "/api/v1/dramas"),
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|
|
|
|
async def generate_image(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
_join_url(self.base_url, "/api/v1/images"),
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|
|
|
|
async def get_image(self, image_id: str) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
_join_url(self.base_url, f"/api/v1/images/{image_id}"),
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|
|
|
|
async def generate_video(self, payload: dict[str, Any]) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.post(
|
|
_join_url(self.base_url, "/api/v1/videos"),
|
|
json=payload,
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|
|
|
|
async def get_video(self, video_id: str) -> dict[str, Any]:
|
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
|
response = await client.get(
|
|
_join_url(self.base_url, f"/api/v1/videos/{video_id}"),
|
|
)
|
|
response.raise_for_status()
|
|
return _unwrap_response(response.json())
|