import { spawn } from "node:child_process"; import path from "node:path"; function parseBoolean(value) { return String(value || "").trim().toLowerCase() === "true"; } function parseArgs(value) { return String(value || "") .trim() .split(/\s+/) .filter(Boolean); } function parseTimeoutMs(value) { const parsed = Number.parseInt(String(value || ""), 10); return Number.isFinite(parsed) && parsed > 0 ? parsed : 45000; } function resolveCommandArgs(command, args, cwd) { const runtimeName = path.basename(command || "").toLowerCase(); const scriptRuntimes = new Set([ "node", "node.exe", "tsx", "tsx.cmd", "bun", "bun.exe", "deno", "deno.exe", ]); if (!scriptRuntimes.has(runtimeName) || args.length === 0) { return args; } const [first, ...rest] = args; if (!first || first.startsWith("-")) { return args; } const resolvedFirst = path.isAbsolute(first) ? first : path.resolve(cwd || process.cwd(), first); return [resolvedFirst, ...rest]; } function pickConfigValue(config, key, fallback) { if (config && config[key] !== undefined && config[key] !== null && `${config[key]}`.trim() !== "") { return config[key]; } return fallback; } export function getOmxTeamTaskRunnerConfig(env = process.env, config = {}) { const enabled = parseBoolean(pickConfigValue(config, "omxEnabled", env.BOSS_OMX_ENABLED)); const command = String(pickConfigValue(config, "omxCommand", env.BOSS_OMX_COMMAND) || "").trim() || undefined; const args = Array.isArray(config?.omxArgs) ? config.omxArgs.map((item) => String(item)).filter(Boolean) : parseArgs(pickConfigValue(config, "omxArgs", env.BOSS_OMX_ARGS)); const cwd = String(pickConfigValue(config, "omxWorkdir", env.BOSS_OMX_WORKDIR) || "").trim() || undefined; const timeoutMs = parseTimeoutMs(pickConfigValue(config, "omxTimeoutMs", env.BOSS_OMX_TIMEOUT_MS)); return { enabled, command, args, cwd, timeoutMs, }; } export function shouldUseOmxTeamTaskRunner(task) { return task?.taskType === "dispatch_execution" && String(task?.orchestrationBackendId || "").trim() === "omx-team"; } export function buildOmxTeamTaskExecution(config, task) { if (!config?.enabled) { throw new Error("OMX_TEAM_RUNTIME_DISABLED"); } if (!config?.command) { throw new Error("OMX_TEAM_COMMAND_REQUIRED"); } const cwd = config.cwd || process.cwd(); return { command: config.command, args: resolveCommandArgs(config.command, config.args || [], cwd), cwd, timeoutMs: config.timeoutMs || 45000, stdinPayload: { requestKind: "dispatch_execution", requestId: String(task?.taskId || "").trim(), dispatchExecutionId: String(task?.dispatchExecutionId || "").trim(), groupProjectId: String(task?.projectId || "").trim(), targetProjectId: String(task?.targetProjectId || "").trim(), targetThreadId: String(task?.targetThreadId || "").trim(), targetThreadDisplayName: String(task?.targetThreadDisplayName || "").trim() || undefined, objective: String(task?.executionPrompt || task?.requestText || "").trim(), orchestrationBackendId: "omx-team", workersRequested: 1, context: { requestedBy: String(task?.requestedByAccount || task?.requestedBy || "").trim() || undefined, requestedAt: String(task?.requestedAt || "").trim() || undefined, }, }, }; } function parseJsonLine(rawOutput) { const lines = String(rawOutput || "") .trim() .split(/\r?\n/) .map((line) => line.trim()) .filter(Boolean); const candidate = lines.at(-1) || ""; return JSON.parse(candidate); } export function parseOmxTeamTaskResult(rawOutput) { const parsed = parseJsonLine(rawOutput); if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { throw new Error("INVALID_OMX_RUNTIME_PAYLOAD"); } if (parsed.status === "failed") { return { status: "failed", requestId: typeof parsed.requestId === "string" ? parsed.requestId.trim() || undefined : undefined, dispatchExecutionId: typeof parsed.dispatchExecutionId === "string" ? parsed.dispatchExecutionId.trim() || undefined : undefined, errorMessage: typeof parsed.error === "string" && parsed.error.trim() ? parsed.error.trim() : "OMX_EXECUTION_FAILED", }; } const rawThreadReply = typeof parsed.rawThreadReply === "string" && parsed.rawThreadReply.trim() ? parsed.rawThreadReply.trim() : typeof parsed.summary === "string" && parsed.summary.trim() ? parsed.summary.trim() : typeof parsed.replyBody === "string" && parsed.replyBody.trim() ? parsed.replyBody.trim() : ""; if (!rawThreadReply) { throw new Error("INVALID_OMX_RUNTIME_PAYLOAD"); } return { status: "completed", requestId: typeof parsed.requestId === "string" ? parsed.requestId.trim() || undefined : undefined, dispatchExecutionId: typeof parsed.dispatchExecutionId === "string" ? parsed.dispatchExecutionId.trim() || undefined : undefined, rawThreadReply, replyBody: typeof parsed.replyBody === "string" && parsed.replyBody.trim() ? parsed.replyBody.trim() : undefined, }; } export async function executeOmxTeamTask(config, task) { const execution = buildOmxTeamTaskExecution(config, task); return new Promise((resolve, reject) => { const child = spawn(execution.command, execution.args, { cwd: execution.cwd, env: process.env, stdio: ["pipe", "pipe", "pipe"], }); let stdout = ""; let stderr = ""; let timedOut = false; const timer = setTimeout(() => { timedOut = true; child.kill("SIGKILL"); }, execution.timeoutMs); child.stdout.setEncoding("utf8"); child.stderr.setEncoding("utf8"); child.stdout.on("data", (chunk) => { stdout += chunk; }); child.stderr.on("data", (chunk) => { stderr += chunk; }); child.on("error", (error) => { clearTimeout(timer); reject(error); }); child.on("close", (code) => { clearTimeout(timer); if (timedOut) { reject(new Error("OMX_EXECUTION_TIMEOUT")); return; } if (code !== 0) { reject(new Error(stderr.trim() || `omx exit code ${code}`)); return; } try { resolve(parseOmxTeamTaskResult(stdout)); } catch (error) { reject(error); } }); child.stdin.write(JSON.stringify(execution.stdinPayload)); child.stdin.end(); }); }