Compare commits

...

2 Commits

Author SHA1 Message Date
Soulter
fc8767bbc8 fix 2026-06-06 00:45:55 +08:00
Soulter
a308ce9496 fix: keep DingTalk stream reconnecting 2026-06-06 00:19:52 +08:00
2 changed files with 129 additions and 23 deletions

View File

@@ -3,6 +3,7 @@ import json
import threading
import time
import uuid
from concurrent.futures import CancelledError as FutureCancelledError
from pathlib import Path
from typing import Literal, NoReturn, cast
@@ -34,6 +35,18 @@ from astrbot.core.utils.media_utils import (
from ...register import register_platform_adapter
from .dingtalk_event import DingtalkMessageEvent
DINGTALK_RECONNECT_INITIAL_DELAY = 10
DINGTALK_RECONNECT_MAX_DELAY = 300
DINGTALK_RECONNECT_STABLE_SECONDS = 300
def _dingtalk_reconnect_delay(retry_count: int) -> int:
safe_retry_count = max(retry_count, 1)
return min(
DINGTALK_RECONNECT_INITIAL_DELAY * 2 ** (safe_retry_count - 1),
DINGTALK_RECONNECT_MAX_DELAY,
)
class MyEventHandler(dingtalk_stream.EventHandler):
async def process(self, event: dingtalk_stream.EventMessage):
@@ -83,7 +96,8 @@ class DingtalkPlatformAdapter(Platform):
self.client,
)
self.client_ = client # 用于 websockets 的 client
self._shutdown_event: threading.Event | None = None
self._shutdown_event = threading.Event()
self._terminated_event = threading.Event()
def _id_to_sid(self, dingtalk_id: str | None) -> str:
if not dingtalk_id:
@@ -750,56 +764,70 @@ class DingtalkPlatformAdapter(Platform):
# 钉钉的 SDK 并没有实现真正的异步start() 里面有堵塞方法。
# SDK 内部已有 while True 重连循环,但需要监控 task 状态,
# 如果 task 意外退出则重新启动。
MAX_RETRIES = 5
RETRY_INTERVAL = 10
def start_client(loop: asyncio.AbstractEventLoop) -> None:
retry_count = 0
def handle_retry(error_msg: str) -> bool:
"""处理重试逻辑,返回 True 表示需要继续重试False 表示放弃。"""
def handle_retry(error_msg: str, run_seconds: float) -> None:
nonlocal retry_count
logger.error(error_msg)
if run_seconds >= DINGTALK_RECONNECT_STABLE_SECONDS:
retry_count = 0
retry_count += 1
if retry_count < MAX_RETRIES:
logger.info(f"钉钉适配器尝试重连 ({retry_count}/{MAX_RETRIES})...")
time.sleep(RETRY_INTERVAL)
return True
logger.error("钉钉适配器重连失败,已达最大重试次数")
return False
delay = _dingtalk_reconnect_delay(retry_count)
logger.info(
f"钉钉适配器将在 {delay} 秒后重连 (第 {retry_count} 次)...",
)
self._terminated_event.wait(delay)
while retry_count < MAX_RETRIES:
while not self._terminated_event.is_set():
task = None
should_cancel_task = False
start_time = time.monotonic()
try:
self._shutdown_event = threading.Event()
task = loop.create_task(self.client_.start())
self._shutdown_event.clear()
if self._terminated_event.is_set():
return
task = asyncio.run_coroutine_threadsafe(self.client_.start(), loop)
# 当 task 完成时唤醒线程(无论是正常退出还是异常退出)
task.add_done_callback(lambda _: self._shutdown_event.set())
if self._terminated_event.is_set():
should_cancel_task = True
self._shutdown_event.set()
self._shutdown_event.wait()
if self._terminated_event.is_set():
return
if task.done():
try:
exc = task.exception()
except asyncio.CancelledError:
except (asyncio.CancelledError, FutureCancelledError):
logger.info("钉钉适配器 task 已取消")
return
if exc:
if "Graceful shutdown" in str(exc):
logger.info("钉钉适配器已被关闭")
return
if handle_retry(f"钉钉 SDK task 异常退出: {exc}"):
continue
return
should_cancel_task = True
handle_retry(
f"钉钉 SDK task 异常退出: {exc}",
time.monotonic() - start_time,
)
continue
# task 仍在运行shutdown_event 被设置(正常关闭)
return
except Exception as e:
if "Graceful shutdown" in str(e):
logger.info("钉钉适配器已被关闭")
return
if not handle_retry(f"钉钉机器人启动失败: {e}"):
return
should_cancel_task = True
handle_retry(
f"钉钉机器人启动失败: {e}",
time.monotonic() - start_time,
)
continue
finally:
# 仅在重试/失败路径取消 task正常关闭不取消
if task is not None and not task.done() and retry_count > 0:
if task is not None and not task.done() and should_cancel_task:
task.cancel()
loop = asyncio.get_running_loop()
@@ -809,11 +837,11 @@ class DingtalkPlatformAdapter(Platform):
def monkey_patch_close() -> NoReturn:
raise KeyboardInterrupt("Graceful shutdown")
self._terminated_event.set()
self._shutdown_event.set()
if self.client_.websocket is not None:
self.client_.open_connection = monkey_patch_close
await self.client_.websocket.close(code=1000, reason="Graceful shutdown")
if self._shutdown_event is not None:
self._shutdown_event.set()
def get_client(self):
return self.client

