"""OpenSearch client wrapper for gov-doc chunk indexing and hybrid search.

Migrated from Elasticsearch 8.19 to OpenSearch 2.19 for permanent free RRF support.

OpenSearch 客户端封装模块。
负责文档 chunk 的索引管理、混合检索（BM25 + kNN 向量融合）、
文档元数据 CRUD、入库追踪记录查询，以及基于 content_hash 的
内容去重与 ACL 权限重算。
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from opensearchpy import AsyncOpenSearch
from opensearchpy.helpers import async_bulk

from app.config import settings
from app.utils.logger import get_logger

if TYPE_CHECKING:
    from app.infrastructure.redis_client import RedisClient

logger = get_logger(__name__)

# ── Index settings & mappings ────────────────────────────────────────────────

_IK_ANALYZER_SETTINGS: dict[str, Any] = {
    "analysis": {
        "analyzer": {
            # Index-time: plain IK (no synonyms)
            "ik_max_index": {
                "type": "custom",
                "tokenizer": "ik_max_word",
                "filter": ["lowercase"],
            },
            # Search-time: IK smart + synonym expansion
            "ik_smart_synonym": {
                "type": "custom",
                "tokenizer": "ik_smart",
                "filter": ["lowercase", "gov_synonym"],
            },
        },
        "filter": {
            "gov_synonym": {
                "type": "synonym",
                "synonyms": [
                    "发改委,国家发展和改革委员会,发展改革委",
                    "工信部,工业和信息化部",
                    "财政部,财政厅",
                    "住建部,住房和城乡建设部",
                    "环保,生态环境,环境保护",
                ],
                # Note: "updateable" removed — not supported in OpenSearch
            },
        },
    },
}

GOV_DOC_CHUNKS_MAPPING: dict[str, Any] = {
    "settings": {
        "number_of_shards": 5,
        "number_of_replicas": settings.es_number_of_replicas,  # 副本数可配置，生产环境建议 >= 1
        "refresh_interval": "5s",
        "index.knn": True,  # Required for OpenSearch knn_vector fields
        **_IK_ANALYZER_SETTINGS,
    },
    "mappings": {
        "properties": {
            # ── Identity ──
            "chunk_id": {"type": "keyword"},
            "content_hash": {"type": "keyword"},
            "doc_ids": {"type": "keyword"},
            "chunk_index": {"type": "integer"},
            # ── Content ──
            "content": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
            },
            "content_vector": {
                "type": "knn_vector",
                "dimension": 1024,
                "method": {
                    "name": "hnsw",
                    "space_type": "cosinesimil",
                    "engine": "lucene",
                    "parameters": {
                        "m": 16,
                        "ef_construction": 256,
                    },
                },
            },
            # ── ACL (unified prefixed IDs) ──
            "acl_ids": {"type": "keyword"},
            # ── Denormalized metadata (from representative document) ──
            "title": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
                "fields": {
                    "keyword": {
                        "type": "keyword",
                        "ignore_above": 512,
                    },
                },
            },
            "doc_number": {"type": "keyword"},
            "issuing_org": {"type": "keyword"},
            "doc_type": {"type": "keyword"},
            "subject_words": {"type": "keyword"},
            "signer": {"type": "keyword"},
            "publish_date": {
                "type": "date",
                "format": "yyyy-MM-dd||yyyy/MM/dd||epoch_millis",
            },
            # ── Knowledge category ──
            "knowledge_category": {"type": "keyword"},
            "knowledge_category_code": {"type": "keyword"},
            "document_scene_type": {"type": "keyword"},
            # ── External source provenance ──
            "source_system": {"type": "keyword"},
            "source_article_id": {"type": "keyword"},
            "source_attachment_id": {"type": "keyword"},
            "source_site_code": {"type": "keyword"},
            "source_target_code": {"type": "keyword"},
            "source_url": {"type": "keyword", "index": False},
            # ── Structure metadata (multi-format support) ──
            "page_number": {"type": "integer"},
            "page_numbers": {"type": "integer"},
            "heading_hierarchy": {"type": "keyword"},
            "element_type": {"type": "keyword"},
            "file_type": {"type": "keyword"},
            "created_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "updated_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
        },
    },
}

GOV_DOC_META_MAPPING: dict[str, Any] = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": settings.es_number_of_replicas,  # 副本数可配置，生产环境建议 >= 1
        **_IK_ANALYZER_SETTINGS,
    },
    "mappings": {
        "properties": {
            "doc_id": {"type": "keyword"},
            "content_hash": {"type": "keyword"},
            "title": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "doc_number": {"type": "keyword"},
            "issuing_org": {"type": "keyword"},
            "doc_type": {"type": "keyword"},
            "subject_words": {"type": "keyword"},
            "signer": {"type": "keyword"},
            "publish_date": {
                "type": "date",
                "format": "yyyy-MM-dd||yyyy/MM/dd||epoch_millis",
            },
            "acl_ids": {"type": "keyword"},
            "knowledge_category": {"type": "keyword"},
            "knowledge_category_code": {"type": "keyword"},
            "document_scene_type": {"type": "keyword"},
            "source_system": {"type": "keyword"},
            "source_article_id": {"type": "keyword"},
            "source_attachment_id": {"type": "keyword"},
            "source_site_code": {"type": "keyword"},
            "source_target_code": {"type": "keyword"},
            "source_url": {"type": "keyword", "index": False},
            "source_metadata": {"type": "object", "enabled": False},
            "service_guide_status": {"type": "keyword"},
            "guide_profile_id": {"type": "keyword"},
            "guide_matter_name": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "summary": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
            },
            "chunk_count": {"type": "integer"},
            "page_count": {"type": "integer"},
            "file_path": {"type": "keyword", "index": False},
            "file_type": {"type": "keyword"},
            "status": {"type": "keyword"},
            "task_id": {"type": "keyword"},
            "error": {"type": "text", "index": False},
            "created_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "updated_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "related_docs": {
                "type": "nested",
                "properties": {
                    "doc_id": {"type": "keyword"},
                    "title": {
                        "type": "text",
                        "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
                    },
                    "relation_type": {"type": "keyword"},
                },
            },
        },
    },
}

GOV_SERVICE_GUIDES_MAPPING: dict[str, Any] = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": settings.es_number_of_replicas,
        "refresh_interval": "5s",
        **_IK_ANALYZER_SETTINGS,
    },
    "mappings": {
        "dynamic": "strict",
        "properties": {
            "schema_version": {"type": "keyword"},
            "profile_id": {"type": "keyword"},
            "doc_id": {"type": "keyword"},
            "content_hash": {"type": "keyword"},
            "acl_ids": {"type": "keyword"},
            "scene_type": {"type": "keyword"},
            "source": {"type": "keyword"},
            "source_url": {"type": "keyword", "index": False},
            "is_current": {"type": "boolean"},
            "guide_version": {"type": "keyword"},
            "extractor_version": {"type": "keyword"},
            "extracted_at": {"type": "date", "format": "strict_date_optional_time||epoch_millis"},
            "updated_at": {"type": "date", "format": "strict_date_optional_time||epoch_millis"},
            "matter_name": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "colloquial_names": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "matter_type": {"type": "keyword"},
            "implementation_code": {"type": "keyword"},
            "basic_code": {"type": "keyword"},
            "business_item_code": {"type": "keyword"},
            "matter_version": {"type": "keyword"},
            "implementing_subject": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "implementing_subject_nature": {"type": "keyword"},
            "delegated_department": {
                "type": "text",
                "analyzer": "ik_max_index",
                "search_analyzer": "ik_smart_synonym",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "service_objects": {"type": "keyword"},
            "service_modes": {"type": "keyword"},
            "online_depth": {"type": "keyword"},
            "hall_required": {"type": "boolean"},
            "express_supported": {"type": "boolean"},
            "reservation_supported": {"type": "boolean"},
            "must_onsite": {"type": "boolean"},
            "must_onsite_reason": {"type": "text", "analyzer": "ik_max_index", "search_analyzer": "ik_smart_synonym"},
            "visit_count_to_hall": {"type": "integer"},
            "promised_time_limit_days": {"type": "integer"},
            "legal_time_limit_days": {"type": "integer"},
            "handled_org_names": {"type": "keyword"},
            "region_names": {"type": "keyword"},
            "linked_matter_ids": {"type": "keyword"},
            "material_names": {"type": "keyword"},
            "fee_names": {"type": "keyword"},
            "window_names": {"type": "keyword"},
            "legal_basis_names": {"type": "keyword"},
            "cross_region_scope": {"type": "keyword"},
            "cross_region_summary": {"type": "keyword"},
            "guide_search_text": {"type": "text", "analyzer": "ik_max_index", "search_analyzer": "ik_smart_synonym"},
            "needs_review": {"type": "boolean"},
            "completeness_score": {"type": "float"},
            "confidence_score": {"type": "float"},
            "document_info": {"type": "object", "enabled": False},
            "matter_identity": {"type": "object", "enabled": False},
            "basic_info": {"type": "object", "enabled": False},
            "cross_region_service": {
                "type": "nested",
                "properties": {
                    "service_scope_type": {"type": "keyword"},
                    "regions_summary": {"type": "keyword"},
                    "regions_detail": {"type": "keyword", "index": False},
                    "regions_truncated": {"type": "boolean"},
                    "service_modes": {"type": "keyword"},
                    "notes": {"type": "text", "index": False},
                    "raw_text": {"type": "text", "index": False},
                },
            },
            "review_info": {"type": "object", "enabled": False},
            "result_info": {"type": "nested", "properties": {"outcome_name": {"type": "keyword", "ignore_above": 512}, "outcome_type": {"type": "keyword"}, "template_name": {"type": "keyword", "ignore_above": 512}, "sample_name": {"type": "keyword", "ignore_above": 512}, "electronic_certificate_status": {"type": "keyword"}, "notes": {"type": "text", "index": False}}},
            "acceptance_info": {"type": "object", "enabled": False},
            "process_info": {"type": "object", "enabled": False},
            "materials": {
                "type": "nested",
                "properties": {
                    "guide_material_id": {"type": "keyword"},
                    "material_name": {
                        "type": "text",
                        "analyzer": "ik_max_index",
                        "search_analyzer": "ik_smart_synonym",
                        "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
                    },
                    "linked_material_id": {"type": "keyword"},
                    "requirement_level": {"type": "keyword"},
                    "original_count": {"type": "integer"},
                    "copy_count": {"type": "integer"},
                    "form_types": {"type": "keyword"},
                    "paper_spec": {"type": "keyword"},
                    "electronic_license_linked": {"type": "boolean"},
                    "exempt_submission": {"type": "boolean"},
                    "reusable_previous_submission": {"type": "boolean"},
                    "material_type": {"type": "keyword"},
                    "source_channel": {"type": "keyword"},
                    "fill_instructions": {"type": "text", "analyzer": "ik_max_index", "search_analyzer": "ik_smart_synonym"},
                    "notes": {"type": "text", "index": False},
                    "applicable_conditions": {"type": "keyword"},
                    "blank_form_available": {"type": "boolean"},
                    "sample_available": {"type": "boolean"},
                    "download_hint": {"type": "text", "index": False},
                    "raw_row_text": {"type": "text", "index": False},
                },
            },
            "fees": {"type": "nested", "properties": {"fee_name": {"type": "text", "analyzer": "ik_max_index", "search_analyzer": "ik_smart_synonym", "fields": {"keyword": {"type": "keyword", "ignore_above": 512}}}, "amount_text": {"type": "keyword", "ignore_above": 512}, "amount_value": {"type": "float"}, "currency": {"type": "keyword"}, "charging_body": {"type": "keyword"}, "charging_method": {"type": "keyword"}, "reducible": {"type": "boolean"}, "notes": {"type": "text", "index": False}}},
            "legal_basis": {"type": "nested", "properties": {"law_name": {"type": "keyword", "ignore_above": 512}, "document_no": {"type": "keyword", "ignore_above": 512}, "article_no": {"type": "keyword", "ignore_above": 512}, "issuing_body": {"type": "keyword", "ignore_above": 512}, "effective_date": {"type": "date", "format": "yyyy-MM-dd||yyyy/MM/dd||strict_date_optional_time||epoch_millis"}, "article_content": {"type": "text", "index": False}}},
            "rights_and_obligations": {"type": "object", "enabled": False},
            "remedies": {"type": "object", "enabled": False},
            "consultation_and_supervision": {"type": "object", "enabled": False},
            "service_windows": {"type": "nested", "properties": {"window_name": {"type": "keyword", "ignore_above": 512}, "location": {"type": "keyword", "ignore_above": 512}, "office_phone": {"type": "keyword", "ignore_above": 128}, "office_hours": {"type": "keyword", "ignore_above": 512}, "navigation": {"type": "text", "index": False}, "scope": {"type": "keyword", "ignore_above": 512}}},
            "bindings": {"type": "object", "enabled": False},
            "quality": {"type": "object", "enabled": False},
            "raw_sections": {"type": "object", "enabled": False},
        },
    },
}

# ── Ingest trace mappings ────────────────────────────────────────────────────

GOV_DOC_INGEST_TRACES_MAPPING: dict[str, Any] = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": settings.es_number_of_replicas,  # 副本数可配置，生产环境建议 >= 1
        "refresh_interval": "5s",
    },
    "mappings": {
        "properties": {
            "trace_id": {"type": "keyword"},
            "doc_id": {"type": "keyword"},
            "task_id": {"type": "keyword"},
            "content_hash": {"type": "keyword"},
            "source_type": {"type": "keyword"},
            "status": {"type": "keyword"},
            "current_stage": {"type": "keyword"},
            "file_type": {"type": "keyword"},
            "original_filename": {"type": "keyword"},
            "title": {
                "type": "text",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "operator": {"type": "keyword"},
            "attempt_count": {"type": "integer"},
            "latest_seq": {"type": "integer"},
            "started_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "finished_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "duration_ms": {"type": "long"},
            "error_code": {"type": "keyword"},
            "error_message": {"type": "text", "index": False},
            "artifact_count": {"type": "integer"},
            "created_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "updated_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
        },
    },
}

GOV_DOC_INGEST_EVENTS_MAPPING: dict[str, Any] = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": settings.es_number_of_replicas,  # 副本数可配置，生产环境建议 >= 1
        "refresh_interval": "5s",
    },
    "mappings": {
        "properties": {
            "event_id": {"type": "keyword"},
            "trace_id": {"type": "keyword"},
            "doc_id": {"type": "keyword"},
            "task_id": {"type": "keyword"},
            "stage": {"type": "keyword"},
            "event_type": {"type": "keyword"},
            "status": {"type": "keyword"},
            "seq": {"type": "integer"},
            "attempt": {"type": "integer"},
            "service": {"type": "keyword"},
            "operator": {"type": "keyword"},
            "file_type": {"type": "keyword"},
            "summary": {
                "type": "text",
                "fields": {"keyword": {"type": "keyword", "ignore_above": 512}},
            },
            "duration_ms": {"type": "long"},
            "severity": {"type": "keyword"},
            "error_code": {"type": "keyword"},
            "error_message": {"type": "text", "index": False},
            "artifact_refs": {"type": "keyword"},
            "details": {"type": "object", "enabled": False},
            "timestamp": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "created_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
        },
    },
}

GOV_DOC_INGEST_ARTIFACTS_MAPPING: dict[str, Any] = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": settings.es_number_of_replicas,  # 副本数可配置，生产环境建议 >= 1
        "refresh_interval": "5s",
    },
    "mappings": {
        "properties": {
            "artifact_id": {"type": "keyword"},
            "trace_id": {"type": "keyword"},
            "event_id": {"type": "keyword"},
            "doc_id": {"type": "keyword"},
            "stage": {"type": "keyword"},
            "artifact_type": {"type": "keyword"},
            "retention_level": {"type": "keyword"},
            "is_redacted": {"type": "boolean"},
            "content_encoding": {"type": "keyword"},
            "preview_text": {"type": "text", "index": False},
            "payload_json": {"type": "object", "enabled": False},
            "payload_text": {"type": "text", "index": False},
            "storage_backend": {"type": "keyword"},
            "storage_path": {"type": "keyword", "index": False},
            "content_bytes": {"type": "long"},
            "expires_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
            "created_at": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis",
            },
        },
    },
}

# ── RRF search pipeline (created once at startup) ───────────────────────────

HYBRID_RRF_PIPELINE = "hybrid_rrf_pipeline"

_RRF_PIPELINE_BODY: dict[str, Any] = {
    "description": "RRF hybrid search pipeline for BM25 + kNN fusion",
    "phase_results_processors": [
        {
            "normalization-processor": {
                "normalization": {"technique": "min_max"},
                "combination": {
                    "technique": "rrf",
                    "parameters": {"rank_constant": 60},
                },
            }
        }
    ],
}


# ── Client class ─────────────────────────────────────────────────────────────


class ESClient:
    """Thin async wrapper around :class:`AsyncOpenSearch`.

    对 OpenSearch 异步客户端的薄封装，提供索引管理、批量写入、
    内容去重查询、ACL 重算以及入库追踪等高层接口。
    """

    # RRF 能力错误的关键词列表，用于区分"明确不支持"与"瞬时故障"
    _RRF_CAPABILITY_ERROR_KEYWORDS = ("illegal_argument_exception", "hybrid", "pipeline")

    def __init__(
        self,
        es: AsyncOpenSearch,
        *,
        redis_client: "RedisClient | None" = None,
    ) -> None:
        self._es = es
        # 可选的 Redis 客户端，用于 ACL 重算时的分布式锁
        # Optional Redis client for distributed locking during ACL recomputation
        self._redis: "RedisClient | None" = redis_client
        # tri-state RRF 能力状态：unknown=未探测, available=可用, unavailable=不支持
        self._rrf_status: str = "unknown"

    @classmethod
    def from_settings(
        cls,
        *,
        redis_client: "RedisClient | None" = None,
    ) -> "ESClient":
        """Create an ESClient directly from application settings.

        On Windows with a system proxy configured, aiohttp may route requests
        through the proxy, causing SSL errors on plain-HTTP connections.
        We clear proxy env-vars for this process to avoid that.

        Args:
            redis_client: 可选的 RedisClient，用于 ACL 重算和删除操作的分布式锁。
                          Optional RedisClient for distributed locking on ACL paths.
        """
        import os
        # 注意：此处全局清除代理环境变量，会影响当前进程内所有使用 HTTP 的库。
        # 这是因为 OpenSearch 的 Python 客户端（基于 aiohttp/urllib3）不支持
        # 通过构造参数单独禁用代理。在 Windows 开发环境中，系统代理常导致
        # 连接本地 OpenSearch/Redis/Neo4j 时出现 SSL 错误。
        # 如果本进程需要通过代理访问外部服务（如外网 LLM API），
        # 需要在对应客户端中单独设置代理，而非依赖环境变量。
        # Prevent aiohttp from routing internal requests through the system proxy.
        # This is safe because OpenSearch / Redis / Neo4j are all on localhost.
        for _var in ("HTTP_PROXY", "HTTPS_PROXY", "ALL_PROXY",
                     "http_proxy", "https_proxy", "all_proxy"):
            os.environ.pop(_var, None)

        # 安全修复：TLS 证书验证改为可配置，不再硬编码关闭；生产环境应开启
        kwargs: dict[str, Any] = {
            "hosts": [settings.es_host],
            "timeout": settings.es_request_timeout,
            "verify_certs": settings.es_verify_certs,
            "ssl_show_warn": settings.es_verify_certs,
        }
        if settings.es_username:
            kwargs["http_auth"] = (settings.es_username, settings.es_password)
        return cls(AsyncOpenSearch(**kwargs), redis_client=redis_client)

    @property
    def raw(self) -> AsyncOpenSearch:
        """Expose the underlying ``AsyncOpenSearch`` for advanced queries."""
        return self._es

    async def close(self) -> None:
        await self._es.close()

    # ── RRF 能力状态与 hybrid 搜索接口 ────────────────────────────────────

    @property
    def should_use_hybrid(self) -> bool:
        """当前是否应尝试 hybrid 检索。仅在明确不支持时返回 False。

        Returns True unless RRF has been definitively determined as unavailable.
        When status is 'unknown', returns True to allow runtime probing.
        """
        return self._rrf_status != "unavailable"

    def _is_rrf_capability_error(self, err_msg_lower: str) -> bool:
        """判断错误信息是否表示 RRF 能力明确不支持（而非瞬时故障）。"""
        return any(kw in err_msg_lower for kw in self._RRF_CAPABILITY_ERROR_KEYWORDS)

    async def hybrid_search(
        self,
        body: dict[str, Any],
        *,
        index: str,
        pipeline: str,
    ) -> tuple[dict[str, Any] | None, bool]:
        """执行 hybrid 搜索，内置 tri-state 能力探测与自动熔断。

        Attempt a hybrid search with built-in capability probing and circuit-breaking.

        返回值:
            (response, ok) — ok=True 表示 hybrid 成功；ok=False 表示调用方应回退到 BM25。

        状态机逻辑:
            - available: 直接执行，成功返回结果；明确能力错误则熔断为 unavailable
            - unknown: 尝试探测，成功提升为 available，能力错误降级为 unavailable，
              其他错误保持 unknown（不永久锁定，下次仍可重试）
            - unavailable: 直接返回 (None, False)，不发起请求
        """
        if self._rrf_status == "unavailable":
            return None, False

        try:
            response = await self._es.search(
                index=index,
                body=body,
                params={"search_pipeline": pipeline},
            )
            # 成功：若之前是 unknown，提升为 available
            if self._rrf_status == "unknown":
                self._rrf_status = "available"
                logger.info("rrf_capability_confirmed", rrf_status=self._rrf_status)
            return response, True
        except Exception as exc:
            err_msg = str(exc).lower()
            if self._is_rrf_capability_error(err_msg):
                # 明确的能力错误 → 熔断，后续不再尝试
                prev = self._rrf_status
                self._rrf_status = "unavailable"
                logger.warning(
                    "rrf_capability_fused",
                    error=str(exc),
                    prev_status=prev,
                    rrf_status=self._rrf_status,
                )
                return None, False
            # 瞬时错误（网络、超时等）→ 保持当前状态，返回失败但不永久锁定
            logger.warning(
                "hybrid_search_transient_error",
                error=str(exc),
                rrf_status=self._rrf_status,
            )
            return None, False

    # ── Index management ─────────────────────────────────────────────────

    async def create_indices(self) -> None:
        """创建所有必需的索引（幂等），并确保 RRF 混合检索管道已注册。

        Create all required indices if they do not already exist,
        and ensure the RRF search pipeline exists.
        """
        for index_name, body in [
            (settings.es_chunk_index, GOV_DOC_CHUNKS_MAPPING),
            (settings.es_meta_index, GOV_DOC_META_MAPPING),
            (settings.es_service_guide_index, GOV_SERVICE_GUIDES_MAPPING),
            (settings.es_trace_index, GOV_DOC_INGEST_TRACES_MAPPING),
            (settings.es_event_index, GOV_DOC_INGEST_EVENTS_MAPPING),
            (settings.es_artifact_index, GOV_DOC_INGEST_ARTIFACTS_MAPPING),
        ]:
            exists = await self._es.indices.exists(index=index_name)
            if not exists:
                await self._es.indices.create(index=index_name, body=body)
                logger.info("es_index_created", index=index_name)
            else:
                logger.info("es_index_exists", index=index_name)
                await self._ensure_source_provenance_mapping(index_name)

        # 创建 RRF 搜索管道（幂等），并根据结果设置能力状态
        # Create RRF search pipeline (idempotent) and record capability status
        try:
            await self._es.http.put(
                f"/_search/pipeline/{HYBRID_RRF_PIPELINE}",
                body=_RRF_PIPELINE_BODY,
            )
            self._rrf_status = "available"
            logger.info(
                "search_pipeline_created",
                pipeline=HYBRID_RRF_PIPELINE,
                rrf_status=self._rrf_status,
            )
        except Exception as exc:
            err_msg = str(exc).lower()
            if self._is_rrf_capability_error(err_msg):
                # 明确的能力不支持（如版本不支持 RRF）→ 永久标记为不可用
                self._rrf_status = "unavailable"
                logger.warning(
                    "search_pipeline_not_supported",
                    error=str(exc),
                    rrf_status=self._rrf_status,
                )
            else:
                # 瞬时错误（连接超时、5xx 等）→ 保持 unknown，后续运行时探测
                logger.warning(
                    "search_pipeline_create_failed_transient",
                    error=str(exc),
                    rrf_status=self._rrf_status,
                )

    async def _ensure_source_provenance_mapping(self, index_name: str) -> None:
        """Add GovCrawler provenance fields to existing indices.

        Index creation uses the full mapping above. This best-effort update
        covers already-created OpenSearch indices without forcing a reset.
        """
        source_properties: dict[str, Any] = {
            "source_system": {"type": "keyword"},
            "source_article_id": {"type": "keyword"},
            "source_attachment_id": {"type": "keyword"},
            "source_site_code": {"type": "keyword"},
            "source_target_code": {"type": "keyword"},
            "source_url": {"type": "keyword", "index": False},
        }
        if index_name == settings.es_meta_index:
            source_properties["source_metadata"] = {"type": "object", "enabled": False}
        elif index_name != settings.es_chunk_index:
            return

        try:
            await self._es.indices.put_mapping(
                index=index_name,
                body={"properties": source_properties},
            )
            logger.info("es_source_provenance_mapping_ensured", index=index_name)
        except Exception as exc:
            logger.warning(
                "es_source_provenance_mapping_failed",
                index=index_name,
                error=str(exc),
            )

    # ── Bulk operations ──────────────────────────────────────────────────

    async def bulk_index_chunks(
        self,
        chunks: list[dict[str, Any]],
        *,
        index: str | None = None,
    ) -> tuple[int, list[Any]]:
        """批量索引 chunk 文档列表，返回 (成功数, 错误列表)。

        Bulk-index a list of chunk documents.

        Each dict in *chunks* must contain at least ``chunk_id``.
        Returns ``(success_count, errors)``.
        """
        target_index = index or settings.es_chunk_index

        def _actions():
            for chunk in chunks:
                yield {
                    "_index": target_index,
                    "_id": chunk["chunk_id"],
                    "_source": chunk,
                }

        success, errors = await async_bulk(self._es, _actions(), raise_on_error=False)
        if errors:
            logger.warning("es_bulk_errors", count=len(errors), sample=errors[:3])
        return success, errors

    # ── Content-hash queries ──────────────────────────────────────────────

    async def find_by_content_hash(self, content_hash: str) -> dict[str, Any] | None:
        """根据内容哈希查找已完成的文档元数据记录（用于入库去重判断）。

        Find a completed meta document by content_hash.

        Returns the first match's ``_source`` or ``None``.
        """
        resp = await self._es.search(
            index=settings.es_meta_index,
            body={
                "query": {
                    "bool": {
                        "must": [
                            {"term": {"content_hash": content_hash}},
                            {"term": {"status": "completed"}},
                        ],
                    }
                },
                "size": 1,
                "_source": ["doc_id", "chunk_count", "page_count", "file_path", "file_type"],
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body
        hits = raw.get("hits", {}).get("hits", [])
        if hits:
            return hits[0]["_source"]
        return None

    async def patch_chunks_by_content_hash(
        self,
        content_hash: str,
        fields: dict[str, Any],
    ) -> int:
        """Patch denormalized metadata on all chunks for a content_hash."""
        doc_fields = {k: v for k, v in fields.items() if v is not None}
        if not content_hash or not doc_fields:
            return 0

        assignments = [f"ctx._source.{key} = params.fields.{key}" for key in doc_fields]
        assignments.append("ctx._source.updated_at = params.updated_at")
        resp = await self._es.update_by_query(
            index=settings.es_chunk_index,
            body={
                "query": {"term": {"content_hash": content_hash}},
                "script": {
                    "lang": "painless",
                    "source": "; ".join(assignments),
                    "params": {
                        "fields": doc_fields,
                        "updated_at": datetime.now(timezone.utc).isoformat(),
                    },
                },
            },
            refresh=True,
            conflicts="proceed",
        )
        raw = resp if isinstance(resp, dict) else resp.body
        return int(raw.get("updated", 0))

    async def sync_service_guide_acl(self, doc_id: str, acl_ids: list[str]) -> int:
        """Synchronize acl_ids on guide documents bound to a doc_id."""
        if not doc_id:
            return 0

        resp = await self._es.update_by_query(
            index=settings.es_service_guide_index,
            body={
                "query": {"term": {"doc_id": doc_id}},
                "script": {
                    "lang": "painless",
                    "source": "ctx._source.acl_ids = params.acl_ids; ctx._source.updated_at = params.updated_at",
                    "params": {
                        "acl_ids": acl_ids,
                        "updated_at": datetime.now(timezone.utc).isoformat(),
                    },
                },
            },
            refresh=True,
            conflicts="proceed",
        )
        raw = resp if isinstance(resp, dict) else resp.body
        return int(raw.get("updated", 0))

    async def bind_service_guide_matters(
        self,
        doc_id: str,
        matter_ids: list[str],
    ) -> int:
        """Backfill linked Matter IDs onto guide documents for a doc_id."""
        if not doc_id:
            return 0

        unique_ids = list(dict.fromkeys(mid for mid in matter_ids if mid))
        resp = await self._es.update_by_query(
            index=settings.es_service_guide_index,
            body={
                "query": {"term": {"doc_id": doc_id}},
                "script": {
                    "lang": "painless",
                    "source": (
                        "ctx._source.linked_matter_ids = params.matter_ids; "
                        "if (ctx._source.bindings == null) { ctx._source.bindings = new HashMap(); } "
                        "ctx._source.bindings.matter_ids = params.matter_ids; "
                        "ctx._source.updated_at = params.updated_at"
                    ),
                    "params": {
                        "matter_ids": unique_ids,
                        "updated_at": datetime.now(timezone.utc).isoformat(),
                    },
                },
            },
            refresh=True,
            conflicts="proceed",
        )
        raw = resp if isinstance(resp, dict) else resp.body
        return int(raw.get("updated", 0))

    async def delete_service_guides_by_doc_id(self, doc_id: str) -> int:
        """Delete guide documents linked to a doc_id."""
        if not doc_id:
            return 0

        resp = await self._es.delete_by_query(
            index=settings.es_service_guide_index,
            body={"query": {"term": {"doc_id": doc_id}}},
            refresh=True,
        )
        raw = resp if isinstance(resp, dict) else resp.body
        return int(raw.get("deleted", 0))

    async def find_service_guide_by_doc_id(
        self,
        doc_id: str,
        *,
        source_fields: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return the newest service guide document bound to a doc_id."""
        if not doc_id:
            return None

        body: dict[str, Any] = {
            "query": {"term": {"doc_id": doc_id}},
            "size": 1,
            "sort": [{"updated_at": {"order": "desc"}}],
        }
        if source_fields is not None:
            body["_source"] = source_fields

        resp = await self._es.search(
            index=settings.es_service_guide_index,
            body=body,
        )
        raw = resp if isinstance(resp, dict) else resp.body
        hits = raw.get("hits", {}).get("hits", [])
        if not hits:
            return None
        return hits[0].get("_source", {})

    async def search_service_guides(
        self,
        *,
        acl_filter: dict[str, Any],
        query_text: str = "",
        doc_ids: list[str] | None = None,
        matter_ids: list[str] | None = None,
        matter_name: str = "",
        size: int = 3,
    ) -> list[dict[str, Any]]:
        """Search current guide profiles for research and QA evidence."""
        unique_doc_ids = list(dict.fromkeys(doc_id for doc_id in (doc_ids or []) if doc_id))
        unique_matter_ids = list(dict.fromkeys(mid for mid in (matter_ids or []) if mid))

        anchor_should: list[dict[str, Any]] = []
        should: list[dict[str, Any]] = []
        filter_clauses: list[dict[str, Any]] = [acl_filter]
        if unique_doc_ids:
            anchor_should.append({"terms": {"doc_id": unique_doc_ids}})
        if unique_matter_ids:
            anchor_should.append({"terms": {"linked_matter_ids": unique_matter_ids}})
        if anchor_should:
            filter_clauses.append(
                {
                    "bool": {
                        "should": anchor_should,
                        "minimum_should_match": 1,
                    }
                }
            )
        if matter_name:
            should.append({"match_phrase": {"matter_name": matter_name}})
        if query_text:
            should.append(
                {
                    "multi_match": {
                        "query": query_text,
                        "fields": [
                            "matter_name^4",
                            "colloquial_names^3",
                            "implementing_subject^2",
                            "guide_search_text^2",
                        ],
                        "type": "best_fields",
                    }
                }
            )

        if not anchor_should and not should:
            return []

        query: dict[str, Any] = {
            "bool": {
                "must": [{"term": {"is_current": True}}],
                "filter": filter_clauses,
            }
        }
        if should:
            query["bool"]["should"] = should
            if not anchor_should:
                query["bool"]["minimum_should_match"] = 1

        resp = await self._es.search(
            index=settings.es_service_guide_index,
            body={
                "query": query,
                "size": size,
                "sort": [
                    "_score",
                    {"completeness_score": {"order": "desc", "missing": "_last"}},
                    {"confidence_score": {"order": "desc", "missing": "_last"}},
                    {"updated_at": {"order": "desc"}},
                ],
                "_source": [
                    "profile_id",
                    "doc_id",
                    "matter_name",
                    "colloquial_names",
                    "implementation_code",
                    "guide_version",
                    "service_objects",
                    "service_modes",
                    "implementing_subject",
                    "express_supported",
                    "reservation_supported",
                    "promised_time_limit_days",
                    "legal_time_limit_days",
                    "material_names",
                    "fee_names",
                    "window_names",
                    "legal_basis_names",
                    "linked_matter_ids",
                    "document_info",
                    "basic_info",
                    "process_info",
                    "consultation_and_supervision",
                    "materials",
                    "fees",
                    "service_windows",
                    "legal_basis",
                    "updated_at",
                ],
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body
        return raw.get("hits", {}).get("hits", [])

    async def get_all_metas_by_content_hash(
        self,
        content_hash: str,
        *,
        exclude_doc_id: str | None = None,
    ) -> list[dict[str, Any]]:
        """Get all completed meta docs sharing a content_hash.

        Optionally excludes a single doc_id (for deletion scenarios).
        Returns a list of ``_source`` dicts with ``doc_id`` and ``acl_ids``.
        """
        query: dict[str, Any] = {
            "bool": {
                "must": [
                    {"term": {"content_hash": content_hash}},
                    {"term": {"status": "completed"}},
                ],
            }
        }
        if exclude_doc_id:
            query["bool"]["must_not"] = [{"term": {"_id": exclude_doc_id}}]

        # 单次查询上限 10000 条，覆盖绝大多数 content_hash 共享场景
        # Upper bound of 10000 docs per content_hash; log warning if approaching limit
        _SIZE_LIMIT = 10000
        resp = await self._es.search(
            index=settings.es_meta_index,
            body={
                "query": query,
                "size": _SIZE_LIMIT,
                "_source": ["doc_id", "acl_ids"],
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body
        hits = raw.get("hits", {}).get("hits", [])
        # 结果数接近上限时记录警告，提示可能存在数据截断
        # Warn when result count approaches limit — may indicate truncation
        if len(hits) >= _SIZE_LIMIT * 0.9:
            logger.warning(
                "content_hash_meta_near_limit",
                content_hash=content_hash[:12],
                count=len(hits),
                limit=_SIZE_LIMIT,
            )
        return [hit["_source"] for hit in hits]

    async def _recompute_chunk_acl_inner(
        self,
        content_hash: str,
        *,
        exclude_doc_id: str | None = None,
    ) -> None:
        """ACL 重算的内部实现（无锁），由 recompute_chunk_acl 在锁保护下调用。

        Inner implementation of ACL recomputation (no lock).
        Called by recompute_chunk_acl under lock protection.
        """
        metas = await self.get_all_metas_by_content_hash(
            content_hash, exclude_doc_id=exclude_doc_id,
        )

        merged_acl: set[str] = set()
        doc_ids: list[str] = []
        for meta in metas:
            merged_acl.update(meta.get("acl_ids", []))
            doc_ids.append(meta["doc_id"])

        # update_by_query 只支持 refresh=true/false，不支持 "wait_for"
        # （"wait_for" 仅适用于单文档操作如 index/update/delete）
        # update_by_query only supports refresh=true/false, NOT "wait_for"
        # ("wait_for" is only for single-document operations like index/update/delete)
        await self._es.update_by_query(
            index=settings.es_chunk_index,
            body={
                "query": {"term": {"content_hash": content_hash}},
                "script": {
                    "lang": "painless",
                    "source": (
                        "ctx._source.acl_ids = params.acl_ids; "
                        "ctx._source.doc_ids = params.doc_ids"
                    ),
                    "params": {
                        "acl_ids": sorted(merged_acl),
                        "doc_ids": doc_ids,
                    },
                },
            },
            refresh=True,
            # 入库管道中 bulk_index_chunks 和 recompute_chunk_acl 可能接近并发执行，
            # 导致 update_by_query 遇到版本冲突；设置 conflicts=proceed 跳过冲突文档
            # 并在下一次 ACL 重算时自动修正。
            # bulk_index_chunks and recompute_chunk_acl may run close together,
            # causing version conflicts; proceed to skip and self-heal on next recompute.
            conflicts="proceed",
        )
        logger.info(
            "chunk_acl_recomputed",
            content_hash=content_hash[:12],
            doc_count=len(doc_ids),
            acl_count=len(merged_acl),
        )

    async def recompute_chunk_acl(
        self,
        content_hash: str,
        *,
        exclude_doc_id: str | None = None,
    ) -> None:
        """Recompute acl_ids on all chunks for a content_hash.

        重算指定 content_hash 对应所有 chunk 的 ACL 权限。
        使用 Redis 分布式锁（content_hash 粒度）防止并发重算导致 ACL 不一致。
        若未注入 RedisClient 则直接执行（兼容无锁场景）。

        This is the ONLY method that modifies chunks' ACL — called on ingest,
        deletion, and permission-update to guarantee consistency.
        Uses a Redis distributed lock (content_hash granularity) to prevent
        concurrent recomputation from producing inconsistent ACL.
        Falls back to no-lock execution if RedisClient is not injected.
        """
        if self._redis is not None:
            # 以 content_hash 为粒度加锁，不同 content_hash 之间互不阻塞
            # Lock at content_hash granularity — different hashes don't block each other
            lock_name = f"zm:acl_recompute:{content_hash}"
            async with self._redis.lock(lock_name, timeout=60):
                await self._recompute_chunk_acl_inner(
                    content_hash, exclude_doc_id=exclude_doc_id,
                )
        else:
            await self._recompute_chunk_acl_inner(
                content_hash, exclude_doc_id=exclude_doc_id,
            )

    # ── Ingest trace queries ────────────────────────────────────────────────

    async def query_ingest_traces(
        self,
        *,
        page: int = 1,
        page_size: int = 20,
        doc_id: str | None = None,
        trace_id: str | None = None,
        task_id: str | None = None,
        status: str | None = None,
        current_stage: str | None = None,
        file_type: str | None = None,
        source_type: str | None = None,
        has_error: bool | None = None,
        start_time: str | None = None,
        end_time: str | None = None,
        keyword: str | None = None,
    ) -> dict[str, Any]:
        """按条件分页查询入库追踪记录，支持多维度过滤和关键词搜索。

        Query ingest traces with filters and pagination."""
        must: list[dict[str, Any]] = []
        if doc_id:
            must.append({"term": {"doc_id": doc_id}})
        if trace_id:
            must.append({"term": {"trace_id": trace_id}})
        if task_id:
            must.append({"term": {"task_id": task_id}})
        if status:
            must.append({"term": {"status": status}})
        if current_stage:
            must.append({"term": {"current_stage": current_stage}})
        if file_type:
            must.append({"term": {"file_type": file_type}})
        if source_type:
            must.append({"term": {"source_type": source_type}})
        if has_error:
            must.append({"terms": {"status": ["failed", "partial_failed"]}})
        if start_time or end_time:
            time_range: dict[str, str] = {}
            if start_time:
                time_range["gte"] = start_time
            if end_time:
                time_range["lte"] = end_time
            must.append({"range": {"started_at": time_range}})
        if keyword:
            must.append({
                "multi_match": {
                    "query": keyword,
                    "fields": ["title", "original_filename", "doc_id", "trace_id"],
                }
            })

        query = {"bool": {"must": must}} if must else {"match_all": {}}
        from_idx = (page - 1) * page_size

        resp = await self._es.search(
            index=settings.es_trace_index,
            body={
                "query": query,
                "sort": [{"started_at": {"order": "desc"}}],
                "from": from_idx,
                "size": page_size,
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body
        hits = raw.get("hits", {})
        total = hits.get("total", {}).get("value", 0)
        records = [h["_source"] for h in hits.get("hits", [])]
        return {"total": total, "page": page, "page_size": page_size, "records": records}

    async def get_ingest_trace(self, trace_id: str) -> dict[str, Any] | None:
        """Get a single trace by ID."""
        try:
            resp = await self._es.get(index=settings.es_trace_index, id=trace_id)
            raw = resp if isinstance(resp, dict) else resp.body
            return raw.get("_source")
        except Exception:
            # 查询入库追踪记录失败，返回 None 让调用方按"未找到"处理
            logger.warning("Failed to get ingest trace", exc_info=True)
            return None

    async def query_ingest_events(
        self,
        trace_id: str,
        *,
        page: int = 1,
        page_size: int = 100,
        stage: str | None = None,
        status: str | None = None,
        event_type: str | None = None,
    ) -> dict[str, Any]:
        """Query events for a trace, ordered by seq."""
        must: list[dict[str, Any]] = [{"term": {"trace_id": trace_id}}]
        if stage:
            must.append({"term": {"stage": stage}})
        if status:
            must.append({"term": {"status": status}})
        if event_type:
            must.append({"term": {"event_type": event_type}})

        from_idx = (page - 1) * page_size
        resp = await self._es.search(
            index=settings.es_event_index,
            body={
                "query": {"bool": {"must": must}},
                "sort": [{"seq": {"order": "asc"}}],
                "from": from_idx,
                "size": page_size,
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body
        hits = raw.get("hits", {})
        total = hits.get("total", {}).get("value", 0)
        records = [h["_source"] for h in hits.get("hits", [])]
        return {"total": total, "trace_id": trace_id, "records": records}

    async def get_ingest_event(self, event_id: str) -> dict[str, Any] | None:
        """Get a single event by ID."""
        try:
            resp = await self._es.get(index=settings.es_event_index, id=event_id)
            raw = resp if isinstance(resp, dict) else resp.body
            return raw.get("_source")
        except Exception:
            return None

    async def query_ingest_artifacts(
        self,
        trace_id: str,
        *,
        page: int = 1,
        page_size: int = 50,
        artifact_type: str | None = None,
        stage: str | None = None,
    ) -> dict[str, Any]:
        """Query artifacts for a trace."""
        must: list[dict[str, Any]] = [{"term": {"trace_id": trace_id}}]
        if artifact_type:
            must.append({"term": {"artifact_type": artifact_type}})
        if stage:
            must.append({"term": {"stage": stage}})

        from_idx = (page - 1) * page_size
        resp = await self._es.search(
            index=settings.es_artifact_index,
            body={
                "query": {"bool": {"must": must}},
                "sort": [{"created_at": {"order": "asc"}}],
                "from": from_idx,
                "size": page_size,
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body
        hits = raw.get("hits", {})
        total = hits.get("total", {}).get("value", 0)
        records = [h["_source"] for h in hits.get("hits", [])]
        return {"total": total, "trace_id": trace_id, "records": records}

    async def get_ingest_artifact(self, artifact_id: str) -> dict[str, Any] | None:
        """Get a single artifact by ID."""
        try:
            resp = await self._es.get(
                index=settings.es_artifact_index, id=artifact_id,
            )
            raw = resp if isinstance(resp, dict) else resp.body
            return raw.get("_source")
        except Exception:
            return None

    async def get_trace_stats(self) -> dict[str, Any]:
        """获取入库追踪的聚合统计：今日总数、运行中、已完成、失败数和平均耗时。

        Get aggregate statistics for traces (today counts, running, failed)."""
        resp = await self._es.search(
            index=settings.es_trace_index,
            body={
                "size": 0,
                "aggs": {
                    "today_total": {
                        "filter": {"range": {"started_at": {"gte": "now/d", "lt": "now+1d/d"}}},
                    },
                    "by_status": {
                        "terms": {"field": "status", "size": 10},
                    },
                    "avg_duration": {
                        "filter": {
                            "terms": {"status": ["completed", "partial_failed"]},
                        },
                        "aggs": {
                            "avg_ms": {"avg": {"field": "duration_ms"}},
                        },
                    },
                },
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body
        aggs = raw.get("aggregations", {})
        today_total = aggs.get("today_total", {}).get("doc_count", 0)

        status_counts: dict[str, int] = {}
        for bucket in aggs.get("by_status", {}).get("buckets", []):
            status_counts[bucket["key"]] = bucket["doc_count"]

        avg_duration = aggs.get("avg_duration", {}).get("avg_ms", {}).get("value")

        return {
            "today_total": today_total,
            "running": status_counts.get("running", 0),
            "completed": status_counts.get("completed", 0),
            "failed": status_counts.get("failed", 0)
                + status_counts.get("partial_failed", 0),
            "avg_duration_ms": round(avg_duration) if avg_duration else 0,
        }

    # ── Document deletion (content-aware) ──────────────────────────────────

    async def _delete_document_inner(self, doc_id: str) -> dict[str, Any]:
        """文档删除的内部实现（无锁），由 delete_document 在锁保护下调用。

        Inner implementation of document deletion (no lock).
        Execution order: delete meta first -> query remaining metas -> decide
        whether to delete chunks or recompute ACL.
        """
        # Step 1: 获取 meta 记录以读取 content_hash
        # Get the meta record to read content_hash
        meta_loaded = False
        content_hash = ""
        try:
            meta_resp = await self._es.get(index=settings.es_meta_index, id=doc_id)
            raw = meta_resp if isinstance(meta_resp, dict) else meta_resp.body
            source = raw.get("_source", {})
            content_hash = source.get("content_hash", "")
            meta_loaded = True
        except Exception:
            # Meta may already be missing while guide documents still remain.
            # Continue cleanup for guide index instead of returning early.
            pass

        # Step 2: 先删除 meta 记录，确保后续查询剩余 meta 时不会包含自身
        # Delete meta first so subsequent remaining-meta query excludes this doc
        deleted_meta = False
        if meta_loaded:
            try:
                await self._es.delete(
                    index=settings.es_meta_index,
                    id=doc_id,
                    refresh=True,
                )
                deleted_meta = True
            except Exception:
                # 元数据记录删除失败，记录日志但不阻塞主流程
                logger.warning("Failed to delete meta document", exc_info=True)

        deleted_chunks = 0
        deleted_guides = 0

        try:
            deleted_guides = await self.delete_service_guides_by_doc_id(doc_id)
        except Exception:
            logger.warning("Failed to delete service guide document", exc_info=True)

        if meta_loaded and content_hash:
            # Step 3: 查询剩余引用该 content_hash 的 meta（此时已不含刚删除的 doc_id）
            # Query remaining metas for this content_hash (the deleted doc is already gone)
            remaining_metas = await self.get_all_metas_by_content_hash(content_hash)

            if not remaining_metas:
                # 最后一个引用 — 删除所有 chunk
                # Last reference — delete all chunks
                resp = await self._es.delete_by_query(
                    index=settings.es_chunk_index,
                    body={"query": {"term": {"content_hash": content_hash}}},
                    refresh=True,
                )
                raw_resp = resp if isinstance(resp, dict) else resp.body
                deleted_chunks = raw_resp.get("deleted", 0)
            else:
                # 其他文档仍引用该内容 — 重算 ACL（此处调用内部方法，因为已在锁内）
                # Other docs still reference this content — recompute ACL (inner, already under lock)
                await self._recompute_chunk_acl_inner(content_hash)
        elif meta_loaded:
            # Legacy: 无 content_hash，回退到按 doc_id 删除
            # Legacy: no content_hash, fall back to doc_id-based deletion
            resp = await self._es.delete_by_query(
                index=settings.es_chunk_index,
                body={"query": {"term": {"doc_ids": doc_id}}},
                refresh=True,
            )
            raw_resp = resp if isinstance(resp, dict) else resp.body
            deleted_chunks = raw_resp.get("deleted", 0)

        logger.info(
            "es_document_deleted",
            doc_id=doc_id,
            content_hash=content_hash[:12] if content_hash else "",
            chunks=deleted_chunks,
            guides=deleted_guides,
            meta=deleted_meta,
        )
        return {
            "deleted_chunks": deleted_chunks,
            "deleted_meta": int(deleted_meta),
            "deleted_guides": deleted_guides,
        }

    async def delete_document(self, doc_id: str) -> dict[str, Any]:
        """内容感知的文档删除，使用分布式锁防止并发删除/ACL 重算竞态。

        Delete a document, handling shared content correctly.
        Uses a Redis distributed lock (content_hash granularity) to prevent
        race conditions between concurrent delete and ACL recomputation.

        执行顺序：先删 meta → 查询剩余 meta → 决定删 chunks 还是 recompute ACL，
        整个流程在 content_hash 锁内完成。若无 RedisClient 则直接执行（兼容无锁场景）。
        """
        if self._redis is not None:
            # 先读取 content_hash 用于构造锁名（需要在锁外读取一次）
            # Read content_hash first for lock name (must read outside lock)
            try:
                meta_resp = await self._es.get(
                    index=settings.es_meta_index,
                    id=doc_id,
                    _source=["content_hash"],
                )
                raw = meta_resp if isinstance(meta_resp, dict) else meta_resp.body
                content_hash = raw.get("_source", {}).get("content_hash", "")
            except Exception:
                content_hash = ""

            if content_hash:
                # 以 content_hash 为粒度加锁，与 recompute_chunk_acl 使用相同的锁
                # Lock at content_hash granularity — same lock as recompute_chunk_acl
                lock_name = f"zm:acl_recompute:{content_hash}"
                async with self._redis.lock(lock_name, timeout=60):
                    return await self._delete_document_inner(doc_id)
            else:
                return await self._delete_document_inner(doc_id)
        else:
            return await self._delete_document_inner(doc_id)
