"""Research engine — legacy research/qa plus plan-driven deep research.

研究引擎编排层。
作为系统核心查询入口，协调 ResearchRetriever（检索层）和格式化工具函数，
支持标准研究、问答和深度研究三种模式，以及多轮对话（基于可插拔会话存储）。
"""

from __future__ import annotations

from typing import Any, AsyncIterator

from app.api.schemas.research import ResearchChunk, ResearchPlan, ResearchTask
from app.config import settings
from app.core.embedding import EmbeddingService
from app.core.graph_query_planner import GraphEvidence, GraphQueryPlanner, QueryIntent
from app.core.graph_query_service import GraphQueryService
from app.core.permission import PermissionContext
from app.core.research_formatter import (
    _build_context_text,
    _build_context_with_citation_map,
    _build_deep_research_messages,
    _build_final_document_messages,
    _build_messages,
    _build_plan_brief,
    _build_section_hint,
    _build_task_brief,
    _build_task_query,
    _join_report_text,
    _join_section_text,
    _merge_unique_docs,
    _merge_unique_reference_docs,
    _normalize_research_plan,
    _normalize_research_report,
    _normalize_section_rerun_result,
    _resolve_explicit_doc_ids,
    _source_group_label,
)
from app.core.research_retriever import ResearchRetriever
from app.core.search_engine import _filter_llm_relevance
from app.infrastructure.es_client import ESClient
from app.infrastructure.llm_client import LLMClient
from app.infrastructure.redis_client import RedisClient
from app.infrastructure.session_store import (
    RedisResearchSessionStore,
    ResearchSessionStore,
)
from app.prompts.research_prompts import (
    RESEARCH_PLAN_SYSTEM,
    RESEARCH_PLAN_USER,
    RESEARCH_SECTION_RERUN_SYSTEM,
    RESEARCH_SECTION_RERUN_USER,
)
from app.utils.logger import get_logger

logger = get_logger(__name__)

# Maximum number of documents to include in LLM context
_MAX_CONTEXT_DOCS = 12
# Maximum turns to include in multi-turn history
_MAX_HISTORY_TURNS = 4
_DEPTH_DOC_LIMITS = {
    "standard": 8,
    "deep": 12,
    "comprehensive": 16,
}
# Quick mode uses smaller budgets regardless of depth_level
_QUICK_DOC_LIMIT = 5
_QUICK_MAX_SECTIONS = 3
_MAX_SERVICE_GUIDE_DOCS = 3
_SERVICE_GUIDE_QUERY_KEYWORDS = (
    "办事指南",
    "办理",
    "申请",
    "材料",
    "条件",
    "时限",
    "流程",
    "收费",
    "费用",
    "窗口",
    "咨询",
    "投诉",
    "预约",
    "快递",
    "网办",
    "在线办理",
)
_GUIDE_REFERENCE_FIELD_RULES: tuple[tuple[str, str, tuple[str, ...]], ...] = (
    ("materials", "申请材料", ("材料", "证件", "提交", "表格", "原件", "复印件", "证明")),
    ("fees", "收费项目", ("收费", "费用", "工本费", "多少钱", "金额")),
    ("time_limit", "办理时限", ("多久", "时限", "几天", "办结", "时间")),
    ("windows", "办理窗口", ("窗口", "地点", "哪里办", "大厅", "地址")),
    ("consultation", "咨询投诉", ("电话", "咨询", "投诉", "热线", "联系方式")),
    ("process", "办理流程", ("流程", "步骤", "怎么办", "如何办理", "办理路径")),
)