View File

@@ -0,0 +1,78 @@
import asyncio
import threading
import pytest
from astrbot.core.platform.sources.dingtalk import dingtalk_adapter
from astrbot.core.platform.sources.dingtalk.dingtalk_adapter import (
DINGTALK_RECONNECT_INITIAL_DELAY,
DINGTALK_RECONNECT_MAX_DELAY,
DingtalkPlatformAdapter,
_dingtalk_reconnect_delay,
)
def test_dingtalk_reconnect_delay_uses_exponential_backoff():
assert [_dingtalk_reconnect_delay(i) for i in range(1, 5)] == [
10,
20,
40,
80,
]
def test_dingtalk_reconnect_delay_has_minimum_delay():
assert _dingtalk_reconnect_delay(0) == DINGTALK_RECONNECT_INITIAL_DELAY
assert _dingtalk_reconnect_delay(-1) == DINGTALK_RECONNECT_INITIAL_DELAY
def test_dingtalk_reconnect_delay_is_capped():
assert _dingtalk_reconnect_delay(20) == DINGTALK_RECONNECT_MAX_DELAY
@pytest.mark.asyncio
async def test_dingtalk_reconnect_delay_wakes_on_terminate(monkeypatch):
class ObservedEvent:
def __init__(self) -> None:
self._event = threading.Event()
self.wait_started = threading.Event()
self.wait_timeout: float | None = None
def is_set(self) -> bool:
return self._event.is_set()
def set(self) -> None:
self._event.set()
def wait(self, timeout: float | None = None) -> bool:
self.wait_timeout = timeout
self.wait_started.set()
return self._event.wait(timeout)
class FailingClient:
websocket = None
async def start(self) -> None:
raise RuntimeError("connect failed")
terminated_event = ObservedEvent()
adapter = DingtalkPlatformAdapter.__new__(DingtalkPlatformAdapter)
adapter.client_ = FailingClient()
adapter._shutdown_event = threading.Event()
adapter._terminated_event = terminated_event
monkeypatch.setattr(dingtalk_adapter, "_dingtalk_reconnect_delay", lambda _: 60)
run_task = asyncio.create_task(adapter.run())
try:
wait_started = await asyncio.to_thread(terminated_event.wait_started.wait, 1)
assert wait_started
assert terminated_event.wait_timeout == 60
await adapter.terminate()
await asyncio.wait_for(run_task, timeout=1)
finally:
if not run_task.done():
await adapter.terminate()
run_task.cancel()
await asyncio.gather(run_task, return_exceptions=True)