Files
SillyTavern_replica/backend/services/fiction_event_plan_service.py

423 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
爽文事件规划fiction.event_plan— LLM 调用逻辑。
按事件顺序生成,每完成一个事件即持久化并广播进度。
"""
from __future__ import annotations
import json
import logging
import random
import re
from typing import Any, AsyncIterator, Dict, List, Optional
from langchain_core.messages import HumanMessage, SystemMessage
from models.fiction_models import (
ChapterPlanItem,
CoarseOutlineEvent,
EventPlanEntry,
FictionBookMetadata,
FlowStepsPlan,
)
from services.fiction_event_plan_progress import emit as emit_progress
from services.fiction_event_plan_progress import subscribe as subscribe_progress
from services.fiction_metadata_service import fiction_metadata_service
from services.fiction_prompt_utils import resolve_prompt
from services.fiction_service import fiction_service
from services.studio_step_respond import resolve_api_config
from utils.llm_client import LLMClient
logger = logging.getLogger(__name__)
_llm_client = LLMClient()
_JSON_FENCE = re.compile(r"```(?:json)?\s*([\s\S]*?)```", re.IGNORECASE)
def _extract_json(raw: str) -> Dict[str, Any]:
text = (raw or "").strip()
if not text:
raise ValueError("模型返回为空")
fence = _JSON_FENCE.search(text)
if fence:
text = fence.group(1).strip()
try:
return json.loads(text)
except json.JSONDecodeError:
start = text.find("{")
end = text.rfind("}")
if start >= 0 and end > start:
return json.loads(text[start : end + 1])
raise ValueError("无法解析模型返回的 JSON")
def _validate_api_config(api_config: Dict[str, str]) -> None:
if not api_config.get("api_key"):
raise ValueError("API Key 未配置,请先在 API 配置页面保存密钥")
if not api_config.get("api_url"):
raise ValueError("API 地址未配置,请先在 API 配置页面保存 mainLLM")
def _format_guide_global_layers(layers: List[str]) -> str:
entries = fiction_service.get_guide_global_entries().entries
filtered = [e for e in entries if e.layer in layers]
lines: List[str] = []
for entry in filtered:
lines.append(f"[{entry.layer}] {entry.title}\n{entry.content}")
return "\n\n".join(lines) if lines else "(无全局指南)"
def _format_book_guide(book_id: str) -> str:
guide = fiction_service.get_book_guide(book_id)
parts = [
f"主角人设:{guide.persona}",
f"核心爽点:{guide.highlight}",
f"用户体验:{guide.experience}",
f"创作禁区:{guide.forbiddenZones}",
]
return "\n".join(parts)
def _get_flow_by_id(flow_id: str):
catalog = fiction_service.get_emotion_catalog()
for flow in catalog.flows:
if flow.id == flow_id:
return flow
return None
def _format_flow_steps(flow) -> str:
lines = [f"情绪流:{flow.intro} (id: {flow.id})"]
for step in flow.steps or []:
lines.append(f" [{step.key}] {step.text}")
return "\n".join(lines)
def _build_event_plan_messages(
book_id: str,
event_id: str,
event_title: str,
event_summary: str,
emotion_flow_id: str,
) -> List[Any]:
settings = fiction_service.get_book_settings(book_id)
user_prompt = settings.prompts.eventPlan or fiction_service.get_default_settings().prompts.eventPlan
system_prompt = resolve_prompt("eventPlan", user_prompt)
guide_l2 = _format_guide_global_layers(["L2"])
book_guide = _format_book_guide(book_id)
flow = _get_flow_by_id(emotion_flow_id)
flow_text = _format_flow_steps(flow) if flow else f"情绪流 id: {emotion_flow_id}"
user_content = f"""## 全局创作指南L2仅用于事件规划
{guide_l2}
## 本书 Guide 世界书
{book_guide}
## 当前粗纲事件
- id: {event_id}
- title: {event_title}
- summary: {event_summary}
## 为本事件随机选定的情绪流
{flow_text}
请输出 flowStepsPlan起承转合与 chapterPlan章节级 brief
输出 JSON 示例:
{{
"flowStepsPlan": {{
"": "本阶段规划…",
"": "",
"": "",
"": ""
}},
"chapterPlan": [
{{ "seq": 1, "phaseKey": "", "phaseSlice": "", "brief": "本章要点", "status": "planned" }}
]
}}"""
return [
SystemMessage(content=system_prompt),
HumanMessage(content=user_content),
]
def _normalize_flow_steps_plan(raw: Any) -> FlowStepsPlan:
data = raw if isinstance(raw, dict) else {}
return FlowStepsPlan(
=str(data.get("") or data.get("qi") or ""),
=str(data.get("") or data.get("cheng") or ""),
=str(data.get("") or data.get("zhuan") or ""),
=str(data.get("") or data.get("he") or ""),
)
def _normalize_chapter_plan(raw: Any) -> List[ChapterPlanItem]:
if not isinstance(raw, list):
return []
items: List[ChapterPlanItem] = []
for idx, item in enumerate(raw):
if not isinstance(item, dict):
continue
seq = item.get("seq")
if not isinstance(seq, int):
seq = idx + 1
status = str(item.get("status") or "planned")
items.append(
ChapterPlanItem(
seq=seq,
phaseKey=str(item.get("phaseKey") or item.get("phase_key") or ""),
phaseSlice=str(item.get("phaseSlice") or item.get("phase_slice") or ""),
brief=str(item.get("brief") or ""),
status=status,
)
)
items.sort(key=lambda c: c.seq)
return items
def _parse_event_plan_response(data: Dict[str, Any], emotion_flow_id: str) -> EventPlanEntry:
flow_steps = _normalize_flow_steps_plan(data.get("flowStepsPlan"))
chapter_plan = _normalize_chapter_plan(data.get("chapterPlan"))
if not chapter_plan:
raise ValueError("chapterPlan 为空")
return EventPlanEntry(
emotionFlowId=emotion_flow_id,
flowStepsPlan=flow_steps,
chapterPlan=chapter_plan,
)
def _event_fully_planned(entry: EventPlanEntry) -> bool:
return bool(entry.chapterPlan)
def _resolve_targets(
metadata: FictionBookMetadata,
*,
event_id: Optional[str] = None,
) -> List[CoarseOutlineEvent]:
coarse_events = metadata.coarseOutline.events
if not coarse_events:
raise ValueError("请先生成粗纲")
events_map = dict(metadata.events or {})
if event_id:
targets = [e for e in coarse_events if e.id == event_id]
if not targets:
raise ValueError(f"粗纲中不存在事件: {event_id}")
return targets
return [
e
for e in coarse_events
if e.id not in events_map or not _event_fully_planned(events_map[e.id])
]
def event_planned_payload(evt: CoarseOutlineEvent, entry: EventPlanEntry) -> Dict[str, Any]:
phases: List[str] = []
fsp = entry.flowStepsPlan
for key in ("", "", "", ""):
if getattr(fsp, key, ""):
phases.append(key)
return {
"type": "event_planned",
"eventId": evt.id,
"title": evt.title,
"chapterCount": len(entry.chapterPlan),
"phases": phases,
}
def build_progress_snapshot(book_id: str) -> Dict[str, Any]:
"""已规划事件的目录快照(不含 brief 剧透)。"""
metadata = fiction_metadata_service.get_metadata(book_id)
coarse_events = metadata.coarseOutline.events
events_map = metadata.events or {}
items: List[Dict[str, Any]] = []
for evt in coarse_events:
entry = events_map.get(evt.id)
if entry and _event_fully_planned(entry):
items.append(event_planned_payload(evt, entry))
run = fiction_metadata_service.get_run(book_id)
progress = run.progress or {}
return {
"type": "snapshot",
"items": items,
"done": progress.get("done", len(items)),
"total": progress.get("total", len(coarse_events)),
}
async def _generate_one_event(
book_id: str,
evt: CoarseOutlineEvent,
*,
resolved: Dict[str, str],
allowed: List[str],
) -> EventPlanEntry:
emotion_flow_id = random.choice(allowed)
messages = _build_event_plan_messages(
book_id,
evt.id,
evt.title,
evt.summary,
emotion_flow_id,
)
response = await _llm_client.chat_completion(
messages=messages,
api_url=resolved["api_url"],
api_key=resolved["api_key"],
model=resolved.get("model", "gpt-4o-mini"),
temperature=0.7,
max_tokens=8000,
request_timeout=120,
stream=False,
)
content = response["choices"][0]["message"]["content"]
data = _extract_json(content)
return _parse_event_plan_response(data, emotion_flow_id)
async def iter_event_plan(
book_id: str,
*,
profile_id: Optional[str] = None,
api_config: Optional[Dict[str, str]] = None,
event_id: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
"""按事件逐个生成,每完成一个即保存并 yield 进度事件。"""
resolved = resolve_api_config(profile_id, api_config)
_validate_api_config(resolved)
metadata = fiction_metadata_service.get_metadata(book_id)
meta = fiction_service.get_book_meta(book_id)
allowed = list(meta.allowedFlowIds or [])
if not allowed:
raise ValueError("本书未配置 allowedFlowIds")
targets = _resolve_targets(metadata, event_id=event_id)
coarse_events = metadata.coarseOutline.events
coarse_total = len(coarse_events)
if not targets:
logger.info("Event plans already exist for book %s, skipping generation", book_id)
fiction_metadata_service.set_pipeline_stage(
book_id, status="idle", pipeline_stage="event_plan_done"
)
fiction_metadata_service.clear_pipeline_progress(book_id)
done_evt: Dict[str, Any] = {"type": "done", "done": coarse_total, "total": coarse_total}
emit_progress(book_id, done_evt)
yield done_evt
return
run = fiction_metadata_service.get_run(book_id)
if run.status == "error":
fiction_metadata_service.clear_pipeline_error(book_id)
fiction_metadata_service.set_pipeline_stage(
book_id, status="running", pipeline_stage="event_plan"
)
events_map = dict(metadata.events or {})
def _planned_count() -> int:
return sum(
1
for e in coarse_events
if e.id in events_map and _event_fully_planned(events_map[e.id])
)
initial_done = _planned_count()
fiction_metadata_service.set_pipeline_progress(
book_id, done=initial_done, total=coarse_total
)
started: Dict[str, Any] = {
"type": "started",
"done": initial_done,
"total": coarse_total,
"pending": len(targets),
}
emit_progress(book_id, started)
yield started
try:
for evt in targets:
entry = await _generate_one_event(
book_id, evt, resolved=resolved, allowed=allowed
)
events_map[evt.id] = entry
metadata.events = events_map
fiction_metadata_service.save_metadata(book_id, metadata)
done_count = _planned_count()
fiction_metadata_service.set_pipeline_progress(
book_id, done=done_count, total=coarse_total
)
payload = event_planned_payload(evt, entry)
emit_progress(book_id, payload)
yield payload
fiction_metadata_service.set_pipeline_stage(
book_id, status="idle", pipeline_stage="event_plan_done"
)
fiction_metadata_service.clear_pipeline_progress(book_id)
done_evt = {"type": "done", "done": _planned_count(), "total": coarse_total}
emit_progress(book_id, done_evt)
yield done_evt
except Exception as exc:
completed = [
eid for eid, ent in events_map.items() if _event_fully_planned(ent)
]
err_evt: Dict[str, Any] = {
"type": "error",
"message": str(exc),
"completedEvents": completed,
"done": _planned_count(),
"total": coarse_total,
}
emit_progress(book_id, err_evt)
fiction_metadata_service.set_pipeline_stage(
book_id, status="error", pipeline_stage="event_plan"
)
yield err_evt
raise
async def run_event_plan(
book_id: str,
*,
profile_id: Optional[str] = None,
api_config: Optional[Dict[str, str]] = None,
event_id: Optional[str] = None,
) -> FictionBookMetadata:
async for _event in iter_event_plan(
book_id,
profile_id=profile_id,
api_config=api_config,
event_id=event_id,
):
pass
return fiction_metadata_service.get_metadata(book_id)
async def stream_event_plan_subscribe(book_id: str) -> AsyncIterator[Dict[str, Any]]:
"""订阅进行中的事件纲要进度(先快照,再实时)。"""
snapshot = build_progress_snapshot(book_id)
yield snapshot
run = fiction_metadata_service.get_run(book_id)
if run.status == "running" and run.pipelineStage == "event_plan":
async for event in subscribe_progress(book_id):
if event.get("type") == "snapshot":
continue
yield event
elif snapshot["items"] or snapshot.get("done", 0) > 0:
yield {
"type": "done",
"done": snapshot["done"],
"total": snapshot["total"],
}