"""Unit tests for the ingest pipeline's OpenSearch interactions.

These tests run against the real OpenSearch instance (via ASGI lifespan)
and directly verify that the pipeline's ES/OpenSearch calls work correctly.
They catch API-level incompatibilities (e.g. Elasticsearch `document=` vs
OpenSearch `body=` parameter naming) that higher-level HTTP tests miss
because:
  1. _write_pending_meta silently swallows exceptions
  2. Celery tasks never execute during ASGI unit tests

Run:  pytest tests/test_pipeline_unit.py -v
"""

from __future__ import annotations

import uuid
from datetime import datetime, timezone

import pytest
from httpx import AsyncClient

from app.config import settings


def _unique_id(prefix: str = "test") -> str:
    return f"{prefix}-{uuid.uuid4().hex[:12]}"


# ──────────────────────────────────────────────────────────────────────────────
# 1. raw.index() — the exact call that was broken by document= vs body=
# ──────────────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
class TestRawIndex:
    """Verify that OpenSearch raw.index(body=...) works correctly."""

    async def test_index_meta_document(self, client: AsyncClient):
        """raw.index(body=...) should successfully write a meta document.

        This is the exact call pattern used in:
          - ingest.py _write_pending_meta()
          - ingest_pipeline.py _write_doc_meta()
        The previous bug used `document=body` (Elasticsearch 8.x keyword)
        instead of `body=body` (OpenSearch), causing silent failures.
        """
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        doc_id = _unique_id("idx-test")
        now_iso = datetime.now(timezone.utc).isoformat()
        body = {
            "doc_id": doc_id,
            "content_hash": "test_hash_abc123",
            "title": "Pipeline 单元测试文档",
            "doc_number": "TEST-001",
            "issuing_org": "测试部门",
            "doc_type": "通知",
            "subject_words": ["测试"],
            "signer": "测试人",
            "publish_date": "2026-01-01",
            "acl_ids": ["A_01", "D_01"],
            "status": "pending",
            "task_id": "fake-task-id",
            "created_at": now_iso,
            "updated_at": now_iso,
        }

        # This is the exact call that was broken — must NOT raise
        await es_client.raw.index(
            index=settings.es_meta_index,
            id=doc_id,
            body=body,
        )

        # Verify it was actually written
        resp = await es_client.raw.get(
            index=settings.es_meta_index,
            id=doc_id,
        )
        raw = resp if isinstance(resp, dict) else resp.body
        source = raw["_source"]
        assert source["doc_id"] == doc_id
        assert source["title"] == "Pipeline 单元测试文档"
        assert source["status"] == "pending"
        assert source["acl_ids"] == ["A_01", "D_01"]

        # Cleanup
        await es_client.raw.delete(index=settings.es_meta_index, id=doc_id, ignore=[404])

    async def test_index_rejects_document_kwarg(self, client: AsyncClient):
        """Confirm that the old ES8 `document=` keyword is NOT accepted.

        This is the regression guard: if someone accidentally re-introduces
        `document=body`, this test will catch it immediately.
        """
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        doc_id = _unique_id("reject-test")
        body = {"doc_id": doc_id, "title": "should fail"}

        with pytest.raises(TypeError, match="document"):
            await es_client.raw.index(
                index=settings.es_meta_index,
                id=doc_id,
                document=body,  # type: ignore[call-arg]
            )


# ──────────────────────────────────────────────────────────────────────────────
# 2. raw.update() with script — used by _update_meta_status
# ──────────────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
class TestRawUpdate:
    """Verify that raw.update(body=...) scripted upsert works."""

    async def test_scripted_upsert(self, client: AsyncClient):
        """Scripted upsert should create a new document if it doesn't exist."""
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        doc_id = _unique_id("upsert-test")
        now_iso = datetime.now(timezone.utc).isoformat()

        await es_client.raw.update(
            index=settings.es_meta_index,
            id=doc_id,
            body={
                "script": {
                    "lang": "painless",
                    "source": (
                        "ctx._source.status = params.status; "
                        "ctx._source.updated_at = params.updated_at"
                    ),
                    "params": {"status": "processing", "updated_at": now_iso},
                },
                "upsert": {
                    "doc_id": doc_id,
                    "status": "processing",
                    "created_at": now_iso,
                    "updated_at": now_iso,
                },
            },
        )

        # Verify
        resp = await es_client.raw.get(index=settings.es_meta_index, id=doc_id)
        raw = resp if isinstance(resp, dict) else resp.body
        assert raw["_source"]["status"] == "processing"

        # Update again — this time it should update, not upsert
        await es_client.raw.update(
            index=settings.es_meta_index,
            id=doc_id,
            body={
                "script": {
                    "lang": "painless",
                    "source": "ctx._source.status = params.status",
                    "params": {"status": "completed"},
                },
                "upsert": {"doc_id": doc_id, "status": "completed"},
            },
        )

        resp = await es_client.raw.get(index=settings.es_meta_index, id=doc_id)
        raw = resp if isinstance(resp, dict) else resp.body
        assert raw["_source"]["status"] == "completed"

        # Cleanup
        await es_client.raw.delete(index=settings.es_meta_index, id=doc_id, ignore=[404])


