"""Threshold checks against `crawl_log` + `article` — v2 schema aligned.

Rules (design doc §4.7, field names updated to v2):
  R1: per-target 24h 成功率 < 80%（样本 ≥ 5 条）
  R2: 同站 1h 内 412/403 比例 > 30%（样本 ≥ 10 条）
  R3: target 24h 内无新 'ready' 文章，但前一个 24h 有 → 选择器可能失效

Schema change from 1.0:
  * `CrawlLog.column_id` (str) → `target_id` (int FK to crawl_target.id)
  * `Article.column_id` (str) → `target_id` (int FK)
  * `site_id` is now int FK → we resolve display codes/names via joins
"""
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Sequence

from sqlalchemy import and_, case, func, select
from sqlalchemy.orm import Session

from govcrawler.db import get_sessionmaker
from govcrawler.models import Article, CrawlLog, CrawlSite, CrawlTarget

# Thresholds
R1_SUCCESS_FLOOR = 0.80
R1_MIN_SAMPLES = 5
R2_BLOCK_CEILING = 0.30
R2_MIN_SAMPLES = 10
R2_BLOCK_STATUSES = (403, 412, 429)

WINDOW_24H = timedelta(hours=24)
WINDOW_1H = timedelta(hours=1)

# R3 误报抑制：栏目最近一篇 ready 文章发布日期超过这么久之前，认为该栏目
# 本身就是低更新（机构信息、长期公示等），24h 没新货是常态，不告警。
# 政策性文件多数 2 周内必有新增；超过 30 天还没动静的栏目通常不是选择器坏了。
R3_STALE_TARGET_QUIET_DAYS = 30


@dataclass
class AlertRule:
    code: str
    site_code: str | None  # crawl_site.site_code for human display
    target_code: str | None  # crawl_target.target_code; None for site-wide rules
    message: str


# ---------------------------------------------------------------------------
# R1 — per-target success rate
# ---------------------------------------------------------------------------
def _check_success_rate(session: Session, now: datetime) -> list[AlertRule]:
    since = now - WINDOW_24H
    stmt = (
        select(
            CrawlSite.site_code,
            CrawlTarget.target_code,
            func.count(CrawlLog.id).label("total"),
            func.sum(case((CrawlLog.success.is_(True), 1), else_=0)).label("ok"),
        )
        .select_from(CrawlLog)
        .join(CrawlSite, CrawlSite.id == CrawlLog.site_id, isouter=True)
        .join(CrawlTarget, CrawlTarget.id == CrawlLog.target_id, isouter=True)
        .where(CrawlLog.occurred_at >= since)
        .group_by(CrawlSite.site_code, CrawlTarget.target_code)
    )
    out: list[AlertRule] = []
    for site_code, target_code, total, ok in session.execute(stmt).all():
        total = int(total or 0)
        ok = int(ok or 0)
        if total < R1_MIN_SAMPLES:
            continue
        rate = ok / total if total else 0.0
        if rate < R1_SUCCESS_FLOOR:
            out.append(AlertRule(
                code="R1_SUCCESS_RATE",
                site_code=site_code, target_code=target_code,
                message=(
                    f"[R1] 24h 成功率偏低 site={site_code} target={target_code} "
                    f"rate={rate:.0%} (ok={ok}/{total}, 阈值 {R1_SUCCESS_FLOOR:.0%})"
                ),
            ))
    return out


# ---------------------------------------------------------------------------
# R2 — per-site block rate
# ---------------------------------------------------------------------------
def _check_block_rate(session: Session, now: datetime) -> list[AlertRule]:
    since = now - WINDOW_1H
    stmt = (
        select(
            CrawlSite.site_code,
            func.count(CrawlLog.id).label("total"),
            func.sum(
                case((CrawlLog.http_status.in_(R2_BLOCK_STATUSES), 1), else_=0)
            ).label("blocked"),
        )
        .select_from(CrawlLog)
        .join(CrawlSite, CrawlSite.id == CrawlLog.site_id, isouter=True)
        .where(CrawlLog.occurred_at >= since)
        .group_by(CrawlSite.site_code)
    )
    out: list[AlertRule] = []
    for site_code, total, blocked in session.execute(stmt).all():
        total = int(total or 0)
        blocked = int(blocked or 0)
        if total < R2_MIN_SAMPLES:
            continue
        rate = blocked / total if total else 0.0
        if rate > R2_BLOCK_CEILING:
            out.append(AlertRule(
                code="R2_BLOCK_RATE",
                site_code=site_code, target_code=None,
                message=(
                    f"[R2] 1h 412/403 比例偏高 site={site_code} "
                    f"rate={rate:.0%} (blocked={blocked}/{total}, 阈值 {R2_BLOCK_CEILING:.0%})"
                ),
            ))
    return out


