test: harden remote control stress flow

This commit is contained in:
AI Bot
2026-05-11 23:12:47 +08:00
parent a311280238
commit 9c8ffebb92
7 changed files with 884 additions and 40 deletions

487
scripts/stress-remote-control.mjs Executable file
View File

@@ -0,0 +1,487 @@
#!/usr/bin/env node
import { spawn } from "node:child_process";
import { createServer } from "node:http";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { fileURLToPath } from "node:url";
import { executeBrowserControlTask } from "../local-agent/browser-control-task-runner.mjs";
import { executeComputerUseTask } from "../local-agent/computer-use-task-runner.mjs";
const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "..");
function parseArgs(argv) {
const options = {
chainTasks: 80,
runtimeTasks: 240,
runtimeConcurrency: 24,
pollMs: 5,
timeoutMs: 45_000,
skipChain: false,
skipRuntime: false,
};
for (const arg of argv) {
if (arg === "--skip-chain") options.skipChain = true;
else if (arg === "--skip-runtime") options.skipRuntime = true;
else if (arg.startsWith("--chain-tasks=")) options.chainTasks = positiveInt(arg.split("=")[1], options.chainTasks);
else if (arg.startsWith("--runtime-tasks=")) options.runtimeTasks = positiveInt(arg.split("=")[1], options.runtimeTasks);
else if (arg.startsWith("--runtime-concurrency=")) {
options.runtimeConcurrency = positiveInt(arg.split("=")[1], options.runtimeConcurrency);
} else if (arg.startsWith("--poll-ms=")) options.pollMs = positiveInt(arg.split("=")[1], options.pollMs);
else if (arg.startsWith("--timeout-ms=")) options.timeoutMs = positiveInt(arg.split("=")[1], options.timeoutMs);
else if (arg === "--help" || arg === "-h") {
options.help = true;
}
}
return options;
}
function positiveInt(value, fallback) {
const parsed = Number.parseInt(String(value || ""), 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}
function percentile(values, p) {
return values[Math.min(values.length - 1, Math.floor(values.length * p))] || 0;
}
function listen(server, host = "127.0.0.1") {
return new Promise((resolve, reject) => {
server.once("error", reject);
server.listen(0, host, () => {
server.off("error", reject);
resolve(server.address().port);
});
});
}
function closeServer(server) {
return new Promise((resolve) => server.close(resolve));
}
function readJsonBody(request) {
return new Promise((resolve, reject) => {
let raw = "";
request.setEncoding("utf8");
request.on("data", (chunk) => {
raw += chunk;
});
request.on("end", () => {
try {
resolve(raw ? JSON.parse(raw) : {});
} catch (error) {
reject(error);
}
});
request.on("error", reject);
});
}
async function waitFor(predicate, timeoutMs) {
const started = Date.now();
while (Date.now() - started < timeoutMs) {
if (await predicate()) return;
await new Promise((resolve) => setTimeout(resolve, 25));
}
throw new Error(`stress timeout after ${timeoutMs}ms`);
}
async function writeChainRuntimeFixtures(root) {
const browserRuntime = path.join(root, "browser-runtime.mjs");
await writeFile(
browserRuntime,
`
let input = "";
process.stdin.setEncoding("utf8");
process.stdin.on("data", chunk => input += chunk);
process.stdin.on("end", () => {
const payload = JSON.parse(input || "{}");
const url = (payload.objective.match(/https?:\\/\\/\\S+/) || [])[0] || "https://example.com";
process.stdout.write(JSON.stringify({
status: "completed",
requestId: payload.requestId,
replyBody: "browser ok " + payload.requestId,
targetUrl: url,
executionSummary: "stress-browser-ok"
}) + "\\n");
});
`,
"utf8",
);
const computerRuntime = path.join(root, "computer-runtime.mjs");
await writeFile(
computerRuntime,
`
let input = "";
process.stdin.setEncoding("utf8");
process.stdin.on("data", chunk => input += chunk);
process.stdin.on("end", () => {
const payload = JSON.parse(input || "{}");
if (String(payload.objective || "").includes("dialog")) {
process.stdout.write(JSON.stringify({
status: "needs_user_action",
requestId: payload.requestId,
kind: "dialog_intervention_required",
dialogId: "stress-dialog-" + payload.requestId,
appName: "System Settings",
platform: "darwin",
risk: "high",
summary: "stress dialog requires user action " + payload.requestId,
recommendedAction: "handled_on_device",
availableActions: ["handled_on_device", "cancel_task"]
}) + "\\n");
return;
}
process.stdout.write(JSON.stringify({
status: "completed",
requestId: payload.requestId,
replyBody: "desktop ok " + payload.requestId,
targetApp: "Finder",
executionSummary: "stress-desktop-ok"
}) + "\\n");
});
`,
"utf8",
);
return { browserRuntime, computerRuntime };
}
function buildChainTasks(totalTasks) {
return Array.from({ length: totalTasks }, (_, index) => {
const n = index + 1;
const isDialog = n % 10 === 0;
const isBrowser = !isDialog && n % 2 === 0;
return {
taskId: `stress-task-${String(n).padStart(3, "0")}`,
taskType: isBrowser ? "browser_control" : "desktop_control",
projectId: "master-agent",
requestText: isDialog
? `open system settings dialog ${n}`
: isBrowser
? `open https://example.com/stress/${n}`
: `open Finder action ${n}`,
executionPrompt: "",
requestedByAccount: "krisolo",
deviceId: "mac-studio",
dispatchExecutionId: `stress-dispatch-${n}`,
targetThreadId: `stress-thread-${n}`,
requestedAt: new Date().toISOString(),
riskLevel: isDialog ? "high" : "medium",
};
});
}
async function runChainStress(options) {
const runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-remote-control-stress-"));
const skillsDir = path.join(runtimeRoot, "skills");
await mkdir(skillsDir, { recursive: true });
const { browserRuntime, computerRuntime } = await writeChainRuntimeFixtures(runtimeRoot);
const tasks = buildChainTasks(options.chainTasks);
const pending = [...tasks];
const claimedAt = new Map();
const completions = [];
const appLogs = [];
let claimRequests = 0;
let heartbeatRequests = 0;
let skillRequests = 0;
const controlPlane = createServer(async (request, response) => {
const url = request.url || "";
try {
if (request.method === "POST" && url === "/api/v1/master-agent/tasks/claim") {
claimRequests += 1;
const task = pending.shift() || null;
if (task) claimedAt.set(task.taskId, Date.now());
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true, task }));
return;
}
const completeMatch = url.match(/^\/api\/v1\/master-agent\/tasks\/([^/]+)\/complete$/);
if (request.method === "POST" && completeMatch) {
const body = await readJsonBody(request);
completions.push({ taskId: completeMatch[1], body, receivedAt: Date.now() });
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true }));
return;
}
if (request.method === "POST" && url === "/api/device-heartbeat") {
heartbeatRequests += 1;
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true, token: "stress-server-token" }));
return;
}
if (request.method === "POST" && url === "/api/v1/devices/mac-studio/skills") {
skillRequests += 1;
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true }));
return;
}
if (request.method === "POST" && url === "/api/v1/app-logs") {
appLogs.push(await readJsonBody(request));
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true }));
return;
}
response.writeHead(404, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: false, url }));
} catch (error) {
response.writeHead(500, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: false, error: error.message }));
}
});
const controlPort = await listen(controlPlane);
const probe = createServer();
const agentPort = await listen(probe);
await closeServer(probe);
const configPath = path.join(runtimeRoot, "local-agent-config.json");
await writeFile(
configPath,
JSON.stringify({
port: agentPort,
bindHost: "127.0.0.1",
controlPlaneUrl: `http://127.0.0.1:${controlPort}`,
deviceId: "mac-studio",
token: "stress-local-token",
name: "Mac Studio Stress",
account: "krisolo",
status: "online",
codexSessionDiscoveryEnabled: false,
projects: ["master-agent"],
skillsDir,
masterAgentEnabled: true,
masterAgentPollIntervalMs: options.pollMs,
heartbeatIntervalMs: 60_000,
skillLifecycleEnabled: false,
browserControlEnabled: true,
browserControlCommand: process.execPath,
browserControlArgs: [browserRuntime],
browserControlWorkdir: repoRoot,
browserControlTimeoutMs: 5_000,
computerUseEnabled: true,
computerUseCommand: process.execPath,
computerUseArgs: [computerRuntime],
computerUseWorkdir: repoRoot,
computerUseTimeoutMs: 5_000,
}),
);
const child = spawn(process.execPath, ["local-agent/server.mjs", configPath], {
cwd: repoRoot,
env: process.env,
stdio: ["ignore", "pipe", "pipe"],
});
let stderr = "";
child.stderr.setEncoding("utf8");
child.stderr.on("data", (chunk) => {
stderr += chunk;
});
const started = Date.now();
try {
await waitFor(() => completions.length === options.chainTasks, options.timeoutMs);
const durationMs = Date.now() - started;
return summarizeChainStress({
totalTasks: options.chainTasks,
durationMs,
completions,
tasks,
claimedAt,
claimRequests,
heartbeatRequests,
skillRequests,
appLogs,
stderr,
});
} finally {
child.kill("SIGTERM");
await closeServer(controlPlane).catch(() => null);
await rm(runtimeRoot, { recursive: true, force: true }).catch(() => null);
}
}
function summarizeChainStress(input) {
const completed = input.completions.filter((item) => item.body.status === "completed");
const waiting = input.completions.filter((item) => item.body.status === "needs_user_action");
const failed = input.completions.filter((item) => item.body.status === "failed");
const missing = input.tasks.filter(
(task) => !input.completions.some((item) => item.taskId === task.taskId),
);
const duplicateCount =
input.completions.length - new Set(input.completions.map((item) => item.taskId)).size;
const latencies = input.completions
.map((item) => item.receivedAt - (input.claimedAt.get(item.taskId) || item.receivedAt))
.sort((a, b) => a - b);
const invalidDialog = waiting.filter(
(item) =>
item.body.kind !== "dialog_intervention_required" ||
!Array.isArray(item.body.availableActions),
);
const invalidCompleted = completed.filter((item) => !item.body.replyBody);
return {
name: "chain",
totalTasks: input.totalTasks,
durationMs: input.durationMs,
throughputPerSec: Number((input.totalTasks / (input.durationMs / 1000)).toFixed(2)),
completed: completed.length,
waitingUserAction: waiting.length,
failed: failed.length,
missing: missing.length,
duplicateCount,
claimRequests: input.claimRequests,
heartbeatRequests: input.heartbeatRequests,
skillRequests: input.skillRequests,
appLogs: input.appLogs.length,
latencyMs: {
min: latencies[0] || 0,
p50: percentile(latencies, 0.5),
p95: percentile(latencies, 0.95),
max: latencies.at(-1) || 0,
},
invalidDialog: invalidDialog.length,
invalidCompleted: invalidCompleted.length,
stderrTail: input.stderr.trim().slice(-500),
};
}
async function runRuntimeStress(options) {
const total = options.runtimeTasks;
const concurrency = Math.min(options.runtimeConcurrency, total);
let next = 0;
let active = 0;
const results = [];
const started = Date.now();
async function runOne(index) {
const n = index + 1;
if (n % 3 === 0) {
return executeComputerUseTask(
{
taskId: `runtime-desktop-${n}`,
taskType: "desktop_control",
requestText: `open Finder ${n}`,
projectId: "master-agent",
targetThreadId: `thread-${n}`,
requestedByAccount: "krisolo",
},
{
computerUseEnabled: true,
computerUseCommand: process.execPath,
computerUseArgs: ["tests/fixtures/computer-use-runtime.mjs"],
computerUseWorkdir: repoRoot,
computerUseTimeoutMs: 5000,
},
);
}
return executeBrowserControlTask(
{
taskId: `runtime-browser-${n}`,
taskType: "browser_control",
requestText: `open https://example.com/runtime/${n}`,
projectId: "master-agent",
targetThreadId: `thread-${n}`,
requestedByAccount: "krisolo",
},
{
browserControlEnabled: true,
browserControlCommand: process.execPath,
browserControlArgs: ["tests/fixtures/browser-control-runtime.mjs"],
browserControlWorkdir: repoRoot,
browserControlTimeoutMs: 5000,
},
);
}
await new Promise((resolve) => {
const pump = () => {
while (active < concurrency && next < total) {
const index = next;
next += 1;
active += 1;
const taskStarted = Date.now();
runOne(index)
.then((result) => results.push({ ok: true, result, latencyMs: Date.now() - taskStarted }))
.catch((error) => results.push({ ok: false, error: error.message, latencyMs: Date.now() - taskStarted }))
.finally(() => {
active -= 1;
if (results.length === total) resolve();
else pump();
});
}
};
pump();
});
const durationMs = Date.now() - started;
const failed = results.filter((item) => !item.ok || item.result?.status === "failed");
const completed = results.filter((item) => item.ok && item.result?.status === "completed");
const latencies = results.map((item) => item.latencyMs).sort((a, b) => a - b);
return {
name: "runtime",
total,
concurrency,
durationMs,
throughputPerSec: Number((total / (durationMs / 1000)).toFixed(2)),
completed: completed.length,
failed: failed.length,
latencyMs: {
min: latencies[0] || 0,
p50: percentile(latencies, 0.5),
p95: percentile(latencies, 0.95),
max: latencies.at(-1) || 0,
},
firstFailure: failed[0] || null,
};
}
function hasFailure(summary) {
if (summary.name === "chain") {
return (
summary.failed > 0 ||
summary.missing > 0 ||
summary.duplicateCount > 0 ||
summary.invalidDialog > 0 ||
summary.invalidCompleted > 0
);
}
return summary.failed > 0;
}
function printHelp() {
console.log(`Usage: node scripts/stress-remote-control.mjs [options]
Options:
--chain-tasks=N local-agent chain tasks, default 80
--runtime-tasks=N direct runtime tasks, default 240
--runtime-concurrency=N direct runtime concurrency, default 24
--poll-ms=N local-agent task poll interval, default 5
--timeout-ms=N chain stress timeout, default 45000
--skip-chain skip local-agent chain stress
--skip-runtime skip direct runtime stress
`);
}
const options = parseArgs(process.argv.slice(2));
if (options.help) {
printHelp();
process.exit(0);
}
const summaries = [];
if (!options.skipChain) {
summaries.push(await runChainStress(options));
}
if (!options.skipRuntime) {
summaries.push(await runRuntimeStress(options));
}
console.log(JSON.stringify({ ok: summaries.every((summary) => !hasFailure(summary)), summaries }, null, 2));
if (summaries.some(hasFailure)) {
process.exitCode = 1;
}