"""Graph query service — Neo4j-backed helpers for entity search and sub-graph retrieval.

Used by:
  - ResearchEngine  (entity-based doc discovery)
  - Graph API       (frontend knowledge-graph explorer)

知识图谱查询服务模块。
提供面向业务的图谱只读查询接口，包括：
- 实体全文搜索和邻域子图获取
- 通过实体发现关联文档
- 文档推荐（同机构/同主题/同区域）
- 政策依据链和修订历史追溯
- 事项知识卡片（条件/材料/时限/办理机构）
"""

from __future__ import annotations

from typing import Any

from app.config import settings
from app.core.graph_schema_loader import get_schema
from app.infrastructure.neo4j_client import Neo4jClient
from app.utils.logger import get_logger

logger = get_logger(__name__)


class GraphQueryService:
    """Query the Neo4j knowledge graph for entities and document relationships.

    知识图谱查询服务，封装了实体搜索、文档发现、推荐、
    政策链追溯和事项卡片等只读查询操作。
    """

    def __init__(self, neo4j_client: Neo4jClient) -> None:
        self._neo4j = neo4j_client

    @staticmethod
    def _query_params(
        params: dict[str, Any] | None = None,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any]:
        query_params = dict(params or {})
        if acl_tokens is not None:
            query_params["acl_tokens"] = acl_tokens
        return query_params

    @staticmethod
    def _document_visibility_clause(
        alias: str,
        acl_tokens: list[str] | None,
    ) -> str:
        base = f"coalesce({alias}.is_placeholder, false) = false"
        if acl_tokens is None:
            return base
        if not acl_tokens:
            return f"({base} AND size(coalesce({alias}.acl_ids, [])) = 0)"
        return (
            f"({base} AND "
            f"(size(coalesce({alias}.acl_ids, [])) = 0 "
            f"OR any(token IN coalesce({alias}.acl_ids, []) WHERE token IN $acl_tokens)))"
        )

    def _path_node_visibility_clause(
        self,
        alias: str,
        acl_tokens: list[str] | None,
    ) -> str:
        return f"(NOT {alias}:Document OR {self._document_visibility_clause(alias, acl_tokens)})"

    def _visible_doc_exists_clause(
        self,
        node_alias: str,
        acl_tokens: list[str] | None,
    ) -> str:
        return (
            f"EXISTS {{ MATCH ({node_alias})-[]-(visible_doc:Document) "
            f"WHERE {self._document_visibility_clause('visible_doc', acl_tokens)} }}"
        )

    def _visible_governing_doc_exists_clause(
        self,
        matter_alias: str,
        acl_tokens: list[str] | None,
    ) -> str:
        return (
            f"EXISTS {{ MATCH (visible_doc:Document)-[:GOVERNS]->({matter_alias}) "
            f"WHERE {self._document_visibility_clause('visible_doc', acl_tokens)} }}"
        )

    @staticmethod
    def _compute_layout_positions(
        nodes: list[dict[str, Any]],
        edges: list[dict[str, Any]],
    ) -> None:
        """Use networkx spring_layout to pre-compute node x/y positions.

        Mutates each node dict in-place, adding ``x`` and ``y`` keys.
        Canvas size and repulsion strength adapt to node count:
        fewer nodes → tighter layout; many nodes → wider spread.
        """
        if not nodes:
            return
        import networkx as nx

        n = len(nodes)
        G = nx.Graph()
        node_ids = [nd["id"] for nd in nodes]
        G.add_nodes_from(node_ids)
        for e in edges:
            if e["source"] in G and e["target"] in G:
                G.add_edge(e["source"], e["target"])

        # 根据节点数自适应布局参数
        if n <= 30:
            k = 2.5 / (n ** 0.5) if n > 1 else 1.0
            width, height = 900.0, 700.0
        elif n <= 100:
            k = 3.5 / (n ** 0.5)
            width, height = 1400.0, 1100.0
        else:
            k = 5.0 / (n ** 0.5)
            width, height = 2000.0, 1600.0

        pos = nx.spring_layout(G, k=k, iterations=100, seed=42)

        # Scale to canvas dimensions with padding; convert to native float for JSON
        pad = 50.0
        for nd in nodes:
            xy = pos.get(nd["id"])
            if xy is not None:
                nd["x"] = float(pad + (xy[0] + 1) / 2 * (width - 2 * pad))
                nd["y"] = float(pad + (xy[1] + 1) / 2 * (height - 2 * pad))

    # ==================================================================
    # Entity search
    # ==================================================================

    async def search_entities(
        self,
        name: str,
        *,
        label: str | None = None,
        limit: int = 20,
        acl_tokens: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Fulltext-search entity nodes by name.

        Parameters
        ----------
        name:
            Search term (partial match via CONTAINS).
        label:
            Optional node label to restrict the search.
        limit:
            Maximum number of results.

        Returns
        -------
        list of dicts with ``id``, ``labels``, ``properties``.
        """
        # Fulltext index mapping (label → Neo4j fulltext index name)
        _fulltext_indexes = {
            "Organization": "org_fulltext",
            "Matter": "matter_fulltext",
            "Policy": "policy_fulltext",
            "Task": "task_fulltext",
            "Project": "project_fulltext",
            "System": "system_fulltext",
            "DataResource": "dataresource_fulltext",
            "Budget": "budget_fulltext",
            "Indicator": "indicator_fulltext",
            "Industry": "industry_fulltext",
        }

        if label:
            # Validate label against known entity types to prevent injection
            # Use unfiltered set to include phase_3 types
            schema = get_schema()
            valid_labels = schema.all_entity_type_names_unfiltered()
            if label not in valid_labels:
                return []

            # Use fulltext index when available for the specified label
            ft_index = _fulltext_indexes.get(label)
            if ft_index:
                cypher = (
                    f"CALL db.index.fulltext.queryNodes('{ft_index}', $name) "
                    "YIELD node AS n, score "
                    f"WHERE {self._visible_doc_exists_clause('n', acl_tokens)} "
                    "RETURN elementId(n) AS eid, labels(n) AS lbs, properties(n) AS props "
                    "LIMIT $limit"
                )
            else:
                cypher = (
                    f"MATCH (n:{label}) "
                    "WHERE n.name CONTAINS $name "
                    f"AND {self._visible_doc_exists_clause('n', acl_tokens)} "
                    "RETURN elementId(n) AS eid, labels(n) AS lbs, properties(n) AS props "
                    "LIMIT $limit"
                )
        else:
            # Build OR clause dynamically from schema config (unfiltered for all phases)
            schema = get_schema()
            entity_names = sorted(schema.all_entity_type_names_unfiltered())
            or_clause = " OR ".join(f"n:{en}" for en in entity_names)
            cypher = (
                "MATCH (n) "
                f"WHERE ({or_clause}) "
                "AND n.name CONTAINS $name "
                f"AND {self._visible_doc_exists_clause('n', acl_tokens)} "
                "RETURN elementId(n) AS eid, labels(n) AS lbs, properties(n) AS props "
                "LIMIT $limit"
            )

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"name": name, "limit": limit}, acl_tokens),
            )
            records = await result.data()

        entities = []
        for rec in records:
            entities.append(
                {
                    "id": rec["eid"],
                    "labels": list(rec["lbs"]),
                    "properties": dict(rec["props"] or {}),
                }
            )
        return entities

    async def get_entity(
        self,
        entity_id: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a single entity node only when it is attached to visible documents."""
        cypher = (
            "MATCH (n) "
            "WHERE elementId(n) = $eid "
            "AND NOT n:Document "
            f"AND {self._visible_doc_exists_clause('n', acl_tokens)} "
            "RETURN elementId(n) AS eid, labels(n) AS lbs, properties(n) AS props "
            "LIMIT 1"
        )
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"eid": entity_id}, acl_tokens),
            )
            record = await result.single()

        if not record:
            return None
        return {
            "id": record["eid"],
            "labels": list(record["lbs"]),
            "properties": dict(record["props"] or {}),
        }

    # ==================================================================
    # Document discovery via entities
    # ==================================================================

    async def get_docs_by_entity(
        self,
        entity_name: str,
        entity_label: str,
        *,
        limit: int = 30,
        acl_tokens: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Return all documents connected to an entity (any relationship).

        Returns
        -------
        list of dicts with ``doc_id``, ``title``, ``doc_number``,
        ``rel_type`` (relationship type that connects them).
        """
        if entity_label == "Document":
            match_clause = "(e:Document {doc_id: $name})"
            where_clause = (
                f"{self._document_visibility_clause('e', acl_tokens)} "
                f"AND {self._document_visibility_clause('d', acl_tokens)}"
            )
        else:
            # Validate label against known entity types to prevent injection
            # Use unfiltered set to include phase_3 types
            schema = get_schema()
            valid_labels = schema.all_entity_type_names_unfiltered()
            if entity_label not in valid_labels:
                return []
            match_clause = f"(e:{entity_label} {{name: $name}})"
            where_clause = self._document_visibility_clause("d", acl_tokens)

        cypher = (
            f"MATCH {match_clause}-[r]-(d:Document) "
            f"WHERE {where_clause} "
            "RETURN d.doc_id AS doc_id, d.title AS title, "
            "coalesce(d.doc_code, d.doc_number, '') AS doc_number, type(r) AS rel_type "
            "ORDER BY d.publish_date DESC "
            "LIMIT $limit"
        )
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"name": entity_name, "limit": limit}, acl_tokens),
            )
            records = await result.data()

        return [
            {
                "doc_id": r["doc_id"],
                "title": r["title"] or "",
                "doc_number": r["doc_number"] or "",
                "rel_type": r["rel_type"],
            }
            for r in records
            if r["doc_id"]
        ]

    async def find_doc_by_code(
        self,
        doc_code: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> str | None:
        """Resolve a document code to its doc_id for query planning."""
        code = doc_code.strip()
        if not code:
            return None

        cypher = """
        MATCH (d:Document)
        WHERE (
            coalesce(d.doc_code, '') CONTAINS $code
            OR coalesce(d.doc_number, '') CONTAINS $code
        )
          AND __DOC_VISIBILITY__
        RETURN d.doc_id AS doc_id
        ORDER BY CASE WHEN coalesce(d.doc_code, d.doc_number, '') = $code THEN 0 ELSE 1 END,
                 d.publish_date DESC
        LIMIT 1
        """
        cypher = cypher.replace(
            "__DOC_VISIBILITY__",
            self._document_visibility_clause("d", acl_tokens),
        )

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"code": code}, acl_tokens),
            )
            record = await result.single()

        return record["doc_id"] if record and record.get("doc_id") else None

    async def search_graph_for_docs(
        self,
        keywords: list[str],
        *,
        limit_per_keyword: int = 10,
        acl_tokens: list[str] | None = None,
    ) -> list[str]:
        """Find doc_ids connected to any entity whose name contains a keyword.

        Parameters
        ----------
        keywords:
            List of search terms.
        limit_per_keyword:
            Maximum documents per keyword.

        Returns
        -------
        Deduplicated list of ``doc_id`` strings.
        """
        if not keywords:
            return []

        # Build entity label filter dynamically from schema config (unfiltered)
        schema = get_schema()
        entity_names = sorted(schema.all_entity_type_names_unfiltered())
        or_clause = " OR ".join(f"e:{en}" for en in entity_names)

        cypher = f"""
        UNWIND $keywords AS kw
        MATCH (e)-[r]-(d:Document)
        WHERE ({or_clause})
          AND e.name CONTAINS kw
                    AND {self._document_visibility_clause('d', acl_tokens)}
        RETURN DISTINCT d.doc_id AS doc_id
        LIMIT $limit
        """
        total_limit = limit_per_keyword * len(keywords)
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params(
                    {"keywords": keywords, "limit": total_limit},
                    acl_tokens,
                ),
            )
            records = await result.data()

        doc_ids = [r["doc_id"] for r in records if r.get("doc_id")]
        # Deduplicate while preserving order
        return list(dict.fromkeys(doc_ids))

    # ==================================================================
    # Document entity context
    # ==================================================================

    async def get_document_entities(
        self,
        doc_id: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, list[dict[str, Any]]]:
        """Return all entities directly connected to a document, grouped by label.

        Returns
        -------
        dict mapping entity label -> list of entity dicts (id, name, properties).
        """
        cypher = f"""
        MATCH (d:Document {{doc_id: $doc_id}})-[r]-(e)
        WHERE {self._document_visibility_clause('d', acl_tokens)}
          AND NOT e:Document
        RETURN labels(e) AS labels, e.name AS name, elementId(e) AS eid,
               type(r) AS rel_type, properties(e) AS props
        ORDER BY labels(e)[0], e.name
        """
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"doc_id": doc_id}, acl_tokens),
            )
            records = await result.data()

        grouped: dict[str, list[dict[str, Any]]] = {}
        for rec in records:
            label = rec["labels"][0] if rec["labels"] else "Unknown"
            if label not in grouped:
                grouped[label] = []
            grouped[label].append(
                {
                    "id": rec["eid"],
                    "name": rec["name"] or "",
                    "rel_type": rec["rel_type"],
                    "properties": rec["props"] or {},
                }
            )
        return grouped

    # ==================================================================
    # Sub-graph for graph explorer
    # ==================================================================

    async def get_entity_neighborhood(
        self,
        entity_id: str,
        *,
        depth: int = 2,
        max_nodes: int = 100,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any]:
        """Return the subgraph neighbourhood of an entity node.

        Parameters
        ----------
        entity_id:
            Neo4j element ID of the start node.
        depth:
            BFS depth from the start node.
        max_nodes:
            Hard limit on the number of nodes returned.

        Returns
        -------
        ``{"nodes": [...], "edges": [...]}``
        """
        # Return scalar values from Cypher to avoid Node/Relationship object issues
        # when the async driver serialises them via result.single().
        # NOTE: Neo4j does not support parameters in variable-length path ranges
        # [*1..$depth] or list slices [..$max_nodes] — embed them as literals.
        depth_safe = max(1, min(int(depth), 5))        # cap at 5 hops
        max_nodes_safe = max(1, int(max_nodes))
        cypher = (
            f"MATCH path = (start)-[*1..{depth_safe}]-(neighbor) "
            "WHERE elementId(start) = $eid "
            f"AND all(node IN nodes(path) WHERE {self._path_node_visibility_clause('node', acl_tokens)}) "
            "WITH path LIMIT 5000 "
            "UNWIND relationships(path) AS rel "
            "WITH DISTINCT rel "
            "WITH "
            "  collect(DISTINCT {eid: elementId(startNode(rel)), lbs: labels(startNode(rel)), props: properties(startNode(rel))}) "
            "  + collect(DISTINCT {eid: elementId(endNode(rel)),   lbs: labels(endNode(rel)),   props: properties(endNode(rel))}) "
            f"  AS all_nodes, "
            "  collect(DISTINCT {src: elementId(startNode(rel)), tgt: elementId(endNode(rel)), "
            "                    typ: type(rel), props: properties(rel)}) AS all_rels "
            f"RETURN all_nodes[..{max_nodes_safe}] AS nodes, all_rels AS rels"
        )
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"eid": entity_id}, acl_tokens),
            )
            record = await result.single()

        if not record:
            return {"nodes": [], "edges": []}

        # Deduplicate nodes by eid
        seen_nodes: set[str] = set()
        nodes = []
        for n in (record["nodes"] or []):
            if n["eid"] not in seen_nodes:
                seen_nodes.add(n["eid"])
                nodes.append({
                    "id": n["eid"],
                    "labels": list(n["lbs"]),
                    "properties": dict(n["props"] or {}),
                })

        seen_edges: set[tuple] = set()
        edges = []
        for r in (record["rels"] or []):
            key = (r["src"], r["tgt"], r["typ"])
            if key not in seen_edges:
                seen_edges.add(key)
                edges.append({
                    "source": r["src"],
                    "target": r["tgt"],
                    "type": r["typ"],
                    "properties": dict(r["props"] or {}),
                })
        return {"nodes": nodes, "edges": edges}

    async def get_docs_overview(
        self,
        *,
        label: str | None = None,
        entity_name: str | None = None,
        limit: int = 200,
        cursor: str | None = None,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any]:
        """Return a graph overview – documents + their direct entity neighbours.

        When ``entity_name`` + ``label`` are provided, shows documents connected
        to that specific entity, plus all entities adjacent to those documents.
        Otherwise returns the most-connected documents and their entity neighbours.

        Supports offset-based cursor pagination for progressive loading.

        Returns
        -------
        ``{"nodes": [...], "edges": [...], "next_cursor": "...", "total_count": N}``
        suitable for the G6 canvas.
        """
        skip = int(cursor) if cursor else 0

        if entity_name and label:
            schema = get_schema()
            valid_labels = schema.all_entity_type_names_unfiltered()
            label = label if label in valid_labels else next(iter(valid_labels), "Organization")
            # Find anchor entity + its documents, then expand each document's
            # direct (non-Document) entity neighbours.
            cypher = (
                f"MATCH (anchor:{label} {{name: $name}})-[r0]-(d:Document) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "WITH anchor, d, r0 "
                "ORDER BY d.publish_date DESC "
                "SKIP $skip LIMIT $limit "
                "OPTIONAL MATCH (d)-[r1]-(e) WHERE NOT e:Document "
                "RETURN "
                "  elementId(anchor) AS anchor_eid, labels(anchor) AS anchor_lbs, properties(anchor) AS anchor_props, "
                "  elementId(d) AS doc_eid, labels(d) AS doc_lbs, properties(d) AS doc_props, "
                "  elementId(startNode(r0)) AS r0_src, elementId(endNode(r0)) AS r0_tgt, type(r0) AS r0_type, "
                "  elementId(e)            AS ent_eid, labels(e)   AS ent_lbs,   properties(e)   AS ent_props, "
                "  elementId(startNode(r1)) AS r1_src, elementId(endNode(r1)) AS r1_tgt, type(r1) AS r1_type"
            )
            # Count query
            count_cypher = (
                f"MATCH (anchor:{label} {{name: $name}})-[]-(d:Document) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN count(DISTINCT d) AS total"
            )
            params: dict[str, Any] = self._query_params(
                {"name": entity_name, "limit": limit, "skip": skip},
                acl_tokens,
            )
            count_params: dict[str, Any] = self._query_params(
                {"name": entity_name},
                acl_tokens,
            )
        else:
            # Top-N most-connected documents + their direct entity neighbours
            cypher = (
                "MATCH (d:Document)-[r0]-() "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "WITH d, count(r0) AS degree "
                "ORDER BY degree DESC "
                "SKIP $skip LIMIT $limit "
                "OPTIONAL MATCH (d)-[r1]-(e) WHERE NOT e:Document "
                "RETURN "
                "  elementId(d) AS doc_eid, labels(d) AS doc_lbs, properties(d) AS doc_props, degree, "
                "  elementId(e)            AS ent_eid, labels(e)   AS ent_lbs,   properties(e)   AS ent_props, "
                "  elementId(startNode(r1)) AS r1_src, elementId(endNode(r1)) AS r1_tgt, type(r1) AS r1_type"
            )
            # Count query
            count_cypher = (
                "MATCH (d:Document)-[r0]-() "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "WITH d, count(r0) AS degree "
                "RETURN count(d) AS total"
            )
            params = self._query_params({"limit": limit, "skip": skip}, acl_tokens)
            count_params = self._query_params({}, acl_tokens)

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(cypher, **params)
            records = await result.data()
            # Run count query for pagination metadata
            count_result = await session.run(count_cypher, **count_params)
            count_record = await count_result.single()

        total_count = count_record["total"] if count_record else 0

        seen_nodes: set[str] = set()
        seen_edges: set[tuple[str, str, str]] = set()
        nodes: list[dict[str, Any]] = []
        edges: list[dict[str, Any]] = []

        for rec in records:
            # ── Document node ──────────────────────────────────────────
            doc_eid = rec.get("doc_eid")
            if doc_eid and doc_eid not in seen_nodes:
                seen_nodes.add(doc_eid)
                nodes.append({
                    "id": doc_eid,
                    "labels": list(rec["doc_lbs"] or []),
                    "properties": dict(rec["doc_props"] or {}),
                    "degree": rec.get("degree"),
                })

            # ── Anchor entity node (entity_name filter only) ───────────
            anchor_eid = rec.get("anchor_eid")
            if anchor_eid and anchor_eid not in seen_nodes:
                seen_nodes.add(anchor_eid)
                nodes.append({
                    "id": anchor_eid,
                    "labels": list(rec["anchor_lbs"] or []),
                    "properties": dict(rec["anchor_props"] or {}),
                })

            # ── Anchor ↔ Document edge (entity_name filter only) ───────
            r0_src = rec.get("r0_src")
            r0_tgt = rec.get("r0_tgt")
            r0_type = rec.get("r0_type")
            if r0_src and r0_tgt and r0_type:
                ekey: tuple[str, str, str] = (r0_src, r0_tgt, r0_type)
                if ekey not in seen_edges:
                    seen_edges.add(ekey)
                    edges.append({"source": r0_src, "target": r0_tgt, "type": r0_type})

            # ── Neighbour entity node ──────────────────────────────────
            ent_eid = rec.get("ent_eid")
            if ent_eid and ent_eid not in seen_nodes:
                seen_nodes.add(ent_eid)
                nodes.append({
                    "id": ent_eid,
                    "labels": list(rec["ent_lbs"] or []),
                    "properties": dict(rec["ent_props"] or {}),
                })

            # ── Document ↔ Entity edge ─────────────────────────────────
            r1_src = rec.get("r1_src")
            r1_tgt = rec.get("r1_tgt")
            r1_type = rec.get("r1_type")
            if r1_src and r1_tgt and r1_type:
                ekey = (r1_src, r1_tgt, r1_type)
                if ekey not in seen_edges:
                    seen_edges.add(ekey)
                    edges.append({"source": r1_src, "target": r1_tgt, "type": r1_type})

        # Pagination: compute next cursor
        next_offset = skip + limit
        next_cursor = str(next_offset) if next_offset < total_count else None

        # 后端预计算布局坐标，前端直接使用 preset 布局，避免大图前端卡顿
        self._compute_layout_positions(nodes, edges)

        return {
            "nodes": nodes,
            "edges": edges,
            "next_cursor": next_cursor,
            "total_count": total_count,
        }

    # ==================================================================
    # Document recommendations & governance chains (Phase 0)
    # ==================================================================

    async def get_document_recommendations(
        self,
        doc_id: str,
        *,
        limit: int = 20,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any]:
        """Return documents related to the given document through shared entities.

        Finds documents that share Organization, Region, or PolicyTheme
        with the given document.

        Returns
        -------
        dict with ``same_org``, ``same_theme``, ``same_region`` lists,
        each containing ``doc_id``, ``title``, ``doc_code``, ``publish_date``,
        ``shared_entity``.

        使用 asyncio.gather 并行执行三条 Cypher 查询，减少总耗时。
        """
        import asyncio

        # Common filter: exclude placeholders, repealed docs, and self
        _filter = (
            f"WHERE {self._document_visibility_clause('d', acl_tokens)}"
            " AND other.doc_id <> $doc_id"
            f" AND {self._document_visibility_clause('other', acl_tokens)}"
            " AND coalesce(other.status, '') <> '已废止'"
        )

        # Same organization (via ISSUED_BY)
        cypher_org = f"""
        MATCH (d:Document {{doc_id: $doc_id}})-[:ISSUED_BY]->(org:Organization)<-[:ISSUED_BY]-(other:Document)
        {_filter}
        RETURN other.doc_id AS doc_id, other.title AS title,
               other.doc_code AS doc_code, other.publish_date AS publish_date,
               org.name AS shared_entity, other.status AS status
        ORDER BY other.publish_date DESC
        LIMIT $limit
        """

        # Same theme (via BELONGS_TO_THEME)
        cypher_theme = f"""
        MATCH (d:Document {{doc_id: $doc_id}})-[:BELONGS_TO_THEME]->(t:PolicyTheme)<-[:BELONGS_TO_THEME]-(other:Document)
        {_filter}
        RETURN other.doc_id AS doc_id, other.title AS title,
               other.doc_code AS doc_code, other.publish_date AS publish_date,
               t.name AS shared_entity, other.status AS status
        ORDER BY other.publish_date DESC
        LIMIT $limit
        """

        # Same region (via APPLIES_TO_REGION)
        cypher_region = f"""
        MATCH (d:Document {{doc_id: $doc_id}})-[:APPLIES_TO_REGION]->(r:Region)<-[:APPLIES_TO_REGION]-(other:Document)
        {_filter}
        RETURN other.doc_id AS doc_id, other.title AS title,
               other.doc_code AS doc_code, other.publish_date AS publish_date,
               r.name AS shared_entity, other.status AS status
        ORDER BY other.publish_date DESC
        LIMIT $limit
        """

        params = self._query_params({"doc_id": doc_id, "limit": limit}, acl_tokens)

        def _parse_records(records: list[dict]) -> list[dict[str, Any]]:
            """将 Cypher 查询结果解析为标准化字典列表。"""
            return [
                {
                    "doc_id": r["doc_id"],
                    "title": r["title"] or "",
                    "doc_code": r["doc_code"] or "",
                    "publish_date": r["publish_date"] or "",
                    "shared_entity": r["shared_entity"] or "",
                    "status": r.get("status") or "",
                }
                for r in records
                if r["doc_id"]
            ]

        async def _run_query(cypher: str) -> list[dict[str, Any]]:
            """在独立会话中执行单条 Cypher 查询。"""
            async with self._neo4j.driver.session(
                database=settings.neo4j_database
            ) as session:
                res = await session.run(cypher, **params)
                records = await res.data()
                return _parse_records(records)

        # 并行执行三条查询，减少总延迟
        org_results, theme_results, region_results = await asyncio.gather(
            _run_query(cypher_org),
            _run_query(cypher_theme),
            _run_query(cypher_region),
        )

        return {
            "same_org": org_results,
            "same_theme": theme_results,
            "same_region": region_results,
        }

    async def get_policy_chain(
        self,
        doc_id: str,
        *,
        max_depth: int = 5,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any]:
        """Traverse BASED_ON relationships to find the policy dependency chain.

        Returns
        -------
        dict with ``chain`` (list of document dicts in dependency order)
        and ``edges`` (list of relationship dicts).
        """
        depth_safe = max(1, min(int(max_depth), 10))

        cypher = f"""
        MATCH path = (d:Document {{doc_id: $doc_id}})-[:BASED_ON*1..{depth_safe}]->(ancestor:Document)
        WHERE all(node IN nodes(path) WHERE {self._document_visibility_clause('node', acl_tokens)})
        UNWIND nodes(path) AS n
        WITH DISTINCT n
        RETURN n.doc_id AS doc_id, n.title AS title, n.doc_code AS doc_code,
               n.publish_date AS publish_date, n.status AS status,
               coalesce(n.is_placeholder, false) AS is_placeholder
        ORDER BY n.publish_date ASC
        """

        cypher_edges = f"""
        MATCH path = (d:Document {{doc_id: $doc_id}})-[:BASED_ON*1..{depth_safe}]->(ancestor:Document)
        WHERE all(node IN nodes(path) WHERE {self._document_visibility_clause('node', acl_tokens)})
        UNWIND relationships(path) AS rel
        WITH DISTINCT rel
        RETURN startNode(rel).doc_id AS from_doc_id,
               endNode(rel).doc_id AS to_doc_id,
               type(rel) AS rel_type
        """

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                cypher,
                **self._query_params({"doc_id": doc_id}, acl_tokens),
            )
            chain_records = await res.data()

            res2 = await session.run(
                cypher_edges,
                **self._query_params({"doc_id": doc_id}, acl_tokens),
            )
            edge_records = await res2.data()

        chain = [
            {
                "doc_id": r["doc_id"],
                "title": r["title"] or "",
                "doc_code": r["doc_code"] or "",
                "publish_date": r["publish_date"] or "",
                "status": r["status"] or "",
                "is_placeholder": r["is_placeholder"],
            }
            for r in chain_records
            if r["doc_id"]
        ]

        edges = [
            {
                "from_doc_id": r["from_doc_id"],
                "to_doc_id": r["to_doc_id"],
                "rel_type": r["rel_type"],
            }
            for r in edge_records
            if r["from_doc_id"] and r["to_doc_id"]
        ]

        return {"chain": chain, "edges": edges}

    async def get_revision_history(
        self,
        doc_id: str,
        *,
        max_depth: int = 5,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any]:
        """Find the amendment and repeal history of a document.

        Traverses AMENDS and REPEALS relationships in both directions
        to construct the full revision timeline.

        Returns
        -------
        dict with ``documents`` (list) and ``edges`` (list with ``rel_type``
        being AMENDS or REPEALS).
        """
        depth_safe = max(1, min(int(max_depth), 10))

        cypher = f"""
        MATCH path = (d:Document {{doc_id: $doc_id}})-[:AMENDS|REPEALS*1..{depth_safe}]-(related:Document)
        WHERE all(node IN nodes(path) WHERE {self._document_visibility_clause('node', acl_tokens)})
        UNWIND nodes(path) AS n
        WITH DISTINCT n
        RETURN n.doc_id AS doc_id, n.title AS title, n.doc_code AS doc_code,
               n.publish_date AS publish_date, n.status AS status,
               coalesce(n.is_placeholder, false) AS is_placeholder
        ORDER BY n.publish_date ASC
        """

        cypher_edges = f"""
        MATCH path = (d:Document {{doc_id: $doc_id}})-[:AMENDS|REPEALS*1..{depth_safe}]-(related:Document)
        WHERE all(node IN nodes(path) WHERE {self._document_visibility_clause('node', acl_tokens)})
        UNWIND relationships(path) AS rel
        WITH DISTINCT rel
        RETURN startNode(rel).doc_id AS from_doc_id,
               endNode(rel).doc_id AS to_doc_id,
               type(rel) AS rel_type
        """

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                cypher,
                **self._query_params({"doc_id": doc_id}, acl_tokens),
            )
            doc_records = await res.data()

            res2 = await session.run(
                cypher_edges,
                **self._query_params({"doc_id": doc_id}, acl_tokens),
            )
            edge_records = await res2.data()

        documents = [
            {
                "doc_id": r["doc_id"],
                "title": r["title"] or "",
                "doc_code": r["doc_code"] or "",
                "publish_date": r["publish_date"] or "",
                "status": r["status"] or "",
                "is_placeholder": r["is_placeholder"],
            }
            for r in doc_records
            if r["doc_id"]
        ]

        edges = [
            {
                "from_doc_id": r["from_doc_id"],
                "to_doc_id": r["to_doc_id"],
                "rel_type": r["rel_type"],
            }
            for r in edge_records
            if r["from_doc_id"] and r["to_doc_id"]
        ]

        return {"documents": documents, "edges": edges}

    # ==================================================================
    # Matter queries (Phase 1)
    # ==================================================================

    async def search_matters(
        self,
        query: str,
        *,
        limit: int = 20,
        acl_tokens: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Search Matter nodes by name (CONTAINS), returning summary results.

        Results are sorted by name ASC for stable ordering.
        Each result includes ``matter_id`` for detail navigation.
        """
        cypher = """
        MATCH (m:Matter)
        WHERE m.name CONTAINS $search_term
        OPTIONAL MATCH (m)<-[:GOVERNS]-(d:Document)
                WHERE __DOC_VISIBILITY__
        WITH m, collect(DISTINCT d.doc_id)[..3] AS sample_ids
                WHERE size(sample_ids) > 0
        RETURN
          coalesce(m.matter_id, '') AS matter_id,
          m.name AS name,
          coalesce(m.description, '') AS description,
          coalesce(m.matter_type, '') AS matter_type,
          coalesce(m.status, '') AS status,
          sample_ids AS sample_doc_ids
        ORDER BY m.name ASC
        LIMIT $limit
        """
        cypher = cypher.replace(
            "__DOC_VISIBILITY__",
            self._document_visibility_clause("d", acl_tokens),
        )

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"search_term": query, "limit": limit}, acl_tokens),
            )
            records = await result.data()

        return [
            {
                "matter_id": r["matter_id"],
                "name": r["name"] or "",
                "description": r["description"],
                "matter_type": r["matter_type"],
                "status": r["status"],
                "sample_doc_ids": [x for x in (r["sample_doc_ids"] or []) if x],
            }
            for r in records
            if r["name"]
        ]

    async def get_governed_matters(
        self,
        doc_id: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Return Matter nodes directly governed by a document."""
        if not doc_id:
            return []

        cypher = (
            "MATCH (d:Document {doc_id: $doc_id})-[:GOVERNS]->(m:Matter) "
            f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
            "RETURN coalesce(m.matter_id, '') AS matter_id, "
            "coalesce(m.name, '') AS name "
            "ORDER BY m.name ASC"
        )

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"doc_id": doc_id}, acl_tokens),
            )
            records = await result.data()

        return [
            {
                "matter_id": r["matter_id"],
                "name": r["name"] or "",
            }
            for r in records
            if r.get("matter_id") or r.get("name")
        ]

    async def get_matter_card(
        self,
        matter_id: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a complete matter knowledge card using split queries.

        Returns None if the matter_id is not found.
        """
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            # ── Matter basic info ──
            res = await session.run(
                f"MATCH (m:Matter {{matter_id: $mid}}) "
                f"WHERE {self._visible_governing_doc_exists_clause('m', acl_tokens)} "
                "RETURN m.matter_id AS matter_id, m.name AS name, "
                "coalesce(m.description, '') AS description, "
                "coalesce(m.matter_type, '') AS matter_type, "
                "coalesce(m.status, '') AS status",
                **self._query_params({"mid": matter_id}, acl_tokens),
            )
            matter_rec = await res.single()
            if not matter_rec:
                return None

            card: dict[str, Any] = {
                "matter_id": matter_rec["matter_id"],
                "name": matter_rec["name"],
                "description": matter_rec["description"],
                "matter_type": matter_rec["matter_type"],
                "status": matter_rec["status"],
            }

            # ── Conditions ──
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:HAS_CONDITION]->(c:Condition) "
                "RETURN properties(c) AS props ORDER BY c.name",
                mid=matter_id,
            )
            card["conditions"] = [dict(r["props"]) for r in await res.data()]

            # ── Materials ──
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:REQUIRES_MATERIAL]->(mat:Material) "
                "RETURN properties(mat) AS props ORDER BY mat.name",
                mid=matter_id,
            )
            card["materials"] = [dict(r["props"]) for r in await res.data()]

            # ── Time limits ──
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:HAS_TIME_LIMIT]->(tl:TimeLimit) "
                "RETURN properties(tl) AS props ORDER BY tl.name",
                mid=matter_id,
            )
            card["time_limits"] = [dict(r["props"]) for r in await res.data()]

            # ── Target groups ──
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:APPLIES_TO_TARGET]->(tg:TargetGroup) "
                "RETURN properties(tg) AS props ORDER BY tg.name",
                mid=matter_id,
            )
            card["target_groups"] = [dict(r["props"]) for r in await res.data()]

            # ── Handled by (Organization) ──
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:HANDLED_BY]->(org:Organization) "
                "RETURN properties(org) AS props ORDER BY org.name",
                mid=matter_id,
            )
            card["handled_by"] = [dict(r["props"]) for r in await res.data()]

            # ── Governing documents (exclude placeholders) ──
            res = await session.run(
                "MATCH (d:Document)-[:GOVERNS]->(m:Matter {matter_id: $mid}) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN d.doc_id AS doc_id, coalesce(d.title, '') AS title, "
                "coalesce(d.doc_code, '') AS doc_code, "
                "coalesce(d.publish_date, '') AS publish_date, "
                "coalesce(d.status, '') AS status "
                "ORDER BY "
                "  CASE WHEN d.status IN ['有效', '部分失效'] THEN 0 ELSE 1 END, "
                "  d.publish_date DESC, d.title",
                **self._query_params({"mid": matter_id}, acl_tokens),
            )
            card["governing_docs"] = [dict(r) for r in await res.data()]

            # ── Regions ──
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:APPLIES_TO_REGION]->(r:Region) "
                "RETURN properties(r) AS props ORDER BY r.name",
                mid=matter_id,
            )
            card["regions"] = [dict(r["props"]) for r in await res.data()]

        return card

    async def get_matter_requirements(
        self,
        matter_id: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return conditions, materials, and time limits for a matter.

        Returns None if the matter_id is not found.
        Results are sorted by name for stable ordering.
        """
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            # Check matter exists
            res = await session.run(
                f"MATCH (m:Matter {{matter_id: $mid}}) "
                f"WHERE {self._visible_governing_doc_exists_clause('m', acl_tokens)} "
                "RETURN m.matter_id AS matter_id, m.name AS name",
                **self._query_params({"mid": matter_id}, acl_tokens),
            )
            matter_rec = await res.single()
            if not matter_rec:
                return None

            result: dict[str, Any] = {
                "matter_id": matter_rec["matter_id"],
                "name": matter_rec["name"],
            }

            # Conditions
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:HAS_CONDITION]->(c:Condition) "
                "RETURN properties(c) AS props ORDER BY c.name",
                mid=matter_id,
            )
            result["conditions"] = [dict(r["props"]) for r in await res.data()]

            # Materials
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:REQUIRES_MATERIAL]->(mat:Material) "
                "RETURN properties(mat) AS props ORDER BY mat.name",
                mid=matter_id,
            )
            result["materials"] = [dict(r["props"]) for r in await res.data()]

            # Time limits
            res = await session.run(
                "MATCH (m:Matter {matter_id: $mid})-[:HAS_TIME_LIMIT]->(tl:TimeLimit) "
                "RETURN properties(tl) AS props ORDER BY tl.name",
                mid=matter_id,
            )
            result["time_limits"] = [dict(r["props"]) for r in await res.data()]

        return result

    async def get_same_theme_documents(
        self,
        doc_id: str,
        *,
        limit: int = 20,
        acl_tokens: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Return documents that share the same PolicyTheme as the given document.

        Returns
        -------
        list of dicts with ``doc_id``, ``title``, ``doc_code``,
        ``publish_date``, ``theme_name``.
        """
        cypher = """
        MATCH (d:Document {doc_id: $doc_id})-[:BELONGS_TO_THEME]->(t:PolicyTheme)<-[:BELONGS_TO_THEME]-(other:Document)
        WHERE __DOC_VISIBILITY_D__
          AND other.doc_id <> $doc_id
          AND __DOC_VISIBILITY_OTHER__
          AND coalesce(other.status, '') <> '已废止'
        RETURN other.doc_id AS doc_id, other.title AS title,
               other.doc_code AS doc_code, other.publish_date AS publish_date,
               t.name AS theme_name, other.status AS status
        ORDER BY other.publish_date DESC
        LIMIT $limit
        """
        cypher = cypher.replace(
            "__DOC_VISIBILITY_D__",
            self._document_visibility_clause("d", acl_tokens),
        ).replace(
            "__DOC_VISIBILITY_OTHER__",
            self._document_visibility_clause("other", acl_tokens),
        )

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                cypher,
                **self._query_params({"doc_id": doc_id, "limit": limit}, acl_tokens),
            )
            records = await res.data()

        return [
            {
                "doc_id": r["doc_id"],
                "title": r["title"] or "",
                "doc_code": r["doc_code"] or "",
                "publish_date": r["publish_date"] or "",
                "theme_name": r["theme_name"] or "",
                "status": r.get("status") or "",
            }
            for r in records
            if r["doc_id"]
        ]

    # ==================================================================
    # Entity key-based document discovery (Sprint 4)
    # ==================================================================

    async def get_docs_by_entity_key(
        self,
        entity_key: str,
        entity_label: str,
        *,
        limit: int = 30,
        acl_tokens: list[str] | None = None,
    ) -> list[dict[str, Any]]:
        """Return documents connected to an entity matched by its business primary key.

        Unlike ``get_docs_by_entity()`` which matches on ``name``, this method
        uses the label's ``key_property`` (e.g. ``person_id`` for Person) to
        locate the entity node.  This avoids cross-entity contamination for
        non-name primary key types.

        Parameters
        ----------
        entity_key:
            Value of the label's ``key_property`` attribute.
        entity_label:
            Node label (e.g. ``"Person"``).

        Returns
        -------
        Same shape as ``get_docs_by_entity()``.
        """
        # ── label whitelist validation ──
        schema = get_schema()
        if entity_label not in schema.all_node_labels_unfiltered():
            return []

        if entity_label == "Document":
            return await self.get_docs_by_entity(
                entity_key, "Document", limit=limit, acl_tokens=acl_tokens
            )

        # Resolve key_property from schema (never accept external field names)
        key_prop = schema.key_prop_for(entity_label)

        cypher = (
            f"MATCH (e:{entity_label} {{{key_prop}: $entity_key}})-[r]-(d:Document) "
            f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
            "RETURN d.doc_id AS doc_id, d.title AS title, "
            "coalesce(d.doc_code, d.doc_number, '') AS doc_number, type(r) AS rel_type "
            "ORDER BY d.publish_date DESC "
            "LIMIT $limit"
        )
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params(
                    {"entity_key": entity_key, "limit": limit}, acl_tokens
                ),
            )
            records = await result.data()

        return [
            {
                "doc_id": r["doc_id"],
                "title": r["title"] or "",
                "doc_number": r["doc_number"] or "",
                "rel_type": r["rel_type"],
            }
            for r in records
            if r["doc_id"]
        ]

    async def resolve_entity_by_exact_name(
        self,
        name: str,
        label: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> str | None:
        """Resolve an entity's business primary key by exact name match.

        Returns the value of the label's ``key_property`` if exactly one
        matching node exists that is connected to a visible document.
        Returns ``None`` on mismatch, invalid label, or no result.

        Used by the query planner for name → entity_key resolution before
        calling card queries.
        """
        schema = get_schema()
        # ── label whitelist validation ──
        if label not in schema.all_node_labels_unfiltered():
            return None

        key_prop = schema.key_prop_for(label)

        cypher = (
            f"MATCH (n:{label} {{name: $name}}) "
            f"WHERE {self._visible_doc_exists_clause('n', acl_tokens)} "
            f"RETURN n.{key_prop} AS entity_key"
        )
        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            result = await session.run(
                cypher,
                **self._query_params({"name": name}, acl_tokens),
            )
            records = await result.data()

        if not records:
            return None
        # Ambiguous: multiple nodes with the same name — refuse to resolve
        if len(records) > 1:
            logger.warning(
                "resolve_entity_ambiguous",
                name=name, label=label, count=len(records),
            )
            return None
        return records[0]["entity_key"]

    # ==================================================================
    # Entity card queries (Sprint 4 — Policy / Task / Project)
    # ==================================================================

    def _resolve_key_prop(self, label: str) -> str:
        """Resolve key_property for a label from schema (dynamic, never hardcoded)."""
        return get_schema().key_prop_for(label)

    async def get_policy_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a policy knowledge card.

        Includes: responsible orgs, deployed tasks, approved projects,
        related themes, and source documents.

        Parameters
        ----------
        entity_key:
            Value of Policy's ``key_property`` (currently ``name``).
        """
        key_prop = self._resolve_key_prop("Policy")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            # ── Policy basic info ──
            res = await session.run(
                f"MATCH (p:Policy {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('p', acl_tokens)} "
                "RETURN p.name AS name, coalesce(p.summary, '') AS summary, "
                "coalesce(p.policy_id, '') AS policy_id, properties(p) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            policy_rec = await res.single()
            if not policy_rec:
                return None

            card: dict[str, Any] = {
                "name": policy_rec["name"],
                "summary": policy_rec["summary"],
                "policy_id": policy_rec["policy_id"],
                "properties": dict(policy_rec["props"] or {}),
            }

            # ── Responsible orgs (ASSIGNED_TO) ──
            res = await session.run(
                f"MATCH (p:Policy {{{key_prop}: $entity_key}})-[:ASSIGNED_TO]->(org:Organization) "
                f"WHERE {self._visible_doc_exists_clause('org', acl_tokens)} "
                "RETURN org.name AS name ORDER BY org.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["assigned_orgs"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Deployed tasks (Task -[IMPLEMENTS]-> Policy) ──
            res = await session.run(
                f"MATCH (t:Task)-[:IMPLEMENTS]->(p:Policy {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('t', acl_tokens)} "
                "RETURN t.name AS name, coalesce(t.task_id, '') AS task_id ORDER BY t.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["implementing_tasks"] = [
                {"name": r["name"], "task_id": r["task_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Supported projects (Project -[SUPPORTED_BY]-> Policy) ──
            res = await session.run(
                f"MATCH (proj:Project)-[:SUPPORTED_BY]->(p:Policy {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('proj', acl_tokens)} "
                "RETURN proj.name AS name, coalesce(proj.project_id, '') AS project_id ORDER BY proj.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["supported_projects"] = [
                {"name": r["name"], "project_id": r["project_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Related themes ──
            res = await session.run(
                f"MATCH (p:Policy {{{key_prop}: $entity_key}})-[:RELATES_TO_THEME]->(t:PolicyTheme) "
                "RETURN t.name AS name ORDER BY t.name",
                entity_key=entity_key,
            )
            card["related_themes"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Source documents (Document -[CONTAINS_POLICY]-> Policy) ──
            res = await session.run(
                f"MATCH (d:Document)-[:CONTAINS_POLICY]->(p:Policy {{{key_prop}: $entity_key}}) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN d.doc_id AS doc_id, coalesce(d.title, '') AS title, "
                "coalesce(d.doc_code, '') AS doc_code, "
                "coalesce(d.publish_date, '') AS publish_date "
                "ORDER BY d.publish_date DESC",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["source_docs"] = [dict(r) for r in await res.data() if r["doc_id"]]

        return card

    async def get_task_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a task knowledge card.

        Includes: lead/assisting orgs, implementing policy, evaluating indicators,
        budget support, and source documents.
        """
        key_prop = self._resolve_key_prop("Task")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            # ── Task basic info ──
            res = await session.run(
                f"MATCH (t:Task {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('t', acl_tokens)} "
                "RETURN t.name AS name, coalesce(t.task_id, '') AS task_id, "
                "properties(t) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            task_rec = await res.single()
            if not task_rec:
                return None

            card: dict[str, Any] = {
                "name": task_rec["name"],
                "task_id": task_rec["task_id"],
                "properties": dict(task_rec["props"] or {}),
            }

            # ── Lead org (LEAD_BY) ──
            res = await session.run(
                f"MATCH (t:Task {{{key_prop}: $entity_key}})-[:LEAD_BY]->(org:Organization) "
                f"WHERE {self._visible_doc_exists_clause('org', acl_tokens)} "
                "RETURN org.name AS name ORDER BY org.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["lead_orgs"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Assisting orgs (ASSISTED_BY) ──
            res = await session.run(
                f"MATCH (t:Task {{{key_prop}: $entity_key}})-[:ASSISTED_BY]->(org:Organization) "
                f"WHERE {self._visible_doc_exists_clause('org', acl_tokens)} "
                "RETURN org.name AS name ORDER BY org.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["assist_orgs"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Implementing policy (IMPLEMENTS) ──
            res = await session.run(
                f"MATCH (t:Task {{{key_prop}: $entity_key}})-[:IMPLEMENTS]->(p:Policy) "
                f"WHERE {self._visible_doc_exists_clause('p', acl_tokens)} "
                "RETURN p.name AS name, coalesce(p.policy_id, '') AS policy_id ORDER BY p.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["implementing_policies"] = [
                {"name": r["name"], "policy_id": r["policy_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Evaluating indicators ──
            res = await session.run(
                f"MATCH (ind:Indicator)-[:EVALUATES]->(t:Task {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('ind', acl_tokens)} "
                "RETURN ind.name AS name ORDER BY ind.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["evaluating_indicators"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Budget support ──
            res = await session.run(
                f"MATCH (b:Budget)-[:FUNDS]->(t:Task {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('b', acl_tokens)} "
                "RETURN b.name AS name, coalesce(b.amount, '') AS amount ORDER BY b.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["budgets"] = [
                {"name": r["name"], "amount": r["amount"]}
                for r in await res.data() if r["name"]
            ]

            # ── Source documents (Document -[DEPLOYS_TASK]-> Task) ──
            res = await session.run(
                f"MATCH (d:Document)-[:DEPLOYS_TASK]->(t:Task {{{key_prop}: $entity_key}}) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN d.doc_id AS doc_id, coalesce(d.title, '') AS title, "
                "coalesce(d.doc_code, '') AS doc_code, "
                "coalesce(d.publish_date, '') AS publish_date "
                "ORDER BY d.publish_date DESC",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["source_docs"] = [dict(r) for r in await res.data() if r["doc_id"]]

        return card

    async def get_project_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a project knowledge card.

        Includes: implementing org, location, supporting policy, budget,
        tech stack, and source documents.
        """
        key_prop = self._resolve_key_prop("Project")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            # ── Project basic info ──
            res = await session.run(
                f"MATCH (proj:Project {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('proj', acl_tokens)} "
                "RETURN proj.name AS name, coalesce(proj.project_id, '') AS project_id, "
                "properties(proj) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            proj_rec = await res.single()
            if not proj_rec:
                return None

            card: dict[str, Any] = {
                "name": proj_rec["name"],
                "project_id": proj_rec["project_id"],
                "properties": dict(proj_rec["props"] or {}),
            }

            # ── Implementing org ──
            res = await session.run(
                f"MATCH (proj:Project {{{key_prop}: $entity_key}})-[:IMPLEMENTED_BY]->(org:Organization) "
                f"WHERE {self._visible_doc_exists_clause('org', acl_tokens)} "
                "RETURN org.name AS name ORDER BY org.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["implementing_orgs"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Location ──
            res = await session.run(
                f"MATCH (proj:Project {{{key_prop}: $entity_key}})-[:LOCATED_IN]->(r:Region) "
                f"WHERE {self._visible_doc_exists_clause('r', acl_tokens)} "
                "RETURN r.name AS name ORDER BY r.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["locations"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Supporting policy ──
            res = await session.run(
                f"MATCH (proj:Project {{{key_prop}: $entity_key}})-[:SUPPORTED_BY]->(p:Policy) "
                f"WHERE {self._visible_doc_exists_clause('p', acl_tokens)} "
                "RETURN p.name AS name, coalesce(p.policy_id, '') AS policy_id ORDER BY p.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["supporting_policies"] = [
                {"name": r["name"], "policy_id": r["policy_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Budget ──
            res = await session.run(
                f"MATCH (b:Budget)-[:FUNDS]->(proj:Project {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('b', acl_tokens)} "
                "RETURN b.name AS name, coalesce(b.amount, '') AS amount ORDER BY b.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["budgets"] = [
                {"name": r["name"], "amount": r["amount"]}
                for r in await res.data() if r["name"]
            ]

            # ── Tech stack (phase_3 may not exist yet) ──
            res = await session.run(
                f"MATCH (proj:Project {{{key_prop}: $entity_key}})-[:USES_TECH]->(tech:Technology) "
                f"WHERE {self._visible_doc_exists_clause('tech', acl_tokens)} "
                "RETURN tech.name AS name ORDER BY tech.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["technologies"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Evaluating indicators ──
            res = await session.run(
                f"MATCH (ind:Indicator)-[:EVALUATES]->(proj:Project {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('ind', acl_tokens)} "
                "RETURN ind.name AS name ORDER BY ind.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["evaluating_indicators"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Source documents (Document -[APPROVES_PROJECT]-> Project) ──
            res = await session.run(
                f"MATCH (d:Document)-[:APPROVES_PROJECT]->(proj:Project {{{key_prop}: $entity_key}}) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN d.doc_id AS doc_id, coalesce(d.title, '') AS title, "
                "coalesce(d.doc_code, '') AS doc_code, "
                "coalesce(d.publish_date, '') AS publish_date "
                "ORDER BY d.publish_date DESC",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["source_docs"] = [dict(r) for r in await res.data() if r["doc_id"]]

        return card

    # ==================================================================
    # System card
    # ==================================================================

    async def get_system_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a System knowledge card.

        Includes: operating org, managed data resources, technologies.
        ``related_docs`` is empty this iteration (no reliable Document path).
        """
        key_prop = self._resolve_key_prop("System")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                f"MATCH (s:System {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('s', acl_tokens)} "
                "RETURN s.name AS name, properties(s) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            rec = await res.single()
            if not rec:
                return None

            card: dict[str, Any] = {
                "name": rec["name"],
                "properties": dict(rec["props"] or {}),
            }

            # ── Operating org (OPERATED_BY) ──
            res = await session.run(
                f"MATCH (s:System {{{key_prop}: $entity_key}})-[:OPERATED_BY]->(org:Organization) "
                f"WHERE {self._visible_doc_exists_clause('org', acl_tokens)} "
                "RETURN org.name AS name ORDER BY org.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["operated_by"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Managed data resources (MANAGES) ──
            res = await session.run(
                f"MATCH (s:System {{{key_prop}: $entity_key}})-[:MANAGES]->(dr:DataResource) "
                f"WHERE {self._visible_doc_exists_clause('dr', acl_tokens)} "
                "RETURN dr.name AS name ORDER BY dr.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["managed_data"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Technologies (USES_TECH, phase_3) ──
            res = await session.run(
                f"MATCH (s:System {{{key_prop}: $entity_key}})-[:USES_TECH]->(tech:Technology) "
                f"WHERE {self._visible_doc_exists_clause('tech', acl_tokens)} "
                "RETURN tech.name AS name ORDER BY tech.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["technologies"] = [r["name"] for r in await res.data() if r["name"]]

            card["related_docs"] = []

        return card

    # ==================================================================
    # DataResource card
    # ==================================================================

    async def get_data_resource_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a DataResource knowledge card.

        Includes: managing systems, conforming standards (reverse query).
        ``related_docs`` is empty this iteration (no reliable Document path).
        """
        key_prop = self._resolve_key_prop("DataResource")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                f"MATCH (dr:DataResource {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('dr', acl_tokens)} "
                "RETURN dr.name AS name, properties(dr) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            rec = await res.single()
            if not rec:
                return None

            card: dict[str, Any] = {
                "name": rec["name"],
                "properties": dict(rec["props"] or {}),
            }

            # ── Managing systems (System -[MANAGES]-> DataResource, reverse) ──
            res = await session.run(
                f"MATCH (sys:System)-[:MANAGES]->(dr:DataResource {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('sys', acl_tokens)} "
                "RETURN sys.name AS name ORDER BY sys.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["managed_by_systems"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Conforming standards (Standard -[CONFORMED_BY]-> DataResource, reverse) ──
            res = await session.run(
                f"MATCH (std:Standard)-[:CONFORMED_BY]->(dr:DataResource {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('std', acl_tokens)} "
                "RETURN std.name AS name ORDER BY std.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["conforms_to"] = [r["name"] for r in await res.data() if r["name"]]

            card["related_docs"] = []

        return card

    # ==================================================================
    # Budget card
    # ==================================================================

    async def get_budget_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return a Budget knowledge card.

        Includes: funded tasks, funded projects, related docs (2-hop heuristic).
        """
        key_prop = self._resolve_key_prop("Budget")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                f"MATCH (b:Budget {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('b', acl_tokens)} "
                "RETURN b.name AS name, properties(b) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            rec = await res.single()
            if not rec:
                return None

            card: dict[str, Any] = {
                "name": rec["name"],
                "properties": dict(rec["props"] or {}),
            }

            # ── Funded tasks (FUNDS -> Task) ──
            res = await session.run(
                f"MATCH (b:Budget {{{key_prop}: $entity_key}})-[:FUNDS]->(t:Task) "
                f"WHERE {self._visible_doc_exists_clause('t', acl_tokens)} "
                "RETURN t.name AS name, coalesce(t.task_id, '') AS task_id ORDER BY t.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["funded_tasks"] = [
                {"name": r["name"], "task_id": r["task_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Funded projects (FUNDS -> Project) ──
            res = await session.run(
                f"MATCH (b:Budget {{{key_prop}: $entity_key}})-[:FUNDS]->(proj:Project) "
                f"WHERE {self._visible_doc_exists_clause('proj', acl_tokens)} "
                "RETURN proj.name AS name, coalesce(proj.project_id, '') AS project_id ORDER BY proj.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["funded_projects"] = [
                {"name": r["name"], "project_id": r["project_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Related docs (2-hop: Budget->FUNDS->Task<-DEPLOYS_TASK<-Document) ──
            res = await session.run(
                f"MATCH (b:Budget {{{key_prop}: $entity_key}})-[:FUNDS]->(t:Task)"
                "<-[:DEPLOYS_TASK]-(d:Document) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN DISTINCT d.doc_id AS doc_id, coalesce(d.title, '') AS title, "
                "coalesce(d.doc_code, '') AS doc_code "
                "ORDER BY title LIMIT 20",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["related_docs"] = [dict(r) for r in await res.data() if r["doc_id"]]

        return card

    # ==================================================================
    # Indicator card
    # ==================================================================

    async def get_indicator_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return an Indicator knowledge card.

        Includes: evaluated tasks, evaluated projects, related docs (2-hop heuristic).
        """
        key_prop = self._resolve_key_prop("Indicator")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                f"MATCH (i:Indicator {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('i', acl_tokens)} "
                "RETURN i.name AS name, properties(i) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            rec = await res.single()
            if not rec:
                return None

            card: dict[str, Any] = {
                "name": rec["name"],
                "properties": dict(rec["props"] or {}),
            }

            # ── Evaluated tasks (EVALUATES -> Task) ──
            res = await session.run(
                f"MATCH (i:Indicator {{{key_prop}: $entity_key}})-[:EVALUATES]->(t:Task) "
                f"WHERE {self._visible_doc_exists_clause('t', acl_tokens)} "
                "RETURN t.name AS name, coalesce(t.task_id, '') AS task_id ORDER BY t.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["evaluated_tasks"] = [
                {"name": r["name"], "task_id": r["task_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Evaluated projects (EVALUATES -> Project) ──
            res = await session.run(
                f"MATCH (i:Indicator {{{key_prop}: $entity_key}})-[:EVALUATES]->(proj:Project) "
                f"WHERE {self._visible_doc_exists_clause('proj', acl_tokens)} "
                "RETURN proj.name AS name, coalesce(proj.project_id, '') AS project_id ORDER BY proj.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["evaluated_projects"] = [
                {"name": r["name"], "project_id": r["project_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Related docs (2-hop: Indicator->EVALUATES->Task<-DEPLOYS_TASK<-Document) ──
            res = await session.run(
                f"MATCH (i:Indicator {{{key_prop}: $entity_key}})-[:EVALUATES]->(t:Task)"
                "<-[:DEPLOYS_TASK]-(d:Document) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN DISTINCT d.doc_id AS doc_id, coalesce(d.title, '') AS title, "
                "coalesce(d.doc_code, '') AS doc_code "
                "ORDER BY title LIMIT 20",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["related_docs"] = [dict(r) for r in await res.data() if r["doc_id"]]

        return card

    # ==================================================================
    # Industry card
    # ==================================================================

    async def get_industry_card(
        self,
        entity_key: str,
        *,
        acl_tokens: list[str] | None = None,
    ) -> dict[str, Any] | None:
        """Return an Industry knowledge card.

        Includes: supporting policies, located regions, managing orgs,
        related docs (2-hop via Policy).
        """
        key_prop = self._resolve_key_prop("Industry")

        async with self._neo4j.driver.session(
            database=settings.neo4j_database
        ) as session:
            res = await session.run(
                f"MATCH (ind:Industry {{{key_prop}: $entity_key}}) "
                f"WHERE {self._visible_doc_exists_clause('ind', acl_tokens)} "
                "RETURN ind.name AS name, properties(ind) AS props",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            rec = await res.single()
            if not rec:
                return None

            card: dict[str, Any] = {
                "name": rec["name"],
                "properties": dict(rec["props"] or {}),
            }

            # ── Supporting policies (INDUSTRY_SUPPORTED_BY -> Policy) ──
            res = await session.run(
                f"MATCH (ind:Industry {{{key_prop}: $entity_key}})-[:INDUSTRY_SUPPORTED_BY]->(p:Policy) "
                f"WHERE {self._visible_doc_exists_clause('p', acl_tokens)} "
                "RETURN p.name AS name, coalesce(p.policy_id, '') AS policy_id ORDER BY p.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["supported_by_policies"] = [
                {"name": r["name"], "policy_id": r["policy_id"]}
                for r in await res.data() if r["name"]
            ]

            # ── Located regions ──
            res = await session.run(
                f"MATCH (ind:Industry {{{key_prop}: $entity_key}})-[:INDUSTRY_LOCATED_IN]->(r:Region) "
                "RETURN r.name AS name ORDER BY r.name",
                entity_key=entity_key,
            )
            card["located_in"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Managing orgs ──
            res = await session.run(
                f"MATCH (ind:Industry {{{key_prop}: $entity_key}})-[:INDUSTRY_MANAGED_BY]->(org:Organization) "
                f"WHERE {self._visible_doc_exists_clause('org', acl_tokens)} "
                "RETURN org.name AS name ORDER BY org.name",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["managed_by"] = [r["name"] for r in await res.data() if r["name"]]

            # ── Related docs (2-hop: Industry->INDUSTRY_SUPPORTED_BY->Policy<-CONTAINS_POLICY<-Document) ──
            res = await session.run(
                f"MATCH (ind:Industry {{{key_prop}: $entity_key}})-[:INDUSTRY_SUPPORTED_BY]->(p:Policy)"
                "<-[:CONTAINS_POLICY]-(d:Document) "
                f"WHERE {self._document_visibility_clause('d', acl_tokens)} "
                "RETURN DISTINCT d.doc_id AS doc_id, coalesce(d.title, '') AS title, "
                "coalesce(d.doc_code, '') AS doc_code "
                "ORDER BY title LIMIT 20",
                **self._query_params({"entity_key": entity_key}, acl_tokens),
            )
            card["related_docs"] = [dict(r) for r in await res.data() if r["doc_id"]]

        return card
