"""Business-logic service for graph admin management.

Handles type definitions, entity CRUD, dedup/merge, relationships,
placeholders, and statistics via Neo4j Cypher queries.

知识图谱管理服务模块。
提供面向管理后台的图谱操作接口，包括：
- 实体/关系类型定义的 CRUD（持久化为 _EntityType / _RelationType 元节点）
- 实体节点的搜索、编辑、删除和去重合并
- 关系的创建和删除
- 占位文档节点的管理
- 图谱统计信息聚合
"""

from __future__ import annotations

import json
import re
from typing import Any

from app.config import settings
from app.core.graph_schema_loader import get_schema
from app.infrastructure.neo4j_client import Neo4jClient, reload_valid_types
from app.utils.logger import get_logger

logger = get_logger(__name__)

# Safe identifier pattern for Neo4j labels / rel types
_SAFE_IDENTIFIER_RE = re.compile(r"^[A-Za-z_]\w{0,49}$")


def _parse_ps(raw: Any) -> dict:
    """Parse properties_schema from Neo4j (stored as JSON string)."""
    if not raw:
        return {}
    if isinstance(raw, dict):
        return raw
    try:
        return json.loads(raw)
    except (json.JSONDecodeError, TypeError):
        return {}

# ── Default type definitions (loaded from graph_schema.yaml) ───────────────


class TypeDefinitionReadOnlyError(ValueError):
    """Raised when a type definition CRUD operation is attempted.

    本期类型定义仅通过 graph_schema.yaml 管理，管理端禁止创建、删除、重命名。
    """

    def __init__(self, operation: str = "modify") -> None:
        super().__init__(
            f"类型定义仅通过 graph_schema.yaml 管理，不允许通过管理 API {operation}。"
            "修改类型定义请编辑 graph_schema.yaml 后重启服务。"
        )


def _load_default_entity_types() -> list[dict[str, Any]]:
    """Load ALL entity types from config/graph_schema.yaml (unfiltered).

    使用全量类型（含非活跃 phase），确保 phase_3 类型也写入元节点。
    """
    schema = get_schema()
    return [
        {
            "name": et["name"],
            "description": et.get("description", ""),
            "zh_name": et.get("zh_name", ""),
            "icon": et.get("icon", ""),
            "color": et.get("color", ""),
            "key_property": et.get("key_property", "name"),
            "phase": et.get("phase", "phase_0"),
        }
        for et in schema.all_entity_types_unfiltered()
    ]


def _load_default_rel_types() -> list[dict[str, Any]]:
    """Load ALL relationship types from config/graph_schema.yaml (unfiltered).

    使用全量类型（含非活跃 phase），确保 phase_3 关系类型也写入元节点。
    """
    schema = get_schema()
    return [
        {
            "name": rt["name"],
            "description": rt.get("description", ""),
            "zh_name": rt.get("zh_name", ""),
            "source_labels": rt.get("source_labels", []),
            "target_labels": rt.get("target_labels", []),
            "phase": rt.get("phase", "phase_0"),
        }
        for rt in schema.all_rel_types_unfiltered()
    ]