# ──────────────────────────────────────────────────────────────────────────────
# 3. bulk_index_chunks — verify the opensearchpy bulk helper
# ──────────────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
class TestBulkIndex:
    """Verify that ESClient.bulk_index_chunks works with OpenSearch."""

    async def test_bulk_index_and_search(self, client: AsyncClient):
        """Bulk-indexing chunks should succeed and be searchable."""
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        content_hash = _unique_id("bulk-hash")
        chunks = []
        for i in range(3):
            chunks.append({
                "chunk_id": f"{content_hash}_chunk_{i}",
                "content_hash": content_hash,
                "doc_ids": ["bulk-test-doc"],
                "chunk_index": i,
                "content": f"这是第{i+1}段测试内容",
                # OpenSearch cosinesimil rejects zero vectors — use small non-zero
                "content_vector": [0.01 * (i + 1)] + [0.001] * (settings.embedding_dimensions - 1),
                "acl_ids": ["A_01"],
                "title": "批量测试",
                "doc_number": "BULK-001",
                "issuing_org": "测试部门",
                "doc_type": "通知",
                "subject_words": [],
                "signer": "",
                "publish_date": None,
            })

        success, errors = await es_client.bulk_index_chunks(chunks)
        assert success == 3
        assert len(errors) == 0

        # Refresh and verify
        await es_client.raw.indices.refresh(index=settings.es_chunk_index)
        resp = await es_client.raw.search(
            index=settings.es_chunk_index,
            body={"query": {"term": {"content_hash": content_hash}}},
        )
        raw = resp if isinstance(resp, dict) else resp.body
        assert raw["hits"]["total"]["value"] == 3

        # Cleanup
        await es_client.raw.delete_by_query(
            index=settings.es_chunk_index,
            body={"query": {"term": {"content_hash": content_hash}}},
        )


# ──────────────────────────────────────────────────────────────────────────────
# 4. _write_pending_meta integration test — verify it actually succeeds
# ──────────────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
class TestWritePendingMeta:
    """Verify _write_pending_meta writes successfully (not silently failing)."""

    async def test_pending_meta_actually_written(
        self,
        client: AsyncClient,
        api_prefix: str,
        auth_headers: dict,
        example_pdf_path,
    ):
        """After webhook upload, the pending meta record should exist in ES.

        Previously, _write_pending_meta silently failed due to `document=body`
        but the endpoint still returned 200. This test verifies the write
        actually happened.
        """
        import json

        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        doc_id = _unique_id("pending-meta")

        # Upload via webhook (this internally calls _write_pending_meta)
        meta = json.dumps({
            "doc_id": doc_id,
            "title": "测试 pending meta 写入",
            "acl_ids": ["A_01"],
        })
        with open(example_pdf_path, "rb") as f:
            resp = await client.post(
                f"{api_prefix}/ingest/webhook/document",
                data={"metadata": meta},
                files={"file": ("test.pdf", f, "application/pdf")},
            )
        assert resp.status_code == 200

        # Give ES a moment to refresh
        await es_client.raw.indices.refresh(index=settings.es_meta_index)

        # The key assertion: pending meta must actually exist in ES
        try:
            get_resp = await es_client.raw.get(
                index=settings.es_meta_index,
                id=doc_id,
            )
            raw = get_resp if isinstance(get_resp, dict) else get_resp.body
            source = raw["_source"]
            assert source["doc_id"] == doc_id
            # If a Celery worker is running (e.g. in Docker), it may
            # pick up the task immediately and flip status to "processing".
            # Both values prove _write_pending_meta succeeded.
            assert source["status"] in ("pending", "processing")
            assert source["title"] == "测试 pending meta 写入"
        except Exception:
            pytest.fail(
                "_write_pending_meta silently failed — the pending meta record "
                "was NOT written to ES. This likely means es.raw.index() is using "
                "an incompatible keyword argument (e.g. `document=` instead of `body=`)."
            )
        finally:
            # Cleanup
            await es_client.raw.delete(
                index=settings.es_meta_index,
                id=doc_id,
                ignore=[404],
            )