class ResearchEngine:
    """Orchestrates the Research-mode pipeline.

    Parameters
    ----------
    es_client:
        Async Elasticsearch client.
    embedding_service:
        Used to generate query vectors for ES kNN.
    graph_service:
        Neo4j-backed entity / document discovery.
    llm_client:
        LLM for keyword extraction and answer streaming.
    redis_client:
        Used as fallback session storage when a dedicated session store is not provided.

    研究引擎，编排完整的 Research 模式流水线。
    协调 ES 检索、图谱查询、向量化、LLM 推理等组件，
    支持标准研究和深度研究两种模式以及多轮对话。
    """

    def __init__(
        self,
        es_client: ESClient,
        embedding_service: EmbeddingService,
        graph_service: GraphQueryService,
        llm_client: LLMClient,
        redis_client: RedisClient | None = None,
        session_store: ResearchSessionStore | None = None,
        planner: GraphQueryPlanner | None = None,
    ) -> None:
        self._es = es_client
        self._embedding = embedding_service
        self._graph = graph_service
        self._llm = llm_client
        if session_store is not None:
            self._session_store = session_store
        elif redis_client is not None:
            self._session_store = RedisResearchSessionStore(
                redis_client,
                ttl_seconds=settings.research_session_ttl_seconds,
            )
        else:
            raise ValueError("ResearchEngine requires a session_store or redis_client")
        self._planner = planner or GraphQueryPlanner()

        # 组合检索器
        self._retriever = ResearchRetriever(
            es_client=es_client,
            embedding_service=embedding_service,
            graph_service=graph_service,
            llm_client=llm_client,
            planner=self._planner,
        )

    # ------------------------------------------------------------------
    # 向后兼容：保留旧的下划线方法名作为委托，供测试 mock 使用
    # ------------------------------------------------------------------

    async def _extract_keywords(self, question: str) -> list[str]:
        return await self._retriever.extract_keywords(question)

    async def _es_search(
        self, question: str, perm: PermissionContext
    ) -> list[dict[str, Any]]:
        return await self._retriever.es_search(question, perm)

    async def _fetch_graph_docs(
        self,
        doc_ids: list[str],
        question: str | None,
        perm: PermissionContext,
    ) -> list[dict[str, Any]]:
        return await self._retriever.fetch_graph_docs(doc_ids, question, perm)

    @staticmethod
    def _build_acl_filter(perm: PermissionContext) -> dict[str, Any]:
        build_filter = getattr(perm, "build_es_filter", None)
        if callable(build_filter):
            return build_filter()

        acl_tokens = [token for token in getattr(perm, "acl_tokens", []) if token]
        return {
            "bool": {
                "should": [
                    {"bool": {"must_not": [{"exists": {"field": "acl_ids"}}]}},
                    {"terms": {"acl_ids": acl_tokens}},
                ],
                "minimum_should_match": 1,
            }
        }

    @staticmethod
    def _should_collect_service_guide_evidence(
        query_text: str,
        *,
        plan: Any | None = None,
        doc_ids: list[str] | None = None,
        matter_ids: list[str] | None = None,
    ) -> bool:
        if doc_ids or matter_ids:
            return True
        if plan is not None and getattr(plan, "intent", None) == QueryIntent.MATTER_DETAIL:
            return True
        return any(keyword in query_text for keyword in _SERVICE_GUIDE_QUERY_KEYWORDS)

    @staticmethod
    def _guide_source_has_field(source: dict[str, Any], field_key: str) -> bool:
        if field_key == "materials":
            return bool(source.get("material_names") or source.get("materials"))
        if field_key == "fees":
            return bool(source.get("fee_names") or source.get("fees"))
        if field_key == "time_limit":
            basic_info = source.get("basic_info") or {}
            return any(
                value is not None and value != ""
                for value in (
                    source.get("promised_time_limit_days"),
                    source.get("legal_time_limit_days"),
                    (basic_info.get("promised_time_limit") or {}).get("raw_text"),
                    (basic_info.get("legal_time_limit") or {}).get("raw_text"),
                )
            )
        if field_key == "windows":
            return bool(source.get("window_names") or source.get("service_windows"))
        if field_key == "consultation":
            consultation = source.get("consultation_and_supervision") or {}
            return bool(
                consultation.get("consultation_phones")
                or consultation.get("consultation_urls")
                or consultation.get("complaint_phones")
                or consultation.get("complaint_urls")
            )
        if field_key == "process":
            process_info = source.get("process_info") or {}
            return bool(
                process_info.get("summary")
                or process_info.get("step_titles")
                or process_info.get("raw_text")
            )
        return False

    @classmethod
    def _build_guide_match_metadata(cls, query_text: str, source: dict[str, Any]) -> dict[str, Any]:
        normalized_query = (query_text or "").strip()
        matched_fields: list[str] = []
        matched_field_labels: list[str] = []

        for field_key, label, keywords in _GUIDE_REFERENCE_FIELD_RULES:
            if not cls._guide_source_has_field(source, field_key):
                continue
            if any(keyword in normalized_query for keyword in keywords):
                matched_fields.append(field_key)
                matched_field_labels.append(label)

        summary = ""
        if matched_field_labels:
            visible_labels = matched_field_labels[:4]
            summary = f"命中{'、'.join(visible_labels)}"
            if len(matched_field_labels) > 4:
                summary += "等字段"

        return {
            "matched_fields": matched_fields,
            "matched_field_labels": matched_field_labels,
            "summary": summary,
        }

    @classmethod
    def _guide_reference_docs(cls, hits: list[dict[str, Any]], query_text: str) -> list[dict[str, Any]]:
        docs: list[dict[str, Any]] = []
        for hit in hits:
            source = hit.get("_source", {})
            doc_id = source.get("doc_id", "")
            if not doc_id:
                continue
            title = (
                (source.get("document_info") or {}).get("title")
                or source.get("matter_name")
                or f"办事指南：{doc_id}"
            )
            match_meta = cls._build_guide_match_metadata(query_text, source)
            docs.append(
                {
                    "doc_id": doc_id,
                    "title": title,
                    "doc_number": source.get("implementation_code") or source.get("guide_version"),
                    "profile_id": source.get("profile_id", hit.get("_id", "")),
                    "source_url": source.get("source_url"),
                    "source_metadata": source.get("source_metadata") or {},
                    "source_system": source.get("source_system"),
                    "score": hit.get("_score"),
                    "_source_type": "guide",
                    "matched_fields": match_meta["matched_fields"],
                    "matched_field_labels": match_meta["matched_field_labels"],
                    "summary": match_meta["summary"],
                }
            )
        return docs

    @staticmethod
    def _reference_payload(doc: dict[str, Any]) -> dict[str, Any]:
        source_group = str(doc.get("_source_type") or doc.get("source_group") or "search")
        is_guide = source_group == "guide"
        return {
            "source_group": source_group,
            "source_label": _source_group_label(source_group),
            "profile_id": doc.get("profile_id") if is_guide else None,
            "matched_fields": doc.get("matched_fields") if is_guide else None,
            "matched_field_labels": doc.get("matched_field_labels") if is_guide else None,
            "summary": doc.get("summary") if is_guide else None,
            "matched_chunks": doc.get("matched_chunks") or [],
            "source_url": doc.get("source_url"),
            "source_metadata": doc.get("source_metadata") or {},
            "source_system": doc.get("source_system"),
            "source_site_code": doc.get("source_site_code"),
            "source_target_code": doc.get("source_target_code"),
        }

    @classmethod
    def _session_reference_doc(cls, doc: dict[str, Any]) -> dict[str, Any]:
        return {
            "doc_id": doc["doc_id"],
            "title": doc.get("title", ""),
            "doc_number": doc.get("doc_number", ""),
            "relevance_score": doc.get("score"),
            **cls._reference_payload(doc),
        }

    @staticmethod
    def _format_service_guide_evidence(hits: list[dict[str, Any]]) -> str:
        if not hits:
            return ""

        sections: list[str] = []
        for hit in hits:
            source = hit.get("_source", {})
            matter_name = source.get("matter_name") or (source.get("document_info") or {}).get("title") or "未命名事项"
            lines = [f"### 事项：{matter_name}"]

            linked_matter_ids = [mid for mid in source.get("linked_matter_ids", []) if mid]
            if linked_matter_ids:
                lines.append(f"- 绑定 Matter：{'、'.join(linked_matter_ids[:3])}")

            implementing_subject = source.get("implementing_subject") or ""
            if implementing_subject:
                lines.append(f"- 实施主体：{implementing_subject}")

            service_objects = [item for item in source.get("service_objects", []) if item]
            if service_objects:
                lines.append(f"- 服务对象：{'、'.join(service_objects[:4])}")

            service_modes = [item for item in source.get("service_modes", []) if item]
            service_flags: list[str] = []
            if source.get("express_supported"):
                service_flags.append("支持快递")
            if source.get("reservation_supported"):
                service_flags.append("支持预约")
            if service_modes or service_flags:
                lines.append(f"- 办理方式：{'、'.join(service_modes[:4] + service_flags[:2])}")

            materials = [name for name in source.get("material_names", []) if name]
            if materials:
                lines.append(f"- 申请材料：{'；'.join(materials[:5])}")

            fees = [name for name in source.get("fee_names", []) if name]
            if fees:
                lines.append(f"- 收费项目：{'；'.join(fees[:4])}")

            time_limit_parts: list[str] = []
            if source.get("promised_time_limit_days"):
                time_limit_parts.append(f"承诺 {source['promised_time_limit_days']} 日")
            if source.get("legal_time_limit_days"):
                time_limit_parts.append(f"法定 {source['legal_time_limit_days']} 日")
            if time_limit_parts:
                lines.append(f"- 办理时限：{'；'.join(time_limit_parts)}")

            process_info = source.get("process_info") or {}
            step_titles = [item for item in process_info.get("step_titles", []) if item]
            if step_titles:
                lines.append(f"- 流程步骤：{' → '.join(step_titles[:6])}")
            elif process_info.get("summary"):
                lines.append(f"- 流程摘要：{process_info.get('summary')}")

            window_names = [name for name in source.get("window_names", []) if name]
            if window_names:
                lines.append(f"- 办理窗口：{'；'.join(window_names[:3])}")

            laws = [name for name in source.get("legal_basis_names", []) if name]
            if laws:
                lines.append(f"- 依据：{'；'.join(laws[:4])}")

            consultation = source.get("consultation_and_supervision") or {}
            consultation_phones = [phone for phone in consultation.get("consultation_phones", []) if phone]
            if consultation_phones:
                lines.append(f"- 咨询电话：{'、'.join(consultation_phones[:3])}")

            doc_id = source.get("doc_id", "")
            if doc_id:
                lines.append(f"- 来源文档：{doc_id}")

            sections.append("\n".join(lines))

        return "## 办事指南结构化证据\n\n" + "\n\n".join(sections)

    async def _collect_service_guide_evidence(
        self,
        query_text: str,
        perm: PermissionContext,
        *,
        plan: Any | None = None,
        doc_ids: list[str] | None = None,
        matter_ids: list[str] | None = None,
    ) -> tuple[str, list[dict[str, Any]]]:
        explicit_doc_ids = [doc_id for doc_id in dict.fromkeys(doc_ids or []) if doc_id]
        explicit_matter_ids = [matter_id for matter_id in dict.fromkeys(matter_ids or []) if matter_id]
        if not self._should_collect_service_guide_evidence(
            query_text,
            plan=plan,
            doc_ids=explicit_doc_ids,
            matter_ids=explicit_matter_ids,
        ):
            return "", []

        matter_name = ""
        if plan is not None and getattr(plan, "intent", None) == QueryIntent.MATTER_DETAIL:
            matter_name = (getattr(plan, "matter_query", "") or "").strip()

        try:
            guide_hits = await self._es.search_service_guides(
                acl_filter=self._build_acl_filter(perm),
                query_text=query_text,
                doc_ids=explicit_doc_ids,
                matter_ids=explicit_matter_ids,
                matter_name=matter_name,
                size=_MAX_SERVICE_GUIDE_DOCS,
            )
            if not guide_hits and explicit_matter_ids and matter_name:
                guide_hits = await self._es.search_service_guides(
                    acl_filter=self._build_acl_filter(perm),
                    query_text=query_text,
                    matter_name=matter_name,
                    size=_MAX_SERVICE_GUIDE_DOCS,
                )
        except Exception as exc:
            logger.warning("service_guide_evidence_failed", error=str(exc))
            return "", []

        if not guide_hits:
            return "", []

        return self._format_service_guide_evidence(guide_hits), self._guide_reference_docs(guide_hits, query_text)

    async def _collect_deep_research_materials(
        self,
        task: ResearchTask,
        plan: ResearchPlan,
        perm: PermissionContext,
        *,
        seed_doc_ids: list[str] | None = None,
        extra_doc_ids: list[str] | None = None,
        section_hint: str | None = None,
        explicit_query: str | None = None,
    ) -> dict[str, Any]:
        """收集深度研究所需的全部材料：显式纳入 + ES 检索 + 图谱补充，合并去重后返回。"""
        acl_tokens = getattr(perm, "acl_tokens", [])
        query_text = _build_task_query(task, plan)
        if section_hint:
            query_text = "\n".join(part for part in [query_text, section_hint] if part)

        keywords = await self._extract_keywords(query_text)
        planner_plan = self._planner.plan(query_text, keywords)

        graph_evidence = GraphEvidence()
        if planner_plan.intent != QueryIntent.GENERAL:
            try:
                graph_evidence = await self._planner.collect_evidence(
                    planner_plan,
                    self._graph,
                    acl_tokens=acl_tokens,
                )
            except Exception as exc:
                logger.warning("deep_research_graph_evidence_failed", error=str(exc))
                graph_evidence = GraphEvidence()

        # Respect enable_kb_search flag: skip ES when disabled
        if task.enable_kb_search is False:
            es_docs = []
        else:
            es_docs = await self._es_search(query_text, perm)
            for doc in es_docs:
                doc["_source_type"] = "search"
        es_doc_ids = {doc["doc_id"] for doc in es_docs}

        explicit_doc_ids = _resolve_explicit_doc_ids(
            task,
            plan,
            seed_doc_ids=seed_doc_ids,
            extra_doc_ids=extra_doc_ids,
        )
        explicit_docs: list[dict[str, Any]] = []
        if explicit_doc_ids:
            explicit_docs = await self._fetch_graph_docs(explicit_doc_ids[:8], explicit_query, perm)
            for doc in explicit_docs:
                doc["_source_type"] = "seed"

        graph_doc_ids: list[str] = []
        if keywords:
            try:
                graph_doc_ids = await self._graph.search_graph_for_docs(
                    keywords,
                    limit_per_keyword=8,
                    acl_tokens=acl_tokens,
                )
            except Exception as exc:
                logger.warning("deep_research_graph_doc_discovery_failed", error=str(exc))
                graph_doc_ids = []

        if graph_evidence.doc_ids:
            graph_doc_ids.extend(graph_evidence.doc_ids)

        scoped_ids = set(explicit_doc_ids)
        graph_doc_ids = [
            doc_id
            for doc_id in dict.fromkeys(graph_doc_ids)
            if doc_id and doc_id not in es_doc_ids and doc_id not in scoped_ids
        ]

        graph_docs: list[dict[str, Any]] = []
        if graph_doc_ids:
            graph_docs = await self._fetch_graph_docs(graph_doc_ids[:8], query_text, perm)
            for doc in graph_docs:
                doc["_source_type"] = "graph"

        if task.mode == "quick":
            context_limit = _QUICK_DOC_LIMIT
        else:
            context_limit = _DEPTH_DOC_LIMITS.get(task.depth_level, _MAX_CONTEXT_DOCS)
        all_docs = _merge_unique_docs(explicit_docs, es_docs, graph_docs)[:context_limit]
        guide_evidence_text, guide_docs = await self._collect_service_guide_evidence(
            query_text,
            perm,
            plan=planner_plan,
            doc_ids=explicit_doc_ids,
            matter_ids=list(
                dict.fromkeys(list(plan.included_matter_ids or []) + list(task.required_matter_ids or []))
            ),
        )
        return {
            "query_text": query_text,
            "keywords": keywords,
            "graph_evidence": graph_evidence,
            "all_docs": all_docs,
            "guide_docs": guide_docs,
            "guide_evidence_text": guide_evidence_text,
        }

    # ==================================================================
    # Public entry point
    # ==================================================================

    def research(
        self,
        question: str,
        perm: PermissionContext,
        *,
        session_id: str | None = None,
        seed_doc_ids: list[str] | None = None,
    ) -> AsyncIterator[ResearchChunk]:
        """标准研究模式入口：流式输出研究结果块。

        Stream research answer chunks for the given question.

        Yields
        ------
        ResearchChunk objects with type in:
        ``thinking`` | ``reference`` | ``text`` | ``done`` | ``error``
        """
        return self._stream(
            question,
            perm,
            session_id=session_id,
            seed_doc_ids=seed_doc_ids,
        )

    def qa(
        self,
        question: str,
        perm: PermissionContext,
        *,
        session_id: str | None = None,
        seed_doc_ids: list[str] | None = None,
        notebook_mode: bool = False,
        notebook_config: dict | None = None,
    ) -> AsyncIterator[ResearchChunk]:
        """问答模式入口：与 research 共享流水线，但使用问答专用的系统提示词。"""
        return self._stream(
            question,
            perm,
            session_id=session_id,
            seed_doc_ids=seed_doc_ids,
            mode="notebook" if notebook_mode else "qa",
            notebook_config=notebook_config,
        )

    async def qa_search(
        self,
        question: str,
        perm: PermissionContext,
        *,
        seed_doc_ids: list[str] | None = None,
        llm: bool = False,
    ) -> dict[str, Any]:
        """QA 材料检索：执行 Phase 1-3（关键词提取 → 图谱规划 → 多路召回 → 合并），
        返回结构化的检索结果和上下文文本，跳过 LLM 生成。

        Returns a dict with keys:
        - ``documents``: merged reference doc list
        - ``context_text``: formatted context ready for LLM
        - ``keywords``: extracted keywords
        - ``intent``: graph query intent
        - ``graph_evidence_text``: graph structured evidence (if any)
        - ``guide_evidence_text``: service guide evidence (if any)
        """
        acl_tokens = getattr(perm, "acl_tokens", [])

        # ── Phase 1: Keyword extraction + Graph planning ──────────
        keywords = await self._extract_keywords(question)
        logger.info("qa_search_keywords", question=question[:80], keywords=keywords)

        graph_evidence = GraphEvidence()
        plan = self._planner.plan(question, keywords)
        if plan.intent != QueryIntent.GENERAL:
            try:
                graph_evidence = await self._planner.collect_evidence(
                    plan, self._graph, acl_tokens=acl_tokens,
                )
            except Exception as exc:
                logger.warning("qa_search_graph_evidence_failed", error=str(exc))
                graph_evidence = GraphEvidence()

        # ── Phase 2: Multi-source retrieval ───────────────────────
        es_docs = await self._es_search(question, perm)
        es_doc_ids = {d["doc_id"] for d in es_docs}
        for doc in es_docs:
            doc["_source_type"] = "search"

        explicit_docs: list[dict[str, Any]] = []
        explicit_doc_ids = [
            doc_id for doc_id in dict.fromkeys(seed_doc_ids or []) if doc_id
        ]
        if explicit_doc_ids:
            explicit_docs = await self._fetch_graph_docs(
                explicit_doc_ids[:8], None, perm,
            )
            for doc in explicit_docs:
                doc["_source_type"] = "seed"

        graph_doc_ids: list[str] = []
        if keywords:
            try:
                graph_doc_ids = await self._graph.search_graph_for_docs(
                    keywords, limit_per_keyword=8, acl_tokens=acl_tokens,
                )
            except Exception as exc:
                logger.warning("qa_search_graph_doc_discovery_failed", error=str(exc))
                graph_doc_ids = []

        if graph_evidence.doc_ids:
            graph_doc_ids.extend(graph_evidence.doc_ids)
        graph_doc_ids = [
            d for d in dict.fromkeys(graph_doc_ids) if d not in es_doc_ids
        ]

        graph_docs: list[dict[str, Any]] = []
        if graph_doc_ids:
            graph_docs = await self._fetch_graph_docs(
                graph_doc_ids[:8], question, perm,
            )
            for doc in graph_docs:
                doc["_source_type"] = "graph"

        # ── Phase 3: Merge + context build ────────────────────────
        all_docs = _merge_unique_docs(explicit_docs, es_docs, graph_docs)[:_MAX_CONTEXT_DOCS]
        guide_evidence_text, guide_docs = await self._collect_service_guide_evidence(
            question, perm, plan=plan, doc_ids=explicit_doc_ids,
        )

        reference_docs = _merge_unique_reference_docs(all_docs, guide_docs)
        if llm:
            reference_docs = _filter_llm_relevance(
                reference_docs,
                preserve_source_types={"seed"},
            )
            kept_doc_ids = {
                doc["doc_id"]
                for doc in reference_docs
                if doc.get("_source_type") != "guide"
            }
            kept_guide_ids = {
                doc.get("profile_id") or doc["doc_id"]
                for doc in reference_docs
                if doc.get("_source_type") == "guide"
            }
            all_docs = [doc for doc in all_docs if doc["doc_id"] in kept_doc_ids]
            guide_docs = [
                doc
                for doc in guide_docs
                if (doc.get("profile_id") or doc["doc_id"]) in kept_guide_ids
            ]
            if not guide_docs:
                guide_evidence_text = ""

        # Build context text (same format as LLM input)
        context_parts = [_build_context_text(all_docs)]
        if graph_evidence.text:
            context_parts.append(graph_evidence.text)
        if guide_evidence_text:
            context_parts.append(guide_evidence_text)
        context_text = "\n\n".join(part for part in context_parts if part)

        # Build structured document list for response
        documents = []
        for doc in reference_docs:
            documents.append({
                "doc_id": doc["doc_id"],
                "title": doc.get("title", ""),
                "doc_number": doc.get("doc_number"),
                "issuing_org": doc.get("issuing_org"),
                "doc_type": doc.get("doc_type"),
                "publish_date": doc.get("publish_date"),
                "score": doc.get("score"),
                "passages": doc.get("passages", []),
                "source_type": doc.get("_source_type", "search"),
                "source_url": doc.get("source_url"),
                "source_metadata": doc.get("source_metadata") or {},
                "source_system": doc.get("source_system"),
                "source_site_code": doc.get("source_site_code"),
                "source_target_code": doc.get("source_target_code"),
                **self._reference_payload(doc),
            })

        return {
            "documents": documents,
            "context_text": context_text,
            "keywords": keywords,
            "intent": plan.intent.value,
            "graph_evidence_text": graph_evidence.text or "",
            "guide_evidence_text": guide_evidence_text,
        }

    async def build_plan(
        self,
        task: ResearchTask,
        *,
        seed_doc_ids: list[str] | None = None,
    ) -> ResearchPlan:
        """生成结构化研究计划：提取关键词 → 图谱意图规划 → LLM 生成计划 → 标准化输出。

        Generate a structured research plan with safe fallbacks."""
        keywords = await self._extract_keywords(task.question)
        planner_plan = self._planner.plan(task.question, keywords)

        try:
            result = await self._llm.chat_json(
                [
                    {"role": "system", "content": RESEARCH_PLAN_SYSTEM},
                    {
                        "role": "user",
                        "content": RESEARCH_PLAN_USER.format(
                            task_brief=_build_task_brief(task),
                            keywords="、".join(keywords) or "无",
                            intent=planner_plan.intent.value,
                            seed_doc_count=len(seed_doc_ids or []),
                        ),
                    },
                ],
                temperature=0.2,
                max_tokens=1200,
            )
        except Exception as exc:
            logger.warning("research_plan_generation_failed", error=str(exc))
            result = {}

        return _normalize_research_plan(
            task,
            result,
            keywords=keywords,
            planner_plan=planner_plan,
            seed_doc_ids=seed_doc_ids,
        )

    async def run_deep_research(
        self,
        task: ResearchTask,
        plan: ResearchPlan,
        perm: PermissionContext,
        *,
        session_id: str | None = None,
        seed_doc_ids: list[str] | None = None,
    ) -> AsyncIterator[ResearchChunk]:
        """深度研究模式：按已确认的研究计划执行多源检索，生成结构化研究报告并通过 SSE 流式输出。

        Execute a confirmed research plan and emit structured SSE events."""
        acl_tokens = getattr(perm, "acl_tokens", [])
        user_id = getattr(perm, "user_id", "")
        try:
            yield ResearchChunk(
                type="progress",
                stage="init",
                status="running",
                content="正在装载研究任务与已确认计划…",
            )
            yield ResearchChunk(type="plan", payload=plan.model_dump())

            query_text = _build_task_query(task, plan)
            keywords = await self._extract_keywords(query_text)
            planner_plan = self._planner.plan(query_text, keywords)

            graph_evidence = GraphEvidence()
            if planner_plan.intent != QueryIntent.GENERAL:
                yield ResearchChunk(
                    type="progress",
                    stage="graph",
                    status="running",
                    content="正在补充图谱结构化证据…",
                )
                try:
                    graph_evidence = await self._planner.collect_evidence(
                        planner_plan,
                        self._graph,
                        acl_tokens=acl_tokens,
                    )
                except Exception as exc:
                    logger.warning("deep_research_graph_evidence_failed", error=str(exc))
                    graph_evidence = GraphEvidence()

            yield ResearchChunk(
                type="progress",
                stage="retrieve",
                status="running",
                content="正在检索混合检索结果…",
            )
            if task.enable_kb_search is False:
                es_docs = []
            else:
                es_docs = await self._es_search(query_text, perm)
                for doc in es_docs:
                    doc["_source_type"] = "search"
            es_doc_ids = {d["doc_id"] for d in es_docs}

            explicit_doc_ids = _resolve_explicit_doc_ids(
                task,
                plan,
                seed_doc_ids=seed_doc_ids,
            )
            explicit_docs: list[dict[str, Any]] = []
            if explicit_doc_ids:
                yield ResearchChunk(
                    type="progress",
                    stage="scope",
                    status="running",
                    content="正在装载显式纳入资料…",
                )
                explicit_docs = await self._fetch_graph_docs(explicit_doc_ids[:8], None, perm)
                for doc in explicit_docs:
                    doc["_source_type"] = "seed"

            graph_doc_ids: list[str] = []
            if keywords:
                try:
                    graph_doc_ids = await self._graph.search_graph_for_docs(
                        keywords,
                        limit_per_keyword=8,
                        acl_tokens=acl_tokens,
                    )
                except Exception as exc:
                    logger.warning("deep_research_graph_doc_discovery_failed", error=str(exc))
                    graph_doc_ids = []

            if graph_evidence.doc_ids:
                graph_doc_ids.extend(graph_evidence.doc_ids)

            scoped_ids = set(explicit_doc_ids)
            graph_doc_ids = [
                doc_id
                for doc_id in dict.fromkeys(graph_doc_ids)
                if doc_id and doc_id not in es_doc_ids and doc_id not in scoped_ids
            ]

            graph_docs: list[dict[str, Any]] = []
            if graph_doc_ids:
                yield ResearchChunk(
                    type="progress",
                    stage="graph",
                    status="running",
                    content="正在补充图谱关联文档…",
                )
                graph_docs = await self._fetch_graph_docs(graph_doc_ids[:8], query_text, perm)
                for doc in graph_docs:
                    doc["_source_type"] = "graph"

            if task.mode == "quick":
                context_limit = _QUICK_DOC_LIMIT
            else:
                context_limit = _DEPTH_DOC_LIMITS.get(task.depth_level, _MAX_CONTEXT_DOCS)
            all_docs = _merge_unique_docs(explicit_docs, es_docs, graph_docs)[:context_limit]
            guide_evidence_text, guide_docs = await self._collect_service_guide_evidence(
                query_text,
                perm,
                plan=planner_plan,
                doc_ids=explicit_doc_ids,
                matter_ids=list(
                    dict.fromkeys(list(plan.included_matter_ids or []) + list(task.required_matter_ids or []))
                ),
            )

            if not all_docs and not guide_docs:
                yield ResearchChunk(
                    type="error",
                    content="未检索到与当前研究任务相关的可用材料，请调整研究范围或补充显式资料。",
                )
                yield ResearchChunk(type="done")
                return

            for source_group, label in (
                ("seed", "显式纳入资料"),
                ("search", "混合检索命中"),
                ("graph", "图谱补充材料"),
                ("guide", "办事指南结构化证据"),
            ):
                grouped = (
                    guide_docs
                    if source_group == "guide"
                    else [doc for doc in all_docs if doc.get("_source_type") == source_group]
                )
                if not grouped:
                    continue
                yield ResearchChunk(
                    type="source_group",
                    title=label,
                    content=f"纳入 {len(grouped)} 份材料",
                    payload={
                        "group": source_group,
                        "count": len(grouped),
                        "doc_ids": [doc["doc_id"] for doc in grouped],
                    },
                )

            reference_docs = _merge_unique_reference_docs(all_docs, guide_docs)
            for doc in reference_docs:
                yield ResearchChunk(
                    type="reference",
                    doc_id=doc["doc_id"],
                    title=doc.get("title", ""),
                    doc_number=doc.get("doc_number"),
                    relevance_score=doc.get("score"),
                    payload=self._reference_payload(doc),
                )

            yield ResearchChunk(
                type="progress",
                stage="analysis",
                status="running",
                content="正在汇总发现、冲突和章节报告…",
            )

            doc_context, citation_map = _build_context_with_citation_map(all_docs)
            context_parts = [doc_context]
            if graph_evidence.text:
                context_parts.append(graph_evidence.text)
            if guide_evidence_text:
                context_parts.append(guide_evidence_text)
            context_text = "\n\n".join(part for part in context_parts if part)

            history = (
                await self._load_session(session_id, user_id)
                if session_id else []
            )
            result = await self._llm.chat_json(
                _build_deep_research_messages(task, plan, context_text, history),
                temperature=0.2,
                max_tokens=settings.llm_max_tokens,
            )
            report = _normalize_research_report(result, all_docs, plan, citation_map)

            if report["executive_summary"]:
                yield ResearchChunk(type="summary", content=report["executive_summary"])

            for finding in report["findings"]:
                yield ResearchChunk(
                    type="finding",
                    title=finding["title"],
                    content=finding["content"],
                    strength=finding["strength"],
                    payload={"source_doc_ids": finding["source_doc_ids"]},
                )

            for conflict in report["conflicts"]:
                yield ResearchChunk(
                    type="conflict",
                    title=conflict["title"],
                    content=conflict["content"],
                    severity=conflict["severity"],
                    payload={"source_doc_ids": conflict["source_doc_ids"]},
                )

            for question in report["open_questions"]:
                yield ResearchChunk(
                    type="open_question",
                    title=question["question"],
                    content=question["reason"],
                )

            for section in report["sections"]:
                yield ResearchChunk(
                    type="section",
                    title=section["title"],
                    content=section["content"],
                    payload={
                        "summary": section["summary"],
                        "source_doc_ids": section["source_doc_ids"],
                    },
                )

            if report["one_page_summary"]:
                yield ResearchChunk(
                    type="report",
                    title="一页式摘要",
                    content=report["one_page_summary"],
                    payload={"kind": "one_page_summary"},
                )

            if report["recommended_next_steps"]:
                yield ResearchChunk(
                    type="follow_up",
                    content="\n".join(report["recommended_next_steps"]),
                    payload={"items": report["recommended_next_steps"]},
                )

            if session_id:
                refs = [self._session_reference_doc(doc) for doc in reference_docs]
                full_answer = _join_report_text(report)
                await self._save_session(
                    session_id,
                    user_id,
                    query_text,
                    full_answer,
                    refs,
                )

            yield ResearchChunk(
                type="progress",
                stage="complete",
                status="completed",
                content="研究报告已生成。",
            )

            # ── Generate final deliverable document ─────────────────
            yield ResearchChunk(
                type="progress",
                stage="document",
                status="running",
                content="正在生成交付物文档...",
            )
            try:
                final_doc_messages = _build_final_document_messages(
                    task, plan, report, reference_docs,
                    output_template=getattr(task, 'output_template', None),
                )
                doc_parts: list[str] = []
                async for text_chunk in self._llm.chat(
                    final_doc_messages,
                    temperature=0.3,
                    max_tokens=settings.llm_max_tokens,
                ):
                    doc_parts.append(text_chunk)
                final_document_md = "".join(doc_parts)
                yield ResearchChunk(
                    type="final_document",
                    content=final_document_md,
                )
                yield ResearchChunk(
                    type="progress",
                    stage="document",
                    status="completed",
                    content="交付物文档已生成。",
                )
            except Exception as doc_exc:
                logger.warning("final_document_generation_failed: %s", doc_exc)
                yield ResearchChunk(
                    type="progress",
                    stage="document",
                    status="completed",
                    content="交付物文档生成失败，不影响研究结果。",
                )

            yield ResearchChunk(
                type="done",
                payload={"citation_map": report.get("citation_map") or []},
            )

        except Exception as exc:
            # 安全修复：不向客户端暴露内部异常详情，仅记录日志
            logger.exception("deep_research_error")
            yield ResearchChunk(type="error", content="研究过程中发生内部错误，请稍后重试")
            yield ResearchChunk(type="done")

    async def rerun_section(
        self,
        task: ResearchTask,
        plan: ResearchPlan,
        section_title: str,
        perm: PermissionContext,
        *,
        section_summary: str | None = None,
        source_doc_ids: list[str] | None = None,
        session_id: str | None = None,
        seed_doc_ids: list[str] | None = None,
    ) -> AsyncIterator[ResearchChunk]:
        """章节局部重跑：针对指定章节重新收集证据并重写内容，不影响其他章节。

        Re-run a single report section with focused evidence collection."""
        user_id = getattr(perm, "user_id", "")
        try:
            yield ResearchChunk(
                type="progress",
                stage="init",
                status="running",
                content="正在准备章节局部重跑…",
            )
            yield ResearchChunk(
                type="progress",
                stage="retrieve",
                status="running",
                content="正在围绕目标章节重取证据…",
            )

            section_hint = _build_section_hint(section_title, section_summary)
            materials = await self._collect_deep_research_materials(
                task,
                plan,
                perm,
                seed_doc_ids=seed_doc_ids,
                extra_doc_ids=source_doc_ids,
                section_hint=section_hint,
                explicit_query=section_hint,
            )
            all_docs = materials["all_docs"]
            guide_docs = materials["guide_docs"]

            if not all_docs and not guide_docs:
                yield ResearchChunk(
                    type="error",
                    content="未检索到可用于重跑该章节的材料，请补充章节证据范围后重试。",
                )
                yield ResearchChunk(type="done")
                return

            for source_group, label in (
                ("seed", "显式纳入资料"),
                ("search", "混合检索命中"),
                ("graph", "图谱补充材料"),
                ("guide", "办事指南结构化证据"),
            ):
                grouped = (
                    guide_docs
                    if source_group == "guide"
                    else [doc for doc in all_docs if doc.get("_source_type") == source_group]
                )
                if not grouped:
                    continue
                yield ResearchChunk(
                    type="source_group",
                    title=label,
                    content=f"纳入 {len(grouped)} 份材料",
                    payload={
                        "group": source_group,
                        "count": len(grouped),
                        "doc_ids": [doc["doc_id"] for doc in grouped],
                    },
                )

            reference_docs = _merge_unique_reference_docs(all_docs, guide_docs)
            for doc in reference_docs:
                yield ResearchChunk(
                    type="reference",
                    doc_id=doc["doc_id"],
                    title=doc.get("title", ""),
                    doc_number=doc.get("doc_number"),
                    relevance_score=doc.get("score"),
                    payload=self._reference_payload(doc),
                )

            yield ResearchChunk(
                type="progress",
                stage="analysis",
                status="running",
                content="正在重写目标章节…",
            )

            context_parts = [_build_context_text(all_docs)]
            graph_evidence = materials["graph_evidence"]
            if graph_evidence.text:
                context_parts.append(graph_evidence.text)
            if materials["guide_evidence_text"]:
                context_parts.append(materials["guide_evidence_text"])
            context_text = "\n\n".join(part for part in context_parts if part)

            result = await self._llm.chat_json(
                [
                    {"role": "system", "content": RESEARCH_SECTION_RERUN_SYSTEM},
                    {
                        "role": "user",
                        "content": RESEARCH_SECTION_RERUN_USER.format(
                            task_brief=_build_task_brief(task),
                            plan_brief=_build_plan_brief(plan),
                            section_title=section_title,
                            section_summary=section_summary or "无",
                            context=context_text,
                        ),
                    },
                ],
                temperature=0.2,
                max_tokens=settings.llm_max_tokens,
            )
            rerun_result = _normalize_section_rerun_result(result, all_docs, section_title)
            section = rerun_result["section"]

            if not section["source_doc_ids"] and source_doc_ids:
                available_doc_ids = {doc["doc_id"] for doc in all_docs}
                section["source_doc_ids"] = [
                    doc_id for doc_id in source_doc_ids if doc_id in available_doc_ids
                ][:8]

            yield ResearchChunk(
                type="section",
                title=section["title"],
                content=section["content"],
                payload={
                    "summary": section["summary"],
                    "source_doc_ids": section["source_doc_ids"],
                    "rerun": True,
                },
            )

            if rerun_result["notes"]:
                yield ResearchChunk(
                    type="follow_up",
                    content="\n".join(rerun_result["notes"]),
                    payload={
                        "items": rerun_result["notes"],
                        "kind": "section_rerun_notes",
                    },
                )

            if session_id:
                refs = [self._session_reference_doc(doc) for doc in reference_docs]
                await self._save_session(
                    session_id,
                    user_id,
                    f"{task.question}（章节重跑：{section_title}）",
                    _join_section_text(section),
                    refs,
                )

            yield ResearchChunk(
                type="progress",
                stage="complete",
                status="completed",
                content="章节局部重跑已完成。",
            )
            yield ResearchChunk(type="done")

        except Exception as exc:
            logger.error("research_section_rerun_error", error=str(exc))
            yield ResearchChunk(type="error", content=f"章节局部重跑失败：{exc}")
            yield ResearchChunk(type="done")

    async def _stream(
        self,
        question: str,
        perm: PermissionContext,
        *,
        session_id: str | None,
        seed_doc_ids: list[str] | None = None,
        mode: str = "research",
        notebook_config: dict | None = None,
    ) -> AsyncIterator[ResearchChunk]:
        """标准研究/问答的核心流水线：关键词提取 → 图谱规划 → ES 混合检索 → 图谱补充 → LLM 流式回答。"""
        acl_tokens = getattr(perm, "acl_tokens", [])
        user_id = getattr(perm, "user_id", "")
        try:
            # ── Step 1: acknowledge + keyword extraction ───────────────
            _thinking_msg = {
                "research": "正在理解研究问题…",
                "notebook": "正在分析来源材料…",
            }.get(mode, "正在分析问答问题…")
            yield ResearchChunk(type="thinking", content=_thinking_msg)

            keywords = await self._extract_keywords(question)
            logger.info("research_keywords", question=question[:80], keywords=keywords)

            # ── Step 1.5: Graph query planning (skip in notebook mode) ─
            graph_evidence = GraphEvidence()
            plan = self._planner.plan(question, keywords)
            if mode != "notebook":
                logger.info(
                    "research_query_plan",
                    intent=plan.intent.value,
                    doc_code=plan.doc_code,
                    entity_name=plan.entity_name,
                    entity_label=plan.entity_label,
                    matter_query=plan.matter_query,
                )
                if plan.intent != QueryIntent.GENERAL:
                    yield ResearchChunk(type="thinking", content="正在查询图谱结构化证据…")
                    try:
                        graph_evidence = await self._planner.collect_evidence(
                            plan,
                            self._graph,
                            acl_tokens=acl_tokens,
                        )
                    except Exception as exc:
                        logger.warning("research_graph_evidence_failed", error=str(exc))
                        graph_evidence = GraphEvidence()

            # ── Step 2: ES hybrid retrieval (skip in notebook mode) ────
            es_docs: list[dict[str, Any]] = []
            if mode != "notebook":
                yield ResearchChunk(type="thinking", content="正在检索相关公文…")
                es_docs = await self._es_search(question, perm)
                for doc in es_docs:
                    doc["_source_type"] = "search"
            es_doc_ids = {d["doc_id"] for d in es_docs}

            explicit_docs: list[dict[str, Any]] = []
            explicit_doc_ids = [doc_id for doc_id in dict.fromkeys(seed_doc_ids or []) if doc_id]
            if explicit_doc_ids:
                yield ResearchChunk(type="thinking", content="正在载入导入资料…")
                # In notebook mode: use all sources (up to 10) and do query-aware retrieval
                # In normal mode: cap at 8 and skip relevance matching for seed docs
                _seed_limit = len(explicit_doc_ids) if mode == "notebook" else 8
                _seed_question = question if mode == "notebook" else None
                explicit_docs = await self._fetch_graph_docs(
                    explicit_doc_ids[:_seed_limit],
                    _seed_question,
                    perm,
                )
                for doc in explicit_docs:
                    doc["_source_type"] = "seed"

            # ── Step 3: Graph enrichment (skip in notebook mode) ─────
            graph_doc_ids: list[str] = []
            if mode != "notebook" and keywords:
                try:
                    graph_doc_ids = await self._graph.search_graph_for_docs(
                        keywords,
                        limit_per_keyword=8,
                        acl_tokens=acl_tokens,
                    )
                except Exception as exc:
                    logger.warning("research_graph_doc_discovery_failed", error=str(exc))
                    graph_doc_ids = []

            if mode != "notebook" and graph_evidence.doc_ids:
                graph_doc_ids.extend(graph_evidence.doc_ids)

            # Remove docs already in ES results to avoid duplicates
            graph_doc_ids = [d for d in dict.fromkeys(graph_doc_ids) if d not in es_doc_ids]

            # ── Step 4: Fetch chunks for graph-only docs ───────────────
            graph_docs: list[dict[str, Any]] = []
            if graph_doc_ids:
                graph_docs = await self._fetch_graph_docs(
                    graph_doc_ids[:8], question, perm
                )
                for doc in graph_docs:
                    doc["_source_type"] = "graph"

            # ── Step 5: Merge and cap ──────────────────────────────────
            all_docs = _merge_unique_docs(explicit_docs, es_docs, graph_docs)[:_MAX_CONTEXT_DOCS]
            guide_evidence_text: str = ""
            guide_docs: list[dict[str, Any]] = []
            if mode != "notebook":
                guide_evidence_text, guide_docs = await self._collect_service_guide_evidence(
                    question,
                    perm,
                    plan=plan,
                    doc_ids=explicit_doc_ids,
                )

            if not all_docs and not guide_docs:
                no_result_msg = (
                    "来源材料中未找到与您问题相关的内容，请尝试调整问题或添加更多来源。"
                    if mode == "notebook"
                    else "未检索到与您问题相关的公文材料，请尝试更换关键词或调整权限范围。"
                )
                yield ResearchChunk(type="text", content=no_result_msg)
                yield ResearchChunk(type="done")
                return

            # ── Step 6: Emit reference chunks ─────────────────────────
            for source_group, label in (
                ("seed", "显式纳入资料"),
                ("search", "混合检索命中"),
                ("graph", "图谱补充材料"),
                ("guide", "办事指南结构化证据"),
            ):
                grouped = (
                    guide_docs
                    if source_group == "guide"
                    else [doc for doc in all_docs if doc.get("_source_type") == source_group]
                )
                if not grouped:
                    continue
                yield ResearchChunk(
                    type="source_group",
                    title=label,
                    content=f"纳入 {len(grouped)} 份材料",
                    payload={
                        "group": source_group,
                        "count": len(grouped),
                        "doc_ids": [doc["doc_id"] for doc in grouped],
                    },
                )

            # ── Step 7: Build LLM context ──────────────────────────────
            _gen_msg = {
                "research": "正在生成研究分析报告…",
                "notebook": "正在基于来源生成回答…",
            }.get(mode, "正在生成问答结果…")
            yield ResearchChunk(type="thinking", content=_gen_msg)

            # Build context text with citation map (passage-level [N] numbering)
            _ctx_text, _citation_map = _build_context_with_citation_map(all_docs)
            context_parts = [_ctx_text]
            if graph_evidence.text:
                context_parts.append(graph_evidence.text)
            if guide_evidence_text:
                context_parts.append(guide_evidence_text)
            context_text = "\n\n".join(part for part in context_parts if part)

            # Emit references
            if mode == "notebook":
                # Notebook: emit passage-level references so [1][2][3] from
                # the same document each have a matching reference entry.
                logger.info(
                    "notebook_citation_map",
                    count=len(_citation_map),
                    doc_ids=[c.get("doc_id", "") for c in _citation_map],
                )
                for _cm_entry in _citation_map:
                    yield ResearchChunk(
                        type="reference",
                        doc_id=_cm_entry.get("doc_id", ""),
                        title=_cm_entry.get("title", ""),
                        doc_number=_cm_entry.get("doc_number"),
                        payload=_cm_entry,
                    )
            else:
                # Normal mode: emit document-level references (deduplicated)
                reference_docs = _merge_unique_reference_docs(all_docs, guide_docs)
                for doc in reference_docs:
                    yield ResearchChunk(
                        type="reference",
                        doc_id=doc["doc_id"],
                        title=doc.get("title", ""),
                        doc_number=doc.get("doc_number"),
                        relevance_score=doc.get("score"),
                        payload=self._reference_payload(doc),
                    )

            # ── Step 8: Load session history ───────────────────────────
            history = (
                await self._load_session(session_id, user_id)
                if session_id else []
            )

            # ── Step 9: Build messages ─────────────────────────────────
            if mode == "notebook":
                from app.prompts.notebook_prompts import (
                    NOTEBOOK_CHAT_USER,
                    build_notebook_chat_system,
                )
                _nb_cfg = notebook_config or {}
                _nb_system = build_notebook_chat_system(
                    goal=_nb_cfg.get("goal", ""),
                    style=_nb_cfg.get("style", ""),
                    answer_length=_nb_cfg.get("answer_length", "medium"),
                )
                messages: list[dict[str, str]] = [
                    {"role": "system", "content": _nb_system},
                ]
                for turn in history:
                    messages.append({"role": "user", "content": turn["question"]})
                    messages.append({"role": "assistant", "content": turn["answer"]})
                messages.append({
                    "role": "user",
                    "content": NOTEBOOK_CHAT_USER.format(
                        context_text=context_text, question=question,
                    ),
                })
            else:
                messages = _build_messages(question, context_text, history, mode=mode)

            # ── Step 10: Stream LLM answer ─────────────────────────────
            # 使用 list 收集 token 后 join，避免字符串拼接的 O(n²) 性能问题
            answer_parts: list[str] = []
            async for token in self._llm.chat(
                messages,
                temperature=0.3,
                max_tokens=settings.llm_max_tokens,
            ):
                answer_parts.append(token)
                yield ResearchChunk(type="text", content=token)
            full_answer = "".join(answer_parts)

            # ── Step 11: Persist session turn ──────────────────────────
            if session_id:
                _persist_docs = all_docs if mode == "notebook" else reference_docs
                refs = [self._session_reference_doc(doc) for doc in _persist_docs]
                await self._save_session(
                    session_id,
                    user_id,
                    question,
                    full_answer,
                    refs,
                )

            yield ResearchChunk(type="done")

        except Exception as exc:
            # 安全修复：不向客户端暴露内部异常详情，仅记录日志
            logger.exception(f"{mode}_error")
            yield ResearchChunk(
                type="error",
                content="研究过程中发生内部错误，请稍后重试",
            )
            yield ResearchChunk(type="done")

    # ==================================================================
    # Session management
    # ==================================================================

    async def _load_session(
        self,
        session_id: str,
        user_id: str,
    ) -> list[dict[str, Any]]:
        """从会话存储中加载多轮对话历史，仅保留最近 N 轮。

        Load conversation history from the configured session store."""
        try:
            history = await self._session_store.load_session(session_id, user_id)
            if history:
                return history[-_MAX_HISTORY_TURNS:]
        except Exception as exc:
            logger.warning(
                "session_load_failed",
                session_id=session_id,
                user_id=user_id,
                error=str(exc),
            )
        return []

    async def _save_session(
        self,
        session_id: str,
        user_id: str,
        question: str,
        answer: str,
        refs: list[dict[str, Any]],
    ) -> None:
        """将当前问答轮次追加到会话存储中，用于后续多轮对话。

        Append a Q&A turn to the configured session store."""
        try:
            await self._session_store.save_session(
                session_id,
                user_id,
                question,
                answer,
                refs,
            )
        except Exception as exc:
            logger.warning(
                "session_save_failed",
                session_id=session_id,
                user_id=user_id,
                error=str(exc),
            )
