Compare commits

...

5 Commits

Author SHA1 Message Date
Soulter
9ca82629d2 feat: enhance run_live_check function to support optional attachment_id and file upload, improve argument handling 2026-03-24 15:39:13 +08:00
Soulter
035f6ba217 feat: enhance Open API with file upload and retrieval endpoints, and update LiveChat message handling 2026-03-24 11:12:30 +08:00
Soulter
631d363dd0 feat: enhance Live Mode with tool call message handling and UI updates 2026-03-17 22:52:27 +08:00
Soulter
565c371e5c feat: enhance Live Mode with text input functionality and UI improvements
- Added a text input panel to allow users to send plain text messages while in Live Mode.
- Updated the LiveMode.vue component to handle text input and integrate it with WebSocket communication.
- Improved the layout and styling of the Live Mode interface for better user experience.
- Documented the new `text_input` message type in the Live API README.
2026-03-16 22:36:29 +08:00
Soulter
a1c9dc5d01 feat: add live API WebSocket endpoint with authentication and session management 2026-03-16 22:11:15 +08:00
13 changed files with 2121 additions and 573 deletions

View File

@@ -326,6 +326,7 @@ async def run_live_agent(
# 创建队列
text_queue: asyncio.Queue[str | None] = asyncio.Queue()
delta_queue: asyncio.Queue[str | None] = asyncio.Queue()
# audio_queue stored bytes or (text, bytes)
audio_queue: asyncio.Queue[bytes | tuple[str, bytes] | None] = asyncio.Queue()
@@ -334,6 +335,7 @@ async def run_live_agent(
_run_agent_feeder(
agent_runner,
text_queue,
delta_queue,
max_step,
show_tool_use,
show_tool_call_result,
@@ -353,32 +355,63 @@ async def run_live_agent(
# 3. 主循环:从 audio_queue 读取音频并 yield
try:
while True:
queue_item = await audio_queue.get()
delta_done = False
audio_done = False
while not (delta_done and audio_done):
task_sources: dict[asyncio.Task, str] = {}
if not delta_done:
task = asyncio.create_task(delta_queue.get())
task_sources[task] = "delta"
if not audio_done:
task = asyncio.create_task(audio_queue.get())
task_sources[task] = "audio"
if queue_item is None:
break
done, pending = await asyncio.wait(
list(task_sources),
return_when=asyncio.FIRST_COMPLETED,
)
text = None
if isinstance(queue_item, tuple):
text, audio_data = queue_item
else:
audio_data = queue_item
for task in pending:
task.cancel()
if pending:
await asyncio.gather(*pending, return_exceptions=True)
if not first_chunk_received:
# 记录首帧延迟(从开始处理到收到第一个音频块)
tts_first_frame_time = time.time() - tts_start_time
first_chunk_received = True
for task in done:
source = task_sources[task]
queue_item = task.result()
if source == "delta":
if queue_item is None:
delta_done = True
continue
yield MessageChain(
chain=[Plain(queue_item)], type="live_text_delta"
)
continue
# 将音频数据封装为 MessageChain
import base64
if queue_item is None:
audio_done = True
continue
audio_b64 = base64.b64encode(audio_data).decode("utf-8")
comps: list[BaseMessageComponent] = [Plain(audio_b64)]
if text:
comps.append(Json(data={"text": text}))
chain = MessageChain(chain=comps, type="audio_chunk")
yield chain
text = None
if isinstance(queue_item, tuple):
text, audio_data = queue_item
else:
audio_data = queue_item
if not first_chunk_received:
# 记录首帧延迟(从开始处理到收到第一个音频块)
tts_first_frame_time = time.time() - tts_start_time
first_chunk_received = True
# 将音频数据封装为 MessageChain
import base64
audio_b64 = base64.b64encode(audio_data).decode("utf-8")
comps: list[BaseMessageComponent] = [Plain(audio_b64)]
if text:
comps.append(Json(data={"text": text}))
chain = MessageChain(chain=comps, type="audio_chunk")
yield chain
except Exception as e:
logger.error(f"[Live Agent] 运行时发生错误: {e}", exc_info=True)
@@ -421,6 +454,7 @@ async def run_live_agent(
async def _run_agent_feeder(
agent_runner: AgentRunner,
text_queue: asyncio.Queue,
delta_queue: asyncio.Queue,
max_step: int,
show_tool_use: bool,
show_tool_call_result: bool,
@@ -440,9 +474,13 @@ async def _run_agent_feeder(
if chain is None:
continue
if chain.type == "reasoning":
continue
# 提取文本
text = chain.get_plain_text()
if text:
await delta_queue.put(text)
buffer += text
# 分句逻辑:匹配标点符号
@@ -477,6 +515,7 @@ async def _run_agent_feeder(
finally:
# 发送结束信号
await text_queue.put(None)
await delta_queue.put(None)
async def _safe_tts_stream_wrapper(

View File

@@ -130,16 +130,6 @@ class LiveChatRoute(Route):
async def live_chat_ws(self) -> None:
"""Legacy Live Chat WebSocket 处理器(默认 ct=live"""
await self._unified_ws_loop(force_ct="live")
async def unified_chat_ws(self) -> None:
"""Unified Chat WebSocket 处理器(支持 ct=live/chat"""
await self._unified_ws_loop(force_ct=None)
async def _unified_ws_loop(self, force_ct: str | None = None) -> None:
"""统一 WebSocket 循环"""
# WebSocket 不能通过 header 传递 token需要从 query 参数获取
# 注意WebSocket 上下文使用 websocket.args 而不是 request.args
token = websocket.args.get("token")
if not token:
await websocket.close(1008, "Missing authentication token")
@@ -156,6 +146,49 @@ class LiveChatRoute(Route):
await websocket.close(1008, "Invalid token")
return
await self.run_ws_session(username=username, force_ct="live")
async def unified_chat_ws(self) -> None:
"""Unified Chat WebSocket 处理器(支持 ct=live/chat"""
token = websocket.args.get("token")
if not token:
await websocket.close(1008, "Missing authentication token")
return
try:
jwt_secret = self.config["dashboard"].get("jwt_secret")
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
username = payload["username"]
except jwt.ExpiredSignatureError:
await websocket.close(1008, "Token expired")
return
except jwt.InvalidTokenError:
await websocket.close(1008, "Invalid token")
return
await self.run_ws_session(username=username, force_ct=None)
async def _unified_ws_loop(self, force_ct: str | None = None) -> None:
"""统一 WebSocket 循环"""
# Keep the legacy entry point for internal call sites.
token = websocket.args.get("token")
if not token:
await websocket.close(1008, "Missing authentication token")
return
try:
jwt_secret = self.config["dashboard"].get("jwt_secret")
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"])
username = payload["username"]
except jwt.ExpiredSignatureError:
await websocket.close(1008, "Token expired")
return
except jwt.InvalidTokenError:
await websocket.close(1008, "Invalid token")
return
await self.run_ws_session(username=username, force_ct=force_ct)
async def run_ws_session(self, username: str, force_ct: str | None = None) -> None:
"""Run a live/unified websocket session for an authenticated username."""
session_id = f"webchat_live!{username}!{uuid.uuid4()}"
live_session = LiveChatSession(session_id, username)
self.sessions[session_id] = live_session
@@ -661,6 +694,34 @@ class LiveChatRoute(Route):
strict=False,
)
@staticmethod
def _format_live_input_preview(message_parts: list[dict]) -> str:
plain_texts = [
str(part.get("text", "")).strip()
for part in message_parts
if part.get("type") == "plain"
and isinstance(part.get("text"), str)
and str(part.get("text", "")).strip()
]
if plain_texts:
return "\n".join(plain_texts)
media_counts: dict[str, int] = {}
for part in message_parts:
part_type = part.get("type")
if part_type in {"image", "record", "file", "video"}:
media_type = str(part_type)
media_counts[media_type] = media_counts.get(media_type, 0) + 1
if not media_counts:
return ""
snippets = [
f"{count} {media_type}"
for media_type, count in sorted(media_counts.items())
]
return "[attachment] " + ", ".join(snippets)
async def _handle_message(self, session: LiveChatSession, message: dict) -> None:
"""处理 WebSocket 消息"""
msg_type = message.get("t") # 使用 t 代替 type
@@ -690,6 +751,16 @@ class LiveChatRoute(Route):
elif msg_type == "end_speaking":
# 结束说话
if session.is_processing:
await websocket.send_json(
{
"t": "error",
"data": "Session is busy",
"code": "PROCESSING_ERROR",
}
)
return
stamp = message.get("stamp")
if not stamp:
logger.warning("[Live Chat] end_speaking 缺少 stamp")
@@ -703,49 +774,130 @@ class LiveChatRoute(Route):
# 处理音频STT -> LLM -> TTS
await self._process_audio(session, audio_path, assemble_duration)
elif msg_type == "text_input":
if session.is_processing:
await websocket.send_json(
{
"t": "error",
"data": "Session is busy",
"code": "PROCESSING_ERROR",
}
)
return
user_text = message.get("text")
user_message = message.get("message")
message_parts = None
if isinstance(user_message, list):
message_parts = await self._build_chat_message_parts(user_message)
if not message_parts or not webchat_message_parts_have_content(
message_parts
):
await websocket.send_json(
{
"t": "error",
"data": "message must include plain text or attachment_id media",
"code": "INVALID_MESSAGE_FORMAT",
}
)
return
if isinstance(user_text, str):
user_text = user_text.strip()
else:
user_text = ""
elif isinstance(user_text, str):
user_text = user_text.strip()
if not isinstance(user_text, str):
user_text = message.get("text")
if (not user_text or not str(user_text).strip()) and message_parts is None:
await websocket.send_json(
{
"t": "error",
"data": "message must be non-empty text",
"code": "INVALID_MESSAGE_FORMAT",
}
)
return
if message_parts is None and (
not isinstance(user_text, str) or not user_text.strip()
):
await websocket.send_json(
{
"t": "error",
"data": "message must be non-empty text",
"code": "INVALID_MESSAGE_FORMAT",
}
)
return
if message_parts is not None and not user_text:
user_text = self._format_live_input_preview(message_parts)
if not user_text:
await websocket.send_json(
{
"t": "error",
"data": "message must include plain text or attachment_id media",
"code": "INVALID_MESSAGE_FORMAT",
}
)
return
if user_text:
await self._process_live_user_text(
session,
user_text=user_text.strip(),
user_message_parts=message_parts,
initial_metrics={"input_type": "text"},
processing_start_time=time.time(),
)
elif msg_type == "interrupt":
# 用户打断
session.should_interrupt = True
logger.info(f"[Live Chat] 用户打断: {session.username}")
async def _process_audio(
self, session: LiveChatSession, audio_path: str, assemble_duration: float
async def _process_live_user_text(
self,
session: LiveChatSession,
user_text: str,
user_message_parts: list[dict] | None = None,
initial_metrics: dict[str, Any] | None = None,
processing_start_time: float | None = None,
) -> None:
"""处理音频STT -> LLM -> 流式 TTS"""
try:
# 发送 WAV 组装耗时
"""处理 Live 用户文本:走 run_live_agent pipeline 并回传流式 TTS."""
payload_message = (
user_message_parts
if user_message_parts is not None
else [{"type": "plain", "text": user_text}]
)
if not payload_message:
await websocket.send_json(
{"t": "metrics", "data": {"wav_assemble_time": assemble_duration}}
{"t": "error", "data": "Message content is empty"}
)
wav_assembly_finish_time = time.time()
return
display_user_text = (
user_text
if user_message_parts is None or user_text
else self._format_live_input_preview(user_message_parts)
)
try:
if initial_metrics:
await websocket.send_json({"t": "metrics", "data": initial_metrics})
processing_start = processing_start_time or time.time()
session.is_processing = True
session.should_interrupt = False
# 1. STT - 语音转文字
ctx = self.plugin_manager.context
stt_provider = ctx.provider_manager.stt_provider_insts[0]
if not stt_provider:
logger.error("[Live Chat] STT Provider 未配置")
await websocket.send_json({"t": "error", "data": "语音识别服务未配置"})
return
await websocket.send_json(
{"t": "metrics", "data": {"stt": stt_provider.meta().type}}
)
user_text = await stt_provider.get_text(audio_path)
if not user_text:
logger.warning("[Live Chat] STT 识别结果为空")
return
logger.info(f"[Live Chat] STT 结果: {user_text}")
await websocket.send_json(
{
"t": "user_msg",
"data": {"text": user_text, "ts": int(time.time() * 1000)},
"data": {
"text": display_user_text,
"ts": int(time.time() * 1000),
},
}
)
@@ -757,11 +909,10 @@ class LiveChatRoute(Route):
message_id = str(uuid.uuid4())
payload = {
"message_id": message_id,
"message": [{"type": "plain", "text": user_text}], # 直接发送文本
"message": payload_message,
"action_type": "live", # 标记为 live mode
}
# 将消息放入队列
await queue.put((session.username, cid, payload))
# 3. 等待响应并流式发送 TTS 音频
@@ -776,11 +927,9 @@ class LiveChatRoute(Route):
# 用户打断,停止处理
logger.info("[Live Chat] 检测到用户打断")
await websocket.send_json({"t": "stop_play"})
# 保存消息并标记为被打断
await self._save_interrupted_message(
session, user_text, bot_text
session, display_user_text, bot_text
)
# 清空队列中未处理的消息
while not back_queue.empty():
try:
back_queue.get_nowait()
@@ -805,6 +954,7 @@ class LiveChatRoute(Route):
result_type = result.get("type")
result_chain_type = result.get("chain_type")
result_streaming = bool(result.get("streaming", False))
data = result.get("data", "")
if result_chain_type == "agent_stats":
@@ -827,29 +977,94 @@ class LiveChatRoute(Route):
if result_chain_type == "tts_stats":
try:
stats = json.loads(data)
await websocket.send_json(
{
"t": "metrics",
"data": stats,
}
)
await websocket.send_json({"t": "metrics", "data": stats})
except Exception as e:
logger.error(f"[Live Chat] 解析 TTSStats 失败: {e}")
continue
if result_chain_type == "live_text_delta":
if data:
await websocket.send_json(
{
"t": "bot_delta_chunk",
"data": {"text": data},
}
)
continue
if result_chain_type in ["tool_call", "tool_call_result"]:
await websocket.send_json(
{
"t": result_chain_type,
"data": data,
"chain_type": result_chain_type,
"streaming": result_streaming,
"message_id": message_id,
}
)
continue
if result_type == "plain":
# 普通文本消息
if (
result_streaming
and data
and result_chain_type != "reasoning"
):
await websocket.send_json(
{
"t": "bot_delta_chunk",
"data": {"text": data},
}
)
bot_text += data
elif result_type in ["image", "record", "file", "video"]:
filename = str(data)
part = None
if result_type == "image":
filename = filename.replace("[IMAGE]", "")
elif result_type == "record":
filename = filename.replace("[RECORD]", "")
elif result_type == "file":
filename = filename.replace("[FILE]", "")
else:
filename = filename.replace("[VIDEO]", "")
if filename:
part = await self._create_attachment_from_file(
filename=filename,
attach_type=result_type,
)
if part is not None:
await websocket.send_json(
{
"t": result_type,
"data": {
"attachment_id": part.get("attachment_id"),
"filename": part.get("filename"),
"type": part.get("type", result_type),
},
"message_id": message_id,
"chain_type": result_type,
"streaming": result_streaming,
}
)
elif str(data):
await websocket.send_json(
{
"t": result_type,
"data": {"raw": str(data)},
"message_id": message_id,
}
)
elif result_type == "audio_chunk":
# 流式音频数据
if not audio_playing:
audio_playing = True
logger.debug("[Live Chat] 开始播放音频流")
# Calculate latency from wav assembly finish to first audio chunk
speak_to_first_frame_latency = (
time.time() - wav_assembly_finish_time
time.time() - processing_start
)
await websocket.send_json(
{
@@ -869,19 +1084,15 @@ class LiveChatRoute(Route):
}
)
# 发送音频数据给前端
await websocket.send_json(
{
"t": "response",
"data": data, # base64 编码的音频数据
"data": data,
}
)
elif result_type in ["complete", "end"]:
# 处理完成
logger.info(f"[Live Chat] Bot 回复完成: {bot_text}")
# 如果没有音频流,发送 bot 消息文本
if not audio_playing:
await websocket.send_json(
{
@@ -893,11 +1104,8 @@ class LiveChatRoute(Route):
}
)
# 发送结束标记
await websocket.send_json({"t": "end"})
# 发送总耗时
wav_to_tts_duration = time.time() - wav_assembly_finish_time
wav_to_tts_duration = time.time() - processing_start
await websocket.send_json(
{
"t": "metrics",
@@ -909,13 +1117,65 @@ class LiveChatRoute(Route):
webchat_queue_mgr.remove_back_queue(message_id)
except Exception as e:
logger.error(f"[Live Chat] 处理音频失败: {e}", exc_info=True)
logger.error(f"[Live Chat] 处理文本失败: {e}", exc_info=True)
await websocket.send_json({"t": "error", "data": f"处理失败: {str(e)}"})
finally:
session.is_processing = False
session.should_interrupt = False
async def _process_audio(
self, session: LiveChatSession, audio_path: str, assemble_duration: float
) -> None:
"""处理音频STT -> LLM -> 流式 TTS"""
try:
await websocket.send_json(
{
"t": "metrics",
"data": {
"wav_assemble_time": assemble_duration,
"input_type": "audio",
},
}
)
wav_assembly_finish_time = time.time()
# 1. STT - 语音转文字
ctx = self.plugin_manager.context
stt_provider = ctx.provider_manager.stt_provider_insts[0]
if not stt_provider:
logger.error("[Live Chat] STT Provider 未配置")
await websocket.send_json({"t": "error", "data": "语音识别服务未配置"})
return
await websocket.send_json(
{
"t": "metrics",
"data": {
"stt": stt_provider.meta().type,
},
}
)
user_text = await stt_provider.get_text(audio_path)
if not user_text:
logger.warning("[Live Chat] STT 识别结果为空")
return
logger.info(f"[Live Chat] STT 结果: {user_text}")
await self._process_live_user_text(
session,
user_text=user_text,
initial_metrics=None,
processing_start_time=wav_assembly_finish_time,
)
except Exception as e:
logger.error(f"[Live Chat] 处理音频失败: {e}", exc_info=True)
await websocket.send_json({"t": "error", "data": f"处理失败: {str(e)}"})
async def _save_interrupted_message(
self, session: LiveChatSession, user_text: str, bot_text: str
) -> None:

View File

@@ -19,6 +19,7 @@ from astrbot.core.utils.datetime_utils import to_utc_isoformat
from .api_key import ALL_OPEN_API_SCOPES
from .chat import ChatRoute
from .live_chat import LiveChatRoute
from .route import Response, Route, RouteContext
@@ -29,23 +30,29 @@ class OpenApiRoute(Route):
db: BaseDatabase,
core_lifecycle: AstrBotCoreLifecycle,
chat_route: ChatRoute,
live_chat_route: LiveChatRoute,
) -> None:
super().__init__(context)
self.db = db
self.core_lifecycle = core_lifecycle
self.platform_manager = core_lifecycle.platform_manager
self.chat_route = chat_route
self.live_chat_route = live_chat_route
self.routes = {
"/v1/chat": ("POST", self.chat_send),
"/v1/chat/sessions": ("GET", self.get_chat_sessions),
"/v1/configs": ("GET", self.get_chat_configs),
"/v1/file": ("POST", self.upload_file),
"/v1/file": [
("POST", self.openapi_upload_file),
("GET", self.openapi_get_file),
],
"/v1/im/message": ("POST", self.send_message),
"/v1/im/bots": ("GET", self.get_bots),
}
self.register_routes()
self.app.websocket("/api/v1/chat/ws")(self.chat_ws)
self.app.websocket("/api/v1/live/ws")(self.live_ws)
@staticmethod
def _resolve_open_username(
@@ -534,9 +541,45 @@ class OpenApiRoute(Route):
except Exception as e:
logger.debug("Open API WS connection closed: %s", e)
async def upload_file(self):
async def live_ws(self) -> None:
authed, auth_err = await self._authenticate_chat_ws_api_key()
if not authed:
await self._send_chat_ws_error(auth_err or "Unauthorized", "UNAUTHORIZED")
await websocket.close(1008, auth_err or "Unauthorized")
return
username, username_err = self._resolve_open_username(
websocket.args.get("username")
)
if username_err or not username:
await self._send_chat_ws_error(
username_err or "Invalid username",
"BAD_USER",
)
await websocket.close(1008, username_err or "Invalid username")
return
ct = websocket.args.get("ct")
force_ct = ct.strip() if isinstance(ct, str) and ct.strip() else "live"
if force_ct not in {"live", "chat"}:
await self._send_chat_ws_error(
"ct must be 'live' or 'chat'",
"INVALID_MESSAGE",
)
await websocket.close(1008, "Invalid ct")
return
await self.live_chat_route.run_ws_session(
username=username,
force_ct=force_ct,
)
async def openapi_upload_file(self):
return await self.chat_route.post_file()
async def openapi_get_file(self):
return await self.chat_route.get_attachment()
async def get_chat_sessions(self):
username, username_err = self._resolve_open_username(
request.args.get("username")

View File

@@ -115,11 +115,13 @@ class AstrBotDashboard:
self.ar = AuthRoute(self.context)
self.api_key_route = ApiKeyRoute(self.context, db)
self.chat_route = ChatRoute(self.context, db, core_lifecycle)
self.live_chat_route = LiveChatRoute(self.context, db, core_lifecycle)
self.open_api_route = OpenApiRoute(
self.context,
db,
core_lifecycle,
self.chat_route,
self.live_chat_route,
)
self.chatui_project_route = ChatUIProjectRoute(self.context, db)
self.tools_root = ToolsRoute(self.context, core_lifecycle)
@@ -138,7 +140,6 @@ class AstrBotDashboard:
self.kb_route = KnowledgeBaseRoute(self.context, core_lifecycle)
self.platform_route = PlatformRoute(self.context, core_lifecycle)
self.backup_route = BackupRoute(self.context, db, core_lifecycle)
self.live_chat_route = LiveChatRoute(self.context, db, core_lifecycle)
self.app.add_url_rule(
"/api/plug/<path:subpath>",
@@ -244,6 +245,7 @@ class AstrBotDashboard:
scope_map = {
"/api/v1/chat": "chat",
"/api/v1/chat/ws": "chat",
"/api/v1/live/ws": "chat",
"/api/v1/chat/sessions": "chat",
"/api/v1/configs": "config",
"/api/v1/file": "file",

File diff suppressed because it is too large Load Diff

View File

@@ -98,14 +98,28 @@ axios.interceptors.request.use((config) => {
// Some parts of the UI use fetch directly; without this, those requests will 401.
const _origFetch = window.fetch.bind(window);
window.fetch = (input: RequestInfo | URL, init?: RequestInit) => {
const requestUrl = (() => {
if (typeof input === 'string') return input;
if (input instanceof URL) return input.toString();
return input.url;
})();
let shouldAttachAuth = false;
try {
const resolvedUrl = new URL(requestUrl, window.location.origin);
shouldAttachAuth = resolvedUrl.origin === window.location.origin;
} catch (_) {
shouldAttachAuth = requestUrl.startsWith('/');
}
const token = localStorage.getItem('token');
if (!token) return _origFetch(input, init);
const locale = localStorage.getItem('astrbot-locale');
if (!token && !locale) return _origFetch(input, init);
const headers = new Headers(init?.headers || (typeof input !== 'string' && 'headers' in input ? (input as Request).headers : undefined));
if (!headers.has('Authorization')) {
if (shouldAttachAuth && token && !headers.has('Authorization')) {
headers.set('Authorization', `Bearer ${token}`);
}
const locale = localStorage.getItem('astrbot-locale');
if (locale && !headers.has('Accept-Language')) {
headers.set('Accept-Language', locale);
}

View File

@@ -29,6 +29,7 @@ X-API-Key: abk_xxx
## Common Endpoints
- `POST /api/v1/chat`: send chat message (SSE stream, server generates UUID when `session_id` is omitted)
- `GET /api/v1/live/ws`: Live API WebSocket (API Key auth, requires `username` query parameter, optional `ct=live|chat`)
- `GET /api/v1/chat/sessions`: list sessions for a specific `username` with pagination
- `GET /api/v1/configs`: list available config files
- `POST /api/v1/file`: upload attachment
@@ -49,3 +50,7 @@ curl -N 'http://localhost:6185/api/v1/chat' \
Use the interactive docs:
- https://docs.astrbot.app/scalar.html
For the full Live API wire protocol, see:
- `docs/live-api/README.md`

461
docs/live-api/README.md Normal file
View File

@@ -0,0 +1,461 @@
# AstrBot Live API Protocol
This document describes the current WebSocket protocol for AstrBot Live API.
## Endpoint
- Legacy JWT endpoint: `/api/live_chat/ws`
- Legacy unified JWT endpoint: `/api/unified_chat/ws`
- Open API endpoint: `/api/v1/live/ws`
## Authentication
### Legacy dashboard endpoints
Pass a dashboard JWT in the `token` query parameter.
Example:
```text
ws://localhost:6185/api/live_chat/ws?token=<dashboard_jwt>
```
### Open API endpoint
Use an API key and provide `username` in the query string.
Examples:
```text
ws://localhost:6185/api/v1/live/ws?api_key=<api_key>&username=alice
ws://localhost:6185/api/v1/live/ws?api_key=<api_key>&username=alice&ct=chat
```
`ct` values:
- `live`: voice conversation mode
- `chat`: unified chat mode over the same WebSocket transport
The Open API endpoint reuses the `chat` API key scope.
## Transport
- Protocol: WebSocket
- Payload format: UTF-8 JSON text frames
- Audio upload format in `live` mode:
- client sends raw PCM frames encoded as Base64
- sample rate: `16000`
- channels: `1`
- sample width: `16-bit`
## Top-Level Envelope
### Client to server
```json
{
"t": "message_type",
"...": "message specific fields"
}
```
When using the unified socket, the client can also include:
```json
{
"ct": "live|chat",
"t": "message_type"
}
```
### Server to client
Legacy `live` mode uses:
```json
{
"t": "message_type",
"data": {}
}
```
Unified `chat` mode uses:
```json
{
"ct": "chat",
"type": "message_type",
"data": {}
}
```
Some forwarded `chat` frames may also contain `t`, `streaming`, `chain_type`, `message_id`, or `session_id`.
## Live Mode
### Client messages
#### `start_speaking`
Start a voice capture segment.
```json
{
"t": "start_speaking",
"stamp": "seg_001"
}
```
#### `speaking_part`
Send one audio frame.
```json
{
"t": "speaking_part",
"data": "<base64_pcm_bytes>"
}
```
#### `end_speaking`
Finish the current voice capture segment.
```json
{
"t": "end_speaking",
"stamp": "seg_001"
}
```
#### `text_input`
Send a plain text input directly while using `ct=live`. The server will still route through Live mode with TTS and interrupt handling.
```json
{
"t": "text_input",
"text": "Hello, what is the weather today?"
}
```
You can also send message parts and use attachment IDs (same segment format as other APIs), e.g. image/file references:
```json
{
"t": "text_input",
"message": [
{ "type": "plain", "text": "参考这张图" },
{ "type": "image", "attachment_id": "att_1234567890" }
]
}
```
Attachment-based inputs are accepted only when `ct=live`; this is converted to the same internal message format as chat mode and then processed by the live pipeline.
#### `interrupt`
Interrupt the current model or TTS response.
```json
{
"t": "interrupt"
}
```
### Server messages
#### `metrics`
Performance and provider metadata.
Example:
```json
{
"t": "metrics",
"data": {
"wav_assemble_time": 0.12,
"stt": "whisper_api",
"llm_ttft": 0.84,
"tts_total_time": 1.72
}
}
```
#### `user_msg`
STT result from the uploaded audio.
```json
{
"t": "user_msg",
"data": {
"text": "Hello there",
"ts": 1710000000000
}
}
```
#### `bot_delta_chunk`
Raw model text delta. This is the token or chunk level stream and is not sentence segmented.
```json
{
"t": "bot_delta_chunk",
"data": {
"text": "Hel"
}
}
```
Notes:
- This event is generated directly from the model streaming path.
- It is independent from TTS chunking.
- Consumers should append `data.text` to a local buffer.
#### `bot_text_chunk`
Text associated with the current TTS chunk. This is usually sentence or phrase segmented.
```json
{
"t": "bot_text_chunk",
"data": {
"text": "Hello there."
}
}
```
Notes:
- This event is aligned to TTS output, not raw token streaming.
- It may be coarser than `bot_delta_chunk`.
#### `response`
One TTS audio chunk, Base64 encoded.
```json
{
"t": "response",
"data": "<base64_audio_bytes>"
}
```
Attachment results can also be returned as attachment events when produced by the model:
```json
{
"t": "image",
"data": {
"attachment_id": "att_1234567890",
"filename": "abc.jpg",
"type": "image"
}
}
```
#### `bot_msg`
Final bot text when the response completed without audio streaming.
```json
{
"t": "bot_msg",
"data": {
"text": "Final reply text",
"ts": 1710000001234
}
}
```
#### `stop_play`
Stop client-side audio playback because the response was interrupted.
```json
{
"t": "stop_play"
}
```
#### `end`
Marks the end of the current response turn.
```json
{
"t": "end"
}
```
#### `error`
Recoverable or terminal processing error.
```json
{
"t": "error",
"data": "error message"
}
```
## Unified Chat Mode
Set `ct=chat` on the Open API endpoint or include `"ct": "chat"` in each client frame when using `/api/unified_chat/ws`.
### Client messages
#### `bind`
Subscribe to an existing webchat session.
```json
{
"ct": "chat",
"t": "bind",
"session_id": "session_001"
}
```
#### `send`
Send a chat request.
```json
{
"ct": "chat",
"t": "send",
"username": "alice",
"session_id": "session_001",
"message_id": "msg_001",
"message": [
{
"type": "plain",
"text": "Please summarize this"
}
],
"selected_provider": "openai_chat_completion",
"selected_model": "gpt-4.1-mini",
"enable_streaming": true
}
```
`message` uses the same message-part schema as `POST /api/v1/chat`.
#### `interrupt`
Interrupt the current chat response.
```json
{
"ct": "chat",
"t": "interrupt"
}
```
### Server messages
#### `session_bound`
Acknowledges a successful `bind`.
```json
{
"ct": "chat",
"type": "session_bound",
"session_id": "session_001",
"message_id": "ws_sub_xxx"
}
```
#### Forwarded streaming events
The server forwards the normal webchat queue payloads. Common examples:
```json
{
"ct": "chat",
"type": "plain",
"data": "Hello",
"streaming": true,
"chain_type": null,
"message_id": "msg_001"
}
```
```json
{
"ct": "chat",
"type": "image",
"data": "[IMAGE]file.jpg",
"streaming": false,
"message_id": "msg_001"
}
```
```json
{
"ct": "chat",
"type": "agent_stats",
"data": {
"time_to_first_token": 0.8
}
}
```
```json
{
"ct": "chat",
"type": "message_saved",
"data": {
"id": 123,
"created_at": "2026-03-16T10:00:00Z"
}
}
```
```json
{
"ct": "chat",
"type": "end",
"data": "",
"streaming": false,
"message_id": "msg_001"
}
```
#### Chat errors
```json
{
"ct": "chat",
"t": "error",
"code": "INVALID_MESSAGE_FORMAT",
"data": "message must be list"
}
```
## Recommended Client Strategy
For `live` mode:
1. Append every `bot_delta_chunk.data.text` into a raw transcript buffer.
2. Use `bot_text_chunk` only when you need text aligned with audio playback.
3. Decode and play each `response` audio chunk in arrival order.
4. Reset per-turn buffers after `end`.
For `chat` mode:
1. Treat `plain + streaming=true` as incremental text.
2. Treat `complete` or `end` as the end of a response turn.
3. Persist `message_saved` metadata if you need server-side history IDs.
## Compatibility Notes
- `bot_text_chunk` remains sentence or phrase segmented for TTS compatibility.
- `bot_delta_chunk` is the new delta-level text event for real-time rendering.
- The legacy JWT endpoints and the new Open API endpoint share the same runtime behavior after authentication.

View File

@@ -50,6 +50,51 @@
}
},
"/api/v1/file": {
"get": {
"tags": [
"Open API"
],
"summary": "Get attachment file",
"description": "Download an attachment by attachment_id.",
"security": [
{
"ApiKeyHeader": []
}
],
"parameters": [
{
"name": "attachment_id",
"in": "query",
"required": true,
"schema": {
"type": "string"
},
"description": "Attachment ID returned by upload API."
}
],
"responses": {
"200": {
"description": "Attachment binary content.",
"content": {
"application/octet-stream": {
"schema": {
"type": "string",
"format": "binary"
}
}
}
},
"401": {
"$ref": "#/components/responses/Unauthorized"
},
"403": {
"$ref": "#/components/responses/Forbidden"
},
"404": {
"description": "Attachment not found"
}
}
},
"post": {
"tags": [
"Open API"
@@ -257,6 +302,56 @@
}
}
},
"/api/v1/live/ws": {
"get": {
"tags": [
"Open API"
],
"summary": "Live API WebSocket",
"description": "WebSocket endpoint for Live API. Authenticate with API Key using query parameter `api_key` or header `Authorization: Bearer <api_key>`, and pass `username` as a query parameter. Use `ct=live` for voice mode or `ct=chat` for unified chat mode. See docs/live-api/README.md for the full frame-level protocol.",
"security": [
{
"ApiKeyHeader": []
}
],
"parameters": [
{
"name": "username",
"in": "query",
"required": true,
"schema": {
"type": "string"
},
"description": "Target username for the live session."
},
{
"name": "ct",
"in": "query",
"schema": {
"type": "string",
"enum": [
"live",
"chat"
],
"default": "live"
},
"description": "Session mode. `live` for voice conversation, `ct=chat` for the unified chat WebSocket."
}
],
"responses": {
"101": {
"description": "WebSocket protocol switch"
},
"401": {
"$ref": "#/components/responses/Unauthorized"
},
"403": {
"$ref": "#/components/responses/Forbidden"
}
},
"x-websocket": true
}
},
"/api/v1/im/message": {
"post": {
"tags": [

View File

@@ -46,6 +46,7 @@ X-API-Key: abk_xxx
调用 AstrBot 内建的 Agent 进行对话交互。支持插件调用、工具调用等能力,与 IM 端对话能力一致。
- `POST /api/v1/chat`发送对话消息SSE 流式返回,不传 `session_id` 会自动创建 UUID
- `GET /api/v1/live/ws`Live API WebSocketAPI Key 鉴权,查询参数必须包含 `username`,可选 `ct=live|chat`
- `GET /api/v1/chat/sessions`:分页获取指定 `username` 的会话
- `GET /api/v1/configs`:获取可用配置文件列表
@@ -148,3 +149,7 @@ curl -N 'http://localhost:6185/api/v1/chat' \
交互式 API 文档请查看:
- https://docs.astrbot.app/scalar.html
Live API 协议说明请查看:
- `docs/live-api/README.md`

View File

@@ -257,6 +257,56 @@
}
}
},
"/api/v1/live/ws": {
"get": {
"tags": [
"Open API"
],
"summary": "Live API WebSocket",
"description": "WebSocket endpoint for Live API. Authenticate with API Key using query parameter `api_key` or header `Authorization: Bearer <api_key>`, and pass `username` as a query parameter. Use `ct=live` for voice mode or `ct=chat` for unified chat mode. See docs/live-api/README.md for the full frame-level protocol.",
"security": [
{
"ApiKeyHeader": []
}
],
"parameters": [
{
"name": "username",
"in": "query",
"required": true,
"schema": {
"type": "string"
},
"description": "Target username for the live session."
},
{
"name": "ct",
"in": "query",
"schema": {
"type": "string",
"enum": [
"live",
"chat"
],
"default": "live"
},
"description": "Session mode. `live` for voice conversation, `chat` for the unified chat WebSocket."
}
],
"responses": {
"101": {
"description": "WebSocket protocol switch"
},
"401": {
"$ref": "#/components/responses/Unauthorized"
},
"403": {
"$ref": "#/components/responses/Forbidden"
}
},
"x-websocket": true
}
},
"/api/v1/im/message": {
"post": {
"tags": [

View File

@@ -0,0 +1,220 @@
import argparse
import asyncio
import logging
import json
import sys
from pathlib import Path
from urllib.parse import urlencode
import aiohttp
import websockets
def build_ws_url(base_url: str, api_key: str, username: str) -> str:
normalized = base_url.rstrip("/")
if normalized.startswith("https://"):
ws_base = "wss://" + normalized.removeprefix("https://").removeprefix("wss://")
elif normalized.startswith("http://"):
ws_base = "ws://" + normalized.removeprefix("http://").removeprefix("wss://")
else:
ws_base = f"ws://{normalized}"
query = urlencode(
{
"api_key": api_key,
"username": username,
"ct": "live",
}
)
return f"{ws_base}/api/v1/live/ws?{query}"
def build_headers(api_key: str) -> dict[str, str]:
return {"X-API-Key": api_key}
def create_logger(log_file: str | None) -> logging.Logger:
logger = logging.getLogger("live_upload_flow")
if logger.handlers:
return logger
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if log_file:
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.propagate = False
return logger
async def upload_file(session: aiohttp.ClientSession, base_url: str, api_key: str, file_path: Path) -> str:
logger = logging.getLogger("live_upload_flow")
form = aiohttp.FormData()
with file_path.open("rb") as file_handle:
form.add_field(
"file",
file_handle,
filename=file_path.name,
content_type="application/octet-stream",
)
async with session.post(
f"{base_url.rstrip('/')}/api/v1/file",
data=form,
headers=build_headers(api_key),
) as resp:
payload = await resp.json()
logger.info(
"[UPLOAD] status=%s, attachment_id=%s",
payload.get("status"),
payload.get("data", {}).get("attachment_id"),
)
if payload.get("status") != "ok":
raise RuntimeError(f"Upload failed: {payload}")
attachment_id = payload["data"]["attachment_id"]
logger.info("[UPLOAD] attachment_id=%s", attachment_id)
return attachment_id
async def get_file(session: aiohttp.ClientSession, base_url: str, api_key: str, attachment_id: str) -> bytes:
logger = logging.getLogger("live_upload_flow")
url = f"{base_url.rstrip('/')}/api/v1/file?{urlencode({'attachment_id': attachment_id})}"
async with session.get(url, headers=build_headers(api_key)) as resp:
logger.info("[GET] status=%s, content_type=%s", resp.status, resp.headers.get("Content-Type"))
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"Failed to fetch attachment: {resp.status} {text}")
return await resp.read()
async def run_live_check(
base_url: str,
api_key: str,
username: str,
attachment_id: str | None,
text: str,
) -> None:
logger = logging.getLogger("live_upload_flow")
ws_url = build_ws_url(base_url, api_key, username)
message_parts = []
if attachment_id:
message_parts.append({"type": "file", "attachment_id": attachment_id})
if text:
message_parts.append({"type": "plain", "text": text})
if not message_parts:
raise ValueError("text is empty and no attachment_id provided")
message = {
"t": "text_input",
"message": message_parts,
}
async with websockets.connect(ws_url) as websocket:
logger.info("[WS] connected: %s", ws_url)
logger.info("[WS] send: %s", json.dumps(message, ensure_ascii=False))
await websocket.send(json.dumps(message))
try:
while True:
raw = await asyncio.wait_for(websocket.recv(), timeout=90)
data = json.loads(raw)
logger.info("[WS] recv: %s", json.dumps(data, ensure_ascii=False))
if data.get("t") == "end":
break
except asyncio.TimeoutError:
logger.warning("[WS] timeout reached, stop collecting messages")
async def main() -> None:
parser = argparse.ArgumentParser(description="Upload file and test live mode input path.")
parser.add_argument("--base-url", default="http://localhost:6185", help="Server base URL")
parser.add_argument("--api-key", required=True, help="OpenAPI key")
parser.add_argument("--username", default="alice", help="OpenAPI username")
parser.add_argument(
"--file",
type=Path,
required=False,
default=None,
help="Local file to upload",
)
parser.add_argument(
"--attachment-id",
help="Existing attachment_id to use directly",
default=None,
)
parser.add_argument(
"--text",
default="Please analyze the uploaded file.",
help="Additional text for live message",
)
parser.add_argument(
"--log-file",
default=str(Path.cwd() / "test.log"),
help="Write logs to this file in addition to terminal output",
)
parser.add_argument(
"--skip-download-check",
action="store_true",
help="Skip GET attachment content verification",
)
args = parser.parse_args()
create_logger(args.log_file)
logger = logging.getLogger("live_upload_flow")
if args.file is not None:
if not args.file.exists():
raise FileNotFoundError(f"file not found: {args.file}")
if not args.file.is_file():
raise ValueError(f"not a regular file: {args.file}")
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
attachment_id = args.attachment_id
if args.file is not None:
attachment_id = await upload_file(
session,
args.base_url,
args.api_key,
args.file,
)
if not args.skip_download_check:
content = await get_file(
session, args.base_url, args.api_key, attachment_id
)
logger.info("[GET] attachment size=%s bytes", len(content))
elif attachment_id:
logger.info("[INPUT] using attachment_id=%s", attachment_id)
if args.file is None and not attachment_id:
logger.info("[INPUT] no attachment_id provided, using text message only")
await run_live_check(
args.base_url,
args.api_key,
args.username,
attachment_id,
args.text,
)
if __name__ == "__main__":
create_logger(None)
try:
asyncio.run(main())
except Exception as e:
logging.getLogger("live_upload_flow").error("error: %s", e, exc_info=True)
raise SystemExit(1)

View File

@@ -767,3 +767,61 @@ async def test_open_file_upload_requires_file_and_can_upload(
assert isinstance(upload_data["data"]["attachment_id"], str)
assert upload_data["data"]["filename"] == "openapi_test.txt"
assert upload_data["data"]["type"] == "file"
@pytest.mark.asyncio
async def test_open_file_get_by_attachment_id_returns_content(
app: Quart,
authenticated_header: dict,
):
test_client = app.test_client()
raw_key, _ = await _create_api_key(
app,
authenticated_header,
scopes=["file"],
name_prefix="file-get-scope-key",
)
upload_res = await test_client.post(
"/api/v1/file",
files={
"file": FileStorage(
stream=BytesIO(b"openapi-get-content"),
filename="openapi_get.txt",
content_type="text/plain",
)
},
headers={"X-API-Key": raw_key},
)
assert upload_res.status_code == 200
upload_data = await upload_res.get_json()
attachment_id = upload_data["data"]["attachment_id"]
get_res = await test_client.get(
f"/api/v1/file?attachment_id={attachment_id}",
headers={"X-API-Key": raw_key},
)
assert get_res.status_code == 200
assert await get_res.get_data() == b"openapi-get-content"
@pytest.mark.asyncio
async def test_open_file_get_attachment_id_missing_returns_error(
app: Quart,
authenticated_header: dict,
):
test_client = app.test_client()
raw_key, _ = await _create_api_key(
app,
authenticated_header,
scopes=["file"],
name_prefix="file-get-missing-key",
)
missing_res = await test_client.get(
"/api/v1/file",
headers={"X-API-Key": raw_key},
)
missing_data = await missing_res.get_json()
assert missing_data["status"] == "error"
assert missing_data["message"] == "Missing key: attachment_id"