Files
boss/src/lib/telegram-gateway.ts

628 lines
21 KiB
TypeScript

import { NextRequest } from "next/server";
import { jsonNoStore } from "@/lib/api-response";
import {
appendProjectMessages,
getAuthSession,
type AuthRole,
type AuthSession,
type BossState,
type ExternalReplyTarget,
type TelegramGroupProjectRoute,
type TelegramIntegrationState,
readState,
writeState,
} from "@/lib/boss-data";
import { replyToMasterAgentUserMessage, tryBuildLocalMasterAgentFastReply } from "@/lib/boss-master-agent";
const TELEGRAM_API_BASE = "https://api.telegram.org";
const TELEGRAM_TEXT_LIMIT = 4000;
const TELEGRAM_BRIDGE_ACCOUNT = "telegram-bridge";
const TELEGRAM_BRIDGE_LABEL = "Telegram";
type TelegramChatType = "private" | "group" | "supergroup" | "channel";
interface TelegramUser {
id: number;
is_bot?: boolean;
first_name?: string;
last_name?: string;
username?: string;
}
interface TelegramChat {
id: number;
type: TelegramChatType;
title?: string;
}
interface TelegramMessage {
message_id: number;
message_thread_id?: number;
date: number;
chat: TelegramChat;
from?: TelegramUser;
text?: string;
reply_to_message?: {
from?: TelegramUser;
};
}
interface TelegramUpdate {
update_id: number;
message?: TelegramMessage;
}
interface NormalizedTelegramMessage {
updateId: number;
messageId: number;
chatId: string;
chatType: TelegramChatType;
chatTitle?: string;
threadId?: number;
senderId?: string;
senderName: string;
text: string;
repliedToBot: boolean;
repliedToBotUsername?: string;
}
interface TelegramGatewayView {
enabled: boolean;
mode: "webhook" | "polling";
botTokenConfigured: boolean;
botUsername?: string;
dmPolicy: "allowlist" | "open" | "disabled";
allowFrom: string[];
groupPolicy: "allowlist" | "open" | "disabled";
groups: string[];
requireMentionInGroups: boolean;
defaultProjectId: string;
groupProjectRoutes: TelegramGroupProjectRoute[];
webhookSecretConfigured: boolean;
webhookUrl?: string;
lastConfiguredAt?: string;
lastConfiguredBy?: string;
lastError?: string;
processedUpdateCount: number;
}
function trimToDefined(value?: string | null) {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}
function toSenderName(message: TelegramMessage) {
const firstName = trimToDefined(message.from?.first_name);
const lastName = trimToDefined(message.from?.last_name);
const username = trimToDefined(message.from?.username);
return [firstName, lastName].filter(Boolean).join(" ") || username || `用户${message.from?.id ?? "未知"}`;
}
function normalizeTelegramUpdate(update: TelegramUpdate): NormalizedTelegramMessage | null {
const message = update.message;
const text = trimToDefined(message?.text);
if (!message || !text) {
return null;
}
return {
updateId: update.update_id,
messageId: message.message_id,
chatId: String(message.chat.id),
chatType: message.chat.type,
chatTitle: trimToDefined(message.chat.title),
threadId: typeof message.message_thread_id === "number" ? message.message_thread_id : undefined,
senderId: message.from?.id != null ? String(message.from.id) : undefined,
senderName: toSenderName(message),
text,
repliedToBot: message.reply_to_message?.from?.is_bot === true,
repliedToBotUsername: trimToDefined(message.reply_to_message?.from?.username),
};
}
function buildTelegramSessionKey(message: NormalizedTelegramMessage) {
if (message.chatType === "private") {
return `telegram:dm:${message.chatId}`;
}
if (message.threadId != null) {
return `telegram:group:${message.chatId}:topic:${message.threadId}`;
}
return `telegram:group:${message.chatId}`;
}
function chunkTelegramText(text: string) {
if (text.length <= TELEGRAM_TEXT_LIMIT) {
return [text];
}
const chunks: string[] = [];
let remaining = text.trim();
while (remaining.length > TELEGRAM_TEXT_LIMIT) {
let index = remaining.lastIndexOf("\n", TELEGRAM_TEXT_LIMIT);
if (index < Math.floor(TELEGRAM_TEXT_LIMIT * 0.6)) {
index = TELEGRAM_TEXT_LIMIT;
}
chunks.push(remaining.slice(0, index).trim());
remaining = remaining.slice(index).trim();
}
if (remaining) {
chunks.push(remaining);
}
return chunks;
}
function escapeRegExp(value: string) {
return value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
function resolveTelegramMentionRegex(botUsername?: string) {
const username = trimToDefined(botUsername);
if (!username) {
return null;
}
return new RegExp(`(^|\\s)@${escapeRegExp(username)}\\b`, "ig");
}
function hasTelegramBotMention(text: string, botUsername?: string) {
const mentionRegex = resolveTelegramMentionRegex(botUsername);
if (!mentionRegex) {
return /@\S+/.test(text);
}
return mentionRegex.test(text);
}
function isReplyToConfiguredTelegramBot(message: NormalizedTelegramMessage, botUsername?: string) {
if (!message.repliedToBot) {
return false;
}
const expectedUsername = trimToDefined(botUsername)?.toLowerCase();
if (!expectedUsername) {
return true;
}
return message.repliedToBotUsername?.toLowerCase() === expectedUsername;
}
function stripTelegramBotMention(text: string, botUsername?: string) {
const mentionRegex = resolveTelegramMentionRegex(botUsername);
if (!mentionRegex) {
return text.trim();
}
return text.replace(mentionRegex, " ").replace(/\s+/g, " ").trim();
}
function ensureTelegramIntegration(state: BossState): TelegramIntegrationState {
const integration = state.telegramIntegration;
if (integration) {
integration.groupProjectRoutes ??= [];
return integration;
}
const next: TelegramIntegrationState = {
enabled: false,
mode: "webhook",
dmPolicy: "allowlist",
allowFrom: [],
groupPolicy: "allowlist",
groups: [],
requireMentionInGroups: true,
defaultProjectId: "master-agent",
groupProjectRoutes: [],
processedUpdateIds: [],
};
state.telegramIntegration = next;
return next;
}
function getTelegramConfigView(integration: TelegramIntegrationState | undefined): TelegramGatewayView {
const config = integration ?? {
enabled: false,
mode: "webhook",
dmPolicy: "allowlist",
allowFrom: [],
groupPolicy: "allowlist",
groups: [],
requireMentionInGroups: true,
defaultProjectId: "master-agent",
groupProjectRoutes: [],
processedUpdateIds: [],
};
return {
enabled: config.enabled,
mode: config.mode,
botTokenConfigured: Boolean(trimToDefined(config.botToken)),
botUsername: trimToDefined(config.botUsername),
dmPolicy: config.dmPolicy,
allowFrom: config.allowFrom,
groupPolicy: config.groupPolicy,
groups: config.groups,
requireMentionInGroups: config.requireMentionInGroups,
defaultProjectId: config.defaultProjectId,
groupProjectRoutes: config.groupProjectRoutes ?? [],
webhookSecretConfigured: Boolean(trimToDefined(config.webhookSecret)),
webhookUrl: trimToDefined(config.webhookUrl),
lastConfiguredAt: trimToDefined(config.lastConfiguredAt),
lastConfiguredBy: trimToDefined(config.lastConfiguredBy),
lastError: trimToDefined(config.lastError),
processedUpdateCount: config.processedUpdateIds.length,
};
}
function checkTelegramAccess(
config: TelegramIntegrationState,
message: NormalizedTelegramMessage,
): { ok: true } | { ok: false; message: string; status: number } {
if (!config.enabled) {
return { ok: false, message: "TELEGRAM_DISABLED", status: 403 };
}
if (message.chatType === "private") {
if (config.dmPolicy === "disabled") {
return { ok: false, message: "TELEGRAM_DM_DISABLED", status: 403 };
}
if (config.dmPolicy === "allowlist") {
if (!message.senderId || !config.allowFrom.includes(message.senderId)) {
return { ok: false, message: "TELEGRAM_SENDER_FORBIDDEN", status: 403 };
}
}
return { ok: true };
}
if (config.groupPolicy === "disabled") {
return { ok: false, message: "TELEGRAM_GROUP_DISABLED", status: 403 };
}
if (config.groupPolicy === "allowlist" && !config.groups.includes(message.chatId)) {
return { ok: false, message: "TELEGRAM_GROUP_FORBIDDEN", status: 403 };
}
if (
config.requireMentionInGroups &&
!hasTelegramBotMention(message.text, config.botUsername) &&
!isReplyToConfiguredTelegramBot(message, config.botUsername)
) {
return { ok: false, message: "TELEGRAM_GROUP_MENTION_REQUIRED", status: 400 };
}
return { ok: true };
}
function buildTelegramSenderLabel(message: NormalizedTelegramMessage) {
if (message.chatType === "private") {
return `Telegram · ${message.senderName}`;
}
const groupLabel = message.chatTitle || message.chatId;
return `Telegram · ${groupLabel} · ${message.senderName}`;
}
function resolveTelegramTargetProjectId(
state: BossState,
config: TelegramIntegrationState,
message: NormalizedTelegramMessage,
) {
const routes = config.groupProjectRoutes ?? [];
const exactRoute = routes.find(
(route) => route.chatId === message.chatId && route.threadId != null && route.threadId === message.threadId,
);
const chatRoute =
exactRoute ??
routes.find((route) => route.chatId === message.chatId && route.threadId == null);
const requestedProjectId = chatRoute?.projectId || config.defaultProjectId || "master-agent";
const projectExists = state.projects.some((project) => project.id === requestedProjectId);
if (projectExists) {
return requestedProjectId;
}
if (state.projects.some((project) => project.id === config.defaultProjectId)) {
return config.defaultProjectId;
}
return "master-agent";
}
function markTelegramUpdateProcessed(state: BossState, updateId: number) {
const integration = ensureTelegramIntegration(state);
integration.processedUpdateIds = Array.from(new Set([...integration.processedUpdateIds, updateId]))
.sort((left, right) => left - right)
.slice(-256);
}
async function persistTelegramUpdateProcessed(updateId: number) {
const state = await readState();
markTelegramUpdateProcessed(state, updateId);
await writeState(state);
}
function hasProcessedTelegramUpdate(state: BossState, updateId: number) {
return ensureTelegramIntegration(state).processedUpdateIds.includes(updateId);
}
async function sendTelegramMessage(config: TelegramIntegrationState, target: ExternalReplyTarget, text: string) {
const botToken = trimToDefined(config.botToken);
if (!botToken) {
throw new Error("TELEGRAM_BOT_TOKEN_REQUIRED");
}
const chunks = chunkTelegramText(text);
for (const chunk of chunks) {
const body: Record<string, unknown> = {
chat_id: Number(target.chatId),
text: chunk,
};
if (typeof target.messageId === "number") {
body.reply_to_message_id = target.messageId;
}
if (typeof target.threadId === "number") {
body.message_thread_id = target.threadId;
}
const response = await fetch(`${TELEGRAM_API_BASE}/bot${botToken}/sendMessage`, {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify(body),
});
if (!response.ok) {
throw new Error(`TELEGRAM_SEND_FAILED_${response.status}`);
}
}
}
function buildTelegramBridgeSession(): AuthSession {
const now = new Date().toISOString();
return {
sessionId: "telegram-bridge-session",
sessionToken: "telegram-bridge-session",
restoreToken: "telegram-bridge-session",
account: TELEGRAM_BRIDGE_ACCOUNT,
role: "highest_admin" satisfies AuthRole,
displayName: TELEGRAM_BRIDGE_LABEL,
loginMethod: "password",
createdAt: now,
expiresAt: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000).toISOString(),
lastSeenAt: now,
};
}
export async function handleTelegramWebhookRequest(params: { request: NextRequest }) {
const state = await readState();
const integration = ensureTelegramIntegration(state);
const secret = trimToDefined(integration.webhookSecret);
if (secret) {
const received = trimToDefined(params.request.headers.get("x-telegram-bot-api-secret-token"));
if (received !== secret) {
return jsonNoStore({ ok: false, message: "TELEGRAM_WEBHOOK_SECRET_INVALID" }, { status: 401 });
}
}
const update = (await params.request.json().catch(() => null)) as TelegramUpdate | null;
const normalized = update ? normalizeTelegramUpdate(update) : null;
if (!update || !normalized) {
return jsonNoStore({ ok: true, delivery: "ignored" });
}
if (hasProcessedTelegramUpdate(state, normalized.updateId)) {
return jsonNoStore({ ok: true, delivery: "duplicate" });
}
const access = checkTelegramAccess(integration, normalized);
if (!access.ok) {
return jsonNoStore({ ok: false, message: access.message }, { status: access.status });
}
const replyTarget: ExternalReplyTarget = {
provider: "telegram",
chatId: normalized.chatId,
messageId: normalized.messageId,
threadId: normalized.threadId,
sessionKey: buildTelegramSessionKey(normalized),
};
const bridgeSession = buildTelegramBridgeSession();
const bridgeAccount = state.user.account || bridgeSession.account;
const projectId = resolveTelegramTargetProjectId(state, integration, normalized);
const requestText =
normalized.chatType === "private"
? normalized.text
: stripTelegramBotMention(normalized.text, integration.botUsername);
const localFastReply = await tryBuildLocalMasterAgentFastReply({
requestText,
requestedByAccount: bridgeSession.account,
projectId,
state,
});
if (localFastReply) {
await appendProjectMessages({
projectId,
messages: [
{
senderLabel: buildTelegramSenderLabel(normalized),
body: requestText,
kind: "text",
},
{
sender: "master",
senderLabel: localFastReply.senderLabel,
body: localFastReply.replyBody,
kind: "text",
},
],
});
await persistTelegramUpdateProcessed(normalized.updateId);
await sendTelegramMessage(integration, replyTarget, localFastReply.replyBody);
return jsonNoStore({ ok: true, delivery: "sent" });
}
const [message] = await appendProjectMessages({
projectId,
messages: [
{
senderLabel: buildTelegramSenderLabel(normalized),
body: requestText,
kind: "text",
},
],
});
const reply = await replyToMasterAgentUserMessage({
requestMessageId: message.id,
requestText,
requestedBy: buildTelegramSenderLabel(normalized),
requestedByAccount: bridgeAccount,
currentSessionExpiresAt: bridgeSession.expiresAt,
projectId,
mode: "smart",
externalReplyTarget: replyTarget,
});
await persistTelegramUpdateProcessed(normalized.updateId);
const replyState = "masterReplyState" in reply ? reply.masterReplyState : undefined;
if (reply.ok && replyState === "completed" && "replyMessage" in reply && reply.replyMessage?.body) {
await sendTelegramMessage(integration, replyTarget, reply.replyMessage.body);
return jsonNoStore({ ok: true, delivery: "sent", taskId: "taskId" in reply ? reply.taskId : undefined });
}
await sendTelegramMessage(integration, replyTarget, "已收到,我先继续协调,整理好结果后会自动回到这里。");
return jsonNoStore({ ok: true, delivery: "queued", taskId: "taskId" in reply ? reply.taskId : undefined });
}
export async function deliverTelegramReplyForCompletedTask(taskId: string) {
const state = await readState();
const integration = ensureTelegramIntegration(state);
const task = state.masterAgentTasks.find((item) => item.taskId === taskId);
if (!task?.externalReplyTarget || task.externalReplyTarget.provider !== "telegram") {
return { delivered: false, reason: "NO_EXTERNAL_TARGET" as const };
}
if (!task.replyBody?.trim()) {
return { delivered: false, reason: "NO_REPLY_BODY" as const };
}
if (task.externalReplyTarget.deliveredAt) {
return { delivered: false, reason: "ALREADY_DELIVERED" as const };
}
try {
await sendTelegramMessage(integration, task.externalReplyTarget, task.replyBody);
} catch (error) {
task.externalReplyTarget.deliveryError = error instanceof Error ? error.message : "TELEGRAM_DELIVERY_FAILED";
await writeState(state);
return { delivered: false, reason: "SEND_FAILED" as const };
}
task.externalReplyTarget.deliveredAt = new Date().toISOString();
task.externalReplyTarget.deliveryError = undefined;
await writeState(state);
return { delivered: true as const };
}
export async function getTelegramIntegrationView() {
const state = await readState();
return getTelegramConfigView(state.telegramIntegration);
}
export async function saveTelegramIntegrationConfig(input: {
enabled: boolean;
mode?: "webhook" | "polling";
botToken?: string;
botUsername?: string;
dmPolicy?: "allowlist" | "open" | "disabled";
allowFrom?: string[];
groupPolicy?: "allowlist" | "open" | "disabled";
groups?: string[];
requireMentionInGroups?: boolean;
defaultProjectId?: string;
groupProjectRoutes?: TelegramGroupProjectRoute[];
webhookSecret?: string;
webhookUrl?: string;
configuredBy: string;
}) {
const state = await readState();
const integration = ensureTelegramIntegration(state);
integration.enabled = input.enabled;
integration.mode = input.mode ?? integration.mode;
integration.botToken = trimToDefined(input.botToken) ?? integration.botToken;
integration.botUsername = trimToDefined(input.botUsername);
integration.dmPolicy = input.dmPolicy ?? integration.dmPolicy;
integration.allowFrom = (input.allowFrom ?? integration.allowFrom).map((item) => item.trim()).filter(Boolean);
integration.groupPolicy = input.groupPolicy ?? integration.groupPolicy;
integration.groups = (input.groups ?? integration.groups).map((item) => item.trim()).filter(Boolean);
integration.requireMentionInGroups =
input.requireMentionInGroups ?? integration.requireMentionInGroups;
integration.defaultProjectId = trimToDefined(input.defaultProjectId) ?? integration.defaultProjectId;
integration.groupProjectRoutes = (input.groupProjectRoutes ?? integration.groupProjectRoutes)
.map((route) => ({
chatId: String(route.chatId ?? "").trim(),
threadId: typeof route.threadId === "number" ? route.threadId : undefined,
projectId: String(route.projectId ?? "").trim(),
label: trimToDefined(route.label),
}))
.filter((route) => route.chatId && route.projectId);
integration.webhookSecret = trimToDefined(input.webhookSecret) ?? integration.webhookSecret;
integration.webhookUrl = trimToDefined(input.webhookUrl);
integration.lastConfiguredAt = new Date().toISOString();
integration.lastConfiguredBy = input.configuredBy;
integration.lastError = undefined;
await writeState(state);
return getTelegramConfigView(integration);
}
export async function probeTelegramBot(config?: TelegramIntegrationState | null) {
const integration = config ?? (await readState()).telegramIntegration;
const botToken = trimToDefined(integration?.botToken);
if (!botToken) {
throw new Error("TELEGRAM_BOT_TOKEN_REQUIRED");
}
const response = await fetch(`${TELEGRAM_API_BASE}/bot${botToken}/getMe`);
if (!response.ok) {
throw new Error(`TELEGRAM_GET_ME_FAILED_${response.status}`);
}
const payload = (await response.json()) as { ok?: boolean; result?: { username?: string } };
return {
ok: payload.ok === true,
username: trimToDefined(payload.result?.username),
};
}
export async function syncTelegramWebhookRegistration(config?: TelegramIntegrationState | null) {
const integration = config ?? (await readState()).telegramIntegration;
const botToken = trimToDefined(integration?.botToken);
if (!integration?.enabled || integration.mode === "polling") {
if (!botToken) {
return { ok: true as const, action: "skipped" as const, reason: "BOT_TOKEN_NOT_CONFIGURED" };
}
const response = await fetch(`${TELEGRAM_API_BASE}/bot${botToken}/deleteWebhook`, {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify({ drop_pending_updates: false }),
});
if (!response.ok) {
throw new Error(`TELEGRAM_DELETE_WEBHOOK_FAILED_${response.status}`);
}
return { ok: true as const, action: "delete_webhook" as const };
}
if (!botToken) {
return { ok: true as const, action: "skipped" as const, reason: "BOT_TOKEN_NOT_CONFIGURED" };
}
const webhookUrl = trimToDefined(integration.webhookUrl);
if (!webhookUrl) {
return { ok: true as const, action: "skipped" as const, reason: "WEBHOOK_URL_NOT_CONFIGURED" };
}
const body: Record<string, unknown> = {
url: webhookUrl,
drop_pending_updates: false,
};
const secret = trimToDefined(integration.webhookSecret);
if (secret) {
body.secret_token = secret;
}
const response = await fetch(`${TELEGRAM_API_BASE}/bot${botToken}/setWebhook`, {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify(body),
});
if (!response.ok) {
throw new Error(`TELEGRAM_SET_WEBHOOK_FAILED_${response.status}`);
}
return { ok: true as const, action: "set_webhook" as const };
}
export async function getAuthorizedTelegramConfigSession(request: NextRequest) {
const session = await getAuthSession(request.cookies.get("boss_session")?.value);
if (!session || session.role !== "highest_admin") {
return null;
}
return session;
}