"""APScheduler wrappers for YAML columns and DB-backed crawl targets.

The project currently runs in a mixed configuration model:

* Legacy/YAML sites still use `config/sites/*.yaml`.
* 2.0/admin-managed sites use DB rows (`crawl_site` + `crawl_target`).

`build_scheduler()` is kept for the YAML path and existing tests. The production
`run_forever()` entrypoint uses `build_target_scheduler()` so the containerized
scheduler runs the same DB target model as the admin "run target" action.
"""
from __future__ import annotations

import hashlib
import logging
from typing import Callable

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select

from govcrawler.config.registry import _registry, reload as reload_registry
from govcrawler.db import get_sessionmaker
from govcrawler.models import CrawlSite, CrawlTarget

log = logging.getLogger(__name__)

WINDOW_START_HOUR = 1
WINDOW_END_HOUR = 5    # exclusive upper bound → last run ≤ 04:59
DEFAULT_SCHEDULE = "0 2 * * *"


def _stable_offset(site_id: str, column_id: str, window_minutes: int) -> int:
    """Deterministic spread: 0 ≤ offset < window_minutes based on stable hash."""
    h = hashlib.sha256(f"{site_id}/{column_id}".encode("utf-8")).digest()
    return int.from_bytes(h[:4], "big") % window_minutes


def spread_cron(site_id: str, column_id: str, cron_expr: str) -> str:
    """If cron_expr is the default (02:00 daily), spread across 01:00–05:00.

    Otherwise pass through unchanged — power users can set explicit crons in YAML.
    """
    if cron_expr.strip() != DEFAULT_SCHEDULE:
        return cron_expr
    window_minutes = (WINDOW_END_HOUR - WINDOW_START_HOUR) * 60
    off = _stable_offset(site_id, column_id, window_minutes)
    hour = WINDOW_START_HOUR + off // 60
    minute = off % 60
    return f"{minute} {hour} * * *"


def build_scheduler(
    job_fn: Callable[[str, str], None],
    *,
    apply_spread: bool = True,
) -> BlockingScheduler:
    """Create a BlockingScheduler with one job per enabled (site, column).

    `job_fn(site_id, column_id)` is the work to run — in production it's
    `lambda s, c: crawl_column(s, c)`, in tests a recorder.
    """
    reload_registry()
    sites = _registry()
    sched = BlockingScheduler(timezone="Asia/Shanghai")
    count = 0
    for site_id, site in sites.items():
        if not site.enabled:
            continue
        for col in site.columns:
            if not col.enabled:
                continue
            cron = spread_cron(site_id, col.column_id, col.schedule) if apply_spread else col.schedule
            trigger = CronTrigger.from_crontab(cron, timezone="Asia/Shanghai")
            job_id = f"{site_id}.{col.column_id}"
            sched.add_job(
                job_fn,
                trigger=trigger,
                id=job_id,
                args=[site_id, col.column_id],
                replace_existing=True,
                max_instances=1,
                coalesce=True,
                misfire_grace_time=3600,
            )
            log.info("scheduled job=%s cron=%r", job_id, cron)
            count += 1
    log.info("scheduler built jobs=%d sites=%d", count, len(sites))
    return sched


def build_target_scheduler(
    job_fn: Callable[[str], None],
    *,
    apply_spread: bool = True,
) -> BlockingScheduler:
    """Create a scheduler with one job per enabled DB `crawl_target`.

    The job function receives `target_code`, matching `pipeline.crawl_target`.
    Targets without an explicit cron use the legacy daily 02:00 default and are
    spread across the same 01:00-05:00 window.
    """
    sched = BlockingScheduler(timezone="Asia/Shanghai")
    Session = get_sessionmaker()
    with Session() as session:
        rows = list(
            session.execute(
                select(CrawlTarget, CrawlSite.site_code)
                .join(CrawlSite, CrawlSite.id == CrawlTarget.site_id)
                .where(CrawlSite.enabled.is_(True))
                .where(CrawlTarget.enabled.is_(True))
                .order_by(CrawlSite.site_code, CrawlTarget.target_code)
            ).all()
        )

    count = 0
    for target, site_code in rows:
        cron_expr = target.schedule_cron or DEFAULT_SCHEDULE
        cron = (
            spread_cron(site_code, target.target_code, cron_expr)
            if apply_spread
            else cron_expr
        )
        trigger = CronTrigger.from_crontab(cron, timezone="Asia/Shanghai")
        job_id = f"target.{target.target_code}"
        sched.add_job(
            job_fn,
            trigger=trigger,
            id=job_id,
            args=[target.target_code],
            replace_existing=True,
            max_instances=1,
            coalesce=True,
            misfire_grace_time=3600,
        )
        log.info("scheduled target job=%s cron=%r", job_id, cron)
        count += 1

    log.info("target scheduler built jobs=%d", count)
    return sched


def run_forever() -> None:
    """Entry point for `python -m govcrawler schedule`."""
    from govcrawler.pipeline import crawl_target

    def _job(target_code: str) -> None:
        try:
            r = crawl_target(target_code)
            log.info(
                "crawl job done target=%s result=%s",
                target_code,
                {k: r.get(k) for k in ("status", "items_seen", "items_new", "items_skipped")},
            )
        except Exception:
            log.exception("crawl job crashed target=%s", target_code)

    sched = build_target_scheduler(_job)
    log.info("starting BlockingScheduler — Ctrl-C to exit")
    try:
        sched.start()
    except (KeyboardInterrupt, SystemExit):
        log.info("scheduler stopped by user")