# ──────────────────────────────────────────────────────────────────────────────
# 5. OpenSearch API compatibility smoke tests
# ──────────────────────────────────────────────────────────────────────────────


@pytest.mark.asyncio
class TestOpenSearchCompatibility:
    """Smoke-test all OpenSearch client call patterns used in the codebase.

    Each test calls a real OpenSearch API method with the exact same
    parameter style as production code, ensuring no parameter naming
    mismatches exist.
    """

    async def test_index_with_body(self, client: AsyncClient):
        """OpenSearch .index(body=...) should work."""
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        doc_id = _unique_id("compat")
        await es_client.raw.index(
            index=settings.es_meta_index,
            id=doc_id,
            body={"doc_id": doc_id, "status": "test"},
        )
        await es_client.raw.delete(index=settings.es_meta_index, id=doc_id, ignore=[404])

    async def test_get_with_source_filter(self, client: AsyncClient):
        """OpenSearch .get(_source=[...]) should work."""
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        doc_id = _unique_id("compat-get")
        await es_client.raw.index(
            index=settings.es_meta_index,
            id=doc_id,
            body={"doc_id": doc_id, "title": "filter test", "status": "test"},
        )
        resp = await es_client.raw.get(
            index=settings.es_meta_index,
            id=doc_id,
            _source=["title"],
        )
        raw = resp if isinstance(resp, dict) else resp.body
        assert "title" in raw["_source"]
        await es_client.raw.delete(index=settings.es_meta_index, id=doc_id, ignore=[404])

    async def test_update_with_doc(self, client: AsyncClient):
        """OpenSearch .update(body={"doc": {...}}) should work."""
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        doc_id = _unique_id("compat-upd")
        await es_client.raw.index(
            index=settings.es_meta_index,
            id=doc_id,
            body={"doc_id": doc_id, "status": "old"},
            refresh=True,
        )
        await es_client.raw.update(
            index=settings.es_meta_index,
            id=doc_id,
            body={"doc": {"status": "new"}},
            refresh=True,
        )
        resp = await es_client.raw.get(index=settings.es_meta_index, id=doc_id)
        raw = resp if isinstance(resp, dict) else resp.body
        assert raw["_source"]["status"] == "new"
        await es_client.raw.delete(index=settings.es_meta_index, id=doc_id, ignore=[404])

    async def test_search_with_body(self, client: AsyncClient):
        """OpenSearch .search(body=...) should work."""
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        resp = await es_client.raw.search(
            index=settings.es_meta_index,
            body={"query": {"match_all": {}}, "size": 1},
        )
        raw = resp if isinstance(resp, dict) else resp.body
        assert "hits" in raw

    async def test_delete_by_query(self, client: AsyncClient):
        """OpenSearch .delete_by_query(body=...) should work."""
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        tag = _unique_id("compat-dbq")
        await es_client.raw.index(
            index=settings.es_meta_index,
            id=tag,
            body={"doc_id": tag, "status": "deleteme"},
            refresh=True,
        )
        resp = await es_client.raw.delete_by_query(
            index=settings.es_meta_index,
            body={"query": {"term": {"doc_id": tag}}},
        )
        raw = resp if isinstance(resp, dict) else resp.body
        assert raw.get("deleted", 0) >= 1

    async def test_update_by_query_with_script(self, client: AsyncClient):
        """OpenSearch .update_by_query(body=...) with Painless script should work.

        This is the exact pattern used by ESClient.recompute_chunk_acl().
        """
        from app.infrastructure.es_client import ESClient

        es_client: ESClient = client.app.state.es_client  # type: ignore[attr-defined]
        tag = _unique_id("compat-ubq")
        await es_client.raw.index(
            index=settings.es_meta_index,
            id=tag,
            body={"doc_id": tag, "status": "before"},
            refresh=True,
        )
        await es_client.raw.update_by_query(
            index=settings.es_meta_index,
            body={
                "query": {"term": {"doc_id": tag}},
                "script": {
                    "lang": "painless",
                    "source": "ctx._source.status = params.new_status",
                    "params": {"new_status": "after"},
                },
            },
            refresh=True,
        )
        resp = await es_client.raw.get(index=settings.es_meta_index, id=tag)
        raw = resp if isinstance(resp, dict) else resp.body
        assert raw["_source"]["status"] == "after"
        await es_client.raw.delete(index=settings.es_meta_index, id=tag, ignore=[404])
