import test from "node:test"; import assert from "node:assert/strict"; import os from "node:os"; import path from "node:path"; import { mkdtemp, rm } from "node:fs/promises"; import { NextRequest } from "next/server"; let runtimeRoot = ""; let readState: (typeof import("../src/lib/boss-data"))["readState"]; let writeState: (typeof import("../src/lib/boss-data"))["writeState"]; let handleTelegramWebhookRequest: (typeof import("../src/lib/telegram-gateway"))["handleTelegramWebhookRequest"]; let completeTaskRoute: (typeof import("../src/app/api/v1/master-agent/tasks/[taskId]/complete/route"))["POST"]; let baseState: Awaited>; async function waitForCondition(predicate: () => boolean | Promise, timeoutMs = 1000) { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { if (await predicate()) return; await new Promise((resolve) => setTimeout(resolve, 20)); } } async function setup() { if (runtimeRoot) return; runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-telegram-gateway-")); process.env.BOSS_RUNTIME_ROOT = runtimeRoot; process.env.BOSS_STATE_FILE = path.join(runtimeRoot, "boss-state.json"); const [data, telegramGateway, taskCompleteRoute] = await Promise.all([ import("../src/lib/boss-data.ts"), import("../src/lib/telegram-gateway.ts"), import("../src/app/api/v1/master-agent/tasks/[taskId]/complete/route.ts"), ]); readState = data.readState; writeState = data.writeState; handleTelegramWebhookRequest = telegramGateway.handleTelegramWebhookRequest; completeTaskRoute = taskCompleteRoute.POST; baseState = structuredClone(await data.readState()); } test.after(async () => { if (runtimeRoot) { await rm(runtimeRoot, { recursive: true, force: true }); } }); test.beforeEach(async () => { await setup(); await writeState(structuredClone(baseState)); const state = await readState(); state.telegramIntegration = { enabled: true, mode: "webhook", botToken: "bot-token-demo", botUsername: "boss_demo_bot", dmPolicy: "allowlist", allowFrom: ["123456"], groupPolicy: "allowlist", groups: [], requireMentionInGroups: true, defaultProjectId: "master-agent", webhookSecret: "boss-telegram-secret", lastConfiguredAt: "2026-04-19T10:00:00+08:00", lastConfiguredBy: "krisolo", processedUpdateIds: [], }; await writeState(state); }); test("Telegram webhook 会拒绝未通过 allowlist 的私聊消息", async () => { await setup(); const response = await handleTelegramWebhookRequest({ request: new NextRequest("http://127.0.0.1:3000/api/v1/integrations/telegram/webhook", { method: "POST", headers: { "content-type": "application/json", "x-telegram-bot-api-secret-token": "boss-telegram-secret", }, body: JSON.stringify({ update_id: 1001, message: { message_id: 301, date: 1_761_000_000, chat: { id: 987654, type: "private" }, from: { id: 999999, is_bot: false, first_name: "Guest" }, text: "你好", }, }), }), }); assert.equal(response.status, 403); const payload = (await response.json()) as { ok: boolean; message: string }; assert.equal(payload.ok, false); assert.equal(payload.message, "TELEGRAM_SENDER_FORBIDDEN"); }); test("Telegram webhook 对 allowlist 私聊会走主 Agent 快速回复并调用 sendMessage", async () => { await setup(); const originalFetch = globalThis.fetch; const outboundCalls: Array<{ url: string; body: unknown }> = []; globalThis.fetch = (async (input, init) => { const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; if (url === "https://api.telegram.org/botbot-token-demo/sendMessage") { outboundCalls.push({ url, body: JSON.parse(String(init?.body ?? "{}")), }); return new Response(JSON.stringify({ ok: true, result: { message_id: 9001 } }), { status: 200, headers: { "content-type": "application/json" }, }); } throw new Error(`unexpected fetch: ${url}`); }) as typeof fetch; try { const response = await handleTelegramWebhookRequest({ request: new NextRequest("http://127.0.0.1:3000/api/v1/integrations/telegram/webhook", { method: "POST", headers: { "content-type": "application/json", "x-telegram-bot-api-secret-token": "boss-telegram-secret", }, body: JSON.stringify({ update_id: 1002, message: { message_id: 302, date: 1_761_000_001, chat: { id: 123456, type: "private" }, from: { id: 123456, is_bot: false, first_name: "Kris" }, text: "hello", }, }), }), }); assert.equal(response.status, 200); const payload = (await response.json()) as { ok: boolean; delivery: string }; assert.equal(payload.ok, true); assert.equal(payload.delivery, "sent"); assert.equal(outboundCalls.length, 1); assert.equal((outboundCalls[0]?.body as { chat_id: number }).chat_id, 123456); assert.match(String((outboundCalls[0]?.body as { text: string }).text), /主 Agent 可以开始协调/); const state = await readState(); const project = state.projects.find((item) => item.id === "master-agent"); assert.ok(project?.messages.some((message) => message.senderLabel === "Telegram · Kris")); } finally { globalThis.fetch = originalFetch; } }); test("Telegram webhook 对需要排队的消息会记录 externalReplyTarget,任务完成后自动回推 Telegram", async () => { await setup(); const originalFetch = globalThis.fetch; const outboundCalls: Array<{ url: string; body: unknown }> = []; globalThis.fetch = (async (input, init) => { const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; if (url === "https://api.telegram.org/botbot-token-demo/sendMessage") { outboundCalls.push({ url, body: JSON.parse(String(init?.body ?? "{}")), }); return new Response(JSON.stringify({ ok: true, result: { message_id: 9002 } }), { status: 200, headers: { "content-type": "application/json" }, }); } throw new Error(`unexpected fetch: ${url}`); }) as typeof fetch; try { const webhookResponse = await handleTelegramWebhookRequest({ request: new NextRequest("http://127.0.0.1:3000/api/v1/integrations/telegram/webhook", { method: "POST", headers: { "content-type": "application/json", "x-telegram-bot-api-secret-token": "boss-telegram-secret", }, body: JSON.stringify({ update_id: 1003, message: { message_id: 303, date: 1_761_000_002, chat: { id: 123456, type: "private" }, from: { id: 123456, is_bot: false, first_name: "Kris" }, text: "请深入分析当前项目并给出迁移方案", }, }), }), }); assert.equal(webhookResponse.status, 200); const webhookPayload = (await webhookResponse.json()) as { ok: boolean; delivery: string; taskId?: string }; assert.equal(webhookPayload.ok, true); assert.equal(webhookPayload.delivery, "queued"); assert.ok(webhookPayload.taskId); const queuedState = await readState(); const task = queuedState.masterAgentTasks.find((item) => item.taskId === webhookPayload.taskId); assert.ok(task, "expected queued task"); assert.equal(task?.externalReplyTarget?.provider, "telegram"); assert.equal(task?.externalReplyTarget?.chatId, "123456"); const completeResponse = await completeTaskRoute( new NextRequest(`http://127.0.0.1:3000/api/v1/master-agent/tasks/${task?.taskId}/complete`, { method: "POST", headers: { "content-type": "application/json", "x-boss-device-token": "boss-mac-studio-token", }, body: JSON.stringify({ deviceId: "mac-studio", status: "completed", replyBody: "已经整理好迁移方案,稍后我会按模块推进。", }), }), { params: Promise.resolve({ taskId: task!.taskId }) }, ); assert.equal(completeResponse.status, 200); await waitForCondition(() => outboundCalls.length >= 2); assert.equal(outboundCalls.length, 2); assert.match(String((outboundCalls[1]?.body as { text: string }).text), /已经整理好迁移方案/); let completedState = await readState(); let completedTask = completedState.masterAgentTasks.find((item) => item.taskId === task?.taskId); await waitForCondition(async () => { completedState = await readState(); completedTask = completedState.masterAgentTasks.find((item) => item.taskId === task?.taskId); return completedTask?.externalReplyTarget?.deliveredAt?.includes("T") === true; }); assert.equal(completedTask?.externalReplyTarget?.deliveredAt?.includes("T"), true); } finally { globalThis.fetch = originalFetch; } }); test("Telegram 群聊在 requireMentionInGroups 开启时,没有 @Bot 不允许进入主 Agent", async () => { await setup(); const state = await readState(); state.telegramIntegration = { ...state.telegramIntegration!, botUsername: "boss_demo_bot", groupPolicy: "allowlist", groups: ["-100200300"], requireMentionInGroups: true, }; await writeState(state); const response = await handleTelegramWebhookRequest({ request: new NextRequest("http://127.0.0.1:3000/api/v1/integrations/telegram/webhook", { method: "POST", headers: { "content-type": "application/json", "x-telegram-bot-api-secret-token": "boss-telegram-secret", }, body: JSON.stringify({ update_id: 1004, message: { message_id: 304, date: 1_761_000_003, chat: { id: -100200300, type: "supergroup", title: "Boss 协作群" }, from: { id: 123456, is_bot: false, first_name: "Kris" }, text: "请总结一下今天进展", }, }), }), }); assert.equal(response.status, 400); const payload = (await response.json()) as { ok: boolean; message: string }; assert.equal(payload.ok, false); assert.equal(payload.message, "TELEGRAM_GROUP_MENTION_REQUIRED"); }); test("Telegram 群聊命中 @Bot 时会清洗 mention 后再进入主 Agent", async () => { await setup(); const state = await readState(); state.telegramIntegration = { ...state.telegramIntegration!, botUsername: "boss_demo_bot", groupPolicy: "allowlist", groups: ["-100200300"], requireMentionInGroups: true, }; await writeState(state); const originalFetch = globalThis.fetch; const outboundCalls: Array<{ url: string; body: unknown }> = []; globalThis.fetch = (async (input, init) => { const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; if (url === "https://api.telegram.org/botbot-token-demo/sendMessage") { outboundCalls.push({ url, body: JSON.parse(String(init?.body ?? "{}")), }); return new Response(JSON.stringify({ ok: true, result: { message_id: 9003 } }), { status: 200, headers: { "content-type": "application/json" }, }); } throw new Error(`unexpected fetch: ${url}`); }) as typeof fetch; try { const response = await handleTelegramWebhookRequest({ request: new NextRequest("http://127.0.0.1:3000/api/v1/integrations/telegram/webhook", { method: "POST", headers: { "content-type": "application/json", "x-telegram-bot-api-secret-token": "boss-telegram-secret", }, body: JSON.stringify({ update_id: 1005, message: { message_id: 305, date: 1_761_000_004, chat: { id: -100200300, type: "supergroup", title: "Boss 协作群" }, from: { id: 123456, is_bot: false, first_name: "Kris" }, text: "@boss_demo_bot hello", }, }), }), }); assert.equal(response.status, 200); assert.equal(outboundCalls.length, 1); const nextState = await readState(); const project = nextState.projects.find((item) => item.id === "master-agent"); const telegramMessage = project?.messages.find((item) => item.senderLabel === "Telegram · Boss 协作群 · Kris"); assert.equal(telegramMessage?.body, "hello"); } finally { globalThis.fetch = originalFetch; } }); test("Telegram 群聊回复 Bot 上一条消息时,即使没有 @Bot 也允许进入主 Agent", async () => { await setup(); const state = await readState(); state.telegramIntegration = { ...state.telegramIntegration!, botUsername: "boss_demo_bot", groupPolicy: "allowlist", groups: ["-100200300"], requireMentionInGroups: true, }; await writeState(state); const originalFetch = globalThis.fetch; const outboundCalls: Array<{ url: string; body: unknown }> = []; globalThis.fetch = (async (input, init) => { const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; if (url === "https://api.telegram.org/botbot-token-demo/sendMessage") { outboundCalls.push({ url, body: JSON.parse(String(init?.body ?? "{}")), }); return new Response(JSON.stringify({ ok: true, result: { message_id: 9004 } }), { status: 200, headers: { "content-type": "application/json" }, }); } throw new Error(`unexpected fetch: ${url}`); }) as typeof fetch; try { const response = await handleTelegramWebhookRequest({ request: new NextRequest("http://127.0.0.1:3000/api/v1/integrations/telegram/webhook", { method: "POST", headers: { "content-type": "application/json", "x-telegram-bot-api-secret-token": "boss-telegram-secret", }, body: JSON.stringify({ update_id: 1006, message: { message_id: 306, date: 1_761_000_005, chat: { id: -100200300, type: "supergroup", title: "Boss 协作群" }, from: { id: 123456, is_bot: false, first_name: "Kris" }, reply_to_message: { message_id: 299, date: 1_761_000_000, chat: { id: -100200300, type: "supergroup", title: "Boss 协作群" }, from: { id: 777888, is_bot: true, username: "boss_demo_bot", first_name: "Boss" }, text: "上一条 bot 回复", }, text: "继续展开这个方案", }, }), }), }); assert.equal(response.status, 200); assert.equal(outboundCalls.length, 1); const nextState = await readState(); const project = nextState.projects.find((item) => item.id === "master-agent"); const telegramMessage = project?.messages.find((item) => item.body === "继续展开这个方案"); assert.equal(telegramMessage?.senderLabel, "Telegram · Boss 协作群 · Kris"); } finally { globalThis.fetch = originalFetch; } }); test("Telegram 群聊可按 chat id 路由到指定 Boss 项目", async () => { await setup(); const state = await readState(); state.telegramIntegration = { ...state.telegramIntegration!, botUsername: "boss_demo_bot", groupPolicy: "allowlist", groups: ["-100200300"], requireMentionInGroups: true, defaultProjectId: "master-agent", groupProjectRoutes: [ { chatId: "-100200300", projectId: "audit-collab", label: "审计 Telegram 群", }, ], }; await writeState(state); const originalFetch = globalThis.fetch; globalThis.fetch = (async (input) => { const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; if (url === "https://api.telegram.org/botbot-token-demo/sendMessage") { return new Response(JSON.stringify({ ok: true, result: { message_id: 9005 } }), { status: 200, headers: { "content-type": "application/json" }, }); } throw new Error(`unexpected fetch: ${url}`); }) as typeof fetch; try { const response = await handleTelegramWebhookRequest({ request: new NextRequest("http://127.0.0.1:3000/api/v1/integrations/telegram/webhook", { method: "POST", headers: { "content-type": "application/json", "x-telegram-bot-api-secret-token": "boss-telegram-secret", }, body: JSON.stringify({ update_id: 1007, message: { message_id: 307, date: 1_761_000_006, chat: { id: -100200300, type: "supergroup", title: "Boss 协作群" }, from: { id: 123456, is_bot: false, first_name: "Kris" }, text: "@boss_demo_bot 汇总审计群今天的风险", }, }), }), }); assert.equal(response.status, 200); const nextState = await readState(); const masterProject = nextState.projects.find((item) => item.id === "master-agent"); const auditProject = nextState.projects.find((item) => item.id === "audit-collab"); assert.equal( masterProject?.messages.some((message) => message.body === "汇总审计群今天的风险"), false, ); assert.equal( auditProject?.messages.some((message) => message.body === "汇总审计群今天的风险"), true, ); } finally { globalThis.fetch = originalFetch; } });