"""Search endpoints – hybrid BM25 + vector search with permission filtering.

混合检索接口模块。
支持 BM25 文本检索与 kNN 向量检索的 RRF 融合排序，
内置 ACL 权限过滤、content_hash 去重、多维度 facet 筛选及结果缓存。
"""

from __future__ import annotations

import re
from typing import Annotated, Any

from fastapi import APIRouter, Depends, Request

from app.api.deps import UserContext, get_current_user, get_es_client, get_redis_client
from app.config import settings
from app.api.schemas.search import (
    DocumentResult,
    SearchRequest,
    SearchResponse,
    SuggestRequest,
    SuggestResponse,
)
from app.core.embedding import EmbeddingService
from app.core.permission import PermissionService
from app.core.search_engine import SearchEngine
from app.infrastructure.embedding_client import EmbeddingClient
from app.infrastructure.es_client import ESClient
from app.infrastructure.redis_client import RedisClient
from app.utils.logger import get_logger
from app.utils.query_cache import get_cached_search, set_cached_search

logger = get_logger(__name__)

router = APIRouter(prefix="/search", tags=["search"])

_HIGHLIGHT_EM_TAG_RE = re.compile(r"</?em\b[^>]*>", re.IGNORECASE)


def _strip_highlight_tags(value: str | None) -> str:
    if not value:
        return ""
    return _HIGHLIGHT_EM_TAG_RE.sub("", value)


def _normalize_matched_chunks(
    chunks: list[dict[str, Any]] | None,
    *,
    escape_html: bool,
) -> list[dict[str, Any]]:
    if not chunks:
        return []
    if not escape_html:
        return chunks
    return [
        {
            **chunk,
            "text": _strip_highlight_tags(str(chunk.get("text") or "")),
        }
        for chunk in chunks
    ]


def _normalize_highlights(
    highlights: list[str] | None,
    *,
    escape_html: bool,
) -> list[str]:
    if not highlights:
        return []
    if not escape_html:
        return highlights
    return [_strip_highlight_tags(str(item)) for item in highlights]


async def _get_search_engine(request: Request) -> SearchEngine:
    """Resolve the SearchEngine from app.state clients.

    Must be async so FastAPI runs it in the event loop (not a thread pool).
    This matters because EmbeddingService.__init__ creates asyncio.Semaphore,
    which requires a running event loop in Python 3.9.

    EmbeddingService 接收 Redis 实例以启用向量缓存（TTL 1h），
    避免相同查询重复调用外部 Embedding API。
    """
    es_client: ESClient = request.app.state.es_client
    embedding_client: EmbeddingClient = request.app.state.embedding_client
    redis_client: RedisClient = request.app.state.redis_client
    embedding_service = EmbeddingService(embedding_client, redis=redis_client.raw)
    return SearchEngine(es_client=es_client, embedding_service=embedding_service)


async def _get_permission_service(request: Request) -> PermissionService:
    """Resolve the PermissionService from app.state clients."""
    redis_client: RedisClient = request.app.state.redis_client
    return PermissionService(redis_client=redis_client)


