Compare commits

...

1 Commits

4 changed files with 67 additions and 20 deletions

View File

@@ -1,7 +1,6 @@
import asyncio
import os
import time
import uuid
from collections.abc import Callable, Coroutine
from pathlib import Path
from typing import Any
@@ -92,32 +91,37 @@ class WebChatAdapter(Platform):
active_request_ids = self._webchat_queue_mgr.list_back_request_ids(
conversation_id
)
subscription_request_ids = [
req_id for req_id in active_request_ids if req_id.startswith("ws_sub_")
stream_request_ids = [
req_id for req_id in active_request_ids if not req_id.startswith("ws_sub_")
]
target_request_ids = subscription_request_ids or active_request_ids
target_request_ids = stream_request_ids or active_request_ids
if target_request_ids:
for request_id in target_request_ids:
await WebChatMessageEvent._send(
request_id,
message_chain,
session.session_id,
if not target_request_ids:
# No active streams to consume this proactive message.
# Persist directly and return to avoid creating an unused queue.
try:
await self._save_proactive_message(conversation_id, message_chain)
except Exception as e:
logger.error(
f"[WebChatAdapter] Failed to save proactive message: {e}",
exc_info=True,
)
else:
message_id = f"active_{uuid.uuid4()!s}"
await super().send_by_session(session, message_chain)
return
for request_id in target_request_ids:
await WebChatMessageEvent._send(
message_id,
request_id,
message_chain,
session.session_id,
streaming=True,
emit_complete=True,
)
should_persist = (
bool(subscription_request_ids)
or not active_request_ids
or all(req_id.startswith("active_") for req_id in active_request_ids)
)
if should_persist:
# If only passive subscription queues exist for this conversation,
# keep a proactive save as a fallback since they are not tied to
# the normal streaming persistence path.
if not stream_request_ids:
try:
await self._save_proactive_message(conversation_id, message_chain)
except Exception as e:

View File

@@ -34,6 +34,7 @@ class WebChatMessageEvent(AstrMessageEvent):
message: MessageChain | None,
session_id: str,
streaming: bool = False,
emit_complete: bool = False,
) -> str | None:
request_id = str(message_id)
conversation_id = _extract_conversation_id(session_id)
@@ -127,6 +128,17 @@ class WebChatMessageEvent(AstrMessageEvent):
else:
logger.debug(f"webchat 忽略: {comp.type}")
if emit_complete:
await web_chat_back_queue.put(
{
"type": "complete",
"data": data,
"streaming": streaming,
"chain_type": message.type,
"message_id": message_id,
},
)
return data
async def send(self, message: MessageChain | None) -> None:

View File

@@ -455,7 +455,10 @@ class OpenApiRoute(Route):
if msg_type == "end":
break
if (streaming and msg_type == "complete") or not streaming:
if chain_type in ("tool_call", "tool_call_result"):
if chain_type in (
"tool_call",
"tool_call_result",
):
continue
try:
refs = self.chat_route._extract_web_search_refs(

View File

@@ -91,6 +91,15 @@ type WsStreamContext = {
const STREAMING_STORAGE_KEY = 'enableStreaming';
const TRANSPORT_MODE_STORAGE_KEY = 'chatTransportMode';
const HIDDEN_TOOL_CALL_NAMES = new Set(['send_message_to_user']);
function isHiddenToolCall(toolCall: ToolCall | { name?: unknown } | null | undefined): boolean {
if (!toolCall || typeof toolCall !== 'object') {
return false;
}
const name = toolCall.name;
return typeof name === 'string' && HIDDEN_TOOL_CALL_NAMES.has(name);
}
export function useMessages(
currSessionId: Ref<string>,
@@ -489,6 +498,9 @@ export function useMessages(
} catch {
return;
}
if (isHiddenToolCall(toolCallData)) {
return;
}
const toolCall: ToolCall = {
id: toolCallData.id,
@@ -528,6 +540,9 @@ export function useMessages(
} catch {
return;
}
if (isHiddenToolCall(resultData)) {
return;
}
if (messageObj) {
for (const part of messageObj.message) {
@@ -658,7 +673,18 @@ export function useMessages(
// 如果 message 是数组 (新格式),遍历并填充 embedded 字段
if (Array.isArray(message)) {
const filteredMessage: MessagePart[] = [];
for (const part of message as MessagePart[]) {
if (part.type === 'tool_call' && Array.isArray(part.tool_calls)) {
const visibleToolCalls = part.tool_calls.filter(
(toolCall) => !isHiddenToolCall(toolCall),
);
if (!visibleToolCalls.length) {
continue;
}
part.tool_calls = visibleToolCalls;
}
if (part.type === 'image' && part.attachment_id) {
part.embedded_url = await getAttachment(part.attachment_id);
} else if (part.type === 'record' && part.attachment_id) {
@@ -671,7 +697,9 @@ export function useMessages(
};
}
// plain, reply, tool_call, video 保持原样
filteredMessage.push(part);
}
content.message = filteredMessage;
}
// 处理 agent_stats (snake_case -> camelCase)