"""APScheduler wrapper: build a BlockingScheduler from YAML site configs.

Each enabled (site, column) becomes one cron job. The cron expression comes from
the column's `schedule` field. To satisfy the design-doc "01:00–05:00 错峰"
constraint, `spread_minutes_in_window()` can rewrite the hour/minute for any
column whose schedule is the default daily 02:00 — distributing evenly in the
01:00–05:00 window based on a stable hash of (site_id, column_id).
"""
from __future__ import annotations

import hashlib
import logging
from typing import Callable

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

from govcrawler.config.registry import _registry, reload as reload_registry

log = logging.getLogger(__name__)

WINDOW_START_HOUR = 1
WINDOW_END_HOUR = 5    # exclusive upper bound → last run ≤ 04:59
DEFAULT_SCHEDULE = "0 2 * * *"


def _stable_offset(site_id: str, column_id: str, window_minutes: int) -> int:
    """Deterministic spread: 0 ≤ offset < window_minutes based on stable hash."""
    h = hashlib.sha256(f"{site_id}/{column_id}".encode("utf-8")).digest()
    return int.from_bytes(h[:4], "big") % window_minutes


def spread_cron(site_id: str, column_id: str, cron_expr: str) -> str:
    """If cron_expr is the default (02:00 daily), spread across 01:00–05:00.

    Otherwise pass through unchanged — power users can set explicit crons in YAML.
    """
    if cron_expr.strip() != DEFAULT_SCHEDULE:
        return cron_expr
    window_minutes = (WINDOW_END_HOUR - WINDOW_START_HOUR) * 60
    off = _stable_offset(site_id, column_id, window_minutes)
    hour = WINDOW_START_HOUR + off // 60
    minute = off % 60
    return f"{minute} {hour} * * *"


def build_scheduler(
    job_fn: Callable[[str, str], None],
    *,
    apply_spread: bool = True,
) -> BlockingScheduler:
    """Create a BlockingScheduler with one job per enabled (site, column).

    `job_fn(site_id, column_id)` is the work to run — in production it's
    `lambda s, c: crawl_column(s, c)`, in tests a recorder.
    """
    reload_registry()
    sites = _registry()
    sched = BlockingScheduler(timezone="Asia/Shanghai")
    count = 0
    for site_id, site in sites.items():
        if not site.enabled:
            continue
        for col in site.columns:
            if not col.enabled:
                continue
            cron = spread_cron(site_id, col.column_id, col.schedule) if apply_spread else col.schedule
            trigger = CronTrigger.from_crontab(cron, timezone="Asia/Shanghai")
            job_id = f"{site_id}.{col.column_id}"
            sched.add_job(
                job_fn,
                trigger=trigger,
                id=job_id,
                args=[site_id, col.column_id],
                replace_existing=True,
                max_instances=1,
                coalesce=True,
                misfire_grace_time=3600,
            )
            log.info("scheduled job=%s cron=%r", job_id, cron)
            count += 1
    log.info("scheduler built jobs=%d sites=%d", count, len(sites))
    return sched


def run_forever() -> None:
    """Entry point for `python -m govcrawler schedule`."""
    from govcrawler.pipeline import crawl_column

    def _job(site_id: str, column_id: str) -> None:
        try:
            r = crawl_column(site_id, column_id)
            log.info(
                "crawl job done site=%s column=%s result=%s", site_id, column_id,
                {k: r.get(k) for k in ("status", "items_seen", "items_new", "items_skipped")},
            )
        except Exception:
            log.exception("crawl job crashed site=%s column=%s", site_id, column_id)

    sched = build_scheduler(_job)
    log.info("starting BlockingScheduler — Ctrl-C to exit")
    try:
        sched.start()
    except (KeyboardInterrupt, SystemExit):
        log.info("scheduler stopped by user")
