Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions astrbot/core/platform/sources/telegram/tg_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,18 @@ async def convert_message(
logger.warning("Received an update without a message.")
return None

def _apply_caption() -> None:
if update.message.caption:
message.message_str = update.message.caption
message.message.append(Comp.Plain(message.message_str))
if update.message.caption and update.message.caption_entities:
for entity in update.message.caption_entities:
if entity.type == "mention":
name = update.message.caption[
entity.offset + 1 : entity.offset + entity.length
]
message.message.append(Comp.At(qq=name, name=name))

message = AstrBotMessage()
message.session_id = str(update.message.chat.id)

Expand Down Expand Up @@ -454,16 +466,7 @@ async def convert_message(
photo = update.message.photo[-1] # get the largest photo
file = await photo.get_file()
message.message.append(Comp.Image(file=file.file_path, url=file.file_path))
if update.message.caption:
message.message_str = update.message.caption
message.message.append(Comp.Plain(message.message_str))
if update.message.caption_entities:
for entity in update.message.caption_entities:
if entity.type == "mention":
name = message.message_str[
entity.offset + 1 : entity.offset + entity.length
]
message.message.append(Comp.At(qq=name, name=name))
_apply_caption()

elif update.message.sticker:
# 将sticker当作图片处理
Expand All @@ -486,6 +489,7 @@ async def convert_message(
message.message.append(
Comp.File(file=file_path, name=file_name, url=file_path)
)
_apply_caption()

elif update.message.video:
file = await update.message.video.get_file()
Expand All @@ -497,6 +501,7 @@ async def convert_message(
)
else:
message.message.append(Comp.Video(file=file_path, path=file.file_path))
_apply_caption()

return message

Expand Down
3 changes: 2 additions & 1 deletion tests/fixtures/mocks/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def create_mock_telegram_modules():

mock_telegram_ext = MagicMock()
mock_telegram_ext.ApplicationBuilder = MagicMock
mock_telegram_ext.ContextTypes = MagicMock
mock_telegram_ext.ContextTypes = MagicMock()
mock_telegram_ext.ContextTypes.DEFAULT_TYPE = MagicMock
mock_telegram_ext.ExtBot = MagicMock
mock_telegram_ext.filters = MagicMock()
mock_telegram_ext.filters.ALL = MagicMock()
Expand Down
108 changes: 108 additions & 0 deletions tests/test_telegram_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import asyncio
import importlib
import sys
from unittest.mock import MagicMock, patch

import pytest

import astrbot.api.message_components as Comp
from tests.fixtures.helpers import (
create_mock_file,
create_mock_update,
make_platform_config,
)
from tests.fixtures.mocks.telegram import create_mock_telegram_modules

_TELEGRAM_PLATFORM_ADAPTER = None


def _load_telegram_adapter():
global _TELEGRAM_PLATFORM_ADAPTER
if _TELEGRAM_PLATFORM_ADAPTER is not None:
return _TELEGRAM_PLATFORM_ADAPTER

mocks = create_mock_telegram_modules()
patched_modules = {
"telegram": mocks["telegram"],
"telegram.constants": mocks["telegram"].constants,
"telegram.error": mocks["telegram"].error,
"telegram.ext": mocks["telegram.ext"],
"telegramify_markdown": mocks["telegramify_markdown"],
"apscheduler": mocks["apscheduler"],
"apscheduler.schedulers": mocks["apscheduler"].schedulers,
"apscheduler.schedulers.asyncio": mocks["apscheduler"].schedulers.asyncio,
"apscheduler.schedulers.background": mocks["apscheduler"].schedulers.background,
}
with patch.dict(sys.modules, patched_modules):
sys.modules.pop("astrbot.core.platform.sources.telegram.tg_adapter", None)
module = importlib.import_module("astrbot.core.platform.sources.telegram.tg_adapter")
_TELEGRAM_PLATFORM_ADAPTER = module.TelegramPlatformAdapter
return _TELEGRAM_PLATFORM_ADAPTER


def _build_context() -> MagicMock:
context = MagicMock()
context.bot.username = "test_bot"
context.bot.id = 12345678
return context


@pytest.mark.asyncio
async def test_telegram_document_caption_populates_message_text_and_plain():
TelegramPlatformAdapter = _load_telegram_adapter()
adapter = TelegramPlatformAdapter(
make_platform_config("telegram"),
{},
asyncio.Queue(),
)
document = create_mock_file("https://api.telegram.org/file/test/report.md")
document.file_name = "report.md"
mention = MagicMock(type="mention", offset=0, length=6)
update = create_mock_update(
message_text=None,
document=document,
caption="@alice 请总结这份文档",
caption_entities=[mention],
)

result = await adapter.convert_message(update, _build_context())

assert result is not None
assert result.message_str == "@alice 请总结这份文档"
assert any(isinstance(component, Comp.File) for component in result.message)
assert any(
isinstance(component, Comp.Plain)
and component.text == "@alice 请总结这份文档"
for component in result.message
)
assert any(
isinstance(component, Comp.At) and component.qq == "alice"
for component in result.message
)


@pytest.mark.asyncio
async def test_telegram_video_caption_populates_message_text_and_plain():
TelegramPlatformAdapter = _load_telegram_adapter()
adapter = TelegramPlatformAdapter(
make_platform_config("telegram"),
{},
asyncio.Queue(),
)
video = create_mock_file("https://api.telegram.org/file/test/lesson.mp4")
video.file_name = "lesson.mp4"
update = create_mock_update(
message_text=None,
video=video,
caption="这段视频讲了什么",
)

result = await adapter.convert_message(update, _build_context())

assert result is not None
assert result.message_str == "这段视频讲了什么"
assert any(isinstance(component, Comp.Video) for component in result.message)
assert any(
isinstance(component, Comp.Plain) and component.text == "这段视频讲了什么"
for component in result.message
)
Loading