mirror of
https://github.com/AstrBotDevs/AstrBot
synced 2026-07-01 01:10:21 +08:00
* fix: unify media reference handling * fix: accept bare base64 record media refs * chore: update agents.md * fix: unify file URI handling across media components and utilities * fix: unify media reference type handling with MediaRefStr alias * Potential fix for pull request finding 'CodeQL / Incomplete URL substring sanitization' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Update astrbot/core/platform/sources/discord/discord_platform_adapter.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix: unify media handling and improve base64 decoding across components * fix: simplify client_kwargs type definition and enhance media message handling in platform adapter * fix: unify media utility documentation and enhance function descriptions * perf: drop "pilk" requirement, improve audio outbound for tencent-related IM apps which using silk * fix: unify Tencent Silk audio handling and enhance media resolver functionality --- - Centralize media reference materialization and base64 resolution for local paths, http(s), base64://, data URIs, and legacy bare base64 payloads. - Normalize incoming Record audio to wav and Image media to temporary jpg during preprocess, with event-scoped cleanup. - Reuse the shared media resolver across OpenAI, Gemini, Anthropic, MiMo, DeerFlow, STT, and platform media paths while sanitizing logs and cleaning temporary conversion outputs. - Ensure generated TTS audio is tracked for cleanup after the event finishes. fix #8676 fix #8543 fix #7588 fix #7580 fix #8030 fix #8034 fix #7461 fix #7565 fix #6509 fix #7144 fix #7795 --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
395 lines
13 KiB
Python
395 lines
13 KiB
Python
import asyncio
|
|
import importlib
|
|
import sys
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
import astrbot.api.message_components as Comp
|
|
from astrbot.core.platform.register import unregister_platform_adapters_by_module
|
|
from tests.fixtures.helpers import (
|
|
NoopAwaitable,
|
|
create_mock_file,
|
|
create_mock_update,
|
|
make_platform_config,
|
|
)
|
|
from tests.fixtures.mocks.telegram import (
|
|
MockTelegramBuilder,
|
|
MockTelegramNetworkError,
|
|
create_mock_telegram_modules,
|
|
)
|
|
|
|
_TELEGRAM_PLATFORM_ADAPTER = None
|
|
_TELEGRAM_PLATFORM_EVENT = None
|
|
_TELEGRAM_MODULES: dict[str, object] = {}
|
|
|
|
|
|
def _build_telegram_patched_modules():
|
|
mocks = create_mock_telegram_modules()
|
|
return {
|
|
"telegram": mocks["telegram"],
|
|
"telegram.constants": mocks["telegram"].constants,
|
|
"telegram.error": mocks["telegram"].error,
|
|
"telegram.ext": mocks["telegram.ext"],
|
|
"telegramify_markdown": mocks["telegramify_markdown"],
|
|
"apscheduler": mocks["apscheduler"],
|
|
"apscheduler.schedulers": mocks["apscheduler"].schedulers,
|
|
"apscheduler.schedulers.asyncio": mocks["apscheduler"].schedulers.asyncio,
|
|
"apscheduler.schedulers.background": mocks["apscheduler"].schedulers.background,
|
|
}
|
|
|
|
|
|
def _load_telegram_module(module_name: str):
|
|
module = _TELEGRAM_MODULES.get(module_name)
|
|
if module is not None:
|
|
return module
|
|
|
|
with patch.dict(sys.modules, _build_telegram_patched_modules()):
|
|
if module_name == "astrbot.core.platform.sources.telegram.tg_adapter":
|
|
unregister_platform_adapters_by_module(module_name)
|
|
sys.modules.pop(module_name, None)
|
|
module = importlib.import_module(module_name)
|
|
|
|
sys.modules[module_name] = module
|
|
_TELEGRAM_MODULES[module_name] = module
|
|
return module
|
|
|
|
|
|
def _load_telegram_adapter():
|
|
global _TELEGRAM_PLATFORM_ADAPTER
|
|
if _TELEGRAM_PLATFORM_ADAPTER is not None:
|
|
return _TELEGRAM_PLATFORM_ADAPTER
|
|
|
|
module = _load_telegram_module("astrbot.core.platform.sources.telegram.tg_adapter")
|
|
_TELEGRAM_PLATFORM_ADAPTER = module.TelegramPlatformAdapter
|
|
return _TELEGRAM_PLATFORM_ADAPTER
|
|
|
|
|
|
def _load_telegram_platform_event():
|
|
global _TELEGRAM_PLATFORM_EVENT
|
|
if _TELEGRAM_PLATFORM_EVENT is not None:
|
|
return _TELEGRAM_PLATFORM_EVENT
|
|
|
|
module = _load_telegram_module("astrbot.core.platform.sources.telegram.tg_event")
|
|
_TELEGRAM_PLATFORM_EVENT = module.TelegramPlatformEvent
|
|
return _TELEGRAM_PLATFORM_EVENT
|
|
|
|
|
|
def _build_context() -> MagicMock:
|
|
context = MagicMock()
|
|
context.bot.username = "test_bot"
|
|
context.bot.id = 12345678
|
|
return context
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_document_caption_populates_message_text_and_plain():
|
|
TelegramPlatformAdapter = _load_telegram_adapter()
|
|
adapter = TelegramPlatformAdapter(
|
|
make_platform_config("telegram"),
|
|
{},
|
|
asyncio.Queue(),
|
|
)
|
|
document = create_mock_file("https://api.telegram.org/file/test/report.md")
|
|
document.file_name = "report.md"
|
|
mention = MagicMock(type="mention", offset=0, length=6)
|
|
update = create_mock_update(
|
|
message_text=None,
|
|
document=document,
|
|
caption="@alice 请总结这份文档",
|
|
caption_entities=[mention],
|
|
)
|
|
|
|
result = await adapter.convert_message(update, _build_context())
|
|
|
|
assert result is not None
|
|
assert result.message_str == "@alice 请总结这份文档"
|
|
assert any(isinstance(component, Comp.File) for component in result.message)
|
|
assert any(
|
|
isinstance(component, Comp.Plain) and component.text == "@alice 请总结这份文档"
|
|
for component in result.message
|
|
)
|
|
assert any(
|
|
isinstance(component, Comp.At) and component.qq == "alice"
|
|
for component in result.message
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_video_caption_populates_message_text_and_plain():
|
|
TelegramPlatformAdapter = _load_telegram_adapter()
|
|
adapter = TelegramPlatformAdapter(
|
|
make_platform_config("telegram"),
|
|
{},
|
|
asyncio.Queue(),
|
|
)
|
|
video = create_mock_file("https://api.telegram.org/file/test/lesson.mp4")
|
|
video.file_name = "lesson.mp4"
|
|
update = create_mock_update(
|
|
message_text=None,
|
|
video=video,
|
|
caption="这段视频讲了什么",
|
|
)
|
|
|
|
result = await adapter.convert_message(update, _build_context())
|
|
|
|
assert result is not None
|
|
assert result.message_str == "这段视频讲了什么"
|
|
assert any(isinstance(component, Comp.Video) for component in result.message)
|
|
assert any(
|
|
isinstance(component, Comp.Plain) and component.text == "这段视频讲了什么"
|
|
for component in result.message
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_voice_message_creates_record_component(tmp_path):
|
|
TelegramPlatformAdapter = _load_telegram_adapter()
|
|
adapter = TelegramPlatformAdapter(
|
|
make_platform_config("telegram"),
|
|
{},
|
|
asyncio.Queue(),
|
|
)
|
|
voice = create_mock_file("https://api.telegram.org/file/test/voice.oga")
|
|
update = create_mock_update(
|
|
message_text=None,
|
|
voice=voice,
|
|
)
|
|
wav_path = tmp_path / "voice.oga.wav"
|
|
convert_message_globals = adapter.convert_message.__func__.__globals__
|
|
|
|
with (
|
|
patch.dict(
|
|
convert_message_globals,
|
|
{
|
|
"get_astrbot_temp_path": MagicMock(return_value=str(tmp_path)),
|
|
"download_file": AsyncMock(),
|
|
},
|
|
),
|
|
patch(
|
|
"astrbot.core.utils.media_utils.ensure_wav",
|
|
AsyncMock(return_value=str(wav_path)),
|
|
),
|
|
):
|
|
result = await adapter.convert_message(update, _build_context())
|
|
|
|
assert result is not None
|
|
assert len(result.message) == 1
|
|
assert isinstance(result.message[0], Comp.Record)
|
|
assert result.message[0].file == str(wav_path)
|
|
assert result.message[0].path == str(wav_path)
|
|
assert result.message[0].url == str(wav_path)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_final_segment_splits_long_markdown_messages():
|
|
TelegramPlatformEvent = _load_telegram_platform_event()
|
|
client = MagicMock()
|
|
client.send_message = AsyncMock()
|
|
event = TelegramPlatformEvent("msg", MagicMock(), MagicMock(), "session", client)
|
|
|
|
delta = "A" * (TelegramPlatformEvent.MAX_MESSAGE_LENGTH + 32)
|
|
payload = {"chat_id": "123456"}
|
|
|
|
await event._send_final_segment(delta, payload)
|
|
|
|
assert client.send_message.await_count == 2
|
|
first_call = client.send_message.await_args_list[0].kwargs
|
|
second_call = client.send_message.await_args_list[1].kwargs
|
|
assert len(first_call["text"]) == TelegramPlatformEvent.MAX_MESSAGE_LENGTH
|
|
assert len(second_call["text"]) == 32
|
|
assert first_call["parse_mode"] == "MarkdownV2"
|
|
assert second_call["parse_mode"] == "MarkdownV2"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_final_segment_splits_long_plaintext_when_markdown_fails():
|
|
TelegramPlatformEvent = _load_telegram_platform_event()
|
|
client = MagicMock()
|
|
client.send_message = AsyncMock()
|
|
event = TelegramPlatformEvent("msg", MagicMock(), MagicMock(), "session", client)
|
|
|
|
delta = "B" * (TelegramPlatformEvent.MAX_MESSAGE_LENGTH + 18)
|
|
payload = {"chat_id": "123456"}
|
|
|
|
with patch(
|
|
"astrbot.core.platform.sources.telegram.tg_event.telegramify_markdown.markdownify",
|
|
side_effect=Exception("boom"),
|
|
):
|
|
await event._send_final_segment(delta, payload)
|
|
|
|
assert client.send_message.await_count == 2
|
|
first_call = client.send_message.await_args_list[0].kwargs
|
|
second_call = client.send_message.await_args_list[1].kwargs
|
|
assert len(first_call["text"]) == TelegramPlatformEvent.MAX_MESSAGE_LENGTH
|
|
assert len(second_call["text"]) == 18
|
|
assert "parse_mode" not in first_call
|
|
assert "parse_mode" not in second_call
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_polling_error_requests_rebuild_after_threshold():
|
|
TelegramPlatformAdapter = _load_telegram_adapter()
|
|
adapter = TelegramPlatformAdapter(
|
|
make_platform_config("telegram"),
|
|
{},
|
|
asyncio.Queue(),
|
|
)
|
|
adapter._loop = asyncio.get_running_loop()
|
|
|
|
assert not adapter._polling_recovery_requested.is_set()
|
|
|
|
for _ in range(adapter._polling_recovery_threshold):
|
|
adapter._on_polling_error(MockTelegramNetworkError("proxy disconnected"))
|
|
|
|
await asyncio.sleep(0)
|
|
|
|
assert adapter._polling_recovery_requested.is_set()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_run_rebuilds_application_after_repeated_polling_errors():
|
|
TelegramPlatformAdapter = _load_telegram_adapter()
|
|
module_globals = TelegramPlatformAdapter.__init__.__globals__
|
|
app_one = MockTelegramBuilder.create_application()
|
|
app_one.updater.running = True
|
|
app_two = MockTelegramBuilder.create_application()
|
|
app_two.updater.running = True
|
|
created_apps = [app_one, app_two]
|
|
|
|
builder = MagicMock()
|
|
builder.token.return_value = builder
|
|
builder.base_url.return_value = builder
|
|
builder.base_file_url.return_value = builder
|
|
builder.build.side_effect = created_apps
|
|
|
|
adapter = None
|
|
|
|
def start_polling_side_effect(*args, **kwargs):
|
|
nonlocal adapter
|
|
error_callback = kwargs["error_callback"]
|
|
assert adapter is not None
|
|
|
|
async def _emit_errors():
|
|
await asyncio.sleep(0)
|
|
for _ in range(adapter._polling_recovery_threshold):
|
|
error_callback(MockTelegramNetworkError("proxy disconnected"))
|
|
|
|
asyncio.create_task(_emit_errors())
|
|
return NoopAwaitable()
|
|
|
|
app_one.updater.start_polling.side_effect = start_polling_side_effect
|
|
|
|
async def second_start_polling(*args, **kwargs):
|
|
assert adapter is not None
|
|
adapter._terminating = True
|
|
|
|
app_two.updater.start_polling.side_effect = second_start_polling
|
|
|
|
with patch.dict(
|
|
module_globals,
|
|
{
|
|
"ApplicationBuilder": MagicMock(return_value=builder),
|
|
"AsyncIOScheduler": MagicMock(
|
|
return_value=MockTelegramBuilder.create_scheduler()
|
|
),
|
|
},
|
|
):
|
|
adapter = TelegramPlatformAdapter(
|
|
make_platform_config("telegram"),
|
|
{},
|
|
asyncio.Queue(),
|
|
)
|
|
await adapter.run()
|
|
|
|
assert builder.build.call_count == 2
|
|
app_one.updater.stop.assert_awaited()
|
|
app_one.bot.delete_my_commands.assert_not_awaited()
|
|
app_one.stop.assert_awaited()
|
|
app_one.shutdown.assert_awaited()
|
|
app_two.initialize.assert_awaited()
|
|
app_two.start.assert_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_recreate_application_is_skipped_during_termination():
|
|
TelegramPlatformAdapter = _load_telegram_adapter()
|
|
adapter = TelegramPlatformAdapter(
|
|
make_platform_config("telegram"),
|
|
{},
|
|
asyncio.Queue(),
|
|
)
|
|
adapter._terminating = True
|
|
adapter._polling_recovery_requested.set()
|
|
|
|
await adapter._recreate_application()
|
|
|
|
assert not adapter._polling_recovery_requested.is_set()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_telegram_run_rebuilds_fresh_application_after_recreate_init_failure():
|
|
TelegramPlatformAdapter = _load_telegram_adapter()
|
|
module_globals = TelegramPlatformAdapter.__init__.__globals__
|
|
app_one = MockTelegramBuilder.create_application()
|
|
app_one.updater.running = True
|
|
app_two = MockTelegramBuilder.create_application()
|
|
app_three = MockTelegramBuilder.create_application()
|
|
app_three.updater.running = True
|
|
created_apps = [app_one, app_two, app_three]
|
|
|
|
builder = MagicMock()
|
|
builder.token.return_value = builder
|
|
builder.base_url.return_value = builder
|
|
builder.base_file_url.return_value = builder
|
|
builder.build.side_effect = created_apps
|
|
|
|
adapter = None
|
|
|
|
def first_start_polling(*args, **kwargs):
|
|
nonlocal adapter
|
|
error_callback = kwargs["error_callback"]
|
|
assert adapter is not None
|
|
|
|
async def _emit_errors():
|
|
await asyncio.sleep(0)
|
|
for _ in range(adapter._polling_recovery_threshold):
|
|
error_callback(MockTelegramNetworkError("proxy disconnected"))
|
|
|
|
asyncio.create_task(_emit_errors())
|
|
return NoopAwaitable()
|
|
|
|
app_one.updater.start_polling.side_effect = first_start_polling
|
|
app_two.initialize.side_effect = TimeoutError("init timeout")
|
|
|
|
async def final_start_polling(*args, **kwargs):
|
|
assert adapter is not None
|
|
adapter._terminating = True
|
|
|
|
app_three.updater.start_polling.side_effect = final_start_polling
|
|
|
|
with patch.dict(
|
|
module_globals,
|
|
{
|
|
"ApplicationBuilder": MagicMock(return_value=builder),
|
|
"AsyncIOScheduler": MagicMock(
|
|
return_value=MockTelegramBuilder.create_scheduler()
|
|
),
|
|
},
|
|
):
|
|
adapter = TelegramPlatformAdapter(
|
|
make_platform_config(
|
|
"telegram",
|
|
telegram_polling_restart_delay=0.1,
|
|
),
|
|
{},
|
|
asyncio.Queue(),
|
|
)
|
|
await adapter.run()
|
|
|
|
assert builder.build.call_count == 3
|
|
app_two.stop.assert_awaited()
|
|
app_two.shutdown.assert_awaited()
|
|
app_three.initialize.assert_awaited()
|
|
app_three.start.assert_awaited()
|