# ---------------------------------------------------------------------------
# R3 — per-target stale selector probe
# ---------------------------------------------------------------------------
def _active_targets(
    session: Session, *, since: datetime, until: datetime | None = None
) -> set[tuple[str | None, str | None]]:
    """Return set of (site_code, target_code) with ≥1 ready article in window."""
    stmt = (
        select(CrawlSite.site_code, CrawlTarget.target_code)
        .select_from(Article)
        .join(CrawlSite, CrawlSite.id == Article.site_id, isouter=True)
        .join(CrawlTarget, CrawlTarget.id == Article.target_id, isouter=True)
        .where(Article.status == "ready", Article.fetched_at >= since)
        .group_by(CrawlSite.site_code, CrawlTarget.target_code)
    )
    if until is not None:
        stmt = stmt.where(Article.fetched_at < until)
    return {(s, t) for s, t in session.execute(stmt).all()}


def _latest_article_publish_per_target(
    session: Session,
) -> dict[tuple[str | None, str | None], datetime]:
    """Return {(site_code, target_code): max(publish_time)} across all
    `ready` articles. Used by R3 to suppress alerts on targets whose
    latest publish is so old the column is clearly a low-update one (机构
    信息 / 长期公示 / 历史归档) — 24h-no-new is the steady state there,
    not a selector regression."""
    stmt = (
        select(
            CrawlSite.site_code,
            CrawlTarget.target_code,
            func.max(Article.publish_time).label("latest_pub"),
        )
        .select_from(Article)
        .join(CrawlSite, CrawlSite.id == Article.site_id, isouter=True)
        .join(CrawlTarget, CrawlTarget.id == Article.target_id, isouter=True)
        .where(Article.status == "ready")
        .group_by(CrawlSite.site_code, CrawlTarget.target_code)
    )
    return {
        (s, t): pub for s, t, pub in session.execute(stmt).all() if pub is not None
    }


def _check_stale_targets(session: Session, now: datetime) -> list[AlertRule]:
    cur_since = now - WINDOW_24H
    prev_since = now - 2 * WINDOW_24H

    prev_active = _active_targets(session, since=prev_since, until=cur_since)
    cur_active = _active_targets(session, since=cur_since)
    latest_pub = _latest_article_publish_per_target(session)

    quiet_cutoff = now - timedelta(days=R3_STALE_TARGET_QUIET_DAYS)

    out: list[AlertRule] = []
    for site_code, target_code in sorted(
        prev_active - cur_active, key=lambda x: (x[0] or "", x[1] or "")
    ):
        # Suppress R3 when the column's most recent ready article is
        # already older than R3_STALE_TARGET_QUIET_DAYS days — the column
        # publishes infrequently by nature, no new posts in 24h is fine.
        latest = latest_pub.get((site_code, target_code))
        if latest is not None and latest < quiet_cutoff:
            continue
        out.append(AlertRule(
            code="R3_STALE_TARGET",
            site_code=site_code, target_code=target_code,
            message=(
                f"[R3] 栏目 24h 无新文章但上 24h 有 "
                f"site={site_code} target={target_code} — 选择器可能失效"
            ),
        ))
    return out


# ---------------------------------------------------------------------------
# public entry
# ---------------------------------------------------------------------------
def run_checks(
    *,
    now: datetime | None = None,
    session: Session | None = None,
) -> Sequence[AlertRule]:
    """Run all three rules; return a flat list of AlertRule entries."""
    now = now or datetime.utcnow()
    if session is None:
        SessionMaker = get_sessionmaker()
        with SessionMaker() as s:
            return (
                _check_success_rate(s, now)
                + _check_block_rate(s, now)
                + _check_stale_targets(s, now)
            )
    return (
        _check_success_rate(session, now)
        + _check_block_rate(session, now)
        + _check_stale_targets(session, now)
    )
