feat: enhance SDK plugin configuration handling and logging

This commit is contained in:
whatevertogo
2026-03-19 04:47:06 +08:00
parent cb593a539f
commit 48a20240c9

View File

@@ -55,6 +55,7 @@ import copy
import importlib
import inspect
import json
import logging
import os
import re
import shutil
@@ -105,6 +106,7 @@ OptionalInnerType: TypeAlias = Literal["str", "int", "float", "bool"] | None
HandlerKind: TypeAlias = Literal["handler", "hook", "tool", "session"]
DiscoverySeverity: TypeAlias = Literal["warning", "error"]
DiscoveryPhase: TypeAlias = Literal["discovery", "load", "lifecycle", "reload"]
_LOGGER = logging.getLogger(__name__)
def _default_python_version() -> str:
@@ -502,17 +504,74 @@ def _normalize_config_value(field_schema: dict[str, Any], value: Any) -> Any:
return copy.deepcopy(value) if value is not None else default_value
def load_plugin_config(plugin: PluginSpec) -> dict[str, Any]:
"""加载插件配置,返回普通字典"""
def load_plugin_config_schema(plugin: PluginSpec) -> dict[str, Any]:
"""加载插件配置 schema解析失败时记录日志并返回空对象"""
schema_path = plugin.plugin_dir / CONFIG_SCHEMA_FILE
if not schema_path.exists():
return {}
try:
schema_payload = json.loads(schema_path.read_text(encoding="utf-8"))
except Exception:
schema_payload = {}
schema = schema_payload if isinstance(schema_payload, dict) else {}
except json.JSONDecodeError as exc:
_LOGGER.warning(
"Failed to parse SDK plugin config schema %s: %s",
schema_path,
exc,
)
return {}
except OSError as exc:
_LOGGER.warning(
"Failed to read SDK plugin config schema %s: %s",
schema_path,
exc,
)
return {}
if not isinstance(schema_payload, dict):
_LOGGER.warning(
"SDK plugin config schema %s must be a JSON object, got %s",
schema_path,
type(schema_payload).__name__,
)
return {}
return schema_payload
def save_plugin_config(
plugin: PluginSpec,
payload: dict[str, Any],
*,
schema: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""按 schema 归一化并写回插件配置。"""
active_schema = (
load_plugin_config_schema(plugin) if schema is None else dict(schema)
)
normalized = {
key: _normalize_config_value(field_schema, payload.get(key))
for key, field_schema in active_schema.items()
if isinstance(field_schema, dict)
}
config_path = _plugin_config_path(plugin.plugin_dir, plugin.name)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(
json.dumps(normalized, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return normalized
def load_plugin_config(
plugin: PluginSpec,
*,
schema: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""加载插件配置,返回普通字典。"""
active_schema = (
load_plugin_config_schema(plugin) if schema is None else dict(schema)
)
if not active_schema:
return {}
config_path = _plugin_config_path(plugin.plugin_dir, plugin.name)
try:
@@ -521,21 +580,29 @@ def load_plugin_config(plugin: PluginSpec) -> dict[str, Any]:
if config_path.exists()
else {}
)
except Exception:
except json.JSONDecodeError as exc:
_LOGGER.warning(
"Failed to parse SDK plugin config %s: %s",
config_path,
exc,
)
existing_payload = {}
except OSError as exc:
_LOGGER.warning(
"Failed to read SDK plugin config %s: %s",
config_path,
exc,
)
existing_payload = {}
existing = existing_payload if isinstance(existing_payload, dict) else {}
normalized = {
key: _normalize_config_value(field_schema, existing.get(key))
for key, field_schema in schema.items()
for key, field_schema in active_schema.items()
if isinstance(field_schema, dict)
}
if not config_path.exists() or normalized != existing:
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(
json.dumps(normalized, ensure_ascii=False, indent=2),
encoding="utf-8",
)
save_plugin_config(plugin, normalized, schema=active_schema)
return normalized