"""
混合检索引擎 —— BM25 全文检索 + kNN 向量检索，通过 RRF 融合排序，支持权限过滤和内容去重。
Hybrid search engine – BM25 + kNN with RRF fusion, permission filtering,
and content deduplication.

双层检索架构:
  - 在内容层 (gov_doc_chunks) 按 acl_ids 过滤并检索
  - 按 content_hash 分组去重
  - 查询文档层 (gov_doc_meta) 获取版本信息
  - 根据权限匹配精度选择每个 content_hash 的最佳版本

Two-layer architecture:
  - Searches ``gov_doc_chunks`` (content layer) with ``acl_ids`` filter
  - Groups results by ``content_hash`` for deduplication
  - Queries ``gov_doc_meta`` (document layer) for version info
  - Selects best version per content_hash based on permission priority
"""

from __future__ import annotations

from datetime import date
from typing import Any

from app.config import settings
from app.core.embedding import EmbeddingService
from app.core.permission import PermissionContext
from app.infrastructure.es_client import ESClient, HYBRID_RRF_PIPELINE
from app.utils.logger import get_logger

logger = get_logger(__name__)

# 权限匹配精度：数值越小表示匹配越直接（U_=用户, O_=机构, D_=部门, A_=区域, R_=角色）
# Permission precision: lower = more direct match
_ACL_PREFIX_PRIORITY = {"U_": 0, "O_": 1, "D_": 2, "A_": 3, "R_": 4}
_LLM_FIXED_SCORE_THRESHOLD = 0.5
_LLM_RELATIVE_SCORE_RATIO = 0.5


def _normalize_search_scope(value: Any) -> str:
    """将检索范围参数标准化为 title/content/doc_number/all 之一。"""
    if value in {"title", "content", "doc_number"}:
        return value
    return "all"


def _escape_wildcard_query(value: str) -> str:
    """转义通配符查询中的特殊字符，防止注入。"""
    return value.replace("\\", "\\\\").replace("*", "\\*").replace("?", "\\?")


def _build_text_query_clause(query_text: str, search_scope: Any) -> dict[str, Any]:
    """根据检索范围构建对应的 ES 文本查询子句（match/wildcard/multi_match）。"""
    scope = _normalize_search_scope(search_scope)
    if scope == "title":
        return {"match": {"title": {"query": query_text}}}
    if scope == "content":
        return {"match": {"content": {"query": query_text}}}
    if scope == "doc_number":
        return {
            "wildcard": {
                "doc_number": {
                    "value": f"*{_escape_wildcard_query(query_text.strip())}*"
                }
            }
        }
    return {
        "multi_match": {
            "query": query_text,
            "fields": ["title^3", "content"],
            "type": "best_fields",
        }
    }


