"""
入库管道 —— 负责文档从上传到索引的完整处理流程。
Document ingest pipeline – orchestrates the full document processing flow.

双层架构:
  - 文档层 (gov_doc_meta): 按 doc_id 存储业务元数据和独立 ACL
  - 内容层 (gov_doc_chunks): 按 content_hash 存储共享分块和合并后的 ACL

处理流程:
  文件 → SHA-256 去重检查 → [重复内容快路径 | 完整处理路径] → 元数据写入 → ACL 重算

Two-layer architecture:
  - Document layer (gov_doc_meta): per doc_id, business metadata + own ACL
  - Content layer (gov_doc_chunks): per content_hash, shared chunks + merged ACL

Pipeline:
  File → SHA-256 check → [dedup fast path | full path] → meta write → ACL recompute

Supports multiple document formats via Docling + Java converter service.
"""

from __future__ import annotations

import asyncio
import hashlib
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any

import httpx

from app.config import settings
from app.core.chunker import DocumentChunker, chunks_from_docling
from app.core.docling_processor import DoclingProcessor
from app.core.embedding import EmbeddingService
from app.core.ingest_trace_recorder import IngestTraceRecorder
from app.infrastructure.embedding_client import EmbeddingClient
from app.infrastructure.es_client import ESClient
from app.utils.file_type import (
    detect_file_type,
    is_docling_format,
    is_plain_text,
    is_supported,
    needs_conversion,
    get_conversion_target,
)
from app.prompts.document_analysis import (
    SYSTEM_PROMPT as ANALYSIS_SYSTEM_PROMPT,
    _USER_PROMPT_TEMPLATE as ANALYSIS_USER_PROMPT_TEMPLATE,
)
from app.prompts.metadata_extraction import (
    SYSTEM_PROMPT as META_SYSTEM_PROMPT,
    _USER_PROMPT_TEMPLATE as META_USER_PROMPT_TEMPLATE,
)
from app.prompts.summary_generation import (
    SYSTEM_PROMPT as SUM_SYSTEM_PROMPT,
    USER_PROMPT as SUM_USER_PROMPT_TEMPLATE,
)
from app.utils.logger import get_logger

if TYPE_CHECKING:
    from app.core.document_analyzer import DocumentAnalyzer
    from app.core.graph_builder import GraphBuilder
    from app.core.graph_query_service import GraphQueryService
    from app.core.metadata_extractor import MetadataExtractor
    from app.core.service_guide_extractor import ServiceGuideExtractor
    from app.core.summary_generator import SummaryGenerator

logger = get_logger(__name__)


def compute_content_hash(file_path: str | Path) -> str:
    """Compute the SHA-256 hash of a file.

    安全修复：使用 SHA-256 替代 MD5，避免哈希碰撞风险。
    """
    sha256 = hashlib.sha256()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            sha256.update(chunk)
    return sha256.hexdigest()


class _NoOpRecorder:
    """空操作记录器，在未提供追踪记录器时作为占位使用，所有方法均为空操作。

    Fallback when no trace recorder is provided — all methods are no-ops."""

    has_stage_failure: bool = False

    async def record(self, *a, **kw) -> None:
        pass

    async def record_stage_start(self, *a, **kw) -> None:
        pass

    async def record_stage_complete(self, *a, **kw) -> None:
        pass

    async def record_stage_failed(self, *a, **kw) -> None:
        pass

    async def update_trace_fields(self, *a, **kw) -> None:
        pass

    async def record_artifact(self, *a, **kw) -> str:
        return ""


