mirror of
https://github.com/AstrBotDevs/AstrBot
synced 2026-07-01 01:10:21 +08:00
79 lines
2.4 KiB
Python
79 lines
2.4 KiB
Python
import asyncio
|
|
import threading
|
|
|
|
import pytest
|
|
|
|
from astrbot.core.platform.sources.dingtalk import dingtalk_adapter
|
|
from astrbot.core.platform.sources.dingtalk.dingtalk_adapter import (
|
|
DINGTALK_RECONNECT_INITIAL_DELAY,
|
|
DINGTALK_RECONNECT_MAX_DELAY,
|
|
DingtalkPlatformAdapter,
|
|
_dingtalk_reconnect_delay,
|
|
)
|
|
|
|
|
|
def test_dingtalk_reconnect_delay_uses_exponential_backoff():
|
|
assert [_dingtalk_reconnect_delay(i) for i in range(1, 5)] == [
|
|
10,
|
|
20,
|
|
40,
|
|
80,
|
|
]
|
|
|
|
|
|
def test_dingtalk_reconnect_delay_has_minimum_delay():
|
|
assert _dingtalk_reconnect_delay(0) == DINGTALK_RECONNECT_INITIAL_DELAY
|
|
assert _dingtalk_reconnect_delay(-1) == DINGTALK_RECONNECT_INITIAL_DELAY
|
|
|
|
|
|
def test_dingtalk_reconnect_delay_is_capped():
|
|
assert _dingtalk_reconnect_delay(20) == DINGTALK_RECONNECT_MAX_DELAY
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dingtalk_reconnect_delay_wakes_on_terminate(monkeypatch):
|
|
class ObservedEvent:
|
|
def __init__(self) -> None:
|
|
self._event = threading.Event()
|
|
self.wait_started = threading.Event()
|
|
self.wait_timeout: float | None = None
|
|
|
|
def is_set(self) -> bool:
|
|
return self._event.is_set()
|
|
|
|
def set(self) -> None:
|
|
self._event.set()
|
|
|
|
def wait(self, timeout: float | None = None) -> bool:
|
|
self.wait_timeout = timeout
|
|
self.wait_started.set()
|
|
return self._event.wait(timeout)
|
|
|
|
class FailingClient:
|
|
websocket = None
|
|
|
|
async def start(self) -> None:
|
|
raise RuntimeError("connect failed")
|
|
|
|
terminated_event = ObservedEvent()
|
|
adapter = DingtalkPlatformAdapter.__new__(DingtalkPlatformAdapter)
|
|
adapter.client_ = FailingClient()
|
|
adapter._shutdown_event = threading.Event()
|
|
adapter._terminated_event = terminated_event
|
|
|
|
monkeypatch.setattr(dingtalk_adapter, "_dingtalk_reconnect_delay", lambda _: 60)
|
|
|
|
run_task = asyncio.create_task(adapter.run())
|
|
try:
|
|
wait_started = await asyncio.to_thread(terminated_event.wait_started.wait, 1)
|
|
assert wait_started
|
|
assert terminated_event.wait_timeout == 60
|
|
|
|
await adapter.terminate()
|
|
await asyncio.wait_for(run_task, timeout=1)
|
|
finally:
|
|
if not run_task.done():
|
|
await adapter.terminate()
|
|
run_task.cancel()
|
|
await asyncio.gather(run_task, return_exceptions=True)
|