diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index 642a7bed50..ac364f47b8 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -447,9 +447,13 @@ "start_message": "Hello, I'm AstrBot!", "telegram_api_base_url": "https://api.telegram.org/bot", "telegram_file_base_url": "https://api.telegram.org/file/bot", + "telegram_proxy": "", + "telegram_get_updates_proxy": "", "telegram_command_register": True, "telegram_command_auto_refresh": True, "telegram_command_register_interval": 300, + "telegram_command_registered_plugins": [], + "telegram_command_scopes": [{"type": "default"}], "telegram_polling_restart_delay": 5.0, }, "Discord": { @@ -765,6 +769,28 @@ "type": "int", "hint": "Telegram 命令自动刷新间隔,单位为秒。", }, + "telegram_command_registered_plugins": { + "description": "Telegram 命令注册插件", + "type": "list", + "items": {"type": "string"}, + "hint": "只注册所选插件提供的指令,填写插件名、展示名、目录名或模块路径。留空表示注册全部已启用插件的指令。", + }, + "telegram_command_scopes": { + "description": "Telegram 命令注册范围", + "type": "list", + "items": {"type": "object"}, + "hint": '每项可填写 Telegram BotCommandScope 配置,如 {"type":"default","language_code":"zh"} 或 {"type":"chat","chat_id":12345}。', + }, + "telegram_proxy": { + "description": "Telegram API 代理", + "type": "string", + "hint": "仅用于此 Telegram 适配器的 Bot API 请求代理,例如 http://127.0.0.1:7890。留空则使用默认网络设置。", + }, + "telegram_get_updates_proxy": { + "description": "Telegram getUpdates 代理", + "type": "string", + "hint": "仅用于此 Telegram 适配器轮询 getUpdates 的代理。留空时与 Bot API 请求使用相同网络设置。", + }, "telegram_polling_restart_delay": { "description": "Telegram 轮询重启延迟", "type": "float", diff --git a/astrbot/core/platform/sources/telegram/components.py b/astrbot/core/platform/sources/telegram/components.py new file mode 100644 index 0000000000..66aa835bc3 --- /dev/null +++ b/astrbot/core/platform/sources/telegram/components.py @@ -0,0 +1,209 @@ +from typing import Any + +from telegram import ( + CallbackGame, + CopyTextButton, + InlineKeyboardButton, + InlineKeyboardMarkup, + LinkPreviewOptions, + LoginUrl, + SwitchInlineQueryChosenChat, + WebAppInfo, +) + +from astrbot.api.message_components import BaseMessageComponent + +TELEGRAM_CALLBACK_DATA_MAX_BYTES = 64 + + +class TelegramInlineButton(BaseMessageComponent): + """Telegram inline keyboard button component.""" + + type: str = "telegram_inline_button" + text: str + url: str | None = None + callback_data: str | None = None + login_url: Any = None + web_app: Any = None + switch_inline_query: str | None = None + switch_inline_query_current_chat: str | None = None + switch_inline_query_chosen_chat: Any = None + copy_text: Any = None + callback_game: Any = None + pay: bool | None = None + style: str | None = None + icon_custom_emoji_id: str | None = None + + def __init__( + self, + text: str, + *, + url: str | None = None, + callback_data: str | None = None, + login_url: LoginUrl | str | None = None, + web_app: WebAppInfo | str | None = None, + switch_inline_query: str | None = None, + switch_inline_query_current_chat: str | None = None, + switch_inline_query_chosen_chat: SwitchInlineQueryChosenChat + | dict + | None = None, + copy_text: CopyTextButton | str | None = None, + callback_game: CallbackGame | None = None, + pay: bool | None = None, + style: str | None = None, + icon_custom_emoji_id: str | None = None, + ) -> None: + super().__init__( + text=text, + url=url, + callback_data=callback_data, + login_url=login_url, + web_app=web_app, + switch_inline_query=switch_inline_query, + switch_inline_query_current_chat=switch_inline_query_current_chat, + switch_inline_query_chosen_chat=switch_inline_query_chosen_chat, + copy_text=copy_text, + callback_game=callback_game, + pay=pay, + style=style, + icon_custom_emoji_id=icon_custom_emoji_id, + ) + self._validate_action() + + def _action_values(self) -> dict[str, Any]: + return { + "url": self.url, + "callback_data": self.callback_data, + "login_url": self.login_url, + "web_app": self.web_app, + "switch_inline_query": self.switch_inline_query, + "switch_inline_query_current_chat": self.switch_inline_query_current_chat, + "switch_inline_query_chosen_chat": self.switch_inline_query_chosen_chat, + "copy_text": self.copy_text, + "callback_game": self.callback_game, + "pay": self.pay, + } + + def _validate_action(self) -> None: + actions = { + key: value + for key, value in self._action_values().items() + if value is not None and value is not False + } + if len(actions) != 1: + raise ValueError( + "Telegram inline button requires exactly one optional action.", + ) + + if self.callback_data is None: + return + + callback_data_length = len(self.callback_data.encode("utf-8")) + if not 1 <= callback_data_length <= TELEGRAM_CALLBACK_DATA_MAX_BYTES: + raise ValueError( + "Telegram inline button callback_data must be 1-64 UTF-8 bytes.", + ) + + def to_telegram_button(self) -> InlineKeyboardButton: + payload = self._action_values() + if isinstance(self.login_url, str): + payload["login_url"] = LoginUrl(self.login_url) + if isinstance(self.web_app, str): + payload["web_app"] = WebAppInfo(self.web_app) + if isinstance(self.switch_inline_query_chosen_chat, dict): + payload["switch_inline_query_chosen_chat"] = SwitchInlineQueryChosenChat( + **self.switch_inline_query_chosen_chat, + ) + if isinstance(self.copy_text, str): + payload["copy_text"] = CopyTextButton(self.copy_text) + + button_payload = { + key: value for key, value in payload.items() if value is not None + } + if self.style is not None: + button_payload["style"] = self.style + if self.icon_custom_emoji_id is not None: + button_payload["icon_custom_emoji_id"] = self.icon_custom_emoji_id + + return InlineKeyboardButton( + text=self.text, + **button_payload, + ) + + +class TelegramInlineKeyboard(BaseMessageComponent): + """Telegram inline keyboard component.""" + + type: str = "telegram_inline_keyboard" + rows: list[list[Any]] + + def __init__( + self, + rows: list[list[TelegramInlineButton | InlineKeyboardButton]], + ) -> None: + super().__init__(rows=rows) + + def to_telegram_markup(self) -> InlineKeyboardMarkup: + keyboard: list[list[InlineKeyboardButton]] = [] + for row in self.rows: + keyboard.append( + [ + button.to_telegram_button() + if isinstance(button, TelegramInlineButton) + else button + for button in row + ], + ) + return InlineKeyboardMarkup(keyboard) + + +class TelegramMessageOptions(BaseMessageComponent): + """Telegram send options for text rendering and link previews.""" + + type: str = "telegram_message_options" + parse_mode: str | None = None + link_preview_is_disabled: bool | None = None + link_preview_url: str | None = None + link_preview_prefer_small_media: bool | None = None + link_preview_prefer_large_media: bool | None = None + link_preview_show_above_text: bool | None = None + + def __init__( + self, + *, + parse_mode: str | None = None, + link_preview_is_disabled: bool | None = None, + link_preview_url: str | None = None, + link_preview_prefer_small_media: bool | None = None, + link_preview_prefer_large_media: bool | None = None, + link_preview_show_above_text: bool | None = None, + ) -> None: + super().__init__( + parse_mode=parse_mode, + link_preview_is_disabled=link_preview_is_disabled, + link_preview_url=link_preview_url, + link_preview_prefer_small_media=link_preview_prefer_small_media, + link_preview_prefer_large_media=link_preview_prefer_large_media, + link_preview_show_above_text=link_preview_show_above_text, + ) + + def to_link_preview_options(self) -> LinkPreviewOptions | None: + if not any( + value is not None + for value in ( + self.link_preview_is_disabled, + self.link_preview_url, + self.link_preview_prefer_small_media, + self.link_preview_prefer_large_media, + self.link_preview_show_above_text, + ) + ): + return None + + return LinkPreviewOptions( + is_disabled=self.link_preview_is_disabled, + url=self.link_preview_url, + prefer_small_media=self.link_preview_prefer_small_media, + prefer_large_media=self.link_preview_prefer_large_media, + show_above_text=self.link_preview_show_above_text, + ) diff --git a/astrbot/core/platform/sources/telegram/tg_adapter.py b/astrbot/core/platform/sources/telegram/tg_adapter.py index 76863a1949..b39e354819 100644 --- a/astrbot/core/platform/sources/telegram/tg_adapter.py +++ b/astrbot/core/platform/sources/telegram/tg_adapter.py @@ -4,14 +4,25 @@ import sys import uuid from contextlib import suppress -from typing import cast +from typing import Any, cast from apscheduler.events import EVENT_JOB_ERROR from apscheduler.schedulers.asyncio import AsyncIOScheduler -from telegram import BotCommand, Update +from telegram import ( + BotCommand, + BotCommandScopeAllChatAdministrators, + BotCommandScopeAllGroupChats, + BotCommandScopeAllPrivateChats, + BotCommandScopeChat, + BotCommandScopeChatAdministrators, + BotCommandScopeChatMember, + BotCommandScopeDefault, + Update, +) from telegram.constants import ChatType from telegram.error import Forbidden, InvalidToken, NetworkError from telegram.ext import ApplicationBuilder, ContextTypes, ExtBot, filters +from telegram.ext import CallbackQueryHandler as TelegramCallbackQueryHandler from telegram.ext import MessageHandler as TelegramMessageHandler import astrbot.api.message_components as Comp @@ -78,7 +89,13 @@ def __init__( "telegram_command_auto_refresh", True, ) - self.last_command_hash = None + self.command_registered_plugins = self._normalize_command_plugin_allowlist( + self.config.get("telegram_command_registered_plugins"), + ) + self.command_scopes = self._normalize_command_scope_configs( + self.config.get("telegram_command_scopes"), + ) + self.last_command_hashes: dict[tuple[str, str | None], int] = {} self.scheduler = AsyncIOScheduler() self.scheduler.add_listener( @@ -130,18 +147,28 @@ def __init__( ) # max seconds - hard cap to prevent indefinite delay def _build_application(self) -> None: - self.application = ( + builder = ( ApplicationBuilder() .token(self.config["telegram_token"]) .base_url(self.base_url) .base_file_url(self.file_base_url) - .build() ) + telegram_proxy = self.config.get("telegram_proxy") + if telegram_proxy: + builder = builder.proxy(telegram_proxy) + telegram_get_updates_proxy = self.config.get("telegram_get_updates_proxy") + if telegram_get_updates_proxy: + builder = builder.get_updates_proxy(telegram_get_updates_proxy) + + self.application = builder.build() message_handler = TelegramMessageHandler( filters=filters.ALL, callback=self.message_handler, ) self.application.add_handler(message_handler) + self.application.add_handler( + TelegramCallbackQueryHandler(callback=self.callback_query_handler), + ) self.client = self.application.bot logger.debug(f"Telegram base url: {self.client.base_url}") @@ -168,7 +195,7 @@ async def _shutdown_application( if delete_commands and self.enable_command_register: with suppress(Exception): - await self.client.delete_my_commands() + await self.delete_registered_commands() with suppress(Exception): await self.application.stop() @@ -322,20 +349,156 @@ async def register_commands(self) -> None: """收集所有注册的指令并注册到 Telegram""" try: commands = self.collect_commands() + if not commands: + return + if len(commands) > 100: + raise ValueError( + "Telegram supports at most 100 bot commands per scope. " + "Use telegram_command_registered_plugins to narrow registered plugins.", + ) - if commands: - current_hash = hash( - tuple((cmd.command, cmd.description) for cmd in commands), + current_hash = hash( + tuple((cmd.command, cmd.description) for cmd in commands), + ) + for scope_config in self.command_scopes: + scope = self._build_bot_command_scope(scope_config) + language_code = self._command_language_code(scope_config) + scope_key = self._command_scope_key(scope_config) + scoped_hash = hash((scope_key, language_code, current_hash)) + if scoped_hash == self.last_command_hashes.get( + (scope_key, language_code), + ): + continue + await self.client.delete_my_commands( + scope=scope, + language_code=language_code, + ) + await self.client.set_my_commands( + commands, + scope=scope, + language_code=language_code, ) - if current_hash == self.last_command_hash: - return - self.last_command_hash = current_hash - await self.client.delete_my_commands() - await self.client.set_my_commands(commands) + self.last_command_hashes[(scope_key, language_code)] = scoped_hash except Exception as e: logger.error(f"向 Telegram 注册指令时发生错误: {e!s}") + async def delete_registered_commands(self) -> None: + for scope_config in self.command_scopes: + await self.client.delete_my_commands( + scope=self._build_bot_command_scope(scope_config), + language_code=self._command_language_code(scope_config), + ) + + @staticmethod + def _normalize_command_plugin_allowlist(value: Any) -> set[str] | None: + if value in (None, "", []): + return None + if isinstance(value, str): + raw_items = [item.strip() for item in value.split(",")] + elif isinstance(value, list): + raw_items = [str(item).strip() for item in value] + else: + raise ValueError( + "telegram_command_registered_plugins must be a list or CSV string.", + ) + allowlist = {item for item in raw_items if item} + if "*" in allowlist: + return None + return allowlist or None + + @staticmethod + def _normalize_command_scope_configs(value: Any) -> list[dict[str, Any]]: + if value in (None, "", []): + return [{"type": "default"}] + if not isinstance(value, list): + raise ValueError("telegram_command_scopes must be a list of scope configs.") + + configs: list[dict[str, Any]] = [] + for item in value: + if isinstance(item, str): + configs.append({"type": item}) + elif isinstance(item, dict): + configs.append(dict(item)) + else: + raise ValueError( + "telegram_command_scopes items must be strings or dicts.", + ) + return configs or [{"type": "default"}] + + @staticmethod + def _command_scope_key(scope_config: dict[str, Any]) -> str: + parts = [ + str(scope_config.get("type") or "default"), + str(scope_config.get("chat_id") or ""), + str(scope_config.get("user_id") or ""), + ] + return ":".join(parts) + + @staticmethod + def _command_language_code(scope_config: dict[str, Any]) -> str | None: + language_code = str(scope_config.get("language_code") or "").strip() + return language_code or None + + @staticmethod + def _build_bot_command_scope(scope_config: dict[str, Any]): + scope_type = str(scope_config.get("type") or "default") + match scope_type: + case "default": + return BotCommandScopeDefault() + case "all_private_chats": + return BotCommandScopeAllPrivateChats() + case "all_group_chats": + return BotCommandScopeAllGroupChats() + case "all_chat_administrators": + return BotCommandScopeAllChatAdministrators() + case "chat": + chat_id = scope_config.get("chat_id") + if chat_id in (None, ""): + raise ValueError("telegram command scope 'chat' requires chat_id.") + return BotCommandScopeChat(chat_id=chat_id) + case "chat_administrators": + chat_id = scope_config.get("chat_id") + if chat_id in (None, ""): + raise ValueError( + "telegram command scope 'chat_administrators' requires chat_id.", + ) + return BotCommandScopeChatAdministrators(chat_id=chat_id) + case "chat_member": + chat_id = scope_config.get("chat_id") + user_id = scope_config.get("user_id") + if chat_id in (None, "") or user_id in (None, ""): + raise ValueError( + "telegram command scope 'chat_member' requires chat_id and user_id.", + ) + return BotCommandScopeChatMember(chat_id=chat_id, user_id=user_id) + case _: + raise ValueError( + f"Unsupported Telegram command scope type: {scope_type}", + ) + + def _plugin_matches_command_allowlist(self, module_path: str) -> bool: + if self.command_registered_plugins is None: + return True + + plugin = star_map.get(module_path) + candidates = { + module_path, + module_path.split(".")[-1], + } + if plugin: + candidates.update( + str(value) + for value in ( + plugin.name, + plugin.display_name, + plugin.root_dir_name, + plugin.module_path, + ) + if value + ) + return bool(candidates & self.command_registered_plugins) + def collect_commands(self) -> list[BotCommand]: """从注册的处理器中收集所有指令""" command_dict = {} @@ -343,9 +506,12 @@ def collect_commands(self) -> list[BotCommand]: for handler_md in star_handlers_registry: handler_metadata = handler_md - if ( - handler_metadata.handler_module_path not in star_map - or not star_map[handler_metadata.handler_module_path].activated + if handler_metadata.handler_module_path not in star_map: + continue + if not star_map[handler_metadata.handler_module_path].activated: + continue + if not self._plugin_matches_command_allowlist( + handler_metadata.handler_module_path, ): continue if not handler_metadata.enabled: @@ -436,6 +602,73 @@ async def message_handler( if abm: await self.handle_msg(abm) + async def callback_query_handler( + self, + update: Update, + context: ContextTypes.DEFAULT_TYPE, + ) -> None: + logger.debug(f"Telegram callback query: {update.callback_query}") + abm = await self.convert_callback_query(update, context) + if abm: + await self.handle_msg(abm) + + async def convert_callback_query( + self, + update: Update, + context: ContextTypes.DEFAULT_TYPE, + ) -> AstrBotMessage | None: + """Convert a Telegram callback query into an AstrBot message event.""" + callback_query = update.callback_query + if not callback_query: + logger.warning("Received an update without a callback query.") + return None + + from_user = getattr(callback_query, "from_user", None) + if not from_user: + logger.warning("[Telegram] Received a callback query without from_user.") + return None + + source_message = getattr(callback_query, "message", None) + source_chat = getattr(source_message, "chat", None) + data = getattr(callback_query, "data", None) + game_short_name = getattr(callback_query, "game_short_name", None) + message_text = str(data or game_short_name or "") + + message = AstrBotMessage() + message.sender = MessageMember( + str(from_user.id), + getattr(from_user, "username", None) + or getattr(from_user, "full_name", None) + or "Unknown", + ) + message.self_id = str(context.bot.username) + message.raw_message = update + message.message_str = message_text + message.message = [Comp.Plain(message_text)] if message_text else [] + message.message_id = str( + getattr(callback_query, "id", None) + or getattr(source_message, "message_id", "") + or "", + ) + + if source_chat and getattr(source_chat, "type", None) != ChatType.PRIVATE: + message.type = MessageType.GROUP_MESSAGE + message.group_id = str(source_chat.id) + message.session_id = message.group_id + message_thread_id = getattr(source_message, "message_thread_id", None) + is_topic_message = getattr(source_message, "is_topic_message", False) + if is_topic_message and message_thread_id: + message.group_id += "#" + str(message_thread_id) + message.session_id = message.group_id + elif source_chat: + message.type = MessageType.FRIEND_MESSAGE + message.session_id = str(source_chat.id) + else: + message.type = MessageType.FRIEND_MESSAGE + message.session_id = str(from_user.id) + + return message + async def convert_message( self, update: Update, diff --git a/astrbot/core/platform/sources/telegram/tg_event.py b/astrbot/core/platform/sources/telegram/tg_event.py index 8445a8ea1e..3629f05efb 100644 --- a/astrbot/core/platform/sources/telegram/tg_event.py +++ b/astrbot/core/platform/sources/telegram/tg_event.py @@ -14,6 +14,7 @@ from astrbot.api.event import AstrMessageEvent, MessageChain from astrbot.api.message_components import ( At, + BaseMessageComponent, File, Image, Plain, @@ -24,6 +25,8 @@ from astrbot.api.platform import AstrBotMessage, MessageType, PlatformMetadata from astrbot.core.utils.metrics import Metric +from .components import TelegramInlineKeyboard, TelegramMessageOptions + def _is_gif(path: str) -> bool: if path.lower().endswith(".gif"): @@ -80,6 +83,76 @@ def __init__( super().__init__(message_str, message_obj, platform_meta, session_id) self.client = client + @staticmethod + def _extract_send_options( + message: MessageChain, + ) -> tuple[ + list[BaseMessageComponent], + TelegramMessageOptions, + TelegramInlineKeyboard | None, + ]: + chain: list[BaseMessageComponent] = [] + options = TelegramMessageOptions() + keyboard = None + + for item in message.chain: + if isinstance(item, TelegramMessageOptions): + options = item + elif isinstance(item, TelegramInlineKeyboard): + keyboard = item + else: + chain.append(item) + + return chain, options, keyboard + + @staticmethod + def _normalize_parse_mode(parse_mode: str | None) -> str | None: + if parse_mode is None: + return None + normalized = parse_mode.strip() + if not normalized or normalized.lower() in {"plain", "plaintext", "none"}: + return None + if normalized not in {"MarkdownV2", "Markdown", "HTML"}: + raise ValueError( + "Telegram parse_mode must be one of MarkdownV2, Markdown, HTML, or plaintext.", + ) + return normalized + + @staticmethod + def _is_plaintext_parse_mode(parse_mode: str | None) -> bool: + if parse_mode is None: + return False + return parse_mode.strip().lower() in {"plain", "plaintext", "none"} + + @classmethod + def _build_text_payload( + cls, + payload: dict[str, Any], + options: TelegramMessageOptions, + keyboard: TelegramInlineKeyboard | None, + ) -> dict[str, Any]: + send_payload = dict(payload) + link_preview_options = options.to_link_preview_options() + if link_preview_options is not None: + send_payload["link_preview_options"] = link_preview_options + if keyboard is not None: + send_payload["reply_markup"] = keyboard.to_telegram_markup() + return send_payload + + @staticmethod + def _is_markdown_parse_error(error: BadRequest) -> bool: + message = getattr(error, "message", str(error)).lower() + return any( + fragment in message + for fragment in ( + "can't parse entities", + "can't parse entity", + "parse entities", + "entity", + "markdown", + ) + ) + @classmethod def _split_message(cls, text: str) -> list[str]: if len(text) <= cls.MAX_MESSAGE_LENGTH: @@ -111,19 +184,46 @@ async def _send_text_chunks( client: ExtBot, text: str, payload: dict[str, Any], + *, + use_markdown: bool | None = None, + options: TelegramMessageOptions | None = None, ) -> None: """按 Telegram 限制切分文本后逐段发送。""" + options = options or TelegramMessageOptions() + parse_mode = cls._normalize_parse_mode(options.parse_mode) for chunk in cls._split_message(text): + if parse_mode is not None: + await client.send_message( + text=chunk, + parse_mode=parse_mode, + **cast(Any, payload), + ) + continue + + if use_markdown is False or cls._is_plaintext_parse_mode( + options.parse_mode, + ): + await client.send_message(text=chunk, **cast(Any, payload)) + continue + try: - markdown_text = telegramify_markdown.markdownify( - chunk, + markdown_text = telegramify_markdown.markdownify(chunk) + except Exception as e: + logger.warning( + f"Failed to convert message to Markdown,using normal text: {e!s}" ) + await client.send_message(text=chunk, **cast(Any, payload)) + continue + + try: await client.send_message( text=markdown_text, parse_mode="MarkdownV2", **cast(Any, payload), ) - except (ValueError, BadRequest) as e: + except BadRequest as e: + if not cls._is_markdown_parse_error(e): + raise logger.warning( f"Failed to convert message to Markdown,using normal text: {e!s}" ) @@ -273,11 +373,12 @@ async def send_with_client( user_name: str, ) -> None: image_path = None + chain, options, keyboard = cls._extract_send_options(message) has_reply = False reply_message_id = None at_user_id = None - for i in message.chain: + for i in chain: if isinstance(i, Reply): has_reply = True reply_message_id = i.id @@ -291,10 +392,11 @@ async def send_with_client( user_name, message_thread_id = user_name.split("#") # 根据消息链确定合适的 chat action 并发送 - action = cls._get_chat_action_for_chain(message.chain) + action = cls._get_chat_action_for_chain(chain) await cls._send_chat_action(client, user_name, action, message_thread_id) - for i in message.chain: + extra_payload = cls._build_text_payload({}, options, keyboard) + for i in chain: payload = { "chat_id": user_name, } @@ -302,12 +404,23 @@ async def send_with_client( payload["reply_to_message_id"] = str(reply_message_id) if message_thread_id: payload["message_thread_id"] = message_thread_id + media_payload = payload | { + key: value + for key, value in extra_payload.items() + if key != "link_preview_options" + } if isinstance(i, Plain): if at_user_id and not at_flag: i.text = f"@{at_user_id} {i.text}" at_flag = True - await cls._send_text_chunks(client, i.text, payload) + await cls._send_text_chunks( + client, + i.text, + payload | extra_payload, + use_markdown=message.use_markdown_, + options=options, + ) elif isinstance(i, Image): image_path = await i.convert_to_file_path() if _is_gif(image_path): @@ -316,19 +429,19 @@ async def send_with_client( else: send_coro = client.send_photo media_kwarg = {"photo": image_path} - await send_coro(**media_kwarg, **cast(Any, payload)) + await send_coro(**media_kwarg, **cast(Any, media_payload)) elif isinstance(i, File): path = await i.get_file() name = i.name or os.path.basename(path) await client.send_document( - document=path, filename=name, **cast(Any, payload) + document=path, filename=name, **cast(Any, media_payload) ) elif isinstance(i, Record): path = await i.convert_to_file_path() await cls._send_voice_with_fallback( client, path, - payload, + media_payload, caption=i.text or None, use_media_action=False, ) @@ -337,7 +450,7 @@ async def send_with_client( await client.send_video( video=path, caption=getattr(i, "text", None) or None, - **cast(Any, payload), + **cast(Any, media_payload), ) async def send(self, message: MessageChain) -> None: @@ -347,6 +460,55 @@ async def send(self, message: MessageChain) -> None: await self.send_with_client(self.client, message, self.get_sender_id()) await super().send(message) + def _get_callback_query(self): + raw_message = getattr(self.message_obj, "raw_message", None) + return getattr(raw_message, "callback_query", None) + + def is_button_interaction(self) -> bool: + """Return whether this event comes from a Telegram callback query.""" + return self._get_callback_query() is not None + + def get_interaction_custom_id(self) -> str: + """Return Telegram callback_data for inline button interactions.""" + callback_query = self._get_callback_query() + if callback_query is None: + return "" + return str(getattr(callback_query, "data", "") or "") + + def get_interaction_data(self) -> str: + """Alias for callback_data to match other platform interaction helpers.""" + return self.get_interaction_custom_id() + + def get_interaction_user_id(self) -> str: + callback_query = self._get_callback_query() + if callback_query is None: + return "" + from_user = getattr(callback_query, "from_user", None) + if not from_user: + return "" + return str(getattr(from_user, "id", "") or "") + + async def answer_interaction( + self, + text: str | None = None, + *, + show_alert: bool | None = None, + url: str | None = None, + cache_time: int | None = None, + ) -> None: + callback_query = self._get_callback_query() + if callback_query is None: + raise RuntimeError("This Telegram event is not a button interaction.") + await callback_query.answer( + text=text, + show_alert=show_alert, + url=url, + cache_time=cache_time, + ) + + async def ack_interaction(self) -> None: + await self.answer_interaction() + async def react(self, emoji: str | None, big: bool = False) -> None: """给原消息添加 Telegram 反应: - 普通 emoji:传入 '👍'、'😂' 等 diff --git a/dashboard/src/i18n/locales/en-US/features/config-metadata.json b/dashboard/src/i18n/locales/en-US/features/config-metadata.json index be670a9b97..3afcf356b8 100644 --- a/dashboard/src/i18n/locales/en-US/features/config-metadata.json +++ b/dashboard/src/i18n/locales/en-US/features/config-metadata.json @@ -588,10 +588,26 @@ "description": "Telegram Command Auto Refresh Interval", "hint": "Telegram command auto-refresh interval in seconds." }, + "telegram_command_registered_plugins": { + "description": "Telegram Command Plugins", + "hint": "Only register commands from the selected plugins. Accepts plugin name, display name, directory name, or module path. Leave empty to register commands from all enabled plugins." + }, + "telegram_command_scopes": { + "description": "Telegram Command Scopes", + "hint": "Telegram BotCommandScope configs, e.g. {\"type\":\"default\",\"language_code\":\"en\"} or {\"type\":\"chat\",\"chat_id\":12345}." + }, + "telegram_get_updates_proxy": { + "description": "Telegram getUpdates Proxy", + "hint": "Proxy used only for this Telegram adapter's getUpdates polling. Leave empty to use the default network settings." + }, "telegram_polling_restart_delay": { "description": "Telegram Polling Restart Delay", "hint": "Waiting time in seconds when the polling loop needs to restart after unexpected exits. Defaults to 5s." }, + "telegram_proxy": { + "description": "Telegram API Proxy", + "hint": "Proxy used only for this Telegram adapter's Bot API requests, for example http://127.0.0.1:7890. Leave empty to use the default network settings." + }, "telegram_token": { "description": "Bot Token", "hint": "If you are in mainland China, set a proxy or change api_base in Other Settings." diff --git a/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json b/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json index cf450bfa5e..8f450d8b0c 100644 --- a/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json +++ b/dashboard/src/i18n/locales/ru-RU/features/config-metadata.json @@ -588,6 +588,26 @@ "description": "Интервал автообновления команд Telegram", "hint": "Интервал автоматического обновления команд Telegram в секундах." }, + "telegram_command_registered_plugins": { + "description": "Плагины команд Telegram", + "hint": "Регистрировать команды только выбранных плагинов. Можно указать имя плагина, отображаемое имя, имя папки или путь модуля. Пусто — команды всех включенных плагинов." + }, + "telegram_command_scopes": { + "description": "Области команд Telegram", + "hint": "Конфигурации Telegram BotCommandScope, например {\"type\":\"default\",\"language_code\":\"ru\"} или {\"type\":\"chat\",\"chat_id\":12345}." + }, + "telegram_get_updates_proxy": { + "description": "Прокси Telegram getUpdates", + "hint": "Прокси только для polling getUpdates этого Telegram-адаптера. Оставьте пустым для стандартных сетевых настроек." + }, + "telegram_polling_restart_delay": { + "description": "Задержка перезапуска polling Telegram", + "hint": "Сколько секунд ждать перед перезапуском polling после неожиданного завершения. По умолчанию 5 секунд." + }, + "telegram_proxy": { + "description": "Прокси Telegram API", + "hint": "Прокси только для Bot API запросов этого Telegram-адаптера, например http://127.0.0.1:7890. Оставьте пустым для стандартных сетевых настроек." + }, "telegram_token": { "description": "Токен бота", "hint": "Если вы находитесь в материковом Китае, установите прокси или измените api_base в разделе «Другие настройки»." diff --git a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json index 3ff951e343..9c7de00d23 100644 --- a/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json +++ b/dashboard/src/i18n/locales/zh-CN/features/config-metadata.json @@ -590,10 +590,26 @@ "description": "Telegram 命令自动刷新间隔", "hint": "Telegram 命令自动刷新间隔,单位为秒。" }, + "telegram_command_registered_plugins": { + "description": "Telegram 命令注册插件", + "hint": "只注册所选插件提供的指令。可填写插件名、展示名、目录名或模块路径。留空表示注册全部已启用插件的指令。" + }, + "telegram_command_scopes": { + "description": "Telegram 命令注册范围", + "hint": "Telegram BotCommandScope 配置,例如 {\"type\":\"default\",\"language_code\":\"zh\"} 或 {\"type\":\"chat\",\"chat_id\":12345}。" + }, + "telegram_get_updates_proxy": { + "description": "Telegram getUpdates 代理", + "hint": "仅用于此 Telegram 适配器轮询 getUpdates 的代理。留空则使用默认网络设置。" + }, "telegram_polling_restart_delay": { "description": "Telegram 轮询重启延迟", "hint": "当轮询意外结束尝试自动重启时的延迟时间,单位为秒。默认为 5s。" }, + "telegram_proxy": { + "description": "Telegram API 代理", + "hint": "仅用于此 Telegram 适配器 Bot API 请求的代理,例如 http://127.0.0.1:7890。留空则使用默认网络设置。" + }, "telegram_token": { "description": "Bot Token", "hint": "如果你的网络环境为中国大陆,请在 `其他配置` 处设置代理或更改 api_base。" diff --git a/docs/en/dev/star/guides/send-message.md b/docs/en/dev/star/guides/send-message.md index 961cc76fb5..b60c8b08e2 100644 --- a/docs/en/dev/star/guides/send-message.md +++ b/docs/en/dev/star/guides/send-message.md @@ -104,6 +104,66 @@ async def test(self, event: AstrMessageEvent): ![Sending video messages](https://files.astrbot.app/docs/source/images/plugin/db93a2bb-671c-4332-b8ba-9a91c35623c2.png) +## Telegram-Specific Send Options + +The Telegram adapter supports Telegram-specific components in `MessageChain` for Markdown/HTML parsing, link previews, and Inline Keyboard. These components also work with proactive sends through `self.context.send_message(unified_msg_origin, chains)`. + +```python +from astrbot.api.event import MessageChain, filter, AstrMessageEvent +from astrbot.core.platform.sources.telegram.components import ( + TelegramInlineButton, + TelegramInlineKeyboard, + TelegramMessageOptions, +) + +@filter.command("review") +async def review(self, event: AstrMessageEvent): + chain = MessageChain() + chain.message("**Choose an approval action**\n\n[Open details](https://example.com/item/42)") + chain.chain.append( + TelegramMessageOptions( + parse_mode="MarkdownV2", + link_preview_is_disabled=False, + link_preview_url="https://example.com/item/42", + link_preview_prefer_large_media=True, + link_preview_show_above_text=True, + ) + ) + chain.chain.append( + TelegramInlineKeyboard( + [ + [ + TelegramInlineButton("Approve", callback_data="approve:42"), + TelegramInlineButton("Reject", callback_data="reject:42"), + ], + [TelegramInlineButton("Open Web Page", url="https://example.com/item/42")], + ] + ) + ) + yield event.chain_result(chain) +``` + +`TelegramMessageOptions.parse_mode` supports `MarkdownV2`, `Markdown`, and `HTML`. You can also pass `plaintext`, `plain`, or `none` to send plain text. Link previews support every Telegram `LinkPreviewOptions` field: disabled state, preview URL, small/large media preference, and whether the preview is shown above the text. + +Each `TelegramInlineButton` must set exactly one action. Supported actions are `url`, `callback_data`, `login_url`, `web_app`, `switch_inline_query`, `switch_inline_query_current_chat`, `switch_inline_query_chosen_chat`, `copy_text`, `callback_game`, and `pay`, plus `style` and `icon_custom_emoji_id` when supported by the Bot API. `callback_data` must be 1-64 UTF-8 bytes. + +Plugins can handle callback events for approval, confirmation, pagination, and similar flows: + +```python +from astrbot.api.event import filter, AstrMessageEvent + +@filter.event_message_type(filter.EventMessageType.ALL) +async def on_telegram_button(self, event: AstrMessageEvent): + if not hasattr(event, "is_button_interaction") or not event.is_button_interaction(): + return + + action = event.get_interaction_data() + user_id = event.get_interaction_user_id() + await event.answer_interaction(f"Received {user_id}: {action}", show_alert=False) +``` + +Use `event.ack_interaction()` for a quick acknowledgment. `event.get_interaction_custom_id()` and `event.get_interaction_data()` both return Telegram `callback_data`. + ## Sending Group Forward Messages > Most platforms do not support this message type. Current support: OneBot v11 diff --git a/docs/en/platform/telegram.md b/docs/en/platform/telegram.md index 68ae143fcd..6769210791 100644 --- a/docs/en/platform/telegram.md +++ b/docs/en/platform/telegram.md @@ -12,6 +12,7 @@ | Voice | Yes | Yes | | | Video | Yes | Yes | | | File | Yes | Yes | | +| Inline Keyboard | Yes | Yes | Supports button callback events | Proactive message push: Supported. @@ -36,7 +37,33 @@ Fill in the configuration fields that appear: - Enable: Check this option. - Bot Token: Your Telegram bot's `token`. -Please ensure your network environment can access Telegram. You may need to configure a proxy using `Configuration -> Other Settings -> HTTP Proxy`. +Please ensure your network environment can access Telegram. You can configure a global proxy in `Configuration -> Other Settings -> HTTP Proxy`, or configure Telegram-specific proxies in the Telegram adapter: + +- `telegram_proxy`: Used only for this Telegram adapter's Bot API requests. +- `telegram_get_updates_proxy`: Used only for this Telegram adapter's `getUpdates` polling requests. + +## Command Registration + +The Telegram adapter can register AstrBot plugin commands as Telegram Bot Commands: + +- `telegram_command_register`: Whether command registration is enabled. +- `telegram_command_auto_refresh`: Whether commands are refreshed at runtime. +- `telegram_command_register_interval`: Auto-refresh interval in seconds. +- `telegram_command_registered_plugins`: Only register commands from selected plugins. Leave empty to register commands from all enabled plugins. +- `telegram_command_scopes`: Scope configs for command registration. The default is `[{"type": "default"}]`. + +`telegram_command_scopes` supports these Telegram Bot API scopes: `default`, `all_private_chats`, `all_group_chats`, `all_chat_administrators`, `chat`, `chat_administrators`, and `chat_member`. `chat` and `chat_administrators` require `chat_id`; `chat_member` requires both `chat_id` and `user_id`. Each scope can also set `language_code`. + +Example: + +```json +[ + {"type": "default", "language_code": "en"}, + {"type": "chat", "chat_id": 123456789} +] +``` + +Telegram supports at most 100 commands per scope. If registration fails, use `telegram_command_registered_plugins` to narrow the plugin set. ## Streaming Output diff --git a/docs/zh/dev/star/guides/send-message.md b/docs/zh/dev/star/guides/send-message.md index 0875876650..026977bd4f 100644 --- a/docs/zh/dev/star/guides/send-message.md +++ b/docs/zh/dev/star/guides/send-message.md @@ -104,6 +104,66 @@ async def test(self, event: AstrMessageEvent): ![发送视频消息](https://files.astrbot.app/docs/source/images/plugin/db93a2bb-671c-4332-b8ba-9a91c35623c2.png) +## Telegram 专属发送选项 + +Telegram 适配器支持在 `MessageChain` 中加入 Telegram 专属组件,用于控制 Markdown/HTML 解析、链接预览和 Inline Keyboard。这些组件同样适用于 `self.context.send_message(unified_msg_origin, chains)` 主动发送。 + +```python +from astrbot.api.event import MessageChain, filter, AstrMessageEvent +from astrbot.core.platform.sources.telegram.components import ( + TelegramInlineButton, + TelegramInlineKeyboard, + TelegramMessageOptions, +) + +@filter.command("review") +async def review(self, event: AstrMessageEvent): + chain = MessageChain() + chain.message("**请选择审批动作**\n\n[查看详情](https://example.com/item/42)") + chain.chain.append( + TelegramMessageOptions( + parse_mode="MarkdownV2", + link_preview_is_disabled=False, + link_preview_url="https://example.com/item/42", + link_preview_prefer_large_media=True, + link_preview_show_above_text=True, + ) + ) + chain.chain.append( + TelegramInlineKeyboard( + [ + [ + TelegramInlineButton("通过", callback_data="approve:42"), + TelegramInlineButton("拒绝", callback_data="reject:42"), + ], + [TelegramInlineButton("打开网页", url="https://example.com/item/42")], + ] + ) + ) + yield event.chain_result(chain) +``` + +`TelegramMessageOptions.parse_mode` 支持 `MarkdownV2`、`Markdown`、`HTML`,也可以传 `plaintext`、`plain` 或 `none` 发送纯文本。链接预览支持 Telegram `LinkPreviewOptions` 的所有字段:是否禁用预览、预览 URL、小/大媒体偏好、是否显示在文本上方。 + +`TelegramInlineButton` 每个按钮必须且只能设置一种动作。支持 `url`、`callback_data`、`login_url`、`web_app`、`switch_inline_query`、`switch_inline_query_current_chat`、`switch_inline_query_chosen_chat`、`copy_text`、`callback_game`、`pay`,以及 Bot API 支持时的 `style`、`icon_custom_emoji_id`。`callback_data` 必须是 1-64 UTF-8 字节。 + +插件可以通过回调事件实现审批、确认、翻页等交互: + +```python +from astrbot.api.event import filter, AstrMessageEvent + +@filter.event_message_type(filter.EventMessageType.ALL) +async def on_telegram_button(self, event: AstrMessageEvent): + if not hasattr(event, "is_button_interaction") or not event.is_button_interaction(): + return + + action = event.get_interaction_data() + user_id = event.get_interaction_user_id() + await event.answer_interaction(f"已收到 {user_id}: {action}", show_alert=False) +``` + +`event.ack_interaction()` 可以快速确认回调;`event.get_interaction_custom_id()` 与 `event.get_interaction_data()` 都会返回 Telegram 的 `callback_data`。 + ## 发送群合并转发消息 > 大多数平台都不支持此种消息类型,当前适配情况:OneBot v11 diff --git a/docs/zh/platform/telegram.md b/docs/zh/platform/telegram.md index 0156b668df..9bb59b6ea5 100644 --- a/docs/zh/platform/telegram.md +++ b/docs/zh/platform/telegram.md @@ -12,6 +12,7 @@ | 语音 | 是 | 是 | | | 视频 | 是 | 是 | | | 文件 | 是 | 是 | | +| Inline Keyboard | 是 | 是 | 支持按钮回调事件 | 主动消息推送:支持。 @@ -37,7 +38,33 @@ - 启用(enable): 勾选。 - Bot Token: 你的 Telegram 机器人的 `token`。 -请确保你的网络环境可以访问 Telegram。你可能需要使用 `配置页->其他配置->HTTP 代理` 来设置代理。 +请确保你的网络环境可以访问 Telegram。你可以使用 `配置页->其他配置->HTTP 代理` 设置全局代理,也可以在 Telegram 适配器里设置独立代理: + +- `telegram_proxy`:仅用于此 Telegram 适配器的 Bot API 请求。 +- `telegram_get_updates_proxy`:仅用于此 Telegram 适配器轮询 `getUpdates` 的请求。 + +## 命令注册 + +Telegram 适配器可以把 AstrBot 插件指令注册为 Telegram Bot Commands: + +- `telegram_command_register`:是否启用命令注册。 +- `telegram_command_auto_refresh`:是否在运行时自动刷新命令。 +- `telegram_command_register_interval`:自动刷新间隔,单位为秒。 +- `telegram_command_registered_plugins`:只注册所选插件提供的指令。留空表示注册全部已启用插件的指令。 +- `telegram_command_scopes`:注册范围配置。默认值为 `[{"type": "default"}]`。 + +`telegram_command_scopes` 支持 Telegram Bot API 的这些范围:`default`、`all_private_chats`、`all_group_chats`、`all_chat_administrators`、`chat`、`chat_administrators`、`chat_member`。其中 `chat`、`chat_administrators` 需要 `chat_id`,`chat_member` 需要 `chat_id` 和 `user_id`。每个范围都可以额外设置 `language_code`。 + +示例: + +```json +[ + {"type": "default", "language_code": "zh"}, + {"type": "chat", "chat_id": 123456789} +] +``` + +Telegram 每个 scope 最多支持 100 条命令。如果注册失败,请使用 `telegram_command_registered_plugins` 缩小要注册的插件范围。 ## 流式输出 diff --git a/tests/fixtures/helpers.py b/tests/fixtures/helpers.py index f290caff52..f4b1a29848 100644 --- a/tests/fixtures/helpers.py +++ b/tests/fixtures/helpers.py @@ -49,6 +49,10 @@ def make_platform_config(platform_type: str, **kwargs) -> dict: "telegram_command_register": True, "telegram_command_auto_refresh": True, "telegram_command_register_interval": 300, + "telegram_command_registered_plugins": [], + "telegram_command_scopes": [{"type": "default"}], + "telegram_proxy": "", + "telegram_get_updates_proxy": "", "telegram_media_group_timeout": 2.5, "telegram_media_group_max_wait": 10.0, "start_message": "Welcome to AstrBot!", diff --git a/tests/fixtures/mocks/telegram.py b/tests/fixtures/mocks/telegram.py index e5e31f178c..f74771144f 100644 --- a/tests/fixtures/mocks/telegram.py +++ b/tests/fixtures/mocks/telegram.py @@ -21,6 +21,67 @@ class MockTelegramInvalidToken(Exception): """Mock telegram.error.InvalidToken used in tests.""" +class MockTelegramObject: + """Small value object for Telegram classes used in tests.""" + + def __init__(self, *args, **kwargs) -> None: + self.args = args + for key, value in kwargs.items(): + setattr(self, key, value) + + +class MockBotCommand(MockTelegramObject): + def __init__(self, command: str, description: str) -> None: + super().__init__(command=command, description=description) + + +class MockBotCommandScopeDefault(MockTelegramObject): + pass + + +class MockBotCommandScopeAllPrivateChats(MockTelegramObject): + pass + + +class MockBotCommandScopeAllGroupChats(MockTelegramObject): + pass + + +class MockBotCommandScopeAllChatAdministrators(MockTelegramObject): + pass + + +class MockBotCommandScopeChat(MockTelegramObject): + pass + + +class MockBotCommandScopeChatAdministrators(MockTelegramObject): + pass + + +class MockBotCommandScopeChatMember(MockTelegramObject): + pass + + +class MockInlineKeyboardButton(MockTelegramObject): + def __init__(self, text: str, **kwargs) -> None: + super().__init__(text=text, **kwargs) + + +class MockInlineKeyboardMarkup(MockTelegramObject): + def __init__(self, inline_keyboard) -> None: + super().__init__(inline_keyboard=inline_keyboard) + + +class MockLinkPreviewOptions(MockTelegramObject): + pass + + +class MockCallbackQueryHandler(MockTelegramObject): + def __init__(self, callback, **kwargs) -> None: + super().__init__(callback=callback, **kwargs) + + def create_mock_telegram_modules(): """创建 Telegram 相关的 mock 模块。 @@ -28,8 +89,27 @@ def create_mock_telegram_modules(): dict: 包含 telegram 和相关模块的 mock 对象 """ mock_telegram = MagicMock() - mock_telegram.BotCommand = MagicMock + mock_telegram.BotCommand = MockBotCommand + mock_telegram.BotCommandScopeDefault = MockBotCommandScopeDefault + mock_telegram.BotCommandScopeAllPrivateChats = MockBotCommandScopeAllPrivateChats + mock_telegram.BotCommandScopeAllGroupChats = MockBotCommandScopeAllGroupChats + mock_telegram.BotCommandScopeAllChatAdministrators = ( + MockBotCommandScopeAllChatAdministrators + ) + mock_telegram.BotCommandScopeChat = MockBotCommandScopeChat + mock_telegram.BotCommandScopeChatAdministrators = ( + MockBotCommandScopeChatAdministrators + ) + mock_telegram.BotCommandScopeChatMember = MockBotCommandScopeChatMember + mock_telegram.CallbackGame = MagicMock + mock_telegram.CopyTextButton = MockTelegramObject + mock_telegram.InlineKeyboardButton = MockInlineKeyboardButton + mock_telegram.InlineKeyboardMarkup = MockInlineKeyboardMarkup + mock_telegram.LinkPreviewOptions = MockLinkPreviewOptions + mock_telegram.LoginUrl = MockTelegramObject + mock_telegram.SwitchInlineQueryChosenChat = MockTelegramObject mock_telegram.Update = MagicMock + mock_telegram.WebAppInfo = MockTelegramObject mock_telegram.constants = MagicMock() mock_telegram.constants.ChatType = MagicMock() mock_telegram.constants.ChatType.PRIVATE = "private" @@ -47,12 +127,15 @@ def create_mock_telegram_modules(): mock_telegram.ReactionTypeEmoji = MagicMock mock_telegram_ext = MagicMock() - mock_telegram_ext.ApplicationBuilder = MagicMock + mock_telegram_ext.ApplicationBuilder = ( + MockTelegramBuilder.create_application_builder + ) mock_telegram_ext.ContextTypes = MagicMock() mock_telegram_ext.ContextTypes.DEFAULT_TYPE = MagicMock mock_telegram_ext.ExtBot = MagicMock mock_telegram_ext.filters = MagicMock() mock_telegram_ext.filters.ALL = MagicMock() + mock_telegram_ext.CallbackQueryHandler = MockCallbackQueryHandler mock_telegram_ext.MessageHandler = MagicMock # Mock telegramify_markdown @@ -127,10 +210,23 @@ def create_bot(): bot.delete_my_commands = AsyncMock() bot.set_my_commands = AsyncMock() bot.set_message_reaction = AsyncMock() + bot.answer_callback_query = AsyncMock() bot.edit_message_text = AsyncMock() bot.send_message_draft = AsyncMock() return bot + @staticmethod + def create_application_builder(app=None): + """创建支持链式调用的 mock Telegram ApplicationBuilder 实例。""" + builder = MagicMock() + builder.token.return_value = builder + builder.base_url.return_value = builder + builder.base_file_url.return_value = builder + builder.proxy.return_value = builder + builder.get_updates_proxy.return_value = builder + builder.build.return_value = app or MockTelegramBuilder.create_application() + return builder + @staticmethod def create_application(): """创建 mock Telegram Application 实例。""" diff --git a/tests/test_telegram_adapter.py b/tests/test_telegram_adapter.py index 8290899d1a..fa33c5c310 100644 --- a/tests/test_telegram_adapter.py +++ b/tests/test_telegram_adapter.py @@ -1,11 +1,13 @@ import asyncio import importlib import sys +from dataclasses import dataclass from unittest.mock import AsyncMock, MagicMock, patch import pytest import astrbot.api.message_components as Comp +from astrbot.api.event import MessageChain from tests.fixtures.helpers import ( NoopAwaitable, create_mock_file, @@ -43,12 +45,17 @@ def _load_telegram_module(module_name: str): if module is not None: return module + components_module_name = "astrbot.core.platform.sources.telegram.components" with patch.dict(sys.modules, _build_telegram_patched_modules()): sys.modules.pop(module_name, None) module = importlib.import_module(module_name) + components_module = sys.modules.get(components_module_name) sys.modules[module_name] = module _TELEGRAM_MODULES[module_name] = module + if components_module is not None: + sys.modules[components_module_name] = components_module + _TELEGRAM_MODULES[components_module_name] = components_module return module @@ -72,6 +79,11 @@ def _load_telegram_platform_event(): return _TELEGRAM_PLATFORM_EVENT +def _load_telegram_components(): + _load_telegram_platform_event() + return _TELEGRAM_MODULES["astrbot.core.platform.sources.telegram.components"] + + def _build_context() -> MagicMock: context = MagicMock() context.bot.username = "test_bot" @@ -384,3 +396,328 @@ async def final_start_polling(*args, **kwargs): app_two.shutdown.assert_awaited() app_three.initialize.assert_awaited() app_three.start.assert_awaited() + + +@pytest.mark.asyncio +async def test_telegram_send_with_inline_keyboard_and_message_options(): + TelegramPlatformEvent = _load_telegram_platform_event() + components = _load_telegram_components() + client = MagicMock() + client.send_message = AsyncMock() + client.send_chat_action = AsyncMock() + message = MessageChain() + message.message("approve this") + message.chain.append( + components.TelegramMessageOptions( + parse_mode="HTML", + link_preview_is_disabled=True, + link_preview_url="https://example.com/preview", + link_preview_prefer_small_media=True, + link_preview_show_above_text=True, + ), + ) + message.chain.append( + components.TelegramInlineKeyboard( + [ + [ + components.TelegramInlineButton( + "Approve", + callback_data="approval:yes", + ), + components.TelegramInlineButton( + "Details", + url="https://example.com/details", + ), + ], + ], + ), + ) + + await TelegramPlatformEvent.send_with_client(client, message, "123456") + + call = client.send_message.await_args.kwargs + assert call["text"] == "approve this" + assert call["parse_mode"] == "HTML" + assert call["link_preview_options"].is_disabled is True + assert call["link_preview_options"].url == "https://example.com/preview" + assert call["reply_markup"].inline_keyboard[0][0].callback_data == "approval:yes" + assert ( + call["reply_markup"].inline_keyboard[0][1].url == "https://example.com/details" + ) + + +@pytest.mark.asyncio +async def test_telegram_send_respects_plaintext_markdown_toggle(): + TelegramPlatformEvent = _load_telegram_platform_event() + client = MagicMock() + client.send_message = AsyncMock() + client.send_chat_action = AsyncMock() + message = MessageChain() + message.use_markdown_ = False + message.message("**plain**") + + await TelegramPlatformEvent.send_with_client(client, message, "123456") + + call = client.send_message.await_args.kwargs + assert call["text"] == "**plain**" + assert "parse_mode" not in call + + +@pytest.mark.asyncio +async def test_telegram_send_by_session_preserves_markdown_options_and_keyboard(): + TelegramPlatformAdapter = _load_telegram_adapter() + components = _load_telegram_components() + adapter = TelegramPlatformAdapter( + make_platform_config("telegram"), + {}, + asyncio.Queue(), + ) + adapter.client.send_message = AsyncMock() + adapter.client.send_chat_action = AsyncMock() + message = MessageChain() + message.message("proactive") + message.chain.append(components.TelegramMessageOptions(parse_mode="Markdown")) + message.chain.append( + components.TelegramInlineKeyboard( + [[components.TelegramInlineButton("OK", callback_data="ok")]], + ), + ) + session = MagicMock() + session.session_id = "123456" + + await adapter.send_by_session(session, message) + + call = adapter.client.send_message.await_args.kwargs + assert call["parse_mode"] == "Markdown" + assert call["reply_markup"].inline_keyboard[0][0].callback_data == "ok" + + +def test_telegram_inline_button_validates_actions_and_callback_data(): + components = _load_telegram_components() + + with pytest.raises(ValueError, match="exactly one"): + components.TelegramInlineButton("Missing") + + with pytest.raises(ValueError, match="exactly one"): + components.TelegramInlineButton( + "Too many", + callback_data="ok", + url="https://example.com", + ) + + with pytest.raises(ValueError, match="1-64"): + components.TelegramInlineButton("Empty", callback_data="") + + with pytest.raises(ValueError, match="1-64"): + components.TelegramInlineButton("Too long", callback_data="x" * 65) + + button = components.TelegramInlineButton("中文", callback_data="确认") + assert len(button.callback_data.encode("utf-8")) <= 64 + + styled_button = components.TelegramInlineButton( + "Styled", + callback_data="styled", + style="primary", + icon_custom_emoji_id="5368324170671202286", + ).to_telegram_button() + assert styled_button.style == "primary" + assert styled_button.icon_custom_emoji_id == "5368324170671202286" + + +def test_telegram_command_config_normalization_supports_wildcard_and_empty_language(): + TelegramPlatformAdapter = _load_telegram_adapter() + + assert TelegramPlatformAdapter._normalize_command_plugin_allowlist(["*"]) is None + assert ( + TelegramPlatformAdapter._normalize_command_plugin_allowlist( + "allowed,*", + ) + is None + ) + assert TelegramPlatformAdapter._command_language_code({"language_code": ""}) is None + assert ( + TelegramPlatformAdapter._command_language_code({"language_code": " zh "}) + == "zh" + ) + + +@pytest.mark.asyncio +async def test_telegram_callback_query_is_converted_to_platform_event(): + TelegramPlatformAdapter = _load_telegram_adapter() + adapter = TelegramPlatformAdapter( + make_platform_config("telegram"), + {}, + asyncio.Queue(), + ) + callback_query = MagicMock() + callback_query.id = "callback-id" + callback_query.data = "approval:yes" + callback_query.game_short_name = None + callback_query.from_user.id = 42 + callback_query.from_user.username = "alice" + callback_query.message.message_id = 99 + callback_query.message.chat.id = -100 + callback_query.message.chat.type = "supergroup" + callback_query.message.is_topic_message = True + callback_query.message.message_thread_id = 7 + callback_query.answer = AsyncMock() + update = MagicMock() + update.callback_query = callback_query + + abm = await adapter.convert_callback_query(update, _build_context()) + + assert abm is not None + assert abm.message_str == "approval:yes" + assert abm.sender.user_id == "42" + assert abm.group_id == "-100#7" + assert abm.session_id == "-100#7" + event = adapter.handle_msg.__globals__["TelegramPlatformEvent"]( + abm.message_str, + abm, + adapter.meta(), + abm.session_id, + adapter.client, + ) + assert event.is_button_interaction() + assert event.get_interaction_custom_id() == "approval:yes" + assert event.get_interaction_data() == "approval:yes" + assert event.get_interaction_user_id() == "42" + + await event.answer_interaction("done", show_alert=True, cache_time=5) + + callback_query.answer.assert_awaited_once_with( + text="done", + show_alert=True, + url=None, + cache_time=5, + ) + + +def test_telegram_application_builder_uses_adapter_proxy_config(): + TelegramPlatformAdapter = _load_telegram_adapter() + module_globals = TelegramPlatformAdapter.__init__.__globals__ + builder = MockTelegramBuilder.create_application_builder() + + with patch.dict( + module_globals, + { + "ApplicationBuilder": MagicMock(return_value=builder), + "AsyncIOScheduler": MagicMock( + return_value=MockTelegramBuilder.create_scheduler() + ), + }, + ): + TelegramPlatformAdapter( + make_platform_config( + "telegram", + telegram_proxy="http://127.0.0.1:7890", + telegram_get_updates_proxy="http://127.0.0.1:7891", + ), + {}, + asyncio.Queue(), + ) + + builder.proxy.assert_called_once_with("http://127.0.0.1:7890") + builder.get_updates_proxy.assert_called_once_with("http://127.0.0.1:7891") + app = builder.build.return_value + assert app.add_handler.call_count == 2 + + +@pytest.mark.asyncio +async def test_telegram_command_registration_filters_plugins_and_uses_scopes(): + TelegramPlatformAdapter = _load_telegram_adapter() + module_globals = TelegramPlatformAdapter.__init__.__globals__ + + async def _handler(*args, **kwargs): + return None + + @dataclass + class _Plugin: + name: str + display_name: str + root_dir_name: str + module_path: str + activated: bool = True + + @dataclass + class _Handler: + handler_module_path: str + event_filters: list + desc: str + enabled: bool = True + + handlers = [ + _Handler( + "plugins.allowed.main", + [module_globals["CommandFilter"]("allowed", alias={"alias"})], + "Allowed command", + ), + _Handler( + "plugins.denied.main", + [module_globals["CommandFilter"]("denied")], + "Denied command", + ), + ] + star_map = { + "plugins.allowed.main": _Plugin( + "allowed_plugin", + "Allowed", + "allowed", + "plugins.allowed.main", + ), + "plugins.denied.main": _Plugin( + "denied_plugin", + "Denied", + "denied", + "plugins.denied.main", + ), + } + adapter = TelegramPlatformAdapter( + make_platform_config( + "telegram", + telegram_command_registered_plugins=["allowed_plugin"], + telegram_command_scopes=[ + {"type": "default", "language_code": "en"}, + {"type": "chat", "chat_id": 12345, "language_code": "zh"}, + ], + ), + {}, + asyncio.Queue(), + ) + + with patch.dict( + module_globals, {"star_handlers_registry": handlers, "star_map": star_map} + ): + await adapter.register_commands() + + assert adapter.client.delete_my_commands.await_count == 2 + assert adapter.client.set_my_commands.await_count == 2 + first_commands = adapter.client.set_my_commands.await_args_list[0].args[0] + assert [cmd.command for cmd in first_commands] == ["alias", "allowed"] + assert {cmd.description for cmd in first_commands} == {"Allowed command"} + first_kwargs = adapter.client.set_my_commands.await_args_list[0].kwargs + second_kwargs = adapter.client.set_my_commands.await_args_list[1].kwargs + assert first_kwargs["language_code"] == "en" + assert second_kwargs["language_code"] == "zh" + assert "BotCommandScopeDefault" in repr(type(first_kwargs["scope"])) + assert "BotCommandScopeChat" in repr(type(second_kwargs["scope"])) + + +@pytest.mark.asyncio +async def test_telegram_command_registration_skips_when_command_count_exceeds_limit(): + TelegramPlatformAdapter = _load_telegram_adapter() + adapter = TelegramPlatformAdapter( + make_platform_config("telegram"), + {}, + asyncio.Queue(), + ) + adapter.collect_commands = MagicMock( + return_value=[ + MagicMock(command=f"cmd{i}", description="desc") for i in range(101) + ], + ) + + await adapter.register_commands() + + adapter.client.delete_my_commands.assert_not_awaited() + adapter.client.set_my_commands.assert_not_awaited()