class IngestPipeline:
    """文档入库管道，协调 解析 → 分块 → 向量化 → 索引 → 图谱构建 的完整流程。

    Orchestrates document ingestion: parse → chunk → embed → index → graph."""

    def __init__(
        self,
        es_client: ESClient,
        embedding_service: EmbeddingService,
        *,
        docling_processor: DoclingProcessor | None = None,
        chunker: DocumentChunker | None = None,
        graph_builder: "GraphBuilder | None" = None,
        graph_query_service: "GraphQueryService | None" = None,
        metadata_extractor: "MetadataExtractor | None" = None,
        summary_generator: "SummaryGenerator | None" = None,
        document_analyzer: "DocumentAnalyzer | None" = None,
        service_guide_extractor: "ServiceGuideExtractor | None" = None,
        mysql_client: "Any | None" = None,
    ):
        self._es = es_client
        self._embedding = embedding_service
        self._docling = docling_processor or DoclingProcessor()
        self._chunker = chunker or DocumentChunker()
        self._graph_builder = graph_builder
        self._graph_query = graph_query_service
        self._metadata_extractor = metadata_extractor
        self._summary_generator = summary_generator
        self._document_analyzer = document_analyzer
        self._service_guide_extractor = service_guide_extractor
        self._mysql = mysql_client
        # 复用 httpx 连接池，避免每次转换请求都创建新连接
        # Reuse httpx connection pool to avoid creating a new connection per conversion request
        self._http_client = httpx.AsyncClient(timeout=settings.converter_timeout)

    async def close(self) -> None:
        """关闭内部 HTTP 连接池，释放资源。 / Close the internal HTTP connection pool."""
        await self._http_client.aclose()

    async def _update_meta_status(
        self,
        doc_id: str,
        status: str,
        *,
        error: str | None = None,
    ) -> None:
        """Upsert a status field on the meta index document."""
        now_iso = datetime.now(timezone.utc).isoformat()
        script_parts = [
            "ctx._source.status = params.status",
            "ctx._source.updated_at = params.updated_at",
            "if (params.error != null) { ctx._source.error = params.error }",
        ]
        params: dict[str, Any] = {
            "status": status,
            "updated_at": now_iso,
            "error": error[:2000] if error else None,
        }
        upsert_body: dict[str, Any] = {
            "doc_id": doc_id,
            "status": status,
            "created_at": now_iso,
            "updated_at": now_iso,
        }
        if error is not None:
            upsert_body["error"] = error[:2000]
        try:
            await self._es.raw.update(
                index=settings.es_meta_index,
                id=doc_id,
                body={
                    "script": {
                        "lang": "painless",
                        "source": "; ".join(script_parts),
                        "params": params,
                    },
                    "upsert": upsert_body,
                },
            )
        except Exception as exc:
            logger.warning(
                "meta_status_update_failed",
                doc_id=doc_id,
                status=status,
                error=str(exc),
            )

    async def ingest_document(
        self,
        doc_id: str,
        file_path: str | Path,
        metadata: dict[str, Any],
        trace_id: str | None = None,
        recorder: IngestTraceRecorder | _NoOpRecorder | None = None,
    ) -> dict[str, Any]:
        """执行单文档入库流程，包括文件类型检测、SHA-256 去重、格式转换、解析分块、向量化、索引写入和图谱构建。

        Full document ingest pipeline with content deduplication.

        Args:
            doc_id: Unique document identifier.
            file_path: Path to the document file (any supported format).
            metadata: Document metadata from OA webhook.
            trace_id: Trace ID for structured event recording (Phase A = task_id).
            recorder: External recorder instance.  When provided the pipeline
                reuses it instead of creating a new one, keeping seq and
                started_at in sync across the whole trace lifecycle.

        Returns:
            dict with ingest results.
        """
        start_time = datetime.now(timezone.utc)
        logger.info("ingest_start", doc_id=doc_id, file_path=str(file_path))
        rec: IngestTraceRecorder | _NoOpRecorder
        if recorder is not None:
            rec = recorder
        elif trace_id:
            rec = IngestTraceRecorder(
                self._es, trace_id=trace_id, doc_id=doc_id, task_id=trace_id,
            )
        else:
            rec = _NoOpRecorder()

        metadata = dict(metadata)
        auto_extract = bool(metadata.pop("auto_extract", False))

        # Resolve knowledge_category → knowledge_category_code if missing
        kc = metadata.get("knowledge_category", "")
        if kc and not metadata.get("knowledge_category_code"):
            from app.core.graph_schema_loader import get_schema
            metadata["knowledge_category_code"] = get_schema().knowledge_category_code(kc)

        await self._update_meta_status(doc_id, "processing")

        try:
            file_path = Path(file_path)

            # Step 0: Detect and validate file type
            await rec.record_stage_start("file_type_detected")
            file_type = detect_file_type(file_path)
            if not is_supported(file_type):
                await rec.record_stage_failed(
                    "file_type_detected",
                    summary=f"不支持的文件类型: {file_type}",
                    error_code="INGEST_UNSUPPORTED_FILE",
                    error_message=f"Unsupported: {file_type}",
                )
                raise ValueError(f"Unsupported file type: {file_type} (file: {file_path.name})")
            await rec.record_stage_complete(
                "file_type_detected",
                summary=f"文件类型: {file_type}",
                details={"file_type": file_type, "file_name": file_path.name},
            )
            await rec.update_trace_fields({"file_type": file_type})

            logger.info("ingest_file_type", doc_id=doc_id, file_type=file_type)

            # Step 0.5: Compute content hash
            await rec.record_stage_start("content_hash_computed")
            # 将同步哈希计算放到线程池，避免阻塞事件循环
            # Run synchronous hash computation in thread pool to avoid blocking the event loop
            content_hash = await asyncio.to_thread(compute_content_hash, file_path)
            await rec.record_stage_complete(
                "content_hash_computed",
                summary=f"SHA-256: {content_hash[:12]}...",
                details={"content_hash": content_hash},
            )
            await rec.update_trace_fields({"content_hash": content_hash})

            logger.info("ingest_hash", doc_id=doc_id, content_hash=content_hash[:12])

            # Step 0.6: Store file by content_hash (content-addressed)
            await rec.record_stage_start("file_stored")
            storage_dir = settings.file_storage_path
            storage_dir.mkdir(parents=True, exist_ok=True)
            ext = file_path.suffix.lstrip(".").lower() or file_type
            content_file_path = storage_dir / f"{content_hash}.{ext}"
            file_already_existed = content_file_path.exists()
            if not file_already_existed:
                # 将同步文件复制放到线程池，避免阻塞事件循环
                # Run synchronous file copy in thread pool to avoid blocking the event loop
                await asyncio.to_thread(shutil.copy2, str(file_path), str(content_file_path))
            await rec.record_stage_complete(
                "file_stored",
                summary=f"文件存储: {content_hash}.{ext}"
                + (" (已存在)" if file_already_existed else ""),
                details={
                    "target_path": f"{content_hash}.{ext}",
                    "already_existed": file_already_existed,
                    "file_size": content_file_path.stat().st_size,
                },
            )

            # Step 1: Check for duplicate content (SHA-256 dedup)
            await rec.record_stage_start("dedup_checked")
            existing = await self._es.find_by_content_hash(content_hash)

            if existing:
                await rec.record_stage_complete(
                    "dedup_checked",
                    summary=f"命中重复内容，已有文档: {existing.get('doc_id', '?')}",
                    details={
                        "is_duplicate": True,
                        "existing_doc_id": existing.get("doc_id"),
                        "existing_chunk_count": existing.get("chunk_count", 0),
                    },
                )
                # ── Fast path: content already processed ──
                return await self._handle_duplicate(
                    doc_id=doc_id,
                    content_hash=content_hash,
                    metadata=metadata,
                    existing=existing,
                    start_time=start_time,
                    file_type=file_type,
                    file_path=content_file_path,
                    rec=rec,
                )

            await rec.record_stage_complete(
                "dedup_checked",
                summary="新内容，走完整处理路径",
                details={"is_duplicate": False},
            )

            # ── Full path: new content ──
            return await self._handle_new_content(
                doc_id=doc_id,
                content_hash=content_hash,
                metadata=metadata,
                file_path=file_path,
                content_file_path=content_file_path,
                file_type=file_type,
                auto_extract=auto_extract,
                start_time=start_time,
                rec=rec,
            )

        except Exception as e:
            elapsed = (datetime.now(timezone.utc) - start_time).total_seconds()
            error_msg = str(e)
            logger.error(
                "ingest_failed",
                doc_id=doc_id,
                error=error_msg,
                duration=round(elapsed, 2),
            )
            await self._update_meta_status(doc_id, "failed", error=error_msg)

            await rec.record(
                "ingest_failed", "failed",
                summary=f"入库失败: {error_msg[:100]}",
                error_code="INGEST_PIPELINE_FAILED",
                error_message=error_msg,
            )

            return {
                "doc_id": doc_id,
                "status": "failed",
                "error": error_msg,
                "error_code": "INGEST_PIPELINE_FAILED",
                "duration_seconds": round(elapsed, 2),
            }

    async def _handle_duplicate(
        self,
        doc_id: str,
        content_hash: str,
        metadata: dict[str, Any],
        existing: dict[str, Any],
        start_time: datetime,
        file_type: str,
        file_path: Path,
        rec: IngestTraceRecorder | _NoOpRecorder,
    ) -> dict[str, Any]:
        """重复内容快路径：跳过解析/分块/向量化，仅写入文档元数据并重算 ACL。

        Handle ingestion when content already exists (fast path)."""
        logger.info(
            "ingest_duplicate_detected",
            doc_id=doc_id,
            content_hash=content_hash[:12],
            existing_doc=existing.get("doc_id"),
        )

        summary = ""
        full_text = ""
        if (self._summary_generator and settings.summary_enabled) or self._service_guide_extractor:
            full_text = await self._get_text_from_chunks(content_hash)

        if self._summary_generator and settings.summary_enabled:
            await rec.record_stage_start("summary_generated")
            try:
                if full_text:
                    summary = await self._summary_generator.generate(full_text, metadata)
                await rec.record_stage_complete(
                    "summary_generated",
                    summary=f"摘要生成完成 ({len(summary)} 字)" if summary else "跳过摘要生成",
                    details={
                        "summary_length": len(summary),
                        "prompt_template": {
                            "system_prompt": SUM_SYSTEM_PROMPT,
                            "user_prompt_template": SUM_USER_PROMPT_TEMPLATE.template,
                        },
                    },
                )
            except Exception as sum_exc:
                # 摘要生成失败应记录为 failed 而非 completed / Record as failed, not completed
                await rec.record_stage_failed(
                    "summary_generated",
                    summary=f"摘要生成失败 (dedup): {sum_exc}",
                    error_code="INGEST_SUMMARY_FAILED",
                    error_message=str(sum_exc),
                )

        await self._run_service_guide_stage(
            doc_id=doc_id,
            content_hash=content_hash,
            metadata=metadata,
            full_text=full_text,
            rec=rec,
            patch_chunks=True,
        )
        if metadata.get("service_guide_status") == "completed":
            await self._sync_service_guide_matter_bindings(
                guide_doc_id=doc_id,
                source_doc_id=existing.get("doc_id") or doc_id,
            )

        # Write doc meta
        await rec.record_stage_start("doc_meta_written")
        ext = file_path.suffix.lstrip(".").lower() or file_type
        await self._write_doc_meta(
            doc_id=doc_id,
            metadata=metadata,
            content_hash=content_hash,
            chunk_count=existing.get("chunk_count", 0),
            page_count=existing.get("page_count", 0),
            file_path=f"{content_hash}.{ext}",
            file_type=file_type,
            summary=summary,
        )
        await rec.record_stage_complete(
            "doc_meta_written",
            summary="文档元数据写入完成 (dedup)",
            details={
                "chunk_count": existing.get("chunk_count", 0),
                "title": metadata.get("title", ""),
                "doc_number": metadata.get("doc_number", ""),
                "issuing_org": metadata.get("issuing_org", ""),
                "doc_type": metadata.get("doc_type", ""),
                "acl_ids": metadata.get("acl_ids", []),
                "knowledge_category": metadata.get("knowledge_category", ""),
            },
        )

        # Recompute ACL
        await rec.record_stage_start("chunk_acl_recomputed")
        await self._es.recompute_chunk_acl(content_hash)
        await rec.record_stage_complete(
            "chunk_acl_recomputed",
            summary="ACL 权限重新计算完成",
            details={"content_hash": content_hash[:12], "acl_ids": metadata.get("acl_ids", [])},
        )

        # Ingest completed
        elapsed = (datetime.now(timezone.utc) - start_time).total_seconds()
        await rec.record(
            "ingest_completed", "completed",
            summary=f"入库完成 (重复内容快路径)，耗时 {elapsed:.1f}s",
            details={"deduplicated": True},
        )

        result = {
            "doc_id": doc_id,
            "status": "completed",
            "deduplicated": True,
            "content_hash": content_hash,
            "chunk_count": existing.get("chunk_count", 0),
            "page_count": existing.get("page_count", 0),
            "file_type": file_type,
            "service_guide_status": metadata.get("service_guide_status", ""),
            "duration_seconds": round(elapsed, 2),
        }
        logger.info("ingest_complete_dedup", **result)
        return result

    async def _handle_new_content(
        self,
        doc_id: str,
        content_hash: str,
        metadata: dict[str, Any],
        file_path: Path,
        content_file_path: Path,
        file_type: str,
        auto_extract: bool,
        start_time: datetime,
        rec: IngestTraceRecorder | _NoOpRecorder,
    ) -> dict[str, Any]:
        """新内容完整处理路径：格式转换 → 解析 → 分块 → 元数据提取 → 摘要生成 → 向量化 → 索引 → 图谱构建。

        Handle ingestion of new content — full processing path."""

        # Step 2: Determine processing path based on file type
        processing_path = file_path

        # If conversion is needed, call Java converter service
        if needs_conversion(file_type):
            await rec.record_stage_start("document_converted", service="converter")
            try:
                original_filename = metadata.get("original_filename") or file_path.name
                processing_path = await self._convert_document(
                    file_path, file_type, original_filename=original_filename,
                    doc_id=doc_id, task_id=metadata.get("task_id", ""),
                )
                await rec.record_stage_complete(
                    "document_converted",
                    summary=f"格式转换完成: {file_type} → {processing_path.suffix}",
                    details={
                        "from_type": file_type,
                        "to_path": str(processing_path),
                        "output_size": processing_path.stat().st_size,
                    },
                    service="converter",
                )
                logger.info("ingest_converted", doc_id=doc_id,
                            from_type=file_type, to_path=str(processing_path))
            except Exception as conv_exc:
                await rec.record_stage_failed(
                    "document_converted",
                    summary=f"格式转换失败: {conv_exc}",
                    error_code="INGEST_CONVERT_FAILED",
                    error_message=str(conv_exc),
                    service="converter",
                )
                raise

        # Step 3: Parse and chunk
        await rec.record_stage_start("document_parsed", service="docling")
        parse_method = "plain_text" if is_plain_text(file_type) else "docling"

        if is_plain_text(file_type):
            full_text = self._read_text_file(file_path)
            page_count = 0
            if not full_text.strip():
                await rec.record_stage_failed(
                    "document_parsed",
                    summary="解析失败: 空文本",
                    error_code="INGEST_EMPTY_TEXT",
                    error_message="Empty text",
                )
                raise ValueError(f"Empty text extracted from: {file_path}")
        else:
            # 将同步 Docling 解析放到线程池，避免阻塞事件循环
            # Run synchronous Docling processing in thread pool to avoid blocking the event loop
            processed = await asyncio.to_thread(self._docling.process, processing_path)
            full_text = processed.full_text
            page_count = processed.page_count
            if not full_text.strip():
                await rec.record_stage_failed(
                    "document_parsed",
                    summary=f"解析失败: 空文本 (processed={processing_path.name})",
                    error_code="INGEST_EMPTY_TEXT",
                    error_message=f"Empty text from {processing_path}",
                )
                raise ValueError(
                    f"Empty text extracted from: {processing_path}"
                    f" (original: {file_path})"
                )

        await rec.record_stage_complete(
            "document_parsed",
            summary=f"解析完成: {len(full_text)} 字, {page_count} 页 ({parse_method})",
            details={
                "parse_method": parse_method,
                "total_chars": len(full_text),
                "page_count": page_count,
            },
            service="docling" if parse_method == "docling" else "worker",
        )

        # Chunking
        await rec.record_stage_start("document_chunked")
        if is_plain_text(file_type):
            chunk_metadata = self._build_chunk_metadata(metadata)
            chunk_metadata["file_type"] = file_type
            chunks = self._chunker.chunk_document(
                text=full_text,
                content_hash=content_hash,
                doc_ids=[doc_id],
                metadata=chunk_metadata,
            )
        else:
            chunk_metadata = self._build_chunk_metadata(metadata)
            chunks = chunks_from_docling(
                processed_chunks=processed.chunks,
                content_hash=content_hash,
                doc_ids=[doc_id],
                metadata=chunk_metadata,
                file_type=file_type,
            )

        if not chunks:
            await rec.record_stage_failed(
                "document_chunked",
                summary="分块失败: 没有产出 chunk",
                error_code="INGEST_CHUNK_EMPTY",
                error_message="No chunks produced",
            )
            raise ValueError(f"No chunks produced for document: {doc_id}")

        chunk_tokens = [getattr(c, "token_estimate", 0) or len(c.content) for c in chunks]
        avg_tokens = sum(chunk_tokens) // len(chunk_tokens) if chunk_tokens else 0
        await rec.record_stage_complete(
            "document_chunked",
            summary=f"分块完成: {len(chunks)} 个 chunk，平均 {avg_tokens} tokens",
            details={
                "chunk_count": len(chunks),
                "avg_tokens": avg_tokens,
                "max_tokens": max(chunk_tokens) if chunk_tokens else 0,
                "min_tokens": min(chunk_tokens) if chunk_tokens else 0,
            },
        )

        logger.info("ingest_parsed_and_chunked", doc_id=doc_id,
                     chars=len(full_text), pages=page_count, chunks=len(chunks))

        # Step 3.5+3.6: Document analysis (metadata extraction + summary generation)
        # 统一文档分析：合并元数据提取和摘要生成为单次 LLM 调用
        _auto_fields = [
            "title", "doc_number", "issuing_org", "doc_type",
            "document_scene_type", "publish_date", "signer", "subject_words",
        ]
        summary = ""

        # Unified path: single LLM call for both metadata + summary
        if auto_extract and settings.summary_enabled and self._document_analyzer is not None:
            await rec.record_stage_start("document_analyzed", service="llm")
            try:
                extracted_meta, summary, prompt_info = await self._document_analyzer.analyze(full_text)
                filled_fields = []
                for fld in _auto_fields:
                    if not metadata.get(fld) and extracted_meta.get(fld):
                        metadata[fld] = extracted_meta[fld]
                        filled_fields.append(fld)
                await rec.record_stage_complete(
                    "document_analyzed",
                    summary=f"文档分析完成，填充 {len(filled_fields)} 字段，摘要 {len(summary)} 字",
                    details={
                        "filled_fields": filled_fields,
                        "extracted_title": extracted_meta.get("title", "")[:50],
                        "extracted_doc_number": extracted_meta.get("doc_number", ""),
                        "summary_length": len(summary),
                        "summary_preview": summary[:100],
                        "input_chars": min(len(full_text), settings.analysis_max_content_chars),
                        "prompt_template": prompt_info,
                    },
                    service="llm",
                )
            except Exception as exc:
                await rec.record_stage_failed(
                    "document_analyzed",
                    summary=f"文档分析失败: {exc}",
                    error_code="INGEST_ANALYSIS_FAILED",
                    error_message=str(exc),
                    service="llm",
                )
                # Non-blocking: continue without extracted metadata/summary
        else:
            # Fallback: separate LLM calls when only one feature is active
            if auto_extract and self._metadata_extractor is not None:
                await rec.record_stage_start("metadata_extracted", service="llm")
                try:
                    extracted_meta = await self._metadata_extractor.extract(full_text)
                    filled_fields = []
                    for fld in _auto_fields:
                        if not metadata.get(fld) and extracted_meta.get(fld):
                            metadata[fld] = extracted_meta[fld]
                            filled_fields.append(fld)
                    await rec.record_stage_complete(
                        "metadata_extracted",
                        summary=f"LLM 元数据提取完成，填充 {len(filled_fields)} 个字段",
                        details={
                            "filled_fields": filled_fields,
                            "extracted_title": extracted_meta.get("title", "")[:50],
                            "extracted_doc_number": extracted_meta.get("doc_number", ""),
                            "prompt_template": {
                                "system_prompt": META_SYSTEM_PROMPT,
                                "user_prompt_template": META_USER_PROMPT_TEMPLATE,
                            },
                        },
                        service="llm",
                    )
                except Exception as meta_exc:
                    await rec.record_stage_failed(
                        "metadata_extracted",
                        summary=f"元数据提取失败: {meta_exc}",
                        error_code="INGEST_METADATA_EXTRACT_FAILED",
                        error_message=str(meta_exc),
                        service="llm",
                    )

            if settings.summary_enabled and self._summary_generator is not None:
                await rec.record_stage_start("summary_generated", service="llm")
                try:
                    summary = await self._summary_generator.generate(full_text, metadata)
                    await rec.record_stage_complete(
                        "summary_generated",
                        summary=f"摘要生成完成 ({len(summary)} 字)",
                        details={
                            "summary_length": len(summary),
                            "preview": summary[:100],
                            "prompt_template": {
                                "system_prompt": SUM_SYSTEM_PROMPT,
                                "user_prompt_template": SUM_USER_PROMPT_TEMPLATE.template,
                            },
                        },
                        service="llm",
                    )
                except Exception as sum_exc:
                    await rec.record_stage_failed(
                        "summary_generated",
                        summary=f"摘要生成失败: {sum_exc}",
                        error_code="INGEST_SUMMARY_FAILED",
                        error_message=str(sum_exc),
                        service="llm",
                    )

        await self._run_service_guide_stage(
            doc_id=doc_id,
            content_hash=content_hash,
            metadata=metadata,
            full_text=full_text,
            rec=rec,
            patch_chunks=False,
        )
        self._refresh_chunk_metadata(chunks, metadata, file_type)

        # Step 4: Generate embeddings
        await rec.record_stage_start("embedding_generated", service="opensearch")
        chunk_texts = [c.content for c in chunks]
        vectors = await self._embedding.embed_chunks(chunk_texts)
        await rec.record_stage_complete(
            "embedding_generated",
            summary=f"向量化完成: {len(vectors)} 个向量",
            details={
                "vector_count": len(vectors),
                "chunk_count": len(chunks),
                "model": settings.embedding_model,
            },
            service="opensearch",
        )

        logger.info("ingest_embedded", doc_id=doc_id, vectors=len(vectors))

        # Step 5: Build ES documents and bulk index
        await rec.record_stage_start("chunks_indexed", service="opensearch")
        now_iso = datetime.now(timezone.utc).isoformat()
        es_docs = []
        for chunk, vector in zip(chunks, vectors):
            doc = chunk.to_dict()
            doc["content_vector"] = vector
            doc["created_at"] = now_iso
            doc["updated_at"] = now_iso
            es_docs.append(doc)

        success_count, errors = await self._es.bulk_index_chunks(es_docs)

        if errors:
            logger.error(
                "ingest_es_errors",
                doc_id=doc_id,
                errors=len(errors),
                sample=errors[:2],
            )
            await rec.record_stage_complete(
                "chunks_indexed",
                summary=f"索引部分失败: 成功 {success_count}, 失败 {len(errors)}",
                details={
                    "success_count": success_count,
                    "error_count": len(errors),
                    "error_sample": [str(e)[:200] for e in errors[:3]],
                },
                service="opensearch",
            )
        else:
            await rec.record_stage_complete(
                "chunks_indexed",
                summary=f"索引完成: {success_count} 个 chunk",
                details={"success_count": success_count, "error_count": 0},
                service="opensearch",
            )

        # Step 6: Write document metadata
        await rec.record_stage_start("doc_meta_written")
        ext = content_file_path.suffix.lstrip(".")
        await self._write_doc_meta(
            doc_id=doc_id,
            metadata=metadata,
            content_hash=content_hash,
            chunk_count=len(chunks),
            page_count=page_count,
            file_path=content_file_path.name,
            file_type=file_type,
            summary=summary,
        )
        await rec.record_stage_complete(
            "doc_meta_written",
            summary="文档元数据写入完成",
            details={
                "chunk_count": len(chunks),
                "page_count": page_count,
                "title": metadata.get("title", ""),
                "doc_number": metadata.get("doc_number", ""),
                "issuing_org": metadata.get("issuing_org", ""),
                "doc_type": metadata.get("doc_type", ""),
                "acl_ids": metadata.get("acl_ids", []),
                "knowledge_category": metadata.get("knowledge_category", ""),
            },
        )

        # Step 6.5: Recompute chunk ACL (merge ACL from all doc metas sharing this content_hash)
        await rec.record_stage_start("chunk_acl_recomputed")
        await self._es.recompute_chunk_acl(content_hash)
        await rec.record_stage_complete(
            "chunk_acl_recomputed",
            summary="ACL 权限重新计算完成",
            details={"content_hash": content_hash[:12], "acl_ids": metadata.get("acl_ids", [])},
        )

        # Step 7: Build knowledge graph (optional, non-blocking failure)
        graph_result: dict[str, Any] | None = None
        if self._graph_builder is not None and settings.graph_build_enabled:
            await rec.record_stage_start("graph_extract_llm", service="llm")
            try:
                # Resolve scene_type for scene-based extraction:
                # priority: document_scene_type > knowledge_category_code > ""
                scene_type = (
                    metadata.get("document_scene_type")
                    or metadata.get("knowledge_category_code")
                    or ""
                )
                graph_result = await self._graph_builder.build_graph(
                    doc_id=doc_id,
                    metadata=metadata,
                    content=full_text,
                    scene_type=scene_type,
                )
                entity_count = graph_result.get("entity_count", 0)
                relation_count = graph_result.get("relation_count", 0)
                ref_count = graph_result.get("referenced_doc_count", 0)

                if graph_result.get("status") == "completed":
                    await rec.record_stage_complete(
                        "graph_extract_llm",
                        summary=f"图谱提取完成: {entity_count} 实体, {relation_count} 关系, {ref_count} 引用",
                        details=graph_result,
                        service="llm",
                    )
                    # 使用配对的 start/complete 确保时间线显示完整的 graph_written 阶段
                    # Use paired start/complete so the timeline shows a complete graph_written stage
                    await rec.record_stage_start("graph_written", service="neo4j")
                    await rec.record_stage_complete(
                        "graph_written",
                        summary="图谱写入 Neo4j 完成",
                        details=graph_result,
                        service="neo4j",
                    )
                    if metadata.get("service_guide_status") == "completed":
                        graph_result["linked_matter_ids"] = await self._sync_service_guide_matter_bindings(
                            guide_doc_id=doc_id,
                        )
                else:
                    await rec.record_stage_failed(
                        "graph_extract_llm",
                        summary=f"图谱提取失败: {graph_result.get('error', '')}",
                        error_code="INGEST_GRAPH_LLM_FAILED",
                        error_message=graph_result.get("error", ""),
                        service="llm",
                    )
            except Exception as graph_exc:
                logger.warning(
                    "ingest_graph_skipped",
                    doc_id=doc_id,
                    error=str(graph_exc),
                )
                graph_result = {"status": "skipped", "error": str(graph_exc)}
                await rec.record_stage_failed(
                    "graph_extract_llm",
                    summary=f"图谱构建异常: {graph_exc}",
                    error_code="INGEST_GRAPH_WRITE_FAILED",
                    error_message=str(graph_exc),
                    service="neo4j",
                )

        # Clean up converted temp file
        # 临时文件清理失败不影响主流程，但记录日志以便排查
        if needs_conversion(file_type) and processing_path != file_path:
            try:
                processing_path.unlink(missing_ok=True)
            except Exception:
                logger.debug("Failed to clean temp file", exc_info=True)

        elapsed = (datetime.now(timezone.utc) - start_time).total_seconds()

        # Record ingest_completed
        await rec.record(
            "ingest_completed", "completed",
            summary=f"入库完成，共 {len(chunks)} chunk，耗时 {elapsed:.1f}s",
            details={
                "chunk_count": len(chunks),
                "page_count": page_count,
                "es_indexed": success_count,
                "graph_status": graph_result.get("status") if graph_result else "skipped",
            },
        )

        # Determine final status: partial_failed if any non-blocking stage failed
        final_status = "completed"
        if rec.has_stage_failure:
            final_status = "partial_failed"

        result: dict[str, Any] = {
            "doc_id": doc_id,
            "status": final_status,
            "deduplicated": False,
            "content_hash": content_hash,
            "chunk_count": len(chunks),
            "page_count": page_count,
            "file_type": file_type,
            "es_indexed": success_count,
            "es_errors": len(errors),
            "service_guide_status": metadata.get("service_guide_status", ""),
            "duration_seconds": round(elapsed, 2),
        }
        if graph_result:
            result["graph"] = graph_result

        logger.info("ingest_complete", **{k: v for k, v in result.items() if k != "graph"})
        return result

    async def _convert_document(
        self,
        file_path: Path,
        file_type: str,
        *,
        original_filename: str | None = None,
        doc_id: str = "",
        task_id: str = "",
    ) -> Path:
        """Call Java converter service to convert doc/xls/ppt/wps/et/ofd."""
        target_format = get_conversion_target(file_type)
        if not target_format:
            raise ValueError(f"No conversion target for format: {file_type}")

        upload_name = original_filename or file_path.name
        file_size = file_path.stat().st_size if file_path.exists() else None

        url = f"{settings.converter_base_url}/api/convert"
        logger.info("converter_calling", url=url, file_type=file_type, target=target_format)

        error_message: str | None = None
        success = False
        duration_ms: int | None = None
        output_path_str: str | None = None

        try:
            with open(file_path, "rb") as f:
                response = await self._http_client.post(
                    url,
                    files={"file": (upload_name, f)},
                    data={"targetFormat": target_format},
                )

            if response.status_code != 200:
                error_message = f"Converter service returned {response.status_code}: {response.text}"
                raise RuntimeError(error_message)

            result = response.json()
            duration_ms = result.get("durationMs")

            if not result.get("success"):
                error_message = f"Conversion failed: {result.get('error', 'unknown error')}"
                raise RuntimeError(error_message)

            output_path = Path(result["outputPath"])
            if not output_path.exists():
                error_message = f"Converted file not found: {output_path}"
                raise FileNotFoundError(error_message)

            output_path_str = str(output_path)
            success = True
            logger.info("converter_success", output=output_path_str, duration_ms=duration_ms)
            return output_path

        except Exception:
            # Re-raise after logging; the finally block records to MySQL
            raise

        finally:
            await self._record_convert_log(
                doc_id=doc_id,
                task_id=task_id,
                source_format=file_type,
                target_format=target_format,
                source_path=str(file_path),
                output_path=output_path_str,
                success=success,
                duration_ms=duration_ms,
                file_size=file_size,
                error_message=error_message,
            )

    async def _record_convert_log(
        self,
        *,
        doc_id: str,
        task_id: str,
        source_format: str,
        target_format: str,
        source_path: str | None,
        output_path: str | None,
        success: bool,
        duration_ms: int | None,
        file_size: int | None,
        error_message: str | None,
    ) -> None:
        """Record a document conversion log entry to MySQL (best-effort)."""
        if self._mysql is None:
            return
        try:
            await self._mysql.execute(
                """
                INSERT INTO convert_log
                    (doc_id, task_id, source_format, target_format,
                     source_path, output_path, success, duration_ms,
                     file_size, error_message)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (
                    doc_id, task_id, source_format, target_format,
                    source_path, output_path, int(success), duration_ms,
                    file_size, error_message,
                ),
            )
        except Exception as exc:
            logger.warning("convert_log_write_failed", error=str(exc))

    # 文本文件大小上限（50MB），超过此限制将拒绝处理以防止内存溢出
    # Max text file size (50MB); files exceeding this limit are rejected to prevent OOM
    _MAX_TEXT_FILE_SIZE = 50 * 1024 * 1024

    def _read_text_file(self, file_path: Path) -> str:
        """Read a text file with encoding detection."""
        # 检查文件大小，防止超大文件耗尽内存
        # Check file size to prevent memory exhaustion from oversized files
        file_size = file_path.stat().st_size
        if file_size > self._MAX_TEXT_FILE_SIZE:
            raise ValueError(
                f"Text file too large ({file_size / 1024 / 1024:.1f}MB > "
                f"{self._MAX_TEXT_FILE_SIZE / 1024 / 1024:.0f}MB limit): {file_path.name}"
            )
        raw = file_path.read_bytes()

        # Try UTF-8 first
        try:
            return raw.decode("utf-8")
        except UnicodeDecodeError:
            pass

        # Use charset-normalizer for detection
        try:
            from charset_normalizer import from_bytes
            result = from_bytes(raw).best()
            if result:
                return str(result)
        except ImportError:
            pass

        # Last resort: GBK (common for Chinese documents)
        return raw.decode("gbk", errors="replace")

    def _build_chunk_metadata(self, metadata: dict[str, Any]) -> dict[str, Any]:
        """Extract the fields that should be copied to each chunk."""
        return {
            "title": metadata.get("title", ""),
            "doc_number": metadata.get("doc_number", ""),
            "issuing_org": metadata.get("issuing_org", ""),
            "doc_type": metadata.get("doc_type", ""),
            "document_scene_type": metadata.get("document_scene_type", ""),
            "subject_words": metadata.get("subject_words", []),
            "signer": metadata.get("signer", ""),
            "publish_date": metadata.get("publish_date") or None,
            "acl_ids": metadata.get("acl_ids", []),
            "knowledge_category": metadata.get("knowledge_category", ""),
            "knowledge_category_code": metadata.get("knowledge_category_code", ""),
            "source_system": metadata.get("source_system", ""),
            "source_article_id": metadata.get("source_article_id", ""),
            "source_attachment_id": metadata.get("source_attachment_id", ""),
            "source_site_code": metadata.get("source_site_code", ""),
            "source_target_code": metadata.get("source_target_code", ""),
            "source_url": metadata.get("source_url", ""),
        }

    def _refresh_chunk_metadata(
        self,
        chunks: list[Any],
        metadata: dict[str, Any],
        file_type: str,
    ) -> None:
        """Refresh chunk metadata after auto-analysis mutates document metadata."""
        chunk_metadata = self._build_chunk_metadata(metadata)
        chunk_metadata["file_type"] = file_type
        for chunk in chunks:
            if hasattr(chunk, "metadata") and isinstance(chunk.metadata, dict):
                chunk.metadata.update(chunk_metadata)

    async def _preserve_existing_service_guide_metadata(
        self,
        doc_id: str,
        metadata: dict[str, Any],
    ) -> bool:
        """Preserve the last successful guide summary fields for a doc when re-extraction degrades."""
        existing_guide = await self._es.find_service_guide_by_doc_id(
            doc_id,
            source_fields=["profile_id", "matter_name", "scene_type"],
        )
        if not existing_guide:
            return False

        if existing_guide.get("profile_id"):
            metadata["guide_profile_id"] = existing_guide["profile_id"]
        if existing_guide.get("matter_name"):
            metadata["guide_matter_name"] = existing_guide["matter_name"]
        if not metadata.get("document_scene_type") and existing_guide.get("scene_type"):
            metadata["document_scene_type"] = existing_guide["scene_type"]
        return True

    async def _sync_service_guide_matter_bindings(
        self,
        *,
        guide_doc_id: str,
        source_doc_id: str | None = None,
    ) -> list[str]:
        """Backfill linked_matter_ids onto the guide profile after graph resolution."""
        if not guide_doc_id or self._graph_query is None:
            return []

        binding_source_doc_id = (source_doc_id or guide_doc_id).strip()
        if not binding_source_doc_id:
            return []

        try:
            governed_matters = await self._graph_query.get_governed_matters(binding_source_doc_id)
            matter_ids = list(
                dict.fromkeys(
                    matter.get("matter_id", "")
                    for matter in governed_matters
                    if matter.get("matter_id")
                )
            )
            await self._es.bind_service_guide_matters(guide_doc_id, matter_ids)
            logger.info(
                "service_guide_matter_bindings_synced",
                guide_doc_id=guide_doc_id,
                source_doc_id=binding_source_doc_id,
                matter_count=len(matter_ids),
            )
            return matter_ids
        except Exception as exc:
            logger.warning(
                "service_guide_matter_binding_failed",
                guide_doc_id=guide_doc_id,
                source_doc_id=binding_source_doc_id,
                error=str(exc),
            )
            return []

    async def _run_service_guide_stage(
        self,
        *,
        doc_id: str,
        content_hash: str,
        metadata: dict[str, Any],
        full_text: str,
        rec: IngestTraceRecorder | _NoOpRecorder,
        patch_chunks: bool,
    ) -> None:
        """Extract and index a service guide profile when the content matches the scene."""
        if self._service_guide_extractor is None:
            return

        await rec.record_stage_start("service_guide_extracted")
        try:
            from app.core.service_guide_extractor import ServiceGuideExtractionInput

            payload = ServiceGuideExtractionInput(
                doc_id=doc_id,
                content_hash=content_hash,
                title=metadata.get("title", ""),
                doc_type=metadata.get("doc_type", ""),
                knowledge_category=metadata.get("knowledge_category", ""),
                source_url=metadata.get("source_url", ""),
                acl_ids=list(metadata.get("acl_ids", []) or []),
                metadata=metadata,
                plain_text=full_text,
                markdown_text=full_text,
            )
            extracted = await self._service_guide_extractor.extract(payload)

            if extracted.detected and extracted.profile:
                profile_doc = extracted.profile.model_dump(mode="json")
                metadata["document_scene_type"] = extracted.scene_type
                metadata["service_guide_status"] = "completed"
                metadata["guide_profile_id"] = extracted.profile.profile_id
                metadata["guide_matter_name"] = extracted.profile.matter_name
                await self._es.delete_service_guides_by_doc_id(doc_id)
                await self._es.raw.index(
                    index=settings.es_service_guide_index,
                    id=extracted.profile.profile_id,
                    body=profile_doc,
                    refresh=True,
                )
                if patch_chunks and metadata.get("document_scene_type"):
                    await self._es.patch_chunks_by_content_hash(
                        content_hash,
                        {"document_scene_type": metadata.get("document_scene_type", "")},
                    )
                await rec.record_stage_complete(
                    "service_guide_extracted",
                    summary=(
                        f"办事指南抽取完成: {metadata.get('guide_matter_name', '') or metadata.get('title', '')}"
                    ),
                    details={
                        "profile_id": metadata.get("guide_profile_id", ""),
                        "matter_name": metadata.get("guide_matter_name", ""),
                        "materials": len(extracted.profile.materials),
                        "fees": len(extracted.profile.fees),
                        "windows": len(extracted.profile.service_windows),
                        "confidence": extracted.detection_confidence,
                        "warnings": extracted.warnings,
                    },
                )
                return

            if extracted.detected:
                metadata["service_guide_status"] = "failed"
                preserved_existing = await self._preserve_existing_service_guide_metadata(doc_id, metadata)
                metadata.setdefault("guide_profile_id", "")
                metadata.setdefault("guide_matter_name", "")
                if not metadata.get("document_scene_type") and extracted.scene_type and extracted.scene_type != "other":
                    metadata["document_scene_type"] = extracted.scene_type
                if patch_chunks and metadata.get("document_scene_type"):
                    await self._es.patch_chunks_by_content_hash(
                        content_hash,
                        {"document_scene_type": metadata.get("document_scene_type", "")},
                    )
                await rec.record_stage_failed(
                    "service_guide_extracted",
                    summary="识别到标准办事指南，但结构化抽取未成功生成 profile",
                    details={
                        "scene_type": extracted.scene_type,
                        "confidence": extracted.detection_confidence,
                        "reasons": extracted.detection_reasons,
                        "warnings": extracted.warnings,
                        "preserved_existing_guide": preserved_existing,
                        "profile_id": metadata.get("guide_profile_id", ""),
                    },
                    error_code="INGEST_SERVICE_GUIDE_PROFILE_UNAVAILABLE",
                    error_message="service guide detected but profile unavailable",
                )
                return

            metadata["service_guide_status"] = "skipped"
            metadata["guide_profile_id"] = ""
            metadata["guide_matter_name"] = ""
            await self._es.delete_service_guides_by_doc_id(doc_id)
            if patch_chunks and metadata.get("document_scene_type"):
                await self._es.patch_chunks_by_content_hash(
                    content_hash,
                    {"document_scene_type": metadata.get("document_scene_type", "")},
                )
            await rec.record_stage_complete(
                "service_guide_extracted",
                summary="未识别为标准办事指南，跳过结构化抽取",
                details={
                    "scene_type": extracted.scene_type,
                    "confidence": extracted.detection_confidence,
                    "reasons": extracted.detection_reasons,
                },
            )
        except Exception as exc:
            metadata["service_guide_status"] = "failed"
            metadata["guide_profile_id"] = ""
            metadata["guide_matter_name"] = ""
            await rec.record_stage_failed(
                "service_guide_extracted",
                summary=f"办事指南抽取失败: {exc}",
                error_code="INGEST_SERVICE_GUIDE_FAILED",
                error_message=str(exc),
            )

    async def _write_doc_meta(
        self,
        doc_id: str,
        metadata: dict[str, Any],
        content_hash: str,
        chunk_count: int,
        page_count: int,
        file_path: str,
        file_type: str = "",
        summary: str = "",
    ) -> None:
        """Write / update the document-level metadata index."""
        now_iso = datetime.now(timezone.utc).isoformat()
        original_filename = metadata.get("original_filename", "")
        # Use original filename (without extension) as title fallback
        title = metadata.get("title", "")
        if not title and original_filename:
            title = Path(original_filename).stem
        body: dict[str, Any] = {
            "doc_id": doc_id,
            "content_hash": content_hash,
            "title": title,
            "original_filename": original_filename,
            "doc_number": metadata.get("doc_number", ""),
            "issuing_org": metadata.get("issuing_org", ""),
            "doc_type": metadata.get("doc_type", ""),
            "document_scene_type": metadata.get("document_scene_type", ""),
            "subject_words": metadata.get("subject_words", []),
            "signer": metadata.get("signer", ""),
            "publish_date": metadata.get("publish_date") or None,
            "acl_ids": metadata.get("acl_ids", []),
            "knowledge_category": metadata.get("knowledge_category", ""),
            "knowledge_category_code": metadata.get("knowledge_category_code", ""),
            "source_system": metadata.get("source_system", ""),
            "source_article_id": metadata.get("source_article_id", ""),
            "source_attachment_id": metadata.get("source_attachment_id", ""),
            "source_site_code": metadata.get("source_site_code", ""),
            "source_target_code": metadata.get("source_target_code", ""),
            "source_url": metadata.get("source_url", ""),
            "source_metadata": metadata.get("source_metadata", {}),
            "service_guide_status": metadata.get("service_guide_status", ""),
            "guide_profile_id": metadata.get("guide_profile_id", ""),
            "guide_matter_name": metadata.get("guide_matter_name", ""),
            "related_docs": metadata.get("related_docs", []),
            "summary": summary or None,
            "chunk_count": chunk_count,
            "page_count": page_count,
            "file_path": file_path,
            "file_type": file_type,
            "status": "completed",
            "created_at": now_iso,
            "updated_at": now_iso,
        }

        await self._es.raw.index(
            index=settings.es_meta_index,
            id=doc_id,
            body=body,
        )
        logger.info("doc_meta_written", doc_id=doc_id, content_hash=content_hash[:12],
                     file_type=file_type)

    async def _get_text_from_chunks(self, content_hash: str) -> str:
        """Reconstruct full text from indexed chunks (for dedup summary)."""
        try:
            resp = await self._es.raw.search(
                index=settings.es_chunk_index,
                body={
                    "query": {"term": {"content_hash": content_hash}},
                    "sort": [{"chunk_index": {"order": "asc"}}],
                    "size": 2000,
                    "_source": ["content"],
                },
            )
            raw = resp if isinstance(resp, dict) else resp.body
            hits = raw.get("hits", {}).get("hits", [])
            return "\n\n".join(h["_source"]["content"] for h in hits)
        except Exception:
            # 获取已索引分块文本失败，返回空字符串不阻塞调用方
            logger.warning("Failed to get text from chunks", exc_info=True)
            return ""


# ── Factory helper for Celery tasks ──────────────────────────────────────────


def create_pipeline(
    *,
    with_graph: bool = True,
    redis_client: Any = None,
    mysql_client: Any = None,
) -> IngestPipeline:
    """工厂方法：为 Celery worker 创建一个带有完整客户端的入库管道实例。

    Create an IngestPipeline with fresh clients (for use in Celery workers).

    Args:
        with_graph: 是否初始化图谱构建器。
        redis_client: 可选的 RedisClient，注入到 ESClient 用于 ACL 重算的分布式锁。
                      Optional RedisClient injected into ESClient for ACL recompute locking.
        mysql_client: 可选的 MySQLClient，用于记录文档转换日志。
                      Optional MySQLClient for conversion log recording.
    """
    from opensearchpy import AsyncOpenSearch

    # 使用配置的超时值，避免硬编码 / Use configured timeout instead of hardcoded value
    es_kwargs: dict[str, Any] = {"hosts": [settings.es_host], "timeout": settings.es_request_timeout}
    if settings.es_username:
        es_kwargs["http_auth"] = (settings.es_username, settings.es_password)

    # 注入 RedisClient 到 ESClient，用于 ACL 重算时的分布式锁
    # Inject RedisClient into ESClient for distributed locking on ACL recompute
    es_client = ESClient(AsyncOpenSearch(**es_kwargs), redis_client=redis_client)
    embedding_client = EmbeddingClient()
    embedding_service = EmbeddingService(
        embedding_client,
        batch_size=settings.embedding_batch_size,
    )

    from app.core.document_analyzer import DocumentAnalyzer
    from app.core.metadata_extractor import MetadataExtractor
    from app.core.service_guide_extractor import ServiceGuideExtractor
    from app.core.summary_generator import SummaryGenerator
    from app.infrastructure.llm_client import LLMClient

    llm_client = LLMClient()
    # 统一文档分析器：合并元数据提取 + 摘要生成为单次 LLM 调用
    # Unified analyzer: single LLM call for metadata extraction + summary generation
    document_analyzer = DocumentAnalyzer(llm_client)
    # 保留独立的提取器/生成器作为回退（仅启用其中一项功能时使用）
    # Keep standalone extractor/generator as fallback when only one feature is active
    metadata_extractor = MetadataExtractor(llm_client)
    summary_generator = SummaryGenerator(llm_client) if settings.summary_enabled else None
    service_guide_extractor = ServiceGuideExtractor()

    graph_builder = None
    graph_query_service = None
    if with_graph and settings.graph_build_enabled:
        from app.core.graph_builder import GraphBuilder
        from app.core.graph_query_service import GraphQueryService
        from app.infrastructure.neo4j_client import Neo4jClient

        neo4j_client = Neo4jClient.from_settings()
        graph_builder = GraphBuilder(
            llm_client,
            neo4j_client,
            max_content_chars=settings.graph_max_content_chars,
        )
        graph_query_service = GraphQueryService(neo4j_client)

    docling_processor = DoclingProcessor()

    return IngestPipeline(
        es_client=es_client,
        embedding_service=embedding_service,
        docling_processor=docling_processor,
        metadata_extractor=metadata_extractor,
        summary_generator=summary_generator,
        document_analyzer=document_analyzer,
        service_guide_extractor=service_guide_extractor,
        graph_builder=graph_builder,
        graph_query_service=graph_query_service,
        mysql_client=mysql_client,
    )