@router.post("", response_model=SearchResponse)
async def hybrid_search(
    body: SearchRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    search_engine: Annotated[SearchEngine, Depends(_get_search_engine)],
    perm_service: Annotated[PermissionService, Depends(_get_permission_service)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
) -> SearchResponse:
    """Execute a hybrid (BM25 + kNN) search with RRF fusion.

    - Applies ACL permission filter (acl_ids terms match)
    - Groups results by content_hash for deduplication
    - Selects the best version per content_hash (most direct permission match)
        - Supports facet filters (issuing_org, doc_type, knowledge_category, document_scene_type,
            signer, publish_year, doc_number, date_range, subject_words)
    - Caches search results per user (TTL 2 min) for performance
    - Returns highlighted snippets, version_count, and aggregation stats
    """
    # Resolve permissions
    perm = await perm_service.resolve(user)

    logger.info(
        "search_request",
        user_id=user.user_id,
        query=body.query[:50],
        page=body.page,
    )

    # Build filters dict from request
    filters: dict[str, Any] = {}
    if body.filters:
        if body.filters.issuing_org:
            filters["issuing_org"] = body.filters.issuing_org
        if body.filters.doc_type:
            filters["doc_type"] = body.filters.doc_type
        if body.filters.knowledge_category:
            filters["knowledge_category"] = body.filters.knowledge_category
        if body.filters.document_scene_type:
            filters["document_scene_type"] = body.filters.document_scene_type
        if body.filters.signer and body.filters.signer.strip():
            filters["signer"] = body.filters.signer.strip()
        if body.filters.publish_year:
            filters["publish_year"] = body.filters.publish_year
        if body.filters.doc_number and body.filters.doc_number.strip():
            filters["doc_number"] = body.filters.doc_number.strip()
        if body.filters.date_from:
            # Convert date to string for serialization
            filters["date_from"] = (
                body.filters.date_from.isoformat()
                if hasattr(body.filters.date_from, "isoformat")
                else str(body.filters.date_from)
            )
        if body.filters.date_to:
            filters["date_to"] = (
                body.filters.date_to.isoformat()
                if hasattr(body.filters.date_to, "isoformat")
                else str(body.filters.date_to)
            )
        if body.filters.subject_words:
            filters["subject_words"] = body.filters.subject_words
        if body.filters.search_scope != "all":
            filters["search_scope"] = body.filters.search_scope

    # ── Try search result cache ──────────────────────────────────────
    from app.config import settings as _s
    cached_result = None
    if _s.search_cache_ttl > 0:
        cached_result = await get_cached_search(
            redis_client.raw,
            user.user_id,
            body.query,
            filters,
            body.page,
            page_size=body.page_size,
            llm=body.llm,
        )

    if cached_result:
        logger.debug("search_cache_hit", user_id=user.user_id, query=body.query[:40])
        result = cached_result
    else:
        # Execute hybrid search
        result = await search_engine.search(
            query=body.query,
            perm=perm,
            filters=filters,
            page=body.page,
            page_size=body.page_size,
            llm=body.llm,
        )
        # Store in cache
        if _s.search_cache_ttl > 0:
            await set_cached_search(
                redis_client.raw,
                user.user_id,
                body.query,
                filters,
                body.page,
                result,
                page_size=body.page_size,
                llm=body.llm,
            )

    # Map to response model
    documents = [
        DocumentResult(
            doc_id=doc["doc_id"],
            content_hash=doc.get("content_hash", ""),
            version_count=doc.get("version_count", 1),
            title=(
                _strip_highlight_tags(doc.get("title", ""))
                if body.escape_html
                else doc.get("title", "")
            ),
            doc_number=doc.get("doc_number"),
            knowledge_category=doc.get("knowledge_category"),
            issuing_org=doc.get("issuing_org"),
            doc_type=doc.get("doc_type"),
            publish_date=doc.get("publish_date"),
            subject_words=doc.get("subject_words") or [],
            score=doc.get("score"),
            bm25_score=doc.get("bm25_score"),
            knn_score=doc.get("knn_score"),
            highlights=_normalize_highlights(
                doc.get("highlights", []),
                escape_html=body.escape_html,
            ),
            matched_chunks=_normalize_matched_chunks(
                doc.get("matched_chunks", []),
                escape_html=body.escape_html,
            ),
        )
        for doc in result.get("documents", [])
    ]

    return SearchResponse(
        total=result.get("total", len(documents)),
        page=body.page,
        page_size=body.page_size,
        documents=documents,
        aggregations=result.get("aggregations", {}),
    )


@router.get("/filter-options")
async def filter_options(
    user: Annotated[UserContext, Depends(get_current_user)],
    request: Request,
    perm_service: Annotated[PermissionService, Depends(_get_permission_service)],
) -> dict[str, Any]:
    """Return available filter options for the search UI.

    Exposes ``knowledge_categories`` and ``doc_types`` from graph_schema.yaml,
    plus top ``subject_words`` from accessible documents, so the frontend
    doesn't hardcode common filter options.

    从 graph_schema.yaml 读取知识分类和公文种类列表，
    并从用户有权访问的已完成文档中聚合常用主题词，供前端搜索筛选面板动态渲染，避免硬编码。
    """
    from app.core.graph_schema_loader import get_schema

    schema = get_schema()
    es_client: ESClient = request.app.state.es_client
    perm = await perm_service.resolve(user)
    categories = [
        {"label": name, "value": name}
        for name in schema.knowledge_category_mapping
    ]
    doc_types = [
        {"label": dt, "value": dt}
        for dt in schema.doc_types
    ]

    subject_words: list[dict[str, str]] = []

    async def _search_subject_word_buckets(
        index: str,
        *,
        include_completed_or_missing_status: bool,
    ) -> list[dict[str, Any]]:
        query_filters: list[dict[str, Any]] = [acl_filter]
        if include_completed_or_missing_status:
            query_filters.append(
                {
                    "bool": {
                        "should": [
                            {"term": {"status": "completed"}},
                            {"bool": {"must_not": [{"exists": {"field": "status"}}]}},
                        ],
                        "minimum_should_match": 1,
                    }
                }
            )

        response = await es_client.raw.search(
            index=index,
            body={
                "size": 0,
                "query": {
                    "bool": {
                        "filter": query_filters,
                    }
                },
                "aggs": {
                    "subject_words": {
                        "terms": {
                            "field": "subject_words",
                            "size": 50,
                            "order": {"_count": "desc"},
                        }
                    }
                },
            },
        )
        raw = response if isinstance(response, dict) else response.body
        return raw.get("aggregations", {}).get("subject_words", {}).get("buckets", [])

    try:
        acl_filter = perm.build_es_filter()
        buckets = await _search_subject_word_buckets(
            settings.es_meta_index,
            include_completed_or_missing_status=True,
        )
        if not any(str(bucket.get("key") or "").strip() for bucket in buckets):
            buckets = await _search_subject_word_buckets(
                settings.es_chunk_index,
                include_completed_or_missing_status=False,
            )

        ranked_subject_words = sorted(
            (
                bucket for bucket in buckets
                if str(bucket.get("key") or "").strip()
            ),
            key=lambda bucket: (-int(bucket.get("doc_count") or 0), str(bucket.get("key") or "")),
        )[:20]
        subject_words = [
            {"label": bucket["key"], "value": bucket["key"]}
            for bucket in ranked_subject_words
        ]
    except Exception as exc:
        logger.warning("search_filter_options_subject_words_failed", error=str(exc))

    return {
        "knowledge_categories": categories,
        "doc_types": doc_types,
        "document_scene_types": [
            {"label": "标准办事指南", "value": "standard_service_guide"},
            {"label": "其他", "value": "other"},
        ],
        "subject_words": subject_words,
    }


@router.post("/suggest", response_model=SuggestResponse)
async def suggest(
    body: SuggestRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    request: Request,
) -> SuggestResponse:
    """Return query auto-complete suggestions based on document titles.

    Uses ES prefix/completion suggestion on the title field,
    filtered by the user's permissions.
    """
    es_client: ESClient = request.app.state.es_client
    perm_service = await _get_permission_service(request)
    perm = await perm_service.resolve(user)

    acl_filter = perm.build_es_filter()

    try:
        # 修复：使用配置值替代硬编码索引名，保持与其他端点一致
        response = await es_client.raw.search(
            index=settings.es_meta_index,
            body={
                "size": 0,
                "query": {
                    "bool": {
                        "must": [
                            {
                                "match_phrase_prefix": {
                                    "title": {
                                        "query": body.query,
                                        "max_expansions": 20,
                                    }
                                }
                            }
                        ],
                        "filter": [acl_filter],
                    }
                },
                "aggs": {
                    "title_suggestions": {
                        "terms": {
                            "field": "title.keyword",
                            "size": body.size,
                            "order": {"_count": "desc"},
                        }
                    }
                },
            },
        )

        raw = response if isinstance(response, dict) else response.body
        buckets = raw.get("aggregations", {}).get("title_suggestions", {}).get("buckets", [])
        suggestions = [b["key"] for b in buckets]

    except Exception as e:
        logger.warning("suggest_error", error=str(e))
        suggestions = []

    return SuggestResponse(suggestions=suggestions)
