Files
AstrBot/tests/test_fastapi_v1_dashboard.py
2026-06-28 13:45:59 +08:00

3304 lines
106 KiB
Python

import copy
from dataclasses import dataclass
from pathlib import Path
from types import SimpleNamespace
from typing import cast
import httpx
import jwt
import pytest
import pytest_asyncio
from fastapi import FastAPI, Request
from fastapi.responses import PlainTextResponse
import astrbot.dashboard.services.config_service as config_service
from astrbot.core import file_token_service
from astrbot.dashboard.api.app import create_dashboard_asgi_app
from astrbot.dashboard.asgi_runtime import (
FastAPIAppAdapter,
g,
)
from astrbot.dashboard.asgi_runtime import (
request as dashboard_request,
)
from astrbot.dashboard.responses import ok
from astrbot.dashboard.services.api_key_service import ApiKeyService
from astrbot.dashboard.services.auth_service import DASHBOARD_JWT_COOKIE_NAME
from astrbot.dashboard.services.plugin_service import (
PLUGIN_UPDATE_SOURCE_REQUIRED_MESSAGE,
PluginServiceError,
)
from astrbot.dashboard.services.skills_service import SkillArchive
JWT_SECRET = "fastapi-v1-test-secret-with-32-bytes"
@dataclass
class FakeApiKey:
key_id: str
scopes: list[str] | None
class _FakeScalarResult:
def __init__(self, items: list[object]) -> None:
self.items = items
def all(self) -> list[object]:
return self.items
class _FakeDbResult:
def __init__(self, db: "FakeDb") -> None:
self.db = db
def fetchall(self) -> list[tuple[str]]:
return [(umo,) for umo in self.db.umo_ids]
def scalars(self) -> _FakeScalarResult:
return _FakeScalarResult(self.db.preferences)
class _FakeDbSession:
def __init__(self, db: "FakeDb") -> None:
self.db = db
async def execute(self, _statement) -> _FakeDbResult:
return _FakeDbResult(self.db)
class _FakeDbContext:
def __init__(self, db: "FakeDb") -> None:
self.db = db
async def __aenter__(self) -> _FakeDbSession:
return _FakeDbSession(self.db)
async def __aexit__(self, exc_type, exc, traceback) -> None:
return None
class FakeDb:
def __init__(self) -> None:
self.api_keys: dict[str, FakeApiKey] = {}
self.touched_key_ids: list[str] = []
self.umo_ids = ["webchat:FriendMessage:webchat!user!session-1"]
self.preferences: list[object] = []
async def get_active_api_key_by_hash(self, key_hash: str) -> FakeApiKey | None:
return self.api_keys.get(key_hash)
async def touch_api_key(self, key_id: str) -> None:
self.touched_key_ids.append(key_id)
async def get_attachment_by_id(self, _attachment_id: str):
return None
def get_db(self) -> _FakeDbContext:
return _FakeDbContext(self)
async def get_umo_aliases(self, _umos: list[str] | None = None) -> list[object]:
return []
def add_api_key(self, raw_key: str, scopes: list[str]) -> None:
self.api_keys[ApiKeyService.hash_key(raw_key)] = FakeApiKey(
key_id="config-key",
scopes=scopes,
)
class FakeLlmTools:
def __init__(self) -> None:
self.config = {
"mcpServers": {
"demo-server": {
"active": True,
"url": "https://example.com/demo-server",
},
"modelscope/demo": {
"active": True,
"url": "https://example.com/modelscope-demo",
},
}
}
self.mcp_server_runtime_view = {}
self.func_list = []
self.enabled_servers: list[tuple[str, dict]] = []
self.disabled_servers: list[str] = []
self.tested_configs: list[dict] = []
self.synced_modelscope_tokens: list[str] = []
def load_mcp_config(self) -> dict:
return copy.deepcopy(self.config)
def save_mcp_config(self, config: dict) -> bool:
self.config = copy.deepcopy(config)
return True
async def test_mcp_server_connection(self, config: dict) -> list[str]:
self.tested_configs.append(copy.deepcopy(config))
return ["demo_tool"]
async def sync_modelscope_mcp_servers(self, access_token: str) -> None:
self.synced_modelscope_tokens.append(access_token)
async def enable_mcp_server(
self,
name: str,
config: dict,
*,
timeout: int,
) -> None:
self.enabled_servers.append((name, copy.deepcopy(config)))
async def disable_mcp_server(self, name: str, *, timeout: int) -> None:
self.disabled_servers.append(name)
def iter_builtin_tools(self) -> list:
return []
def is_builtin_tool(self, _tool_name: str) -> bool:
return False
def activate_llm_tool(self, _tool_name: str, *, star_map) -> bool:
return True
def deactivate_llm_tool(self, _tool_name: str) -> bool:
return True
class FakeProviderManager:
def __init__(self, config: dict) -> None:
self.providers_config = config["provider"]
self.provider_sources_config = config["provider_sources"]
self.reloaded_providers: list[dict] = []
self.deleted_provider_filters: list[dict] = []
self.inst_map: dict[str, object] = {}
self.provider_insts: list[object] = []
self.stt_provider_insts: list[object] = []
self.tts_provider_insts: list[object] = []
self.set_provider_calls: list[dict] = []
self.llm_tools = FakeLlmTools()
def get_merged_provider_config(self, provider_config: dict) -> dict:
config = copy.deepcopy(provider_config)
source_id = config.get("provider_source_id")
if not source_id:
return config
source = next(
(
item
for item in self.provider_sources_config
if item.get("id") == source_id
),
None,
)
if not source:
return config
merged = {**source, **config}
merged["id"] = config["id"]
return merged
def get_provider_config_by_id(
self,
provider_id: str,
*,
merged: bool = False,
) -> dict | None:
for provider in self.providers_config:
if provider.get("id") != provider_id:
continue
if merged:
return self.get_merged_provider_config(provider)
return copy.deepcopy(provider)
return None
async def update_provider(self, origin_provider_id: str, new_config: dict) -> None:
next_id = new_config.get("id")
for provider in self.providers_config:
if provider.get("id") == next_id and next_id != origin_provider_id:
raise ValueError(f"Provider ID {next_id} already exists")
for idx, provider in enumerate(self.providers_config):
if provider.get("id") == origin_provider_id:
self.providers_config[idx] = copy.deepcopy(new_config)
await self.reload(new_config)
return
raise ValueError(f"Provider ID {origin_provider_id} not found")
async def create_provider(self, new_config: dict) -> None:
next_id = new_config.get("id")
if any(provider.get("id") == next_id for provider in self.providers_config):
raise ValueError(f"Provider ID {next_id} already exists")
self.providers_config.append(copy.deepcopy(new_config))
async def delete_provider(
self,
provider_id: str | None = None,
provider_source_id: str | None = None,
) -> None:
self.deleted_provider_filters.append(
{"provider_id": provider_id, "provider_source_id": provider_source_id}
)
if provider_id:
self.providers_config[:] = [
provider
for provider in self.providers_config
if provider.get("id") != provider_id
]
if provider_source_id:
self.providers_config[:] = [
provider
for provider in self.providers_config
if provider.get("provider_source_id") != provider_source_id
]
async def reload(self, provider: dict) -> None:
self.reloaded_providers.append(copy.deepcopy(provider))
async def set_provider(self, provider_id: str, provider_type, umo: str) -> None:
self.set_provider_calls.append(
{
"provider_id": provider_id,
"provider_type": provider_type,
"umo": umo,
}
)
class FakeProviderInstance:
def __init__(self, provider_id: str) -> None:
self.provider_id = provider_id
self.tested = False
def meta(self):
return SimpleNamespace(
id=self.provider_id,
model="kimi-k2-0905-preview",
provider_type=SimpleNamespace(value="chat_completion"),
)
async def test(self) -> None:
self.tested = True
@dataclass
class FakeConversation:
cid: str
user_id: str
platform_id: str = "webchat-main"
message_type: str = "FriendMessage"
title: str = "Demo conversation"
persona_id: str | None = "persona/foo"
history: str = "[]"
created_at: str = "2026-01-01T00:00:00"
updated_at: str = "2026-01-01T00:00:00"
class FakeConversationManager:
def __init__(self) -> None:
user_id = "webchat:FriendMessage:webchat!user!session-1"
self.conversations: dict[tuple[str, str], FakeConversation] = {
(user_id, "conversation/with/slash"): FakeConversation(
cid="conversation/with/slash",
user_id=user_id,
)
}
async def get_filtered_conversations(
self,
*,
page: int,
page_size: int,
platforms: list[str],
message_types: list[str],
search_query: str,
exclude_ids: list[str],
exclude_platforms: list[str],
):
conversations = list(self.conversations.values())
if platforms:
conversations = [
conversation
for conversation in conversations
if conversation.platform_id in platforms
]
if message_types:
conversations = [
conversation
for conversation in conversations
if conversation.message_type in message_types
]
if search_query:
conversations = [
conversation
for conversation in conversations
if search_query in conversation.title
]
conversations = [
conversation
for conversation in conversations
if conversation.cid not in exclude_ids
and conversation.platform_id not in exclude_platforms
]
start = (page - 1) * page_size
return conversations[start : start + page_size], len(conversations)
async def get_conversation(
self,
*,
unified_msg_origin: str,
conversation_id: str,
):
return self.conversations.get((unified_msg_origin, conversation_id))
async def update_conversation(
self,
*,
unified_msg_origin: str,
conversation_id: str,
title: str | None = None,
persona_id: str | None = None,
history=None,
) -> None:
conversation = self.conversations[(unified_msg_origin, conversation_id)]
if title is not None:
conversation.title = title
if persona_id is not None:
conversation.persona_id = persona_id
if history is not None:
conversation.history = history
async def delete_conversation(
self,
*,
unified_msg_origin: str,
conversation_id: str,
) -> None:
self.conversations.pop((unified_msg_origin, conversation_id), None)
class FakePlatform:
def __init__(self, platform_id: str) -> None:
self.platform_id = platform_id
self.config = {"webhook_uuid": "demo-hook"}
self.sent_messages = []
def meta(self):
return SimpleNamespace(id=self.platform_id, name=self.platform_id)
def unified_webhook(self) -> bool:
return True
async def webhook_callback(self, request_obj):
return {
"webhook_uuid": self.config["webhook_uuid"],
"method": request_obj.method,
"payload": await request_obj.get_json(silent=True),
}
async def send_by_session(self, session, message_chain) -> None:
self.sent_messages.append((session, message_chain))
class FakePersonaManager:
def __init__(self) -> None:
self.personas: dict[str, SimpleNamespace] = {
"persona/foo": self._persona(
persona_id="persona/foo",
system_prompt="Demo persona",
)
}
self.folders: dict[str, SimpleNamespace] = {}
self.sort_items: list[dict] = []
@staticmethod
def _persona(
*,
persona_id: str,
system_prompt: str,
begin_dialogs: list | None = None,
tools: list[str] | None = None,
skills: list[str] | None = None,
custom_error_message: str | None = None,
folder_id: str | None = None,
sort_order: int = 0,
) -> SimpleNamespace:
return SimpleNamespace(
persona_id=persona_id,
system_prompt=system_prompt,
begin_dialogs=begin_dialogs,
tools=tools,
skills=skills,
custom_error_message=custom_error_message,
folder_id=folder_id,
sort_order=sort_order,
created_at=None,
updated_at=None,
)
@staticmethod
def _folder(
*,
folder_id: str,
name: str,
parent_id: str | None = None,
description: str | None = None,
sort_order: int = 0,
) -> SimpleNamespace:
return SimpleNamespace(
folder_id=folder_id,
name=name,
parent_id=parent_id,
description=description,
sort_order=sort_order,
created_at=None,
updated_at=None,
)
async def get_all_personas(self) -> list[SimpleNamespace]:
return list(self.personas.values())
async def get_personas_by_folder(
self,
folder_id: str | None,
) -> list[SimpleNamespace]:
return [
persona
for persona in self.personas.values()
if persona.folder_id == folder_id
]
async def get_persona(self, persona_id: str):
return self.personas.get(persona_id)
async def create_persona(self, **kwargs):
persona = self._persona(**kwargs)
self.personas[persona.persona_id] = persona
return persona
async def update_persona(self, persona_id: str, **kwargs) -> None:
persona = self.personas[persona_id]
for key, value in kwargs.items():
if key in ("tools", "skills", "custom_error_message") or value is not None:
setattr(persona, key, value)
async def delete_persona(self, persona_id: str) -> None:
self.personas.pop(persona_id, None)
async def move_persona_to_folder(
self,
persona_id: str,
folder_id: str | None,
) -> None:
self.personas[persona_id].folder_id = folder_id
async def get_folders(self, parent_id: str | None) -> list[SimpleNamespace]:
return [
folder for folder in self.folders.values() if folder.parent_id == parent_id
]
async def get_folder_tree(self) -> list:
return []
async def get_folder(self, folder_id: str):
return self.folders.get(folder_id)
async def create_folder(self, **kwargs):
folder_id = kwargs.get("folder_id") or kwargs["name"]
folder = self._folder(folder_id=folder_id, **kwargs)
self.folders[folder.folder_id] = folder
return folder
async def update_folder(self, folder_id: str, **kwargs) -> None:
folder = self.folders[folder_id]
for key, value in kwargs.items():
if value is not None:
setattr(folder, key, value)
async def delete_folder(self, folder_id: str) -> None:
self.folders.pop(folder_id, None)
async def batch_update_sort_order(self, items: list[dict]) -> None:
self.sort_items = list(items)
class FakeUmopConfigRouter:
def __init__(self) -> None:
self.umop_to_conf_id: dict[str, str] = {}
async def update_routing_data(self, new_routing: dict[str, str]) -> None:
self.umop_to_conf_id = dict(new_routing)
async def update_route(self, umo: str, conf_id: str) -> None:
self.umop_to_conf_id[umo] = conf_id
async def delete_route(self, umo: str) -> None:
self.umop_to_conf_id.pop(umo, None)
class FakeAstrBotUpdator:
async def check_update(self, *_args, **_kwargs):
return None
async def get_releases(self):
return []
async def update(self, *_args, **_kwargs) -> None:
return None
async def download_update_package(self, *_args, **kwargs):
return kwargs.get("path", "temp.zip")
def apply_update_package(self, *_args, **_kwargs) -> None:
return None
class FakeAstrBotConfig(dict):
def save_config(self, post_config: dict) -> None:
self.clear()
self.update(copy.deepcopy(post_config))
def _build_fake_config() -> dict:
return FakeAstrBotConfig(
{
"platform": [
{
"id": "webchat-main",
"type": "webchat",
"enable": True,
"settings": {"session_timeout": 60},
}
],
"provider_sources": [
{
"id": "openai-source",
"type": "openai_chat_completion",
"provider_type": "chat_completion",
"api_base": "https://api.example.test/v1",
"key": ["test-key"],
}
],
"provider": [
{
"id": "gpt-mini",
"provider_source_id": "openai-source",
"model": "gpt-4o-mini",
"enable": True,
},
{
"id": "agent-runner",
"type": "dify",
"provider_type": "agent_runner",
"enable": False,
},
],
}
)
async def _request_json(request: Request, *, silent: bool = False):
try:
return await request.json()
except Exception:
if silent:
return None
raise
def _register_dashboard_alias_routes(
app: FastAPI,
config: dict,
provider_manager: FakeProviderManager,
) -> None:
def _alias_username(request: Request) -> str:
auth_header = request.headers.get("Authorization", "")
token = auth_header.removeprefix("Bearer ").strip()
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload["username"]
def alias_get(path: str):
return app.get(path, include_in_schema=False)
def alias_post(path: str):
return app.post(path, include_in_schema=False)
def alias_api_route(path: str, methods: list[str]):
return app.api_route(path, methods=methods, include_in_schema=False)
@alias_get("/api/config/platform/list")
async def dashboard_alias_platform_list():
return ok({"platforms": config["platform"]})
@alias_get("/api/config/provider/list")
async def dashboard_alias_provider_list(request: Request):
provider_type = request.query_params.get("provider_type")
provider_types = provider_type.split(",") if provider_type else []
provider_source_types = {
source["id"]: source.get("provider_type", "chat_completion")
for source in provider_manager.provider_sources_config
}
providers = []
for provider in provider_manager.providers_config:
source_id = provider.get("provider_source_id")
if source_id:
if provider_source_types.get(source_id) in provider_types:
providers.append(
provider_manager.get_merged_provider_config(provider)
)
continue
if provider.get("provider_type") in provider_types:
providers.append(provider)
return ok(providers)
@alias_get("/api/stat/start-time")
async def dashboard_alias_start_time():
return ok({"start_time": 1234567890})
@alias_get("/api/session/active-umos")
async def dashboard_alias_active_umos():
return ok(
{
"umos": ["webchat:FriendMessage:webchat!user!session-1"],
"umo_infos": [
{
"umo": "webchat:FriendMessage:webchat!user!session-1",
"platform": "webchat",
"message_type": "FriendMessage",
"session_id": "webchat!user!session-1",
}
],
}
)
@alias_get("/api/plugin/get")
async def dashboard_alias_plugin_list(request: Request):
return ok(
{
"plugins": [{"name": "astrbot_plugin_demo"}],
"alias_username": _alias_username(request),
}
)
@alias_post("/api/plugin/off")
async def dashboard_alias_plugin_off(request: Request):
return ok(
{
"payload": await _request_json(request),
"alias_username": _alias_username(request),
}
)
@alias_post("/api/plugin/on")
async def dashboard_alias_plugin_on(request: Request):
return ok(
{
"payload": await _request_json(request),
"alias_username": _alias_username(request),
}
)
@alias_get("/api/plugin/detail")
async def dashboard_alias_plugin_detail(request: Request):
return ok({"name": request.query_params.get("name")})
@alias_post("/api/plugin/uninstall")
async def dashboard_alias_plugin_uninstall(request: Request):
return ok({"payload": await _request_json(request)})
@alias_get("/api/plugin/readme")
async def dashboard_alias_plugin_readme(request: Request):
return ok({"name": request.query_params.get("name"), "content": "readme"})
@alias_get("/api/plugin/changelog")
async def dashboard_alias_plugin_changelog(request: Request):
return ok({"name": request.query_params.get("name"), "content": "changes"})
@alias_post("/api/plugin/reload")
async def dashboard_alias_plugin_reload(request: Request):
return ok({"payload": await _request_json(request)})
@alias_post("/api/plugin/update")
async def dashboard_alias_plugin_update(request: Request):
return ok({"payload": await _request_json(request)})
@alias_post("/api/plugin/check-compat")
async def dashboard_alias_plugin_version_support(request: Request):
return ok(
{
"payload": await _request_json(request),
"alias_username": _alias_username(request),
}
)
@alias_get("/api/config/get")
async def dashboard_alias_config_get(request: Request):
return ok(
{
"plugin_name": request.query_params.get("plugin_name"),
"schema": {"type": "object"},
}
)
@alias_post("/api/config/plugin/update")
async def dashboard_alias_plugin_config_update(request: Request):
return ok(
{
"plugin_name": request.query_params.get("plugin_name"),
"payload": await _request_json(request),
}
)
@alias_api_route("/api/plug/{plugin_path:path}", methods=["GET", "POST"])
async def dashboard_alias_plugin_extension(plugin_path: str, request: Request):
return ok(
{
"plugin_path": plugin_path,
"method": request.method,
"payload": await _request_json(request, silent=True),
"alias_username": _alias_username(request),
}
)
@alias_get("/api/config/file/get")
async def dashboard_alias_config_file_get(request: Request):
return ok(
{
"scope": request.query_params.get("scope"),
"name": request.query_params.get("name"),
"key": request.query_params.get("key"),
}
)
@alias_post("/api/config/file/upload")
async def dashboard_alias_config_file_upload(request: Request):
return ok(
{
"scope": request.query_params.get("scope"),
"name": request.query_params.get("name"),
"key": request.query_params.get("key"),
"payload": await _request_json(request, silent=True),
}
)
@alias_post("/api/config/file/delete")
async def dashboard_alias_config_file_delete(request: Request):
return ok(
{
"scope": request.query_params.get("scope"),
"name": request.query_params.get("name"),
"payload": await _request_json(request),
}
)
@alias_get("/api/skills")
async def dashboard_alias_skill_list():
return ok({"skills": [{"name": "demo_skill"}], "runtime": "local"})
@alias_post("/api/skills/update")
async def dashboard_alias_skill_update(request: Request):
return ok({"payload": await _request_json(request)})
@alias_post("/api/skills/delete")
async def dashboard_alias_skill_delete(request: Request):
return ok({"payload": await _request_json(request)})
@alias_get("/api/skills/download")
async def dashboard_alias_skill_download(request: Request):
return ok({"name": request.query_params.get("name")})
@alias_get("/api/skills/files")
async def dashboard_alias_skill_files(request: Request):
return ok(
{
"name": request.query_params.get("name"),
"path": request.query_params.get("path"),
}
)
@alias_get("/api/skills/file")
async def dashboard_alias_skill_file_get(request: Request):
return ok(
{
"name": request.query_params.get("name"),
"path": request.query_params.get("path"),
}
)
@alias_post("/api/skills/file")
async def dashboard_alias_skill_file_update(request: Request):
return ok({"payload": await _request_json(request)})
@alias_post("/api/config/provider/get_embedding_dim")
async def dashboard_alias_provider_embedding_dim(request: Request):
return ok({"payload": await _request_json(request)})
@alias_get("/api/file/{file_token}")
async def dashboard_alias_token_file(file_token: str):
return PlainTextResponse(f"token:{file_token}")
@pytest.fixture
def fake_db() -> FakeDb:
return FakeDb()
@pytest.fixture
def fake_core_lifecycle():
config = _build_fake_config()
provider_manager = FakeProviderManager(config)
platform = FakePlatform("webchat-main")
umop_config_router = FakeUmopConfigRouter()
reloaded_config_ids = []
platform_reload_configs = []
terminated_platform_ids = []
async def reload_pipeline_scheduler(config_id: str) -> None:
reloaded_config_ids.append(config_id)
async def reload_platform(config: dict) -> None:
platform_reload_configs.append(copy.deepcopy(config))
async def load_platform(config: dict) -> None:
platform_reload_configs.append(copy.deepcopy(config))
async def terminate_platform(platform_id: str) -> None:
terminated_platform_ids.append(platform_id)
demo_plugin = SimpleNamespace(
name="astrbot_plugin_demo",
repo=None,
author="demo",
desc="Demo plugin",
version="1.0.0",
reserved=False,
activated=True,
display_name="AstrBot Plugin Demo",
logo=None,
logo_path=None,
support_platforms=[],
astrbot_version="",
i18n={},
root_dir_name=None,
star_handler_full_names=[],
skills=[],
)
async def turn_off_plugin(plugin_name: str) -> None:
if plugin_name == demo_plugin.name:
demo_plugin.activated = False
async def turn_on_plugin(plugin_name: str) -> None:
if plugin_name == demo_plugin.name:
demo_plugin.activated = True
async def reload_plugin(plugin_name: str | None = None):
return True, f"reloaded {plugin_name or 'all'}"
def validate_astrbot_version_specifier(version_spec: str):
return True, f"supported: {version_spec}"
async def plugin_extension(plugin_path: str):
return ok(
{
"plugin_path": plugin_path,
"method": dashboard_request.method,
"payload": await dashboard_request.get_json(silent=True),
"username": g.username,
}
)
return SimpleNamespace(
astrbot_config=config,
astrbot_updator=FakeAstrBotUpdator(),
start_time=1234567890,
astrbot_config_mgr=SimpleNamespace(
confs={"default": config}, default_conf=config
),
reload_pipeline_scheduler=reload_pipeline_scheduler,
reloaded_config_ids=reloaded_config_ids,
platform_reload_configs=platform_reload_configs,
terminated_platform_ids=terminated_platform_ids,
umop_config_router=umop_config_router,
platform_manager=SimpleNamespace(
platform_insts=[platform],
fake_platform=platform,
reload=reload_platform,
load_platform=load_platform,
terminate_platform=terminate_platform,
get_all_stats=lambda: {
"platforms": [{"id": "webchat-main", "status": "running"}]
},
),
provider_manager=provider_manager,
persona_mgr=FakePersonaManager(),
conversation_manager=FakeConversationManager(),
platform_message_history_manager=SimpleNamespace(),
plugin_manager=SimpleNamespace(
context=SimpleNamespace(get_all_stars=lambda: [demo_plugin]),
failed_plugin_info=None,
failed_plugin_dict={},
turn_off_plugin=turn_off_plugin,
turn_on_plugin=turn_on_plugin,
reload=reload_plugin,
_validate_astrbot_version_specifier=validate_astrbot_version_specifier,
),
star_context=SimpleNamespace(
registered_web_apis=[
("/<path:plugin_path>", plugin_extension, ["GET", "POST"], "demo")
]
),
kb_manager=None,
)
@pytest.fixture
def asgi_app(fake_core_lifecycle, fake_db: FakeDb):
app = create_dashboard_asgi_app(
core_lifecycle=fake_core_lifecycle,
db=fake_db,
jwt_secret=JWT_SECRET,
)
app.state.dashboard_app_adapter = FastAPIAppAdapter(app)
_register_dashboard_alias_routes(
app,
fake_core_lifecycle.astrbot_config,
fake_core_lifecycle.provider_manager,
)
return app
@pytest_asyncio.fixture
async def asgi_client(asgi_app):
transport = httpx.ASGITransport(app=asgi_app)
async with httpx.AsyncClient(
transport=transport,
base_url="http://testserver",
) as client:
yield client
def _jwt_headers() -> dict[str, str]:
token = jwt.encode(
{"username": "fastapi-v1-test"},
JWT_SECRET,
algorithm="HS256",
)
return {"Authorization": f"Bearer {token}"}
@pytest.mark.asyncio
async def test_public_versions_route_uses_static_folder(
fake_core_lifecycle,
fake_db: FakeDb,
tmp_path: Path,
):
static_folder = tmp_path / "dist"
assets_folder = static_folder / "assets"
assets_folder.mkdir(parents=True)
(static_folder / "index.html").write_text("<!doctype html>", encoding="utf-8")
(assets_folder / "version").write_text("v9.8.7", encoding="utf-8")
app = create_dashboard_asgi_app(
core_lifecycle=fake_core_lifecycle,
db=fake_db,
jwt_secret=JWT_SECRET,
static_folder=str(static_folder),
)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(
transport=transport,
base_url="http://testserver",
) as client:
response = await client.get("/api/v1/stats/versions")
data = response.json()
assert response.status_code == 200
assert data["status"] == "ok"
assert data["data"]["webui_version"] == "v9.8.7"
assert data["data"]["astrbot_version"]
assert "astrbot_code_version" in data["data"]
def test_fastapi_app_adapter_registers_on_app_state():
app = FastAPI()
adapter = FastAPIAppAdapter(app)
assert app.state.dashboard_app_adapter is adapter
@pytest.mark.asyncio
async def test_v1_scope_dependencies_accept_dashboard_cookie(
asgi_client: httpx.AsyncClient,
):
token = jwt.encode(
{"username": "fastapi-v1-cookie-test"},
JWT_SECRET,
algorithm="HS256",
)
response = await asgi_client.get(
"/api/v1/bots",
headers={"Cookie": f"{DASHBOARD_JWT_COOKIE_NAME}={token}"},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert isinstance(data["data"]["bots"], list)
@pytest.mark.asyncio
async def test_v1_openapi_is_served_by_fastapi(asgi_client: httpx.AsyncClient):
response = await asgi_client.get("/api/v1/openapi.json")
assert response.status_code == 200
spec = response.json()
assert spec["openapi"].startswith("3.")
assert all(path.startswith("/api/v1/") for path in spec["paths"])
assert "/api/v1/bots" in spec["paths"]
assert "/api/v1/providers" in spec["paths"]
assert "/api/v1/plugins" in spec["paths"]
assert "/api/v1/conversations" in spec["paths"]
assert "/api/v1/mcp/servers" in spec["paths"]
assert "/api/v1/skills" in spec["paths"]
assert "/api/v1/file" in spec["paths"]
def test_static_openapi_v1_paths_include_api_version():
spec_path = Path(__file__).resolve().parents[1] / "openspec" / "openapi-v1.yaml"
in_paths = False
path_keys = []
for line in spec_path.read_text(encoding="utf-8").splitlines():
if line == "paths:":
in_paths = True
continue
if line == "components:":
in_paths = False
if in_paths and line.startswith(" /") and line.endswith(":"):
path_keys.append(line.strip()[:-1])
assert path_keys
assert all(path.startswith("/api/v1/") for path in path_keys)
@pytest.mark.asyncio
async def test_dashboard_static_dist_files_are_served(
fake_core_lifecycle,
fake_db: FakeDb,
tmp_path: Path,
):
static_folder = tmp_path / "dist"
assets_folder = static_folder / "assets"
assets_folder.mkdir(parents=True)
(static_folder / "index.html").write_text(
'<script type="module" src="/assets/index-demo.js"></script>',
encoding="utf-8",
)
(static_folder / "favicon.svg").write_text("<svg></svg>", encoding="utf-8")
(assets_folder / "index-demo.js").write_text(
"window.__astrbotStaticTest = true;",
encoding="utf-8",
)
(tmp_path / "secret.txt").write_text("outside static root", encoding="utf-8")
app = create_dashboard_asgi_app(
core_lifecycle=fake_core_lifecycle,
db=fake_db,
jwt_secret=JWT_SECRET,
static_folder=str(static_folder),
)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(
transport=transport,
base_url="http://testserver",
) as client:
asset_response = await client.get("/assets/index-demo.js")
favicon_response = await client.get("/favicon.svg")
page_response = await client.get("/config")
missing_response = await client.get("/assets/missing.js")
traversal_response = await client.get("/assets/%2E%2E/%2E%2E/secret.txt")
api_response = await client.get("/api/not-found")
assert asset_response.status_code == 200
assert "window.__astrbotStaticTest" in asset_response.text
assert favicon_response.status_code == 200
assert favicon_response.text == "<svg></svg>"
assert page_response.status_code == 200
assert "/assets/index-demo.js" in page_response.text
assert missing_response.status_code == 404
assert traversal_response.status_code == 404
assert api_response.status_code == 404
@pytest.mark.asyncio
async def test_v1_backup_path_rejects_traversal(asgi_client: httpx.AsyncClient):
download_response = await asgi_client.get(
"/api/v1/backups/%2E%2E/secret.zip",
params={"token": "demo"},
)
delete_response = await asgi_client.delete(
"/api/v1/backups/%2E%2E/secret.zip",
headers=_jwt_headers(),
)
assert download_response.status_code == 200
assert delete_response.status_code == 200
assert download_response.json()["status"] == "error"
assert delete_response.json()["status"] == "error"
assert "非法路径" in download_response.json()["message"]
assert "非法路径" in delete_response.json()["message"]
@pytest.mark.asyncio
async def test_v1_openapi_uses_pydantic_request_bodies(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.get("/api/v1/openapi.json")
assert response.status_code == 200
spec = response.json()
schemas = spec["components"]["schemas"]
assert "BotRegistrationRequest" in schemas
assert "ConfigContentRequest" in schemas
bot_registration = spec["paths"]["/api/v1/bot-types/{bot_type}/registration"][
"post"
]
assert bot_registration["parameters"][0]["name"] == "bot_type"
assert bot_registration["requestBody"]["content"]["application/json"]["schema"][
"$ref"
].endswith("/BotRegistrationRequest")
config_profile_update = spec["paths"]["/api/v1/config-profiles/{config_id}"]["put"]
assert config_profile_update["requestBody"]["content"]["application/json"][
"schema"
]["$ref"].endswith("/ConfigContentRequest")
system_config_update = spec["paths"]["/api/v1/system-config"]["put"]
assert system_config_update["requestBody"]["content"]["application/json"]["schema"][
"$ref"
].endswith("/ConfigContentRequest")
open_api_file_upload = spec["paths"]["/api/v1/file"]["post"]
assert open_api_file_upload["requestBody"]["content"]["multipart/form-data"][
"schema"
]["$ref"].endswith("/Body_uploadOpenApiFile")
assert open_api_file_upload["x-astrbot-scope"] == "file"
@pytest.mark.asyncio
async def test_v1_conversation_path_id_allows_slash(asgi_client: httpx.AsyncClient):
response = await asgi_client.get(
"/api/v1/conversations/conversation%2Fwith%2Fslash",
params={"user_id": "webchat:FriendMessage:webchat!user!session-1"},
headers=_jwt_headers(),
)
assert response.status_code == 200
payload = response.json()
assert payload["status"] == "ok"
assert payload["data"]["cid"] == "conversation/with/slash"
@pytest.mark.asyncio
async def test_v1_conversation_detail_requires_user_id(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.get(
"/api/v1/conversations/conversation%2Fwith%2Fslash",
headers=_jwt_headers(),
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_dashboard_alias_conversation_detail_uses_fastapi_service(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.post(
"/api/conversation/detail",
json={
"user_id": "webchat:FriendMessage:webchat!user!session-1",
"cid": "conversation/with/slash",
},
headers=_jwt_headers(),
)
assert response.status_code == 200
payload = response.json()
assert payload["status"] == "ok"
assert payload["data"]["cid"] == "conversation/with/slash"
@pytest.mark.asyncio
async def test_v1_bots_matches_dashboard_platform_alias_list(
asgi_client: httpx.AsyncClient,
):
headers = _jwt_headers()
dashboard_alias_response = await asgi_client.get(
"/api/config/platform/list",
headers=headers,
)
v1_response = await asgi_client.get("/api/v1/bots", headers=headers)
assert dashboard_alias_response.status_code == 200
assert v1_response.status_code == 200
dashboard_alias_data = dashboard_alias_response.json()
v1_data = v1_response.json()
assert dashboard_alias_data["status"] == "ok"
assert v1_data["status"] == "ok"
assert v1_data["data"]["bots"] == dashboard_alias_data["data"]["platforms"]
@pytest.mark.asyncio
async def test_v1_bot_stats_match_platform_manager(asgi_client: httpx.AsyncClient):
response = await asgi_client.get("/api/v1/bots/stats", headers=_jwt_headers())
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"]["platforms"] == [{"id": "webchat-main", "status": "running"}]
@pytest.mark.asyncio
async def test_v1_config_routes_can_replace_all_routes(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
routing = {
"webchat-main:private:*": "default",
"webchat-main:group:demo": "group-conf",
}
response = await asgi_client.put(
"/api/v1/config-routes",
headers=_jwt_headers(),
json={"routing": routing},
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
assert fake_core_lifecycle.umop_config_router.umop_to_conf_id == routing
list_response = await asgi_client.get(
"/api/v1/config-routes",
headers=_jwt_headers(),
)
assert list_response.status_code == 200
assert list_response.json()["data"]["routing"] == routing
@pytest.mark.asyncio
async def test_v1_active_umos_uses_session_service(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.get(
"/api/v1/sessions/active-umos",
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"]["umos"] == ["webchat:FriendMessage:webchat!user!session-1"]
assert data["data"]["umo_infos"][0]["platform"] == "webchat"
@pytest.mark.asyncio
async def test_v1_system_config_update_preserves_independent_bot_provider_sections(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
monkeypatch: pytest.MonkeyPatch,
):
def fake_save_config(post_config: dict, config: FakeAstrBotConfig, is_core=False):
config.save_config(post_config)
monkeypatch.setattr(config_service, "save_config", fake_save_config)
original_platform = copy.deepcopy(fake_core_lifecycle.astrbot_config["platform"])
original_provider_sources = copy.deepcopy(
fake_core_lifecycle.astrbot_config["provider_sources"]
)
original_providers = copy.deepcopy(fake_core_lifecycle.astrbot_config["provider"])
payload = copy.deepcopy(fake_core_lifecycle.astrbot_config)
payload["platform"] = []
payload["provider_sources"] = []
payload["provider"] = []
payload["provider_settings"] = {"default_provider_id": "gpt-mini"}
response = await asgi_client.put(
"/api/v1/system-config",
headers=_jwt_headers(),
json=payload,
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
assert fake_core_lifecycle.astrbot_config["platform"] == original_platform
assert (
fake_core_lifecycle.astrbot_config["provider_sources"]
== original_provider_sources
)
assert fake_core_lifecycle.astrbot_config["provider"] == original_providers
assert fake_core_lifecycle.astrbot_config["provider_settings"] == {
"default_provider_id": "gpt-mini"
}
assert fake_core_lifecycle.reloaded_config_ids == ["default"]
@pytest.mark.asyncio
async def test_v1_system_config_returns_system_metadata(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.get(
"/api/v1/system-config",
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert "system_group" in data["data"]["metadata"]
assert "platform_group" not in data["data"]["metadata"]
@pytest.mark.asyncio
async def test_v1_providers_matches_dashboard_provider_alias_list(
asgi_client: httpx.AsyncClient,
):
headers = _jwt_headers()
dashboard_alias_response = await asgi_client.get(
"/api/config/provider/list?provider_type=chat_completion",
headers=headers,
)
v1_response = await asgi_client.get(
"/api/v1/providers?capability=chat",
headers=headers,
)
assert dashboard_alias_response.status_code == 200
assert v1_response.status_code == 200
dashboard_alias_data = dashboard_alias_response.json()
v1_data = v1_response.json()
assert dashboard_alias_data["status"] == "ok"
assert v1_data["status"] == "ok"
assert v1_data["data"]["providers"] == dashboard_alias_data["data"]
@pytest.mark.asyncio
async def test_v1_provider_source_rename_updates_provider_refs(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setattr(
"astrbot.dashboard.services.config_service.save_config",
lambda *_args, **_kwargs: None,
)
response = await asgi_client.put(
"/api/v1/provider-sources/openai-source",
json={
"config": {
"id": "openai-renamed",
"type": "openai_chat_completion",
"provider_type": "chat_completion",
"api_base": "https://api.example.test/v1",
"key": ["test-key"],
}
},
headers=_jwt_headers(),
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
config = fake_core_lifecycle.astrbot_config
assert config["provider_sources"][0]["id"] == "openai-renamed"
assert config["provider"][0]["provider_source_id"] == "openai-renamed"
assert (
fake_core_lifecycle.provider_manager.provider_sources_config[0]["id"]
== "openai-renamed"
)
assert fake_core_lifecycle.provider_manager.reloaded_providers == [
config["provider"][0]
]
@pytest.mark.asyncio
async def test_v1_provider_update_keeps_dashboard_id_rename_behavior(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
response = await asgi_client.put(
"/api/v1/providers/gpt-mini",
json={
"config": {
"id": "gpt-renamed",
"provider_source_id": "openai-source",
"model": "gpt-4o-mini",
"enable": True,
}
},
headers=_jwt_headers(),
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
config = fake_core_lifecycle.astrbot_config
assert config["provider"][0]["id"] == "gpt-renamed"
assert fake_core_lifecycle.provider_manager.reloaded_providers == [
config["provider"][0]
]
@pytest.mark.asyncio
async def test_v1_create_standalone_provider_matches_dashboard_alias_capability(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
response = await asgi_client.post(
"/api/v1/providers",
json={
"config": {
"id": "tts-main",
"type": "edge_tts",
"provider_type": "text_to_speech",
"enable": True,
}
},
headers=_jwt_headers(),
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
assert fake_core_lifecycle.astrbot_config["provider"][-1] == {
"id": "tts-main",
"type": "edge_tts",
"provider_type": "text_to_speech",
"enable": True,
}
@pytest.mark.asyncio
async def test_v1_safe_provider_routes_accept_slash_ids(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setattr(config_service, "save_config", lambda *_args, **_kwargs: None)
source_id = "https://example.com/source"
provider_id = "qianxun/kimi-k2-0905-preview"
config = fake_core_lifecycle.astrbot_config
config["provider_sources"].append(
{
"id": source_id,
"type": "openai_chat_completion",
"provider_type": "chat_completion",
"api_base": "https://api.example.test/v1",
"key": ["test-key"],
}
)
config["provider"].append(
{
"id": provider_id,
"provider_source_id": source_id,
"model": "kimi-k2-0905-preview",
"enable": True,
}
)
provider_instance = FakeProviderInstance(provider_id)
fake_core_lifecycle.provider_manager.inst_map[provider_id] = provider_instance
async def fake_list_models(_service, requested_source_id: str):
return {"provider_source_id": requested_source_id, "models": ["model/a"]}
monkeypatch.setattr(
config_service.ProviderConfigService,
"list_provider_source_models",
fake_list_models,
)
headers = _jwt_headers()
get_response = await asgi_client.get(
"/api/v1/providers/by-id",
params={"provider_id": provider_id, "merged": True},
headers=headers,
)
schema_response = await asgi_client.get(
"/api/v1/providers/schema",
headers=headers,
)
path_test_response = await asgi_client.post(
"/api/v1/providers/qianxun%2Fkimi-k2-0905-preview/test",
headers=headers,
)
safe_test_response = await asgi_client.post(
"/api/v1/providers/test",
json={"provider_id": provider_id},
headers=headers,
)
enabled_response = await asgi_client.patch(
"/api/v1/providers/enabled",
json={"provider_id": provider_id, "enabled": False},
headers=headers,
)
embedding_response = await asgi_client.post(
"/api/v1/providers/embedding-dimension",
json={"provider_id": provider_id, "provider_config": {"model": "model/a"}},
headers=headers,
)
source_models_response = await asgi_client.get(
"/api/v1/provider-sources/models",
params={"source_id": source_id},
headers=headers,
)
source_providers_response = await asgi_client.get(
"/api/v1/provider-sources/providers",
params={"source_id": source_id},
headers=headers,
)
assert get_response.status_code == 200
assert get_response.json()["data"]["provider"]["id"] == provider_id
assert schema_response.status_code == 200
assert "config_schema" in schema_response.json()["data"]
assert path_test_response.status_code == 200
assert path_test_response.json()["data"]["status"] == "available"
assert safe_test_response.status_code == 200
assert safe_test_response.json()["data"]["status"] == "available"
assert provider_instance.tested is True
assert enabled_response.status_code == 200
assert config["provider"][-1]["enable"] is False
assert embedding_response.status_code == 400
assert embedding_response.json()["status"] == "error"
assert embedding_response.json()["message"] in {
"提供商适配器加载失败,请检查提供商类型配置或查看服务端日志",
"提供商不是 EmbeddingProvider 类型",
}
assert source_models_response.status_code == 200
assert source_models_response.json()["data"]["provider_source_id"] == source_id
assert source_providers_response.status_code == 200
assert source_providers_response.json()["data"]["providers"][0]["id"] == provider_id
@pytest.mark.asyncio
async def test_v1_safe_bot_routes_accept_slash_ids(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setattr(config_service, "save_config", lambda *_args, **_kwargs: None)
bot_id = "group/a"
fake_core_lifecycle.astrbot_config["platform"].append(
{"id": bot_id, "type": "webchat", "enable": True}
)
headers = _jwt_headers()
get_response = await asgi_client.get(
"/api/v1/bots/by-id",
params={"bot_id": bot_id},
headers=headers,
)
enabled_response = await asgi_client.patch(
"/api/v1/bots/enabled",
json={"bot_id": bot_id, "enabled": False},
headers=headers,
)
test_response = await asgi_client.post(
"/api/v1/bots/test",
json={"bot_id": bot_id},
headers=headers,
)
delete_response = await asgi_client.delete(
"/api/v1/bots/by-id",
params={"bot_id": bot_id},
headers=headers,
)
assert get_response.status_code == 200
assert get_response.json()["data"]["bot"]["id"] == bot_id
assert enabled_response.status_code == 200
assert fake_core_lifecycle.platform_reload_configs[-1]["id"] == bot_id
assert fake_core_lifecycle.platform_reload_configs[-1]["enable"] is False
assert test_response.status_code == 200
assert test_response.json()["data"] == {"id": bot_id, "status": "unsupported"}
assert delete_response.status_code == 200
assert fake_core_lifecycle.terminated_platform_ids == [bot_id]
@pytest.mark.asyncio
async def test_v1_config_scope_includes_bot_and_provider(
asgi_client: httpx.AsyncClient,
fake_db: FakeDb,
):
config_key = "abk_fastapi_v1_config"
fake_db.add_api_key(config_key, scopes=["config"])
bot_response = await asgi_client.get(
"/api/v1/bots",
headers={"X-API-Key": config_key},
)
provider_response = await asgi_client.get(
"/api/v1/providers/schema",
headers={"X-API-Key": config_key},
)
assert bot_response.status_code == 200
assert provider_response.status_code == 200
bot_key = "abk_fastapi_v1_bot"
fake_db.add_api_key(bot_key, scopes=["bot"])
response = await asgi_client.get(
"/api/v1/bots",
headers={"X-API-Key": bot_key},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert isinstance(data["data"]["bots"], list)
assert fake_db.touched_key_ids == ["config-key", "config-key", "config-key"]
@pytest.mark.asyncio
async def test_dashboard_alias_route_still_works_through_asgi_app(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.get("/api/stat/start-time")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"]["start_time"] == 1234567890
@pytest.mark.asyncio
async def test_v1_plugins_accept_api_key(
asgi_client: httpx.AsyncClient,
fake_db: FakeDb,
):
raw_key = "abk_fastapi_v1_plugin"
fake_db.add_api_key(raw_key, scopes=["plugin"])
response = await asgi_client.get(
"/api/v1/plugins",
headers={"X-API-Key": raw_key},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert [item["name"] for item in data["data"]] == ["astrbot_plugin_demo"]
@pytest.mark.asyncio
async def test_v1_plugin_enabled_patch_calls_service(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
response = await asgi_client.patch(
"/api/v1/plugins/astrbot_plugin_demo/enabled",
json={"enabled": False},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["message"] == "停用成功。"
plugin = fake_core_lifecycle.plugin_manager.context.get_all_stars()[0]
assert plugin.activated is False
@pytest.mark.asyncio
async def test_v1_plugin_version_support_check_uses_service(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.post(
"/api/v1/plugins/version-support/check",
json={"plugin_ids": ["astrbot_plugin_demo"]},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"] == {
"supported": True,
"message": "supported: ",
"astrbot_version": "",
}
@pytest.mark.asyncio
async def test_v1_plugin_validate_repo_uses_service(
asgi_app: FastAPI,
asgi_client: httpx.AsyncClient,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
captured = {}
async def fake_validate_plugin_repo(payload):
captured["payload"] = payload
return {
"valid": True,
"name": "astrbot_plugin_demo",
"version": "1.2.3",
}, "插件校验通过。"
monkeypatch.setattr(
plugin_service,
"validate_plugin_repo",
fake_validate_plugin_repo,
)
response = await asgi_client.post(
"/api/v1/plugins/validate/repo",
json={
"url": "https://github.com/AstrBotDevs/astrbot-plugin-demo",
"proxy": "https://proxy.example",
},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["message"] == "插件校验通过。"
assert data["data"]["version"] == "1.2.3"
assert captured["payload"] == {
"url": "https://github.com/AstrBotDevs/astrbot-plugin-demo",
"proxy": "https://proxy.example",
}
@pytest.mark.asyncio
async def test_v1_plugin_url_install_accepts_download_url_and_missing_body(
asgi_app: FastAPI,
asgi_client: httpx.AsyncClient,
monkeypatch: pytest.MonkeyPatch,
):
captured_payloads = []
plugin_service = asgi_app.state.services.plugins
async def fake_install_plugin(payload):
captured_payloads.append(payload)
if not payload.get("url"):
raise RuntimeError("missing url")
return {"name": "astrbot_plugin_demo"}, "安装成功。"
monkeypatch.setattr(plugin_service, "install_plugin", fake_install_plugin)
response = await asgi_client.post(
"/api/v1/plugins/install/url",
json={
"url": "https://github.com/AstrBotDevs/astrbot-plugin-demo",
"download_url": "https://cdn.example/plugin.zip",
"ignore_version_check": True,
"install_method": "market",
"registry_url": "https://example.com/plugins.json",
"market_plugin_id": "AstrBotDevs/astrbot-plugin-demo",
},
headers=_jwt_headers(),
)
empty_body_response = await asgi_client.post(
"/api/v1/plugins/install/url",
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert captured_payloads[0] == {
"url": "https://github.com/AstrBotDevs/astrbot-plugin-demo",
"download_url": "https://cdn.example/plugin.zip",
"proxy": None,
"ignore_version_check": True,
"install_method": "market",
"registry_url": "https://example.com/plugins.json",
"market_plugin_id": "AstrBotDevs/astrbot-plugin-demo",
}
assert empty_body_response.status_code == 200
empty_body_data = empty_body_response.json()
assert empty_body_data["status"] == "error"
assert empty_body_data["message"] == "插件操作失败,请查看服务端日志。"
assert "missing url" not in str(empty_body_data)
@pytest.mark.asyncio
async def test_plugin_service_market_install_uses_registry_entry(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
captured = {}
async def fake_get_online_plugins(*, custom_registry, force_refresh):
captured["registry_url"] = custom_registry
captured["force_refresh"] = force_refresh
return {
"$meta": {
"schema_version": 1,
"name": "Test Market",
"version": "2026.06.27",
},
"astrbot-plugin-demo": {
"author": "AstrBotDevs",
"repo": "https://github.com/AstrBotDevs/astrbot-plugin-demo",
"download_url": "https://cdn.example/market-plugin.zip",
},
}, None
async def fake_install_plugin(
repo_url,
proxy="",
ignore_version_check=False,
download_url="",
):
captured["repo_url"] = repo_url
captured["proxy"] = proxy
captured["ignore_version_check"] = ignore_version_check
captured["download_url"] = download_url
return {"name": "astrbot_plugin_demo"}
async def fake_persist_plugin_install_source(
plugin_info,
payload,
*,
fallback_method,
repo_url,
download_url,
):
captured["persist_payload"] = payload
captured["persist_fallback_method"] = fallback_method
captured["persist_repo_url"] = repo_url
captured["persist_download_url"] = download_url
async def fake_sync_skills_after_plugin_change():
captured["synced"] = True
monkeypatch.setattr(plugin_service, "get_online_plugins", fake_get_online_plugins)
monkeypatch.setattr(
plugin_service.plugin_manager,
"install_plugin",
fake_install_plugin,
raising=False,
)
monkeypatch.setattr(
plugin_service,
"persist_plugin_install_source",
fake_persist_plugin_install_source,
)
monkeypatch.setattr(
plugin_service,
"sync_skills_after_plugin_change",
fake_sync_skills_after_plugin_change,
)
result, message = await plugin_service.install_plugin(
{
"url": "https://github.com/SomeoneElse/wrong-plugin",
"download_url": "https://cdn.example/wrong-plugin.zip",
"install_method": "market",
"registry_url": "https://example.com/plugins.json",
"market_plugin_id": "AstrBotDevs/astrbot-plugin-demo",
"proxy": "https://proxy.example",
"ignore_version_check": True,
}
)
assert result == {"name": "astrbot_plugin_demo"}
assert message == "安装成功。"
assert captured["registry_url"] == "https://example.com/plugins.json"
assert captured["force_refresh"] is False
assert captured["repo_url"] == "https://github.com/AstrBotDevs/astrbot-plugin-demo"
assert captured["download_url"] == "https://cdn.example/market-plugin.zip"
assert captured["proxy"] == "https://proxy.example"
assert captured["ignore_version_check"] is True
assert captured["persist_fallback_method"] == "github"
assert (
captured["persist_repo_url"]
== "https://github.com/AstrBotDevs/astrbot-plugin-demo"
)
assert captured["persist_download_url"] == "https://cdn.example/market-plugin.zip"
assert (
captured["persist_payload"]["registry_url"]
== "https://example.com/plugins.json"
)
assert (
captured["persist_payload"]["market_plugin_id"]
== "AstrBotDevs/astrbot-plugin-demo"
)
assert captured["synced"] is True
@pytest.mark.asyncio
async def test_plugin_service_validate_plugin_repo_fetches_metadata_file(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
import astrbot.dashboard.services.plugin_service as plugin_service_module
from astrbot.core.star.updator import PluginUpdator
plugin_service = asgi_app.state.services.plugins
captured: dict[str, object] = {"urls": []}
updater = PluginUpdator.__new__(PluginUpdator)
async def fake_resolve_github_source_branch(repo_url: str):
assert repo_url == "https://github.com/AstrBotDevs/astrbot-plugin-demo"
return "AstrBotDevs", "astrbot-plugin-demo", "trunk"
plugin_service.plugin_manager.updator = SimpleNamespace(
parse_github_url=updater.parse_github_url,
resolve_github_source_branch=fake_resolve_github_source_branch,
validate_plugin_metadata=PluginUpdator.validate_plugin_metadata,
)
class FakeContent:
def __init__(self, text: str):
self._text = text
async def read(self, size: int) -> bytes:
return self._text.encode("utf-8")[:size]
class FakeResponse:
def __init__(self, status: int, *, text: str = "", payload=None):
self.status = status
self._text = text
self._payload = payload or {}
self.headers = {}
self.content = FakeContent(text)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
async def json(self):
return self._payload
async def text(self):
return self._text
class FakeClientSession:
def __init__(self, **kwargs):
captured["session_kwargs"] = kwargs
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
def get(self, url: str):
cast(list[str], captured["urls"]).append(url)
if url.endswith("/metadata.yaml"):
return FakeResponse(404)
if url.endswith("/metadata.yml"):
return FakeResponse(
200,
text="\n".join(
[
"name: astrbot_plugin_demo",
"description: Demo plugin",
"version: 2.0.0",
"author: AstrBotDevs",
"repo: https://github.com/AstrBotDevs/astrbot-plugin-demo",
]
),
)
return FakeResponse(404)
monkeypatch.setattr(
plugin_service_module.aiohttp,
"ClientSession",
FakeClientSession,
)
result, message = await plugin_service.validate_plugin_repo(
{
"url": "AstrBotDevs/astrbot-plugin-demo",
"proxy": "https://proxy.example/",
}
)
assert message == "插件校验通过。"
assert result["metadata_entry"] == "metadata.yml"
assert result["metadata_branch"] == "trunk"
assert result["desc"] == "Demo plugin"
assert result["version"] == "2.0.0"
assert (
"https://proxy.example/https://raw.githubusercontent.com/"
"AstrBotDevs/astrbot-plugin-demo/trunk/metadata.yml"
in cast(list[str], captured["urls"])
)
session_kwargs = cast(dict[str, object], captured["session_kwargs"])
assert "timeout" in session_kwargs
@pytest.mark.asyncio
async def test_plugin_service_validate_plugin_repo_rejects_large_metadata_file(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
import astrbot.dashboard.services.plugin_service as plugin_service_module
from astrbot.core.star.updator import PluginUpdator
plugin_service = asgi_app.state.services.plugins
async def fake_resolve_github_source_branch(repo_url: str):
assert repo_url == "https://github.com/AstrBotDevs/astrbot-plugin-demo"
return "AstrBotDevs", "astrbot-plugin-demo", "main"
plugin_service.plugin_manager.updator = SimpleNamespace(
resolve_github_source_branch=fake_resolve_github_source_branch,
validate_plugin_metadata=PluginUpdator.validate_plugin_metadata,
)
class FakeContent:
async def read(self, size: int) -> bytes: # noqa: ARG002
raise AssertionError("metadata body should not be read when too large")
class FakeResponse:
status = 200
headers = {
"Content-Length": str(plugin_service_module.PLUGIN_METADATA_MAX_BYTES + 1)
}
content = FakeContent()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
class FakeClientSession:
def __init__(self, **kwargs): # noqa: ARG002
return None
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
def get(self, url: str): # noqa: ARG002
return FakeResponse()
monkeypatch.setattr(
plugin_service_module.aiohttp,
"ClientSession",
FakeClientSession,
)
with pytest.raises(PluginServiceError, match="超过 1MB"):
await plugin_service.validate_plugin_repo(
{"url": "https://github.com/AstrBotDevs/astrbot-plugin-demo"}
)
@pytest.mark.asyncio
async def test_plugin_service_validate_plugin_repo_hides_internal_errors(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
import astrbot.dashboard.services.plugin_service as plugin_service_module
from astrbot.core.star.updator import PluginUpdator
plugin_service = asgi_app.state.services.plugins
async def fake_resolve_github_source_branch(repo_url: str):
assert repo_url == "https://github.com/AstrBotDevs/astrbot-plugin-demo"
return "AstrBotDevs", "astrbot-plugin-demo", "main"
plugin_service.plugin_manager.updator = SimpleNamespace(
resolve_github_source_branch=fake_resolve_github_source_branch,
validate_plugin_metadata=PluginUpdator.validate_plugin_metadata,
)
class FakeClientSession:
def __init__(self, **kwargs): # noqa: ARG002
return None
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
def get(self, url: str): # noqa: ARG002
raise RuntimeError("secret stack trace")
monkeypatch.setattr(
plugin_service_module.aiohttp,
"ClientSession",
FakeClientSession,
)
with pytest.raises(PluginServiceError) as exc_info:
await plugin_service.validate_plugin_repo(
{"url": "https://github.com/AstrBotDevs/astrbot-plugin-demo"}
)
assert exc_info.value.public_message == "插件校验失败,请查看服务端日志。"
assert "secret stack trace" not in exc_info.value.public_message
@pytest.mark.asyncio
async def test_plugin_service_bind_market_source_validates_and_persists(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
plugin = SimpleNamespace(
name="astrbot_plugin_demo",
root_dir_name="astrbot_plugin_demo",
repo="https://github.com/AstrBotDevs/astrbot-plugin-demo",
)
captured = {}
async def fake_get_online_plugins(*, custom_registry, force_refresh):
captured["registry_url"] = custom_registry
captured["force_refresh"] = force_refresh
return {
"$meta": {
"schema_version": 1,
"name": "Test Market",
"version": "2026.06.27",
},
"astrbot-plugin-demo": {
"author": "AstrBotDevs",
"repo": "https://github.com/AstrBotDevs/astrbot-plugin-demo.git",
"download_url": "https://cdn.example/plugin.zip",
},
}, None
async def fake_get_plugin_install_sources():
return {"astrbot_plugin_demo": {"installed_at": "2026-06-26T00:00:00+00:00"}}
async def fake_save_plugin_install_sources(records):
captured["records"] = records
monkeypatch.setattr(plugin_service, "find_plugin_by_name", lambda name: plugin)
monkeypatch.setattr(plugin_service, "get_online_plugins", fake_get_online_plugins)
monkeypatch.setattr(
plugin_service,
"get_plugin_install_sources",
fake_get_plugin_install_sources,
)
monkeypatch.setattr(
plugin_service,
"save_plugin_install_sources",
fake_save_plugin_install_sources,
)
record, message = await plugin_service.bind_plugin_market_source(
{
"name": "astrbot_plugin_demo",
"registry_url": "https://example.com/plugins.json",
"market_plugin_id": "AstrBotDevs/astrbot-plugin-demo",
}
)
assert message == "插件源已更新。"
assert captured["registry_url"] == "https://example.com/plugins.json"
assert captured["force_refresh"] is False
assert record["install_method"] == "market"
assert record["registry_url"] == "https://example.com/plugins.json"
assert record["market_plugin_id"] == "AstrBotDevs/astrbot-plugin-demo"
assert record["repo"] == "https://github.com/AstrBotDevs/astrbot-plugin-demo.git"
assert record["download_url"] == "https://cdn.example/plugin.zip"
assert record["installed_at"] == "2026-06-26T00:00:00+00:00"
assert captured["records"]["astrbot_plugin_demo"] == record
@pytest.mark.asyncio
async def test_plugin_service_bind_repo_source_persists_github_method(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
plugin = SimpleNamespace(
name="astrbot_plugin_demo",
root_dir_name="astrbot_plugin_demo",
repo="https://github.com/AstrBotDevs/astrbot-plugin-demo",
)
captured = {}
async def fake_get_plugin_install_sources():
return {"astrbot_plugin_demo": {"installed_at": "2026-06-26T00:00:00+00:00"}}
async def fake_save_plugin_install_sources(records):
captured["records"] = records
monkeypatch.setattr(plugin_service, "find_plugin_by_name", lambda name: plugin)
monkeypatch.setattr(
plugin_service,
"get_plugin_install_sources",
fake_get_plugin_install_sources,
)
monkeypatch.setattr(
plugin_service,
"save_plugin_install_sources",
fake_save_plugin_install_sources,
)
record, message = await plugin_service.bind_plugin_market_source(
{
"name": "astrbot_plugin_demo",
"install_method": "github",
}
)
assert message == "插件源已更新。"
assert record["install_method"] == "github"
assert record["registry_url"] is None
assert record["registry_name"] == "Repository"
assert record["repo"] == "https://github.com/AstrBotDevs/astrbot-plugin-demo"
assert record["installed_at"] == "2026-06-26T00:00:00+00:00"
assert captured["records"]["astrbot_plugin_demo"] == record
@pytest.mark.asyncio
async def test_plugin_service_bind_market_source_rejects_repo_mismatch(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
plugin = SimpleNamespace(
name="astrbot_plugin_demo",
root_dir_name="astrbot_plugin_demo",
repo="https://github.com/AstrBotDevs/astrbot-plugin-demo",
)
async def fake_get_online_plugins(*, custom_registry, force_refresh):
return {
"$meta": {
"schema_version": 1,
"name": "Test Market",
"version": "2026.06.27",
},
"astrbot-plugin-demo": {
"author": "AstrBotDevs",
"repo": "https://github.com/SomeoneElse/astrbot-plugin-demo",
},
}, None
monkeypatch.setattr(plugin_service, "find_plugin_by_name", lambda name: plugin)
monkeypatch.setattr(plugin_service, "get_online_plugins", fake_get_online_plugins)
with pytest.raises(Exception) as exc_info:
await plugin_service.bind_plugin_market_source(
{
"name": "astrbot_plugin_demo",
"market_plugin_id": "AstrBotDevs/astrbot-plugin-demo",
}
)
assert "插件仓库地址与所选插件源不一致" in str(exc_info.value)
def test_plugin_service_repo_identifier_accepts_github_url_without_scheme(
asgi_app: FastAPI,
):
plugin_service = asgi_app.state.services.plugins
assert (
plugin_service.repo_identifier_from_url("github.com/AstrBotDevs/demo.git")
== "AstrBotDevs/demo"
)
def test_plugin_service_resolves_market_entry_by_repo_identifier(
asgi_app: FastAPI,
):
plugin_service = asgi_app.state.services.plugins
record = {
"repo": "https://github.com/AstrBotDevs/astrbot-plugin-demo.git",
}
market_data = {
"$meta": {"schema_version": 1},
"astrbot-plugin-demo": {
"author": "AstrBotDevs",
"repo": "https://www.github.com/AstrBotDevs/astrbot-plugin-demo",
},
}
entry = plugin_service.resolve_market_plugin_entry(market_data, record)
assert entry is not None
assert entry["author"] == "AstrBotDevs"
assert entry["name"] == "astrbot-plugin-demo"
assert entry["repo"] == "https://www.github.com/AstrBotDevs/astrbot-plugin-demo"
@pytest.mark.asyncio
async def test_plugin_service_persist_install_source_resolves_registry_before_read(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
plugin = SimpleNamespace(
name="astrbot_plugin_demo",
root_dir_name="astrbot_plugin_demo",
repo="https://github.com/AstrBotDevs/astrbot-plugin-demo",
)
events = []
captured = {}
async def fake_resolve_registry_name(registry_url):
events.append(("resolve", registry_url))
return "Custom"
async def fake_get_plugin_install_sources():
events.append(("get", None))
return {}
async def fake_save_plugin_install_sources(records):
events.append(("save", None))
captured["records"] = records
monkeypatch.setattr(plugin_service, "find_plugin_by_name", lambda name: plugin)
monkeypatch.setattr(
plugin_service,
"resolve_registry_name",
fake_resolve_registry_name,
)
monkeypatch.setattr(
plugin_service,
"get_plugin_install_sources",
fake_get_plugin_install_sources,
)
monkeypatch.setattr(
plugin_service,
"save_plugin_install_sources",
fake_save_plugin_install_sources,
)
await plugin_service.persist_plugin_install_source(
{"name": "astrbot_plugin_demo"},
{
"registry_url": "https://example.com/plugins.json",
"install_method": "market",
"market_plugin_id": "AstrBotDevs/astrbot-plugin-demo",
},
fallback_method="url",
repo_url="https://github.com/AstrBotDevs/astrbot-plugin-demo",
download_url="",
)
assert events == [
("resolve", "https://example.com/plugins.json"),
("get", None),
("save", None),
]
record = captured["records"]["astrbot_plugin_demo"]
assert record["registry_name"] == "Custom"
def test_plugin_service_missing_install_source_is_implicit_for_display(
asgi_app: FastAPI,
):
plugin_service = asgi_app.state.services.plugins
plugin = SimpleNamespace(
name="astrbot_plugin_demo",
root_dir_name="astrbot_plugin_demo",
repo="https://github.com/AstrBotDevs/astrbot-plugin-demo",
reserved=False,
)
record = plugin_service.resolve_effective_plugin_install_source(plugin, {})
assert record["install_method"] == "market"
assert record["registry_url"] is None
assert record["registry_name"] == "Default"
assert record["repo"] == "https://github.com/AstrBotDevs/astrbot-plugin-demo"
assert record["implicit"] is True
assert record["name"] == "astrbot_plugin_demo"
assert record["marketplace_name"] == "astrbot-plugin-demo"
@pytest.mark.asyncio
async def test_plugin_service_update_missing_source_requires_selection(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
plugin = SimpleNamespace(
name="astrbot_plugin_demo",
root_dir_name="astrbot_plugin_demo",
repo="https://github.com/AstrBotDevs/astrbot-plugin-demo",
reserved=False,
)
async def fake_get_plugin_install_sources():
return {}
monkeypatch.setattr(plugin_service, "find_plugin_by_name", lambda name: plugin)
monkeypatch.setattr(
plugin_service,
"get_plugin_install_sources",
fake_get_plugin_install_sources,
)
with pytest.raises(PluginServiceError) as exc_info:
await plugin_service.resolve_market_update_info("astrbot_plugin_demo")
assert exc_info.value.public_message == PLUGIN_UPDATE_SOURCE_REQUIRED_MESSAGE
@pytest.mark.asyncio
async def test_plugin_service_update_github_source_uses_plugin_repo(
asgi_app: FastAPI,
monkeypatch: pytest.MonkeyPatch,
):
plugin_service = asgi_app.state.services.plugins
plugin = SimpleNamespace(
name="astrbot_plugin_demo",
root_dir_name="astrbot_plugin_demo",
repo="https://github.com/AstrBotDevs/astrbot-plugin-demo",
reserved=False,
)
async def fake_get_plugin_install_sources():
return {
"astrbot_plugin_demo": {
"install_method": "github",
"repo": "https://github.com/AstrBotDevs/astrbot-plugin-demo",
}
}
monkeypatch.setattr(plugin_service, "find_plugin_by_name", lambda name: plugin)
monkeypatch.setattr(
plugin_service,
"get_plugin_install_sources",
fake_get_plugin_install_sources,
)
update_info = await plugin_service.resolve_market_update_info("astrbot_plugin_demo")
assert update_info["repo"] == "https://github.com/AstrBotDevs/astrbot-plugin-demo"
assert update_info["download_url"] == ""
assert update_info["record"]["install_method"] == "github"
@pytest.mark.asyncio
async def test_v1_plugin_update_all_hides_internal_exceptions(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.post(
"/api/v1/plugins/update",
json={"plugin_ids": ["astrbot_plugin_demo"]},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
result = data["data"]["results"][0]
assert result["status"] == "error"
assert result["message"] == "更新失败,请查看服务端日志。"
assert "AttributeError" not in str(data)
assert "update_plugin" not in str(data)
@pytest.mark.asyncio
async def test_v1_plugin_extension_maps_nested_plugin_path(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.post(
"/api/v1/plugins/extensions/astrbot_plugin_demo/api/action",
json={"value": "demo"},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"] == {
"plugin_path": "astrbot_plugin_demo/api/action",
"method": "POST",
"payload": {"value": "demo"},
"username": "fastapi-v1-test",
}
@pytest.mark.asyncio
async def test_v1_plugin_extension_supports_astrbot_web_api(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
from astrbot.api.web import json_response
from astrbot.api.web import request as plugin_request
async def astrbot_web_plugin_extension(item_id: str):
return json_response(
{
"item_id": item_id,
"path_value": plugin_request.path_params["item_id"],
"path": plugin_request.path,
"method": plugin_request.method,
"limit": plugin_request.query.get("limit", 20, type=int),
"tags": plugin_request.query.getlist("tag"),
"payload": await plugin_request.json(default={}),
"username": plugin_request.username,
"plugin_name": plugin_request.plugin_name,
},
status_code=201,
)
fake_core_lifecycle.star_context.registered_web_apis = [
("/web/<item_id>", astrbot_web_plugin_extension, ["POST"], "web")
]
response = await asgi_client.post(
"/api/v1/plugins/extensions/web/demo-item?limit=7&tag=one&tag=two",
json={"value": "demo"},
headers=_jwt_headers(),
)
assert response.status_code == 201
data = response.json()
assert data == {
"item_id": "demo-item",
"path_value": "demo-item",
"path": "/api/v1/plugins/extensions/web/demo-item",
"method": "POST",
"limit": 7,
"tags": ["one", "two"],
"payload": {"value": "demo"},
"username": "fastapi-v1-test",
"plugin_name": "web",
}
@pytest.mark.asyncio
async def test_v1_plugin_extension_astrbot_web_api_reads_form_and_files(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
from astrbot.api.web import PluginUploadFile, json_response
from astrbot.api.web import request as plugin_request
async def astrbot_web_upload_extension():
form = await plugin_request.form()
files = await plugin_request.files()
upload: PluginUploadFile | None = files.get("file")
assert isinstance(upload, PluginUploadFile)
return json_response(
{
"tags": form.getlist("tag"),
"filename": upload.filename,
"content_type": upload.content_type,
"content": (await upload.read()).decode("utf-8"),
}
)
fake_core_lifecycle.star_context.registered_web_apis = [
("/upload", astrbot_web_upload_extension, ["POST"], "upload")
]
response = await asgi_client.post(
"/api/v1/plugins/extensions/upload",
files=[
("tag", (None, "one")),
("tag", (None, "two")),
("file", ("demo.txt", b"hello", "text/plain")),
],
headers=_jwt_headers(),
)
assert response.status_code == 200
assert response.json() == {
"tags": ["one", "two"],
"filename": "demo.txt",
"content_type": "text/plain",
"content": "hello",
}
def test_astrbot_web_request_requires_plugin_context():
from astrbot.api.web import request as plugin_request
with pytest.raises(RuntimeError, match="plugin Web API handler"):
_ = plugin_request.method
def test_astrbot_web_request_proxy_exposes_typed_methods():
from typing import get_type_hints
from astrbot.api.web import (
PluginMultiDict,
PluginRequestProxy,
PluginUploadFile,
)
from astrbot.api.web import request as plugin_request
assert isinstance(plugin_request, PluginRequestProxy)
assert get_type_hints(type(plugin_request).form)["return"] == PluginMultiDict[str]
assert (
get_type_hints(type(plugin_request).files)["return"]
== PluginMultiDict[PluginUploadFile]
)
@pytest.mark.asyncio
async def test_v1_plugin_extension_supports_quart_request_context(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
from quart import g as quart_g
from quart import jsonify as quart_jsonify
from quart import request as quart_request
async def quart_plugin_extension(item_id: str):
return quart_jsonify(
{
"status": "ok",
"data": {
"item_id": item_id,
"path": quart_request.path,
"method": quart_request.method,
"source": quart_request.args.get("source"),
"payload": await quart_request.get_json(),
"username": quart_g.username,
},
}
)
fake_core_lifecycle.star_context.registered_web_apis = [
("/quart/<item_id>", quart_plugin_extension, ["POST"], "quart")
]
response = await asgi_client.post(
"/api/v1/plugins/extensions/quart/demo-item?source=v1",
json={"value": "demo"},
headers=_jwt_headers(),
)
assert response.status_code == 200
assert response.headers["content-type"].startswith("application/json")
data = response.json()
assert data["status"] == "ok"
assert data["data"] == {
"item_id": "demo-item",
"path": "/api/plug/quart/demo-item",
"method": "POST",
"source": "v1",
"payload": {"value": "demo"},
"username": "fastapi-v1-test",
}
@pytest.mark.asyncio
async def test_multipart_parts_preserves_duplicate_form_values():
from starlette.datastructures import FormData
from astrbot.dashboard.api.multipart import multipart_parts
class FakeRequest:
async def form(self):
return FormData([("tag", "one"), ("tag", "two")])
form, files = await multipart_parts(FakeRequest())
assert form.getlist("tag") == ["one", "two"]
assert not files
@pytest.mark.asyncio
async def test_v1_plugin_config_file_routes_reach_service_layer(
asgi_client: httpx.AsyncClient,
):
headers = _jwt_headers()
list_response = await asgi_client.get(
"/api/v1/plugins/astrbot_plugin_demo/config-files/assets",
headers=headers,
)
upload_response = await asgi_client.post(
"/api/v1/plugins/astrbot_plugin_demo/config-files/assets",
json={"filename": "demo.txt"},
headers=headers,
)
delete_response = await asgi_client.request(
"DELETE",
"/api/v1/plugins/astrbot_plugin_demo/config-files",
json={"path": "demo.txt"},
headers=headers,
)
assert list_response.status_code == 400
assert list_response.json()["status"] == "error"
assert upload_response.status_code == 400
assert upload_response.json()["status"] == "error"
assert delete_response.status_code == 400
assert delete_response.json()["status"] == "error"
@pytest.mark.asyncio
async def test_v1_safe_plugin_routes_accept_slash_ids(
asgi_app: FastAPI,
asgi_client: httpx.AsyncClient,
monkeypatch: pytest.MonkeyPatch,
):
plugin_id = "plugin/foo"
headers = _jwt_headers()
plugin_service = asgi_app.state.services.plugins
config_display_service = asgi_app.state.services.config_display
config_file_service = asgi_app.state.services.config_files
async def fake_get_plugin_detail(**kwargs):
return {"name": kwargs["plugin_name"]}
async def fake_set_plugin_enabled(data, *, enabled: bool):
return {"payload": {"name": data["name"], "enabled": enabled}}
async def fake_update_plugin(data):
return {"payload": data}
def fake_get_plugin_readme(name: str):
return {"name": name, "content": "readme"}, "ok"
async def fake_get_configs(name: str):
return {"schema": {"name": name}}
def fake_list_config_files(*, scope: str, name: str, key_path: str):
return {"scope": scope, "name": name, "key": key_path}
monkeypatch.setattr(plugin_service, "get_plugin_detail", fake_get_plugin_detail)
monkeypatch.setattr(plugin_service, "set_plugin_enabled", fake_set_plugin_enabled)
monkeypatch.setattr(plugin_service, "update_plugin", fake_update_plugin)
monkeypatch.setattr(plugin_service, "get_plugin_readme", fake_get_plugin_readme)
monkeypatch.setattr(config_display_service, "get_configs", fake_get_configs)
monkeypatch.setattr(
config_file_service,
"list_config_files",
fake_list_config_files,
)
detail_response = await asgi_client.get(
"/api/v1/plugins/by-id",
params={"plugin_id": plugin_id},
headers=headers,
)
enabled_response = await asgi_client.patch(
"/api/v1/plugins/enabled",
json={"plugin_id": plugin_id, "enabled": False},
headers=headers,
)
update_response = await asgi_client.post(
"/api/v1/plugins/update",
json={"plugin_id": plugin_id, "reinstall": True},
headers=headers,
)
readme_response = await asgi_client.get(
"/api/v1/plugins/readme",
params={"plugin_id": plugin_id},
headers=headers,
)
schema_response = await asgi_client.get(
"/api/v1/plugins/config/schema",
params={"plugin_id": plugin_id},
headers=headers,
)
config_files_response = await asgi_client.get(
"/api/v1/plugins/config-files",
params={"plugin_id": plugin_id, "config_key": "assets/path"},
headers=headers,
)
assert detail_response.status_code == 200
assert detail_response.json()["data"]["name"] == plugin_id
assert enabled_response.status_code == 200
assert enabled_response.json()["data"]["payload"] == {
"name": plugin_id,
"enabled": False,
}
assert update_response.status_code == 200
assert update_response.json()["data"]["payload"] == {
"name": plugin_id,
"reinstall": True,
}
assert readme_response.status_code == 200
assert readme_response.json()["data"]["name"] == plugin_id
assert schema_response.status_code == 200
assert schema_response.json()["data"]["plugin_name"] == plugin_id
assert config_files_response.status_code == 200
assert config_files_response.json()["data"] == {
"scope": "plugin",
"name": plugin_id,
"key": "assets/path",
}
@pytest.mark.asyncio
async def test_v1_safe_plugin_source_delete_accepts_slash_ids(
asgi_client: httpx.AsyncClient,
monkeypatch: pytest.MonkeyPatch,
):
source_id = "https://example.com/source"
sources = [{"id": source_id}, {"id": "keep"}]
async def fake_global_get(_key, _default=None):
return list(sources)
async def fake_global_put(_key, value):
sources[:] = value
monkeypatch.setattr(
"astrbot.dashboard.services.plugin_service.sp.global_get",
fake_global_get,
)
monkeypatch.setattr(
"astrbot.dashboard.services.plugin_service.sp.global_put",
fake_global_put,
)
response = await asgi_client.delete(
"/api/v1/plugin-sources/by-id",
params={"source_id": source_id},
headers=_jwt_headers(),
)
assert response.status_code == 200
assert response.json()["data"]["sources"] == [{"id": "keep"}]
@pytest.mark.asyncio
async def test_v1_command_patch_updates_service(
asgi_app: FastAPI,
asgi_client: httpx.AsyncClient,
monkeypatch: pytest.MonkeyPatch,
):
async def fake_toggle(handler_full_name: str | None, enabled):
return {
"handler_full_name": handler_full_name,
"enabled": enabled,
}
monkeypatch.setattr(
asgi_app.state.services.commands,
"toggle_command",
fake_toggle,
)
response = await asgi_client.patch(
"/api/v1/commands/plugin.handler",
json={"enabled": False},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"] == {
"handler_full_name": "plugin.handler",
"enabled": False,
}
@pytest.mark.asyncio
async def test_v1_bot_type_registration_uses_platform_service(
asgi_app: FastAPI,
asgi_client: httpx.AsyncClient,
monkeypatch: pytest.MonkeyPatch,
):
async def fake_registration(platform_type: str, payload: dict):
return {"platform_type": platform_type, "payload": payload}
monkeypatch.setattr(
asgi_app.state.services.platforms,
"handle_platform_registration",
fake_registration,
)
response = await asgi_client.post(
"/api/v1/bot-types/webchat/registration",
json={"registration_code": "abc123"},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"] == {
"platform_type": "webchat",
"payload": {"registration_code": "abc123"},
}
@pytest.mark.asyncio
async def test_v1_token_file_is_public(
asgi_client: httpx.AsyncClient,
tmp_path: Path,
):
token_file = tmp_path / "token-file.txt"
token_file.write_text("token:demo-token", encoding="utf-8")
file_token = await file_token_service.register_file(str(token_file), timeout=60)
response = await asgi_client.get(f"/api/v1/files/tokens/{file_token}")
assert response.status_code == 200
assert response.text == "token:demo-token"
assert response.headers["content-type"].startswith("text/plain")
def test_v1_openapi_alias_websocket_routes_are_mounted(asgi_app):
assert str(asgi_app.url_path_for("chat_ws")) == "/api/v1/chat/ws"
assert str(asgi_app.url_path_for("live_chat_ws")) == "/api/v1/live-chat/ws"
assert str(asgi_app.url_path_for("unified_chat_ws")) == "/api/v1/unified-chat/ws"
def test_dashboard_config_aliases_are_registered_on_fastapi(asgi_app):
assert (
str(asgi_app.url_path_for("dashboard_alias_platform_list"))
== "/api/config/platform/list"
)
assert (
str(asgi_app.url_path_for("dashboard_alias_provider_list"))
== "/api/config/provider/list"
)
assert (
str(asgi_app.url_path_for("update_dashboard_alias_provider_source"))
== "/api/config/provider_sources/update"
)
@pytest.mark.asyncio
async def test_v1_mcp_enabled_patch_updates_stored_active_flag(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
response = await asgi_client.patch(
"/api/v1/mcp/servers/demo-server/enabled",
json={"enabled": False},
headers=_jwt_headers(),
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["message"] == "Successfully updated MCP server demo-server"
mcp_servers = fake_core_lifecycle.provider_manager.llm_tools.config["mcpServers"]
assert mcp_servers["demo-server"]["active"] is False
@pytest.mark.asyncio
async def test_v1_safe_mcp_routes_accept_slash_server_names(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
server_name = "modelscope/demo"
headers = _jwt_headers()
fake_tools = fake_core_lifecycle.provider_manager.llm_tools
enabled_response = await asgi_client.patch(
"/api/v1/mcp/servers/enabled",
json={"server_name": server_name, "enabled": False},
headers=headers,
)
assert enabled_response.status_code == 200
assert fake_tools.config["mcpServers"][server_name]["active"] is False
test_response = await asgi_client.post(
"/api/v1/mcp/servers/test",
json={"server_name": server_name},
headers=headers,
)
assert test_response.status_code == 200
assert test_response.json()["data"] == ["demo_tool"]
assert fake_tools.tested_configs[-1] == {
"active": False,
"url": "https://example.com/modelscope-demo",
}
delete_response = await asgi_client.delete(
"/api/v1/mcp/servers/by-name",
params={"server_name": server_name},
headers=headers,
)
assert delete_response.status_code == 200
assert server_name not in fake_tools.config["mcpServers"]
sync_response = await asgi_client.post(
"/api/v1/mcp/providers/modelscope/sync",
json={"access_token": "token"},
headers=headers,
)
assert sync_response.status_code == 200
assert fake_tools.synced_modelscope_tokens == ["token"]
@pytest.mark.asyncio
async def test_v1_mcp_scope_accepts_api_key(
asgi_client: httpx.AsyncClient,
fake_db: FakeDb,
):
raw_key = "abk_fastapi_v1_mcp"
fake_db.add_api_key(raw_key, scopes=["mcp"])
response = await asgi_client.get(
"/api/v1/mcp/servers",
headers={"X-API-Key": raw_key},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert any(server["name"] == "demo-server" for server in data["data"])
@pytest.mark.asyncio
async def test_v1_skill_scope_accepts_api_key_and_rejects_plural_scope(
asgi_app: FastAPI,
asgi_client: httpx.AsyncClient,
fake_db: FakeDb,
monkeypatch: pytest.MonkeyPatch,
):
monkeypatch.setattr(
asgi_app.state.services.skills,
"get_skills",
lambda: {"skills": [{"name": "demo_skill"}]},
)
plural_key = "abk_fastapi_v1_skills"
fake_db.add_api_key(plural_key, scopes=["skills"])
plural_response = await asgi_client.get(
"/api/v1/skills",
headers={"X-API-Key": plural_key},
)
assert plural_response.status_code == 403
data = plural_response.json()
assert data["status"] == "error"
assert data["message"] == "Insufficient API key scope"
raw_key = "abk_fastapi_v1_skill"
fake_db.add_api_key(raw_key, scopes=["skill"])
response = await asgi_client.get(
"/api/v1/skills",
headers={"X-API-Key": raw_key},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"]["skills"] == [{"name": "demo_skill"}]
@pytest.mark.asyncio
async def test_v1_safe_skill_routes_accept_slash_names(
asgi_app: FastAPI,
asgi_client: httpx.AsyncClient,
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
):
skill_name = "skill/foo"
headers = _jwt_headers()
skill_service = asgi_app.state.services.skills
archive_path = tmp_path / "skill.zip"
archive_path.write_bytes(b"zip")
async def fake_update_skill(data):
return {"payload": data}
async def fake_delete_skill(data):
return {"payload": data}
def fake_prepare_skill_archive(name: str):
assert name == skill_name
return SkillArchive(path=archive_path, filename="skill.zip")
def fake_list_skill_files(name: str, path: str):
return {"name": name, "path": path}
def fake_get_skill_file(name: str, path: str):
return {"name": name, "path": path}
async def fake_update_skill_file(data):
return {"payload": data}
monkeypatch.setattr(skill_service, "update_skill", fake_update_skill)
monkeypatch.setattr(skill_service, "delete_skill", fake_delete_skill)
monkeypatch.setattr(
skill_service,
"prepare_skill_archive",
fake_prepare_skill_archive,
)
monkeypatch.setattr(skill_service, "list_skill_files", fake_list_skill_files)
monkeypatch.setattr(skill_service, "get_skill_file", fake_get_skill_file)
monkeypatch.setattr(skill_service, "update_skill_file", fake_update_skill_file)
enabled_response = await asgi_client.patch(
"/api/v1/skills/by-name",
json={"skill_name": skill_name, "enabled": False},
headers=headers,
)
archive_response = await asgi_client.get(
"/api/v1/skills/archive",
params={"skill_name": skill_name},
headers=headers,
)
files_response = await asgi_client.get(
"/api/v1/skills/files",
params={"skill_name": skill_name, "path": "src"},
headers=headers,
)
file_response = await asgi_client.get(
"/api/v1/skills/file",
params={"skill_name": skill_name, "path": "src/main.py"},
headers=headers,
)
update_file_response = await asgi_client.put(
"/api/v1/skills/file",
json={"skill_name": skill_name, "path": "src/main.py", "content": "print(1)"},
headers=headers,
)
delete_response = await asgi_client.delete(
"/api/v1/skills/by-name",
params={"skill_name": skill_name},
headers=headers,
)
assert enabled_response.status_code == 200
assert enabled_response.json()["data"]["payload"] == {
"name": skill_name,
"active": False,
}
assert archive_response.status_code == 200
assert archive_response.content == b"zip"
assert files_response.status_code == 200
assert files_response.json()["data"] == {"name": skill_name, "path": "src"}
assert file_response.status_code == 200
assert file_response.json()["data"] == {
"name": skill_name,
"path": "src/main.py",
}
assert update_file_response.status_code == 200
assert update_file_response.json()["data"]["payload"] == {
"name": skill_name,
"path": "src/main.py",
"content": "print(1)",
}
assert delete_response.status_code == 200
assert delete_response.json()["data"]["payload"] == {"name": skill_name}
@pytest.mark.asyncio
async def test_v1_safe_persona_routes_accept_slash_ids(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
persona_id = "persona/foo"
headers = _jwt_headers()
persona_mgr = fake_core_lifecycle.persona_mgr
detail_response = await asgi_client.get(
"/api/v1/personas/by-id",
params={"persona_id": persona_id},
headers=headers,
)
update_response = await asgi_client.put(
"/api/v1/personas/by-id",
json={"persona_id": persona_id, "name": "Demo Persona"},
headers=headers,
)
delete_response = await asgi_client.delete(
"/api/v1/personas/by-id",
params={"persona_id": persona_id},
headers=headers,
)
assert detail_response.status_code == 200
assert detail_response.json()["data"]["persona_id"] == persona_id
assert detail_response.json()["data"]["system_prompt"] == "Demo persona"
assert update_response.status_code == 200
assert update_response.json()["data"] == {"message": "人格更新成功"}
assert delete_response.status_code == 200
assert delete_response.json()["data"] == {"message": "人格删除成功"}
assert persona_id not in persona_mgr.personas
@pytest.mark.asyncio
async def test_v1_persona_by_id_update_preserves_explicit_null_tools_and_skills(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
):
persona_id = "persona/foo"
headers = _jwt_headers()
persona = fake_core_lifecycle.persona_mgr.personas[persona_id]
persona.tools = ["tool-a"]
persona.skills = ["skill-a"]
response = await asgi_client.put(
"/api/v1/personas/by-id",
json={"persona_id": persona_id, "tools": None, "skills": None},
headers=headers,
)
assert response.status_code == 200
assert response.json()["data"] == {"message": "人格更新成功"}
assert persona.tools is None
assert persona.skills is None
@pytest.mark.asyncio
async def test_v1_im_routes_use_im_scope_and_running_platform(
asgi_client: httpx.AsyncClient,
fake_core_lifecycle,
fake_db: FakeDb,
):
raw_key = "abk_fastapi_v1_im"
fake_db.add_api_key(raw_key, scopes=["im"])
bots_response = await asgi_client.get(
"/api/v1/im/bots",
headers={"X-API-Key": raw_key},
)
send_response = await asgi_client.post(
"/api/v1/im/messages",
json={
"umo": "webchat-main:FriendMessage:test-session",
"message": "hello",
},
headers={"X-API-Key": raw_key},
)
assert bots_response.status_code == 200
assert send_response.status_code == 200
assert bots_response.json()["data"]["bot_ids"] == ["webchat-main"]
sent_messages = fake_core_lifecycle.platform_manager.fake_platform.sent_messages
assert len(sent_messages) == 1
session, message_chain = sent_messages[0]
assert str(session) == "webchat-main:FriendMessage:test-session"
assert message_chain.chain[0].text == "hello"
@pytest.mark.asyncio
async def test_v1_platform_webhook_is_public_route(
asgi_client: httpx.AsyncClient,
):
response = await asgi_client.post(
"/api/v1/webhooks/platforms/demo-hook",
json={"challenge": "ping"},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["data"] == {
"webhook_uuid": "demo-hook",
"method": "POST",
"payload": {"challenge": "ping"},
}