class GraphAdminService:
    """Admin-level operations on the knowledge graph.

    知识图谱管理服务，提供类型管理、实体 CRUD、去重合并、
    统计信息等管理后台所需的全部图操作接口。
    内部维护实体类型和关系类型的内存缓存，减少 Neo4j 查询次数。
    """

    def __init__(self, neo4j: Neo4jClient) -> None:
        self._neo4j = neo4j
        self._entity_types_cache: dict[str, dict[str, Any]] = {}
        self._rel_types_cache: dict[str, dict[str, Any]] = {}

    # ====================================================================
    # Type cache management
    # ====================================================================

    async def ensure_default_types(self) -> None:
        """Write default _EntityType / _RelationType meta nodes from schema.

        全量类型（含 phase_3）都写入元节点，ON CREATE 不覆盖用户自定义的
        zh_name/icon/color（仅在首次创建时设置）。is_default 标记为 schema 来源。
        key_property 和 description 始终从 schema 同步（ON MATCH 也更新），
        因为这些是 schema 定义本身，不属于展示元数据。
        """
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            for et in _load_default_entity_types():
                await session.run(
                    "MERGE (t:_EntityType {name: $name}) "
                    "ON CREATE SET t.description = $desc, t.zh_name = $zh_name, "
                    "t.icon = $icon, t.color = $color, "
                    "t.key_property = $key_property, t.phase = $phase, "
                    "t.is_default = true "
                    "ON MATCH SET t.description = $desc, "
                    "t.key_property = $key_property, t.phase = $phase, "
                    "t.is_default = true",
                    name=et["name"],
                    desc=et.get("description", ""),
                    zh_name=et.get("zh_name", ""),
                    icon=et.get("icon", ""),
                    color=et.get("color", ""),
                    key_property=et.get("key_property", "name"),
                    phase=et.get("phase", "phase_0"),
                )
            for rt in _load_default_rel_types():
                await session.run(
                    "MERGE (t:_RelationType {name: $name}) "
                    "ON CREATE SET t.description = $desc, t.zh_name = $zh_name, "
                    "t.source_labels = $src, t.target_labels = $tgt, "
                    "t.phase = $phase, t.is_default = true "
                    "ON MATCH SET t.description = $desc, "
                    "t.source_labels = $src, t.target_labels = $tgt, "
                    "t.phase = $phase, t.is_default = true",
                    name=rt["name"],
                    desc=rt.get("description", ""),
                    zh_name=rt.get("zh_name", ""),
                    src=rt.get("source_labels", []),
                    tgt=rt.get("target_labels", []),
                    phase=rt.get("phase", "phase_0"),
                )
        logger.info("default_types_ensured")

    async def refresh_type_cache(self) -> None:
        """(Re)load type definitions from Neo4j into memory and update module sets.

        缓存包含 zh_name、key_property 等元数据，供前端获取类型信息使用。
        模块级白名单始终包含全量类型（含 phase_3），不受 active_phases 限制。
        """
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            # Entity types
            result = await session.run(
                "MATCH (t:_EntityType) RETURN t.name AS name, "
                "t.description AS description, t.zh_name AS zh_name, "
                "t.icon AS icon, t.color AS color, "
                "t.key_property AS key_property, t.phase AS phase, "
                "t.properties_schema AS ps"
            )
            records = await result.data()
            self._entity_types_cache = {
                r["name"]: {
                    "name": r["name"],
                    "description": r["description"] or "",
                    "zh_name": r["zh_name"] or "",
                    "icon": r["icon"] or "",
                    "color": r["color"] or "",
                    "key_property": r["key_property"] or "name",
                    "phase": r["phase"] or "phase_0",
                    "properties_schema": _parse_ps(r["ps"]),
                }
                for r in records
                if r["name"]
            }

            # Relationship types
            result = await session.run(
                "MATCH (t:_RelationType) RETURN t.name AS name, "
                "t.description AS description, t.zh_name AS zh_name, "
                "t.source_labels AS source_labels, "
                "t.target_labels AS target_labels, "
                "t.phase AS phase"
            )
            records = await result.data()
            self._rel_types_cache = {
                r["name"]: {
                    "name": r["name"],
                    "description": r["description"] or "",
                    "zh_name": r["zh_name"] or "",
                    "source_labels": list(r["source_labels"] or []),
                    "target_labels": list(r["target_labels"] or []),
                    "phase": r["phase"] or "phase_0",
                }
                for r in records
                if r["name"]
            }

        # Refresh module-level validation sets — must include ALL types
        reload_valid_types(
            node_labels=set(self._entity_types_cache.keys()),
            rel_types=set(self._rel_types_cache.keys()),
        )
        logger.info(
            "type_cache_refreshed",
            entity_types=len(self._entity_types_cache),
            rel_types=len(self._rel_types_cache),
        )

    def get_entity_type_names(self) -> set[str]:
        return set(self._entity_types_cache.keys()) | {"Document"}

    def get_rel_type_names(self) -> set[str]:
        return set(self._rel_types_cache.keys())

    # ====================================================================
    # Entity type CRUD
    # ====================================================================

    async def list_entity_types(self) -> list[dict[str, Any]]:
        """List all entity type definitions with instance counts.

        返回结果包含 zh_name（前端短中文标签）和 key_property（只读）。
        """
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(
                "MATCH (t:_EntityType) "
                "OPTIONAL MATCH (n) WHERE any(l IN labels(n) WHERE l = t.name) "
                "AND NOT n:_EntityType AND NOT n:_RelationType "
                "RETURN t.name AS name, t.description AS description, "
                "t.zh_name AS zh_name, "
                "t.icon AS icon, t.color AS color, "
                "t.key_property AS key_property, "
                "t.properties_schema AS ps, count(n) AS instance_count "
                "ORDER BY t.name"
            )
            records = await result.data()
        return [
            {
                "name": r["name"],
                "description": r["description"] or "",
                "zh_name": r["zh_name"] or "",
                "icon": r["icon"] or "",
                "color": r["color"] or "",
                "key_property": r["key_property"] or "name",
                "properties_schema": _parse_ps(r["ps"]),
                "instance_count": r["instance_count"],
            }
            for r in records
        ]

    async def create_entity_type(
        self, name: str, description: str = "", icon: str = "",
        color: str = "", properties_schema: dict | None = None,
    ) -> dict[str, Any]:
        """Disabled — type definitions are managed exclusively via graph_schema.yaml."""
        raise TypeDefinitionReadOnlyError("创建实体类型")

    async def update_entity_type(self, name: str, updates: dict[str, Any]) -> dict[str, Any]:
        """Update display metadata (zh_name, icon, color) for an entity type.

        仅允许修改展示元数据。name/description/properties_schema/key_property
        属于 schema 定义本身，不允许通过管理 API 修改。重命名已禁用。
        """
        # ── 禁止修改 schema 定义字段 ──
        readonly_fields = {"new_name", "description", "properties_schema", "key_property"}
        attempted = readonly_fields & set(updates.keys())
        if attempted:
            raise TypeDefinitionReadOnlyError(
                f"修改字段 {attempted}（这些属于 schema 定义，请编辑 graph_schema.yaml）"
            )

        result_info: dict[str, Any] = {"name": name}

        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            # Verify type exists
            check = await session.run(
                "MATCH (t:_EntityType {name: $name}) RETURN t", name=name
            )
            if not await check.single():
                raise ValueError(f"Entity type '{name}' not found")

            # Only allow display metadata updates
            set_parts = []
            params: dict[str, Any] = {"name": name}
            for key in ("zh_name", "icon", "color"):
                if key in updates and updates[key] is not None:
                    set_parts.append(f"t.{key} = ${key}")
                    params[key] = updates[key]
            if set_parts:
                await session.run(
                    f"MATCH (t:_EntityType {{name: $name}}) SET {', '.join(set_parts)}",
                    **params,
                )

        await self.refresh_type_cache()
        return result_info

    async def delete_entity_type(self, name: str) -> dict[str, Any]:
        """Disabled — type definitions are managed exclusively via graph_schema.yaml."""
        raise TypeDefinitionReadOnlyError("删除实体类型")

    # ====================================================================
    # Relationship type CRUD
    # ====================================================================

    async def list_rel_types(self) -> list[dict[str, Any]]:
        """List all relationship type definitions with instance counts.

        返回结果包含 zh_name（前端短中文标签）。
        """
        # 合并为单条查询，避免 N+1 性能问题
        # Merge into single query to avoid N+1 performance issue
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(
                "MATCH (t:_RelationType) "
                "OPTIONAL MATCH ()-[r]->() WHERE type(r) = t.name "
                "WITH t, count(r) AS cnt "
                "RETURN t.name AS name, t.description AS description, "
                "t.zh_name AS zh_name, "
                "t.source_labels AS source_labels, t.target_labels AS target_labels, "
                "cnt AS instance_count "
                "ORDER BY t.name"
            )
            records = await result.data()

            return [
                {
                    "name": r["name"],
                    "description": r["description"] or "",
                    "zh_name": r["zh_name"] or "",
                    "source_labels": list(r["source_labels"] or []),
                    "target_labels": list(r["target_labels"] or []),
                    "instance_count": r["instance_count"],
                }
                for r in records
            ]

    async def create_rel_type(
        self, name: str, description: str = "",
        source_labels: list[str] | None = None,
        target_labels: list[str] | None = None,
    ) -> dict[str, Any]:
        """Disabled — type definitions are managed exclusively via graph_schema.yaml."""
        raise TypeDefinitionReadOnlyError("创建关系类型")

    async def update_rel_type(self, name: str, updates: dict[str, Any]) -> dict[str, Any]:
        """Update display metadata (zh_name only) for a relationship type.

        仅允许修改 zh_name。name/description/source_labels/target_labels
        属于 schema 定义本身，不允许通过管理 API 修改。重命名已禁用。
        """
        # ── 禁止修改 schema 定义字段 ──
        readonly_fields = {"new_name", "description", "source_labels", "target_labels"}
        attempted = readonly_fields & set(updates.keys())
        if attempted:
            raise TypeDefinitionReadOnlyError(
                f"修改字段 {attempted}（这些属于 schema 定义，请编辑 graph_schema.yaml）"
            )

        result_info: dict[str, Any] = {"name": name}

        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            check = await session.run(
                "MATCH (t:_RelationType {name: $name}) RETURN t", name=name
            )
            if not await check.single():
                raise ValueError(f"Relationship type '{name}' not found")

            # Only allow zh_name update
            set_parts = []
            params: dict[str, Any] = {"name": name}
            if "zh_name" in updates and updates["zh_name"] is not None:
                set_parts.append("t.zh_name = $zh_name")
                params["zh_name"] = updates["zh_name"]
            if set_parts:
                await session.run(
                    f"MATCH (t:_RelationType {{name: $name}}) SET {', '.join(set_parts)}",
                    **params,
                )

        await self.refresh_type_cache()
        return result_info

    async def delete_rel_type(self, name: str) -> dict[str, Any]:
        """Disabled — type definitions are managed exclusively via graph_schema.yaml."""
        raise TypeDefinitionReadOnlyError("删除关系类型")

    # ====================================================================
    # Statistics
    # ====================================================================

    async def get_graph_stats(self) -> dict[str, Any]:
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            # Total nodes (excluding meta nodes)
            r = await session.run(
                "MATCH (n) WHERE NOT n:_EntityType AND NOT n:_RelationType "
                "RETURN count(n) AS total"
            )
            rec = await r.single()
            total_nodes = rec["total"] if rec else 0

            # Total relationships
            r = await session.run("MATCH ()-[r]->() RETURN count(r) AS total")
            rec = await r.single()
            total_rels = rec["total"] if rec else 0

            # Per-label counts
            r = await session.run(
                "MATCH (n) WHERE NOT n:_EntityType AND NOT n:_RelationType "
                "UNWIND labels(n) AS lbl "
                "WITH lbl WHERE lbl <> '_EntityType' AND lbl <> '_RelationType' "
                "RETURN lbl AS label, count(*) AS cnt ORDER BY cnt DESC"
            )
            node_counts = [{"label": rec["label"], "count": rec["cnt"]}
                           for rec in await r.data()]

            # Per-rel-type counts
            r = await session.run(
                "MATCH ()-[r]->() RETURN type(r) AS typ, count(r) AS cnt ORDER BY cnt DESC"
            )
            rel_counts = [{"type": rec["typ"], "count": rec["cnt"]}
                          for rec in await r.data()]

            # Orphan nodes (non-Document with no relationships)
            r = await session.run(
                "MATCH (n) WHERE NOT n:Document "
                "AND NOT n:_EntityType AND NOT n:_RelationType "
                "AND NOT (n)--() RETURN count(n) AS cnt"
            )
            rec = await r.single()
            orphan_nodes = rec["cnt"] if rec else 0

            # Placeholder count
            r = await session.run(
                "MATCH (d:Document {is_placeholder: true}) RETURN count(d) AS cnt"
            )
            rec = await r.single()
            placeholder_count = rec["cnt"] if rec else 0

        return {
            "total_nodes": total_nodes,
            "total_relationships": total_rels,
            "orphan_nodes": orphan_nodes,
            "placeholder_count": placeholder_count,
            "node_counts": node_counts,
            "rel_counts": rel_counts,
        }

    async def get_graph_health(self, es_doc_ids: list[str]) -> dict[str, Any]:
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            # Non-Document entities without relationships / total
            r = await session.run(
                "MATCH (n) WHERE NOT n:Document "
                "AND NOT n:_EntityType AND NOT n:_RelationType "
                "WITH count(n) AS total, "
                "     sum(CASE WHEN NOT (n)--() THEN 1 ELSE 0 END) AS no_rel "
                "RETURN total, no_rel"
            )
            rec = await r.single()
            total_entities = rec["total"] if rec else 0
            no_rel = rec["no_rel"] if rec else 0
            ratio = round(no_rel / total_entities, 4) if total_entities > 0 else 0.0

            # Neo4j doc_ids
            r = await session.run(
                "MATCH (d:Document) WHERE d.is_placeholder IS NULL OR d.is_placeholder = false "
                "RETURN collect(d.doc_id) AS ids"
            )
            rec = await r.single()
            neo4j_ids = set(rec["ids"]) if rec else set()

            es_set = set(es_doc_ids)
            missing = len(es_set - neo4j_ids)

        return {
            "no_relation_ratio": ratio,
            "duplicate_candidates": 0,  # Filled by detect_duplicates on-demand
            "missing_in_graph": missing,
            "total_es_docs": len(es_doc_ids),
        }

    # ====================================================================
    # Entity CRUD
    # ====================================================================

    async def list_entities(
        self,
        label: str | None = None,
        name: str | None = None,
        page: int = 1,
        page_size: int = 20,
        sort_by: str = "connection_count",
    ) -> dict[str, Any]:
        skip = (page - 1) * page_size

        # Build match clause
        base_conditions: list[str] = []
        # 校验标签标识符格式，防止 Cypher 注入
        if label and label in self.get_entity_type_names() and _SAFE_IDENTIFIER_RE.match(label):
            match = f"MATCH (n:{label})"
        else:
            match = "MATCH (n)"
            base_conditions.extend([
                "NOT n:_EntityType",
                "NOT n:_RelationType",
                "NOT n:Document",
            ])

        params: dict[str, Any] = {"skip": skip, "limit": page_size}

        if name:
            base_conditions.append("n.name CONTAINS $name_filter")
            params["name_filter"] = name

        where_clause = f" WHERE {' AND '.join(base_conditions)}" if base_conditions else ""

        if sort_by == "name":
            order = "n.name ASC"
        else:
            order = "conn DESC"

        cypher = (
            f"{match}{where_clause} "
            f"WITH n, size([(n)-[]-() | 1]) AS conn "
            f"ORDER BY {order} "
            f"SKIP $skip LIMIT $limit "
            "RETURN elementId(n) AS id, labels(n) AS labels, "
            "properties(n) AS props, conn"
        )
        count_cypher = (
            f"{match}{where_clause} RETURN count(n) AS total"
        )

        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(cypher, **params)
            records = await result.data()

            count_result = await session.run(count_cypher, **params)
            count_rec = await count_result.single()
            total = count_rec["total"] if count_rec else 0

        items = [
            {
                "id": r["id"],
                "labels": [l for l in r["labels"] if l not in ("_EntityType", "_RelationType")],
                "name": r["props"].get("name", ""),
                "properties": dict(r["props"]),
                "connection_count": r["conn"],
            }
            for r in records
        ]
        return {"items": items, "total": total, "page": page, "page_size": page_size}

    async def get_entity_detail(self, entity_id: str) -> dict[str, Any] | None:
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            # Node + 1-hop neighbors
            result = await session.run(
                "MATCH (n) WHERE elementId(n) = $eid "
                "OPTIONAL MATCH (n)-[r]-(m) "
                "WHERE NOT m:_EntityType AND NOT m:_RelationType "
                "RETURN n, "
                "collect(DISTINCT {"
                "  id: elementId(m), labels: labels(m), "
                "  name: m.name, rel_type: type(r), "
                "  direction: CASE WHEN startNode(r) = n THEN 'out' ELSE 'in' END"
                "}) AS neighbors",
                eid=entity_id,
            )
            rec = await result.single()
            if rec is None:
                return None

            node = rec["n"]
            neighbors_raw = [n for n in rec["neighbors"] if n.get("id")]

            # Related documents (1-hop via any relationship)
            doc_result = await session.run(
                "MATCH (n)-[r]-(d:Document) WHERE elementId(n) = $eid "
                "AND (d.is_placeholder IS NULL OR d.is_placeholder = false) "
                "RETURN d.doc_id AS doc_id, d.title AS title, "
                "d.doc_number AS doc_number, type(r) AS rel_type",
                eid=entity_id,
            )
            docs = await doc_result.data()

        return {
            "id": node.element_id,
            "labels": [l for l in node.labels if l not in ("_EntityType", "_RelationType")],
            "properties": dict(node),
            "neighbors": [
                {
                    "id": n["id"],
                    "labels": [l for l in (n["labels"] or []) if l not in ("_EntityType", "_RelationType")],
                    "name": n.get("name") or "",
                    "rel_type": n.get("rel_type") or "",
                    "direction": n.get("direction") or "",
                }
                for n in neighbors_raw
            ],
            "related_docs": [
                {
                    "doc_id": d["doc_id"] or "",
                    "title": d["title"] or "",
                    "doc_number": d["doc_number"] or "",
                    "rel_type": d["rel_type"] or "",
                }
                for d in docs
            ],
        }

    async def create_entity(self, label: str, properties: dict[str, Any]) -> dict[str, Any]:
        if label not in self.get_entity_type_names():
            raise ValueError(f"Unknown entity type: {label}")
        if not _SAFE_IDENTIFIER_RE.match(label):
            raise ValueError(f"Invalid label: {label}")
        if not properties.get("name"):
            raise ValueError("Property 'name' is required")

        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            try:
                result = await session.run(
                    f"CREATE (n:{label} $props) "
                    "RETURN elementId(n) AS id, labels(n) AS labels, properties(n) AS props",
                    props=properties,
                )
                rec = await result.single()
            except Exception as exc:
                err = str(exc)
                if "already exists" in err.lower() or "constraint" in err.lower():
                    raise ValueError(f"Entity with name '{properties['name']}' already exists") from exc
                raise

        return {
            "id": rec["id"],
            "labels": list(rec["labels"]),
            "properties": dict(rec["props"]),
        }

    async def update_entity(self, entity_id: str, properties: dict[str, Any]) -> dict[str, Any]:
        result = await self._neo4j.update_node(entity_id, properties)
        if result is None:
            raise ValueError(f"Entity {entity_id} not found")
        return result

    async def delete_entity(self, entity_id: str) -> dict[str, Any]:
        result = await self._neo4j.delete_node(entity_id)
        return {"id": entity_id, **result}

    # ====================================================================
    # Duplicate detection & merge
    # ====================================================================

    async def detect_duplicates(
        self,
        label: str | None = None,
        threshold: float = 0.85,
        limit: int = 50,
    ) -> list[dict[str, Any]]:
        """Detect duplicate entity candidates using string similarity + alias matching."""
        try:
            from rapidfuzz import fuzz
        except ImportError:
            logger.error("rapidfuzz not installed; dedup unavailable")
            return []

        # Get all entities of the specified label
        # 校验标签标识符格式，防止 Cypher 注入
        if label and label in self.get_entity_type_names() and _SAFE_IDENTIFIER_RE.match(label):
            match = f"MATCH (n:{label})"
        else:
            match = (
                "MATCH (n) WHERE NOT n:Document "
                "AND NOT n:_EntityType AND NOT n:_RelationType"
            )

        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(
                f"{match} RETURN elementId(n) AS id, labels(n) AS labels, "
                "n.name AS name, n.aliases AS aliases"
            )
            records = await result.data()

        if not records:
            return []

        # Build lookup: name → entity, alias → entity
        entities = []
        alias_map: dict[str, list[int]] = {}  # alias → list of entity indices
        for i, r in enumerate(records):
            name = r.get("name") or ""
            aliases = r.get("aliases") or []
            if isinstance(aliases, str):
                aliases = [aliases]
            entities.append({
                "id": r["id"],
                "labels": r["labels"],
                "name": name,
                "aliases": aliases,
            })
            # Register all names (primary + aliases) for exact alias matching
            for a in [name] + aliases:
                a_lower = a.strip().lower()
                if a_lower:
                    alias_map.setdefault(a_lower, []).append(i)

        candidates: list[dict[str, Any]] = []
        seen_pairs: set[tuple[str, str]] = set()

        # Pass 1: Alias exact match — if entity A's name equals entity B's alias
        for key, indices in alias_map.items():
            if len(indices) < 2:
                continue
            for i in range(len(indices)):
                for j in range(i + 1, len(indices)):
                    a_idx, b_idx = indices[i], indices[j]
                    pair_key = tuple(sorted([entities[a_idx]["id"], entities[b_idx]["id"]]))
                    if pair_key in seen_pairs:
                        continue
                    seen_pairs.add(pair_key)
                    candidates.append({
                        "entity_a_id": entities[a_idx]["id"],
                        "entity_a_name": entities[a_idx]["name"],
                        "entity_b_id": entities[b_idx]["id"],
                        "entity_b_name": entities[b_idx]["name"],
                        "similarity": 1.0,
                        "match_type": "alias_match",
                        "label": entities[a_idx]["labels"][0] if entities[a_idx]["labels"] else "",
                    })

        # Pass 2: Edit distance (Jaro-Winkler) for remaining pairs
        # Bucket by first character to reduce O(n²)
        buckets: dict[str, list[int]] = {}
        for i, ent in enumerate(entities):
            name = ent["name"]
            if name:
                key = name[0].lower()
                buckets.setdefault(key, []).append(i)

        for _, bucket in buckets.items():
            for i in range(len(bucket)):
                for j in range(i + 1, len(bucket)):
                    a_idx, b_idx = bucket[i], bucket[j]
                    pair_key = tuple(sorted([entities[a_idx]["id"], entities[b_idx]["id"]]))
                    if pair_key in seen_pairs:
                        continue

                    sim = fuzz.WRatio(entities[a_idx]["name"], entities[b_idx]["name"]) / 100.0
                    if sim >= threshold:
                        seen_pairs.add(pair_key)
                        candidates.append({
                            "entity_a_id": entities[a_idx]["id"],
                            "entity_a_name": entities[a_idx]["name"],
                            "entity_b_id": entities[b_idx]["id"],
                            "entity_b_name": entities[b_idx]["name"],
                            "similarity": round(sim, 3),
                            "match_type": "edit_distance",
                            "label": entities[a_idx]["labels"][0] if entities[a_idx]["labels"] else "",
                        })

        # Sort by similarity descending
        candidates.sort(key=lambda c: c["similarity"], reverse=True)
        return candidates[:limit]

    async def merge_entities(
        self,
        primary_id: str,
        secondary_id: str,
        add_alias: bool = True,
    ) -> dict[str, Any]:
        """Merge secondary entity into primary: migrate relationships, add alias, delete secondary."""
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            # Get secondary's name for alias
            sec_result = await session.run(
                "MATCH (n) WHERE elementId(n) = $eid "
                "RETURN n.name AS name, n.aliases AS aliases",
                eid=secondary_id,
            )
            sec_rec = await sec_result.single()
            if not sec_rec:
                raise ValueError(f"Secondary entity {secondary_id} not found")

            sec_name = sec_rec["name"] or ""
            sec_aliases = sec_rec["aliases"] or []
            if isinstance(sec_aliases, str):
                sec_aliases = [sec_aliases]

            # 获取 secondary 的所有关系，在 primary 上重建后删除 secondary
            # (之前的 CALL 子查询方式在部分 Neo4j 版本不兼容，已移除)
            rels_result = await session.run(
                "MATCH (s)-[r]-(t) WHERE elementId(s) = $sid "
                "RETURN elementId(t) AS tid, type(r) AS rtype, properties(r) AS rprops, "
                "CASE WHEN startNode(r) = s THEN 'out' ELSE 'in' END AS direction",
                sid=secondary_id,
            )
            rels_data = await rels_result.data()

            migrated = 0
            for rel in rels_data:
                tid = rel["tid"]
                if tid == primary_id:
                    continue  # Skip self-loops
                rtype = rel["rtype"]
                rprops = rel["rprops"] or {}
                direction = rel["direction"]

                if not _SAFE_IDENTIFIER_RE.match(rtype):
                    continue

                if direction == "out":
                    await session.run(
                        f"MATCH (p) WHERE elementId(p) = $pid "
                        f"MATCH (t) WHERE elementId(t) = $tid "
                        f"MERGE (p)-[r:{rtype}]->(t) SET r += $props",
                        pid=primary_id, tid=tid, props=rprops,
                    )
                else:
                    await session.run(
                        f"MATCH (p) WHERE elementId(p) = $pid "
                        f"MATCH (t) WHERE elementId(t) = $tid "
                        f"MERGE (t)-[r:{rtype}]->(p) SET r += $props",
                        pid=primary_id, tid=tid, props=rprops,
                    )
                migrated += 1

            # Add alias to primary
            aliases: list[str] = []
            if add_alias and sec_name:
                # Get primary's current aliases
                pri_result = await session.run(
                    "MATCH (n) WHERE elementId(n) = $pid RETURN n.aliases AS aliases",
                    pid=primary_id,
                )
                pri_rec = await pri_result.single()
                aliases = list(pri_rec["aliases"] or []) if pri_rec else []
                if isinstance(aliases, str):
                    aliases = [aliases]

                new_aliases = list(dict.fromkeys(aliases + [sec_name] + sec_aliases))
                await session.run(
                    "MATCH (n) WHERE elementId(n) = $pid SET n.aliases = $aliases",
                    pid=primary_id, aliases=new_aliases,
                )
                aliases = new_aliases

            # Delete secondary
            await session.run(
                "MATCH (n) WHERE elementId(n) = $sid DETACH DELETE n",
                sid=secondary_id,
            )

        return {
            "primary_id": primary_id,
            "migrated_relationships": migrated,
            "aliases": aliases,
        }

    # ====================================================================
    # Relationships
    # ====================================================================

    async def list_relationships(
        self,
        rel_type: str | None = None,
        source_id: str | None = None,
        target_id: str | None = None,
        page: int = 1,
        page_size: int = 20,
    ) -> dict[str, Any]:
        skip = (page - 1) * page_size

        if rel_type and rel_type in self.get_rel_type_names() and _SAFE_IDENTIFIER_RE.match(rel_type):
            match = f"MATCH (a)-[r:{rel_type}]->(b)"
        else:
            match = "MATCH (a)-[r]->(b)"

        where_parts: list[str] = []
        params: dict[str, Any] = {"skip": skip, "limit": page_size}

        if source_id:
            where_parts.append("elementId(a) = $sid")
            params["sid"] = source_id
        if target_id:
            where_parts.append("elementId(b) = $tid")
            params["tid"] = target_id

        where_clause = f" WHERE {' AND '.join(where_parts)}" if where_parts else ""

        cypher = (
            f"{match}{where_clause} "
            "RETURN elementId(r) AS id, type(r) AS rtype, properties(r) AS props, "
            "elementId(a) AS src_id, a.name AS src_name, labels(a) AS src_labels, "
            "elementId(b) AS tgt_id, b.name AS tgt_name, labels(b) AS tgt_labels "
            "ORDER BY rtype SKIP $skip LIMIT $limit"
        )
        count_cypher = f"{match}{where_clause} RETURN count(r) AS total"

        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(cypher, **params)
            records = await result.data()

            count_result = await session.run(count_cypher, **params)
            count_rec = await count_result.single()
            total = count_rec["total"] if count_rec else 0

        items = [
            {
                "id": r["id"],
                "type": r["rtype"],
                "source_id": r["src_id"],
                "source_name": r["src_name"] or "",
                "source_labels": list(r["src_labels"] or []),
                "target_id": r["tgt_id"],
                "target_name": r["tgt_name"] or "",
                "target_labels": list(r["tgt_labels"] or []),
                "properties": dict(r["props"] or {}),
            }
            for r in records
        ]
        return {"items": items, "total": total, "page": page, "page_size": page_size}

    async def create_relationship(
        self,
        source_id: str,
        target_id: str,
        rel_type: str,
        properties: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        if rel_type not in self.get_rel_type_names():
            raise ValueError(f"Unknown relationship type: {rel_type}")
        if not _SAFE_IDENTIFIER_RE.match(rel_type):
            raise ValueError(f"Invalid relationship type: {rel_type}")

        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(
                f"MATCH (a) WHERE elementId(a) = $sid "
                f"MATCH (b) WHERE elementId(b) = $tid "
                f"CREATE (a)-[r:{rel_type}]->(b) SET r += $props "
                "RETURN elementId(r) AS id, type(r) AS rtype",
                sid=source_id, tid=target_id, props=properties or {},
            )
            rec = await result.single()
            if not rec:
                raise ValueError("Source or target entity not found")

        return {
            "id": rec["id"],
            "type": rec["rtype"],
            "source_id": source_id,
            "target_id": target_id,
        }

    async def delete_relationship(self, rel_id: str) -> bool:
        return await self._neo4j.delete_relationship_by_id(rel_id)

    # ====================================================================
    # Placeholders
    # ====================================================================

    async def list_placeholders(self) -> list[dict[str, Any]]:
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(
                "MATCH (d:Document {is_placeholder: true}) "
                "OPTIONAL MATCH (src:Document)-[:REFERENCES]->(d) "
                "WHERE src.is_placeholder IS NULL OR src.is_placeholder = false "
                "RETURN elementId(d) AS id, d.doc_id AS doc_id, "
                "d.doc_number AS doc_number, "
                "collect({doc_id: src.doc_id, title: src.title}) AS referencers"
            )
            records = await result.data()

        return [
            {
                "id": r["id"],
                "doc_id": r["doc_id"] or "",
                "doc_number": r["doc_number"] or "",
                "referencers": [
                    {"doc_id": ref["doc_id"] or "", "title": ref["title"] or ""}
                    for ref in r["referencers"]
                    if ref.get("doc_id")
                ],
            }
            for r in records
        ]

    async def link_placeholder(
        self, placeholder_id: str, real_doc_id: str
    ) -> dict[str, Any]:
        """Migrate REFERENCES relationships from placeholder to a real document, then delete placeholder."""
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            # Find placeholder
            check = await session.run(
                "MATCH (d:Document) WHERE elementId(d) = $pid "
                "AND d.is_placeholder = true RETURN d",
                pid=placeholder_id,
            )
            if not await check.single():
                raise ValueError(f"Placeholder {placeholder_id} not found")

            # Ensure real doc exists
            real_check = await session.run(
                "MATCH (d:Document {doc_id: $did}) RETURN d",
                did=real_doc_id,
            )
            if not await real_check.single():
                raise ValueError(f"Real document {real_doc_id} not found")

            # Migrate incoming REFERENCES relationships
            migrate_result = await session.run(
                "MATCH (src)-[r:REFERENCES]->(ph:Document) "
                "WHERE elementId(ph) = $pid "
                "MATCH (real:Document {doc_id: $did}) "
                "CREATE (src)-[r2:REFERENCES]->(real) SET r2 = properties(r) "
                "DELETE r "
                "RETURN count(*) AS migrated",
                pid=placeholder_id, did=real_doc_id,
            )
            rec = await migrate_result.single()
            migrated = rec["migrated"] if rec else 0

            # Delete placeholder
            await session.run(
                "MATCH (d:Document) WHERE elementId(d) = $pid DETACH DELETE d",
                pid=placeholder_id,
            )

        return {
            "placeholder_id": placeholder_id,
            "real_doc_id": real_doc_id,
            "migrated_relationships": migrated,
        }

    async def delete_placeholder(self, placeholder_id: str) -> bool:
        async with self._neo4j.driver.session(database=settings.neo4j_database) as session:
            result = await session.run(
                "MATCH (d:Document) WHERE elementId(d) = $pid "
                "AND d.is_placeholder = true "
                "DETACH DELETE d RETURN count(d) AS cnt",
                pid=placeholder_id,
            )
            rec = await result.single()
        return bool(rec and rec["cnt"] > 0)