def _coerce_score(value: Any) -> float | None:
    if value is None:
        return None
    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def _filter_llm_relevance(
    documents: list[dict[str, Any]],
    *,
    preserve_source_types: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Filter final returned documents for LLM callers without changing recall."""
    if not documents:
        return []

    scored = [
        score
        for doc in documents
        if (score := _coerce_score(doc.get("score"))) is not None
    ]
    if not scored:
        return documents

    threshold = max(
        _LLM_FIXED_SCORE_THRESHOLD,
        max(scored) * _LLM_RELATIVE_SCORE_RATIO,
    )
    preserved = preserve_source_types or set()
    return [
        doc
        for doc in documents
        if doc.get("_source_type") in preserved
        or doc.get("source_type") in preserved
        or ((_coerce_score(doc.get("score")) or 0.0) >= threshold)
    ]


def _best_match_priority(doc_acl_ids: list[str], user_tokens: list[str]) -> int:
    """Return the best (lowest) permission priority for a doc given user tokens."""
    best = 99
    acl_set = set(doc_acl_ids)
    for token in user_tokens:
        if token in acl_set:
            prefix = token[:2]
            best = min(best, _ACL_PREFIX_PRIORITY.get(prefix, 99))
    return best


class SearchEngine:
    """混合检索引擎，支持权限感知检索和基于 content_hash 的内容去重。

    Hybrid search with permission-aware retrieval and content deduplication."""

    def __init__(self, es_client: ESClient, embedding_service: EmbeddingService):
        self._es = es_client
        self._embedding = embedding_service

    async def search(
        self,
        query: str,
        perm: PermissionContext,
        *,
        filters: dict[str, Any] | None = None,
        page: int = 1,
        page_size: int = 20,
        llm: bool = False,
    ) -> dict[str, Any]:
        """执行混合检索：生成查询向量 → 构建 ES 查询 → 按 content_hash 分组 → 批量获取版本 → 选择最佳版本。

        Execute a hybrid search with permission filtering and content dedup.

        Returns:
            {
                "total": int,         # unique content items
                "page": int,
                "page_size": int,
                "documents": [...],   # one per unique content_hash
                "aggregations": {...},
            }
        """
        search_scope = _normalize_search_scope((filters or {}).get("search_scope"))

        # Step 1: 判断是否需要向量检索 — search_scope 与 RRF 能力联合决策
        # Determine if vector search is needed based on scope and RRF capability
        need_vector = (
            search_scope in {"all", "content"}
            and self._es.should_use_hybrid
        )

        # Step 2: 生成查询向量（仅在需要时），失败则降级为纯 BM25
        query_vector: list[float] | None = None
        if need_vector:
            try:
                query_vector = await self._embedding.embed_single(query)
            except Exception as emb_err:
                logger.warning(
                    "embedding_failed_bm25_fallback",
                    error=str(emb_err),
                    query=query[:40],
                )

        # Step 3: 构建并执行检索 — hybrid 路径使用 ESClient.hybrid_search 自动探测/熔断
        if query_vector is not None:
            # 构建 hybrid 查询体
            es_body = self._build_query(
                query_text=query,
                query_vector=query_vector,
                perm=perm,
                filters=filters or {},
                page=page,
                page_size=page_size,
            )
            # 通过 ESClient.hybrid_search 执行，内置能力探测与自动熔断
            response, ok = await self._es.hybrid_search(
                es_body,
                index=settings.es_chunk_index,
                pipeline=HYBRID_RRF_PIPELINE,
            )
            if not ok:
                # hybrid 失败（能力不支持或瞬时错误）→ 回退到纯 BM25
                logger.info("search_hybrid_to_bm25_fallback")
                es_body = self._build_query(
                    query_text=query,
                    query_vector=None,
                    perm=perm,
                    filters=filters or {},
                    page=page,
                    page_size=page_size,
                )
                response = await self._es.raw.search(
                    index=settings.es_chunk_index,
                    body=es_body,
                )
        else:
            # 纯 BM25 模式（scope 不含向量或 RRF 已标记为不可用）
            es_body = self._build_query(
                query_text=query,
                query_vector=None,
                perm=perm,
                filters=filters or {},
                page=page,
                page_size=page_size,
            )
            response = await self._es.raw.search(
                index=settings.es_chunk_index,
                body=es_body,
            )

        raw = response if isinstance(response, dict) else response.body

        # Step 4: Group chunk hits by content_hash
        use_collapse = "collapse" in es_body
        if use_collapse:
            all_groups = self._extract_collapse_groups(raw)
        else:
            all_groups = self._aggregate_by_content_hash(raw, page_size * 10)

        score_threshold = settings.search_score_threshold
        if not llm and score_threshold > 0:
            all_groups = [
                g for g in all_groups if (g.get("score") or 0) >= score_threshold
            ]

        if use_collapse:
            content_groups = all_groups
        else:
            offset = (page - 1) * page_size
            content_groups = all_groups[offset:offset + page_size]

        # Step 5: 批量获取版本信息（优化：不再额外查询 BM25/kNN 分数，减少 4 次 ES 请求为 2 次）
        # Batch fetch version info only – eliminated separate BM25/kNN score
        # queries to reduce ES round-trips from 4 to 2.
        if content_groups:
            content_hashes = [g["content_hash"] for g in content_groups]
            versions_map = await self._fetch_versions(content_hashes, perm)
        else:
            versions_map = {}

        # Step 6: 构建最终结果（bm25_score/knn_score 保留为 None，保持接口兼容）
        # Build final results – bm25_score/knn_score kept as None for API compat.
        documents = self._build_document_results(
            content_groups, versions_map, perm,
        )
        if llm:
            documents = _filter_llm_relevance(documents)

        # Step 7: Aggregations (content-level counts)
        aggregations = self._extract_aggregations(raw)

        # Total unique content items
        if use_collapse and (llm or score_threshold <= 0):
            # BM25 collapse mode without page-local score filtering: use ES aggregation count
            total_content = (
                raw.get("aggregations", {})
                .get("total_content", {})
                .get("value", len(documents))
            )
        else:
            # Hybrid mode or page-local score-filtered BM25: use actual group count
            total_content = len(all_groups)

        return {
            "total": total_content,
            "page": page,
            "page_size": page_size,
            "documents": documents,
            "aggregations": aggregations,
        }

    # ── Query building ────────────────────────────────────────────────────

    def _build_query(
        self,
        query_text: str,
        query_vector: list[float] | None,
        perm: PermissionContext,
        filters: dict[str, Any],
        page: int,
        page_size: int,
    ) -> dict[str, Any]:
        """构建 OpenSearch 查询体：有向量时走 hybrid RRF 模式，无向量时走 BM25 + collapse 模式。

        Build the OpenSearch hybrid query with permission + facet filters."""

        combined_filter = self._build_combined_filter(perm, filters)

        search_scope = _normalize_search_scope(filters.get("search_scope"))

        query_clause = _build_text_query_clause(query_text, search_scope)

        # --- BM25 retriever ---
        bm25_query: dict[str, Any] = {
            "bool": {
                "must": [query_clause],
                "filter": [combined_filter],
            }
        }

        # --- Highlight config ---
        highlight_cfg: dict[str, Any] = {
            "fields": {
                "content": {
                    "fragment_size": 200,
                    "number_of_fragments": 3,
                    "pre_tags": ["<em>"],
                    "post_tags": ["</em>"],
                },
                "title": {
                    "number_of_fragments": 0,
                    "pre_tags": ["<em>"],
                    "post_tags": ["</em>"],
                },
            },
        }

        # --- Aggregations: content-level counts ---
        aggs: dict[str, Any] = {
            "total_content": {
                "cardinality": {"field": "content_hash"},
            },
            "by_org": {
                "terms": {"field": "issuing_org", "size": 20},
                "aggs": {"unique_content": {"cardinality": {"field": "content_hash"}}},
            },
            "by_type": {
                "terms": {"field": "doc_type", "size": 10},
                "aggs": {"unique_content": {"cardinality": {"field": "content_hash"}}},
            },
            "by_category": {
                "terms": {"field": "knowledge_category", "size": 12},
                "aggs": {"unique_content": {"cardinality": {"field": "content_hash"}}},
            },
            "by_subject_words": {
                "terms": {"field": "subject_words", "size": 20},
                "aggs": {"unique_content": {"cardinality": {"field": "content_hash"}}},
            },
            "by_year": {
                "date_histogram": {
                    "field": "publish_date",
                    "calendar_interval": "year",
                    "format": "yyyy",
                    "min_doc_count": 1,
                    "order": {"_key": "desc"},
                },
                "aggs": {"unique_content": {"cardinality": {"field": "content_hash"}}},
            },
        }

        # --- Build body based on retrieval mode ---
        if query_vector is not None and search_scope in {"all", "content"}:
            # OpenSearch hybrid query — RRF fusion via search pipeline
            # Need enough chunks to cover (page * page_size) unique documents.
            # Each document may have many chunks, so use a large multiplier.
            fetch_size = min(page * page_size * 50, 10000)
            body: dict[str, Any] = {
                "size": fetch_size,
                "query": {
                    "hybrid": {
                        "queries": [
                            bm25_query,
                            {
                                "knn": {
                                    "content_vector": {
                                        "vector": query_vector,
                                        "k": fetch_size,
                                        "filter": combined_filter,
                                    },
                                },
                            },
                        ],
                    },
                },
                "_source": {"excludes": ["content_vector"]},
                "highlight": highlight_cfg,
                "aggs": aggs,
            }
        else:
            # BM25-only mode: collapse on content_hash for true content-level paging
            body = {
                "size": page_size,
                "from": (page - 1) * page_size,
                "query": bm25_query,
                "_source": {"excludes": ["content_vector"]},
                "collapse": {
                    "field": "content_hash",
                    "inner_hits": {
                        "name": "best_chunks",
                        "size": 3,
                        "_source": False,
                        "highlight": highlight_cfg,
                    },
                },
                "aggs": aggs,
            }

        return body

    # ── Individual score queries ─────────────────────────────────────────

    @staticmethod
    def _build_combined_filter(
        perm: PermissionContext, filters: dict[str, Any],
    ) -> dict[str, Any]:
        """构建 ACL 权限过滤与业务维度（机构/类型/类目/日期/主题词）联合过滤条件。

        Build the ACL + facet combined filter clause."""
        acl_filter = perm.build_es_filter()
        facet_filters: list[dict[str, Any]] = []

        if filters.get("issuing_org"):
            val = filters["issuing_org"]
            if isinstance(val, list):
                facet_filters.append({"terms": {"issuing_org": val}})
            else:
                facet_filters.append({"term": {"issuing_org": val}})

        if filters.get("doc_type"):
            val = filters["doc_type"]
            if isinstance(val, list):
                facet_filters.append({"terms": {"doc_type": val}})
            else:
                facet_filters.append({"term": {"doc_type": val}})

        if filters.get("knowledge_category"):
            val = filters["knowledge_category"]
            if isinstance(val, list):
                facet_filters.append({"terms": {"knowledge_category": val}})
            else:
                facet_filters.append({"term": {"knowledge_category": val}})

        if filters.get("document_scene_type"):
            val = filters["document_scene_type"]
            if isinstance(val, list):
                facet_filters.append({"terms": {"document_scene_type": val}})
            else:
                facet_filters.append({"term": {"document_scene_type": val}})

        if filters.get("signer"):
            signer = str(filters["signer"]).strip()
            if signer:
                facet_filters.append({
                    "wildcard": {
                        "signer": {
                            "value": f"*{_escape_wildcard_query(signer)}*"
                        }
                    }
                })

        if filters.get("publish_year"):
            try:
                year = int(filters["publish_year"])
            except (TypeError, ValueError):
                year = None
            if year:
                facet_filters.append({
                    "range": {
                        "publish_date": {
                            "gte": f"{year}-01-01",
                            "lt": f"{year + 1}-01-01",
                        }
                    }
                })

        if filters.get("doc_number"):
            facet_filters.append({
                "wildcard": {
                    "doc_number": {
                        "value": f"*{_escape_wildcard_query(str(filters['doc_number']).strip())}*"
                    }
                }
            })

        if filters.get("date_from") or filters.get("date_to"):
            date_range: dict[str, str] = {}
            if filters.get("date_from"):
                df = filters["date_from"]
                date_range["gte"] = df.isoformat() if isinstance(df, date) else str(df)
            if filters.get("date_to"):
                dt = filters["date_to"]
                date_range["lte"] = dt.isoformat() if isinstance(dt, date) else str(dt)
            facet_filters.append({"range": {"publish_date": date_range}})

        if filters.get("subject_words"):
            # Fuzzy match: any subject_word containing the user keyword matches
            sw_clauses = [
                {
                    "wildcard": {
                        "subject_words": {
                            "value": f"*{_escape_wildcard_query(w.strip())}*"
                        }
                    }
                }
                for w in filters["subject_words"]
                if w.strip()
            ]
            if sw_clauses:
                facet_filters.append(
                    {"bool": {"should": sw_clauses, "minimum_should_match": 1}}
                )

        all_filters = [acl_filter] + facet_filters
        return (
            {"bool": {"must": all_filters}} if len(all_filters) > 1 else all_filters[0]
        )

    async def _score_bm25(
        self,
        query_text: str,
        perm: PermissionContext,
        filters: dict[str, Any],
        size: int,
    ) -> dict[str, float]:
        """Run a lightweight BM25-only query, return {content_hash: best_score}.

        内部调试用方法，不在主搜索路径中调用。
        Debug-only helper – no longer invoked in the main search path.
        """
        combined_filter = self._build_combined_filter(perm, filters)
        body: dict[str, Any] = {
            "size": size,
            "query": {
                "bool": {
                    "must": [_build_text_query_clause(query_text, filters.get("search_scope"))],
                    "filter": [combined_filter],
                }
            },
            "_source": ["content_hash"],
        }
        try:
            resp = await self._es.raw.search(index=settings.es_chunk_index, body=body)
            raw = resp if isinstance(resp, dict) else resp.body
        except Exception as e:
            logger.warning("bm25_score_query_failed", error=str(e))
            return {}

        scores: dict[str, float] = {}
        for hit in raw.get("hits", {}).get("hits", []):
            ch = hit.get("_source", {}).get("content_hash", "")
            s = hit.get("_score") or 0.0
            if ch and (ch not in scores or s > scores[ch]):
                scores[ch] = s
        return scores

    async def _score_knn(
        self,
        query_vector: list[float],
        perm: PermissionContext,
        filters: dict[str, Any],
        size: int,
    ) -> dict[str, float]:
        """Run a lightweight kNN-only query, return {content_hash: best_score}.

        内部调试用方法，不在主搜索路径中调用。
        Debug-only helper – no longer invoked in the main search path.
        """
        combined_filter = self._build_combined_filter(perm, filters)
        body: dict[str, Any] = {
            "size": size,
            "query": {
                "knn": {
                    "content_vector": {
                        "vector": query_vector,
                        "k": size,
                        "filter": combined_filter,
                    }
                }
            },
            "_source": ["content_hash"],
        }
        try:
            resp = await self._es.raw.search(index=settings.es_chunk_index, body=body)
            raw = resp if isinstance(resp, dict) else resp.body
        except Exception as e:
            logger.warning("knn_score_query_failed", error=str(e))
            return {}

        scores: dict[str, float] = {}
        for hit in raw.get("hits", {}).get("hits", []):
            ch = hit.get("_source", {}).get("content_hash", "")
            s = hit.get("_score") or 0.0
            if ch and (ch not in scores or s > scores[ch]):
                scores[ch] = s
        return scores

    # ── Result grouping ───────────────────────────────────────────────────

    def _aggregate_by_content_hash(
        self, raw_response: dict[str, Any], top_n: int,
    ) -> list[dict[str, Any]]:
        """Group chunk-level hits by content_hash (RRF mode).

        Returns list of dicts with best score, highlights, and chunk metadata
        per unique content_hash.
        """
        hits = raw_response.get("hits", {}).get("hits", [])
        groups: dict[str, dict[str, Any]] = {}

        for hit in hits:
            source = hit.get("_source", {})
            content_hash = source.get("content_hash", "")
            score = hit.get("_score", 0.0)

            highlight = hit.get("highlight", {})
            content_hl = highlight.get("content", [])
            title_hl = highlight.get("title", [])

            if content_hash not in groups:
                groups[content_hash] = {
                    "content_hash": content_hash,
                    "doc_ids": source.get("doc_ids", []),
                    "title": title_hl[0] if title_hl else source.get("title", ""),
                    "doc_number": source.get("doc_number"),
                    "knowledge_category": source.get("knowledge_category"),
                    "issuing_org": source.get("issuing_org"),
                    "doc_type": source.get("doc_type"),
                    "publish_date": source.get("publish_date"),
                    "source_system": source.get("source_system"),
                    "source_article_id": source.get("source_article_id"),
                    "source_attachment_id": source.get("source_attachment_id"),
                    "source_site_code": source.get("source_site_code"),
                    "source_target_code": source.get("source_target_code"),
                    "source_url": source.get("source_url"),
                    "source_metadata": source.get("source_metadata") or {},
                    "subject_words": source.get("subject_words") or [],
                    "score": score,
                    "highlights": [],
                    "matched_chunks": [],
                }

            group = groups[content_hash]

            if score and score > (group["score"] or 0):
                group["score"] = score

            for hl in content_hl:
                if len(group["highlights"]) < 3 and hl not in group["highlights"]:
                    group["highlights"].append(hl)

            # Capture chunk-level positional info
            if content_hl and len(group["matched_chunks"]) < 3:
                group["matched_chunks"].append({
                    "text": content_hl[0] if content_hl else "",
                    "page_number": source.get("page_number"),
                    "page_numbers": source.get("page_numbers") or [],
                    "heading_hierarchy": source.get("heading_hierarchy") or [],
                    "element_type": source.get("element_type") or "",
                    "chunk_index": source.get("chunk_index"),
                })

        return sorted(
            groups.values(),
            key=lambda g: g.get("score") or 0,
            reverse=True,
        )[:top_n]

    def _extract_collapse_groups(
        self, raw_response: dict[str, Any],
    ) -> list[dict[str, Any]]:
        """Extract content groups from a collapse-on-content_hash response."""
        hits = raw_response.get("hits", {}).get("hits", [])
        groups: list[dict[str, Any]] = []

        for hit in hits:
            source = hit.get("_source", {})
            score = hit.get("_score")

            inner_list = (
                hit.get("inner_hits", {})
                .get("best_chunks", {})
                .get("hits", {})
                .get("hits", [])
            )
            title_hl: str | None = None
            content_hls: list[str] = []
            matched_chunks: list[dict] = []
            for inner in inner_list:
                ih = inner.get("highlight", {})
                inner_src = inner.get("_source", {})
                if not title_hl and ih.get("title"):
                    title_hl = ih["title"][0]
                frags = ih.get("content", [])
                for frag in frags:
                    if len(content_hls) < 3 and frag not in content_hls:
                        content_hls.append(frag)
                if frags and len(matched_chunks) < 3:
                    matched_chunks.append({
                        "text": frags[0],
                        "page_number": inner_src.get("page_number"),
                        "page_numbers": inner_src.get("page_numbers") or [],
                        "heading_hierarchy": inner_src.get("heading_hierarchy") or [],
                        "element_type": inner_src.get("element_type") or "",
                        "chunk_index": inner_src.get("chunk_index"),
                    })

            groups.append({
                "content_hash": source.get("content_hash", ""),
                "doc_ids": source.get("doc_ids", []),
                "title": title_hl or source.get("title", ""),
                "doc_number": source.get("doc_number"),
                "knowledge_category": source.get("knowledge_category"),
                "issuing_org": source.get("issuing_org"),
                "doc_type": source.get("doc_type"),
                "publish_date": source.get("publish_date"),
                "source_system": source.get("source_system"),
                "source_article_id": source.get("source_article_id"),
                "source_attachment_id": source.get("source_attachment_id"),
                "source_site_code": source.get("source_site_code"),
                "source_target_code": source.get("source_target_code"),
                "source_url": source.get("source_url"),
                "source_metadata": source.get("source_metadata") or {},
                "subject_words": source.get("subject_words") or [],
                "score": score,
                "highlights": content_hls,
                "matched_chunks": matched_chunks,
            })

        return groups

    # ── Version resolution ────────────────────────────────────────────────

    async def _fetch_versions(
        self,
        content_hashes: list[str],
        perm: PermissionContext,
    ) -> dict[str, list[dict[str, Any]]]:
        """批量获取多个 content_hash 对应的文档版本列表，按用户权限过滤。

        Batch-fetch meta docs for content_hashes, filtered by user's ACL.

        Returns ``{content_hash: [meta_source, ...]}``.
        """
        if not content_hashes:
            return {}

        acl_filter = perm.build_es_filter()

        resp = await self._es.raw.search(
            index=settings.es_meta_index,
            body={
                "query": {
                    "bool": {
                        "must": [
                            {"terms": {"content_hash": content_hashes}},
                            {"term": {"status": "completed"}},
                        ],
                        "filter": [acl_filter],
                    }
                },
                "size": len(content_hashes) * 20,
                "_source": [
                    "doc_id", "content_hash", "title", "doc_number",
                    "issuing_org", "doc_type", "publish_date", "acl_ids",
                    "signer", "subject_words", "knowledge_category",
                    "source_system", "source_article_id", "source_attachment_id",
                    "source_site_code", "source_target_code", "source_url",
                    "source_metadata", "related_docs",
                ],
            },
        )
        raw = resp if isinstance(resp, dict) else resp.body

        versions_map: dict[str, list[dict[str, Any]]] = {}
        for hit in raw.get("hits", {}).get("hits", []):
            source = hit["_source"]
            ch = source.get("content_hash", "")
            versions_map.setdefault(ch, []).append(source)

        return versions_map

    def _build_document_results(
        self,
        content_groups: list[dict[str, Any]],
        versions_map: dict[str, list[dict[str, Any]]],
        perm: PermissionContext,
        *,
        bm25_scores: dict[str, float] | None = None,
        knn_scores: dict[str, float] | None = None,
    ) -> list[dict[str, Any]]:
        """构建最终结果：对每个 content_hash 选择权限匹配度最高的文档版本。

        Build final results with best-version selection per content_hash.

        Picks the version with the most direct permission match:
        U_ > O_ > D_ > A_ > R_
        """
        results: list[dict[str, Any]] = []

        for group in content_groups:
            content_hash = group["content_hash"]
            versions = versions_map.get(content_hash, [])

            if versions:
                # Pick the version with the most direct permission match
                best_version = min(
                    versions,
                    key=lambda v: _best_match_priority(
                        v.get("acl_ids", []), perm.acl_tokens,
                    ),
                )
                doc_id = best_version["doc_id"]
                title = group.get("title") or best_version.get("title", "")
                doc_number = best_version.get("doc_number")
                knowledge_category = best_version.get("knowledge_category")
                issuing_org = best_version.get("issuing_org")
                doc_type = best_version.get("doc_type")
                publish_date = best_version.get("publish_date")
                source_system = best_version.get("source_system")
                source_article_id = best_version.get("source_article_id")
                source_attachment_id = best_version.get("source_attachment_id")
                source_site_code = best_version.get("source_site_code")
                source_target_code = best_version.get("source_target_code")
                source_url = best_version.get("source_url")
                source_metadata = best_version.get("source_metadata") or {}
                related_docs = best_version.get("related_docs") or []
                subject_words = best_version.get("subject_words") or []
                version_count = len(versions)
            else:
                # Fallback to chunk metadata
                doc_ids = group.get("doc_ids", [])
                doc_id = doc_ids[0] if doc_ids else ""
                title = group.get("title", "")
                doc_number = group.get("doc_number")
                knowledge_category = group.get("knowledge_category")
                issuing_org = group.get("issuing_org")
                doc_type = group.get("doc_type")
                publish_date = group.get("publish_date")
                source_system = group.get("source_system")
                source_article_id = group.get("source_article_id")
                source_attachment_id = group.get("source_attachment_id")
                source_site_code = group.get("source_site_code")
                source_target_code = group.get("source_target_code")
                source_url = group.get("source_url")
                source_metadata = group.get("source_metadata") or {}
                related_docs = group.get("related_docs") or []
                subject_words = group.get("subject_words") or []
                version_count = 1

            results.append({
                "doc_id": doc_id,
                "content_hash": content_hash,
                "version_count": version_count,
                "title": title,
                "doc_number": doc_number,
                "knowledge_category": knowledge_category,
                "issuing_org": issuing_org,
                "doc_type": doc_type,
                "publish_date": publish_date,
                "source_system": source_system,
                "source_article_id": source_article_id,
                "source_attachment_id": source_attachment_id,
                "source_site_code": source_site_code,
                "source_target_code": source_target_code,
                "source_url": source_url,
                "source_metadata": source_metadata,
                "related_docs": related_docs,
                "subject_words": subject_words,
                "score": group.get("score"),
                "bm25_score": (bm25_scores or {}).get(content_hash),
                "knn_score": (knn_scores or {}).get(content_hash),
                "highlights": group.get("highlights", []),
                "matched_chunks": group.get("matched_chunks", []),
            })

        return results

    # ── Aggregation extraction ────────────────────────────────────────────

    def _extract_aggregations(
        self, raw_response: dict[str, Any],
    ) -> dict[str, list[dict[str, Any]]]:
        """提取聚合结果（按机构/类型/类目/年份），使用内容级去重计数。

        Extract aggregation buckets with content-level unique counts."""
        aggs = raw_response.get("aggregations", {})
        result: dict[str, list[dict[str, Any]]] = {}

        if "by_org" in aggs:
            result["by_org"] = [
                {
                    "key": b["key"],
                    "count": b.get("unique_content", {}).get("value", b["doc_count"]),
                }
                for b in aggs["by_org"].get("buckets", [])
            ]

        if "by_type" in aggs:
            result["by_type"] = [
                {
                    "key": b["key"],
                    "count": b.get("unique_content", {}).get("value", b["doc_count"]),
                }
                for b in aggs["by_type"].get("buckets", [])
            ]

        if "by_category" in aggs:
            result["by_category"] = [
                {
                    "key": b["key"],
                    "count": b.get("unique_content", {}).get("value", b["doc_count"]),
                }
                for b in aggs["by_category"].get("buckets", [])
                if b.get("key")
            ]

        if "by_subject_words" in aggs:
            result["by_subject_words"] = [
                {
                    "key": b["key"],
                    "count": b.get("unique_content", {}).get("value", b["doc_count"]),
                }
                for b in aggs["by_subject_words"].get("buckets", [])
                if b.get("key")
            ]

        if "by_year" in aggs:
            result["by_year"] = [
                {
                    "key": b["key_as_string"],
                    "count": b.get("unique_content", {}).get("value", b["doc_count"]),
                }
                for b in aggs["by_year"].get("buckets", [])
            ]

        return result
