"""Redis async client wrapper for permission caching and pub/sub.

Redis 异步客户端封装模块。
主要职责：
1. 用户权限缓存 —— 避免每次请求都重新解析 JWT 构建 ACL 令牌
2. 任务消息发布/订阅 —— 用于 Celery 之外的轻量级任务通知
3. 健康检查 —— 供启动探针和就绪探针使用
"""

from __future__ import annotations

import json
from typing import Any

import redis.asyncio as aioredis

from app.config import settings
from app.utils.logger import get_logger

logger = get_logger(__name__)

# Key prefixes
_PERM_PREFIX = "zm:perm:user:"
_TASK_CHANNEL = "zm:tasks"


class RedisClient:
    """Thin async wrapper around ``redis.asyncio.Redis``.

    对 redis.asyncio 的薄封装，提供权限缓存、Pub/Sub 消息和健康检查接口。
    """

    def __init__(self, client: aioredis.Redis) -> None:
        self._r = client

    @classmethod
    def from_settings(cls) -> "RedisClient":
        """Build a client from the global application settings.

        连接池大小优先从 settings 读取，未配置则使用默认值 20。
        Pool size is read from settings if available, otherwise defaults to 20.
        """
        # 配置连接池大小，避免高并发时连接耗尽
        # Configure connection pool size to prevent exhaustion under load
        max_connections = getattr(settings, "redis_max_connections", 20)
        client = aioredis.from_url(
            settings.redis_url,
            decode_responses=True,
            max_connections=max_connections,
        )
        return cls(client)

    @property
    def raw(self) -> aioredis.Redis:
        return self._r

    async def close(self) -> None:
        await self._r.aclose()

    # ── Permission cache ─────────────────────────────────────────────────

    async def get_user_permissions(self, user_id: str) -> dict[str, Any] | None:
        """Retrieve cached permission data for *user_id*.

        Returns ``None`` on cache miss.  The cached value is expected to be
        a dict with an ``acl_tokens`` key (list of prefixed IDs).
        """
        key = f"{_PERM_PREFIX}{user_id}"
        raw = await self._r.get(key)
        if raw is None:
            return None
        try:
            return json.loads(raw)
        except (json.JSONDecodeError, TypeError):
            logger.warning("redis_bad_perm_cache", user_id=user_id)
            return None

    async def set_user_permissions(
        self,
        user_id: str,
        permissions: dict[str, Any],
        *,
        ttl: int | None = None,
    ) -> None:
        """Cache *permissions* for *user_id* with a TTL (default from settings)."""
        key = f"{_PERM_PREFIX}{user_id}"
        ttl = ttl or settings.redis_permissions_ttl
        await self._r.set(key, json.dumps(permissions, ensure_ascii=False), ex=ttl)

    async def invalidate_user_permissions(self, user_id: str) -> None:
        """Remove the cached permissions for a single user."""
        await self._r.delete(f"{_PERM_PREFIX}{user_id}")

    # ── Task queue pub/sub ───────────────────────────────────────────────

    async def publish_task(self, task_payload: dict[str, Any]) -> None:
        """Publish a task message to the shared Redis channel."""
        await self._r.publish(_TASK_CHANNEL, json.dumps(task_payload, ensure_ascii=False))
        logger.debug("redis_task_published", payload_keys=list(task_payload.keys()))

    async def subscribe_tasks(self) -> aioredis.client.PubSub:
        """返回已订阅任务频道的 Pub/Sub 对象。
        Return an async pub/sub object subscribed to the task channel.

        Usage::

            pubsub = await redis_client.subscribe_tasks()
            async for message in pubsub.listen():
                if message["type"] == "message":
                    data = json.loads(message["data"])
                    ...
        """
        pubsub = self._r.pubsub()
        await pubsub.subscribe(_TASK_CHANNEL)
        return pubsub

    # ── Distributed lock ────────────────────────────────────────────────

    def lock(self, name: str, timeout: int = 30):
        """返回一个 Redis 分布式锁（异步上下文管理器），用于保护并发写入关键区。

        Return a Redis distributed lock (async context manager) for protecting
        critical sections against concurrent writes.

        调用方式: async with redis_client.lock("my_lock", timeout=30): ...
        注意: 这是同步方法，底层 raw.lock() 本身返回异步上下文管理器，无需 await。
        """
        return self._r.lock(name, timeout=timeout)

    # ── Redis Stream helpers ─────────────────────────────────────────────

    async def xadd(self, key: str, fields: dict[str, str], maxlen: int | None = None) -> str:
        """Append an entry to a Redis Stream. Returns the entry ID."""
        return await self._r.xadd(key, fields, maxlen=maxlen)

    async def xread(
        self,
        streams: dict[str, str],
        count: int = 10,
        block: int = 2000,
    ) -> list:
        """Read entries from Redis Streams. ``block`` in milliseconds."""
        return await self._r.xread(streams, count=count, block=block)

    async def expire(self, key: str, seconds: int) -> bool:
        """Set a TTL on a key."""
        return await self._r.expire(key, seconds)

    # ── Generic helpers ──────────────────────────────────────────────────

    async def ping(self) -> bool:
        """Return ``True`` if the Redis server is reachable."""
        try:
            return await self._r.ping()
        except Exception:
            return False
