"""APScheduler wrappers for YAML columns and DB-backed crawl targets.

The project currently runs in a mixed configuration model:

* Legacy/YAML sites still use `config/sites/*.yaml`.
* 2.0/admin-managed sites use DB rows (`crawl_site` + `crawl_target`).

`build_scheduler()` is kept for the YAML path and existing tests. The production
`run_forever()` entrypoint uses `build_target_scheduler()` so the containerized
scheduler runs the same DB target model as the admin "run target" action.
"""
from __future__ import annotations

import hashlib
import logging
from typing import Callable

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select

from govcrawler.config.registry import _registry, reload as reload_registry
from govcrawler.db import get_sessionmaker
from govcrawler.models import CrawlSite, CrawlTarget

log = logging.getLogger(__name__)

WINDOW_START_HOUR = 1
WINDOW_END_HOUR = 5    # exclusive upper bound → last run ≤ 04:59
DEFAULT_SCHEDULE = "0 2 * * *"


def _stable_offset(site_id: str, column_id: str, window_minutes: int) -> int:
    """Deterministic spread: 0 ≤ offset < window_minutes based on stable hash."""
    h = hashlib.sha256(f"{site_id}/{column_id}".encode("utf-8")).digest()
    return int.from_bytes(h[:4], "big") % window_minutes


def spread_cron(site_id: str, column_id: str, cron_expr: str) -> str:
    """If cron_expr is the default (02:00 daily), spread across 01:00–05:00.

    Otherwise pass through unchanged — power users can set explicit crons in YAML.
    """
    if cron_expr.strip() != DEFAULT_SCHEDULE:
        return cron_expr
    window_minutes = (WINDOW_END_HOUR - WINDOW_START_HOUR) * 60
    off = _stable_offset(site_id, column_id, window_minutes)
    hour = WINDOW_START_HOUR + off // 60
    minute = off % 60
    return f"{minute} {hour} * * *"


def build_scheduler(
    job_fn: Callable[[str, str], None],
    *,
    apply_spread: bool = True,
) -> BlockingScheduler:
    """Create a BlockingScheduler with one job per enabled (site, column).

    `job_fn(site_id, column_id)` is the work to run — in production it's
    `lambda s, c: crawl_column(s, c)`, in tests a recorder.
    """
    reload_registry()
    sites = _registry()
    sched = BlockingScheduler(timezone="Asia/Shanghai")
    count = 0
    for site_id, site in sites.items():
        if not site.enabled:
            continue
        for col in site.columns:
            if not col.enabled:
                continue
            cron = spread_cron(site_id, col.column_id, col.schedule) if apply_spread else col.schedule
            trigger = CronTrigger.from_crontab(cron, timezone="Asia/Shanghai")
            job_id = f"{site_id}.{col.column_id}"
            sched.add_job(
                job_fn,
                trigger=trigger,
                id=job_id,
                args=[site_id, col.column_id],
                replace_existing=True,
                max_instances=1,
                coalesce=True,
                misfire_grace_time=3600,
            )
            log.info("scheduled job=%s cron=%r", job_id, cron)
            count += 1
    log.info("scheduler built jobs=%d sites=%d", count, len(sites))
    return sched


def _resolve_cron_for_target(
    target: CrawlTarget,
    site_code: str,
    yaml_path: str | None,
    site_schedule_cron: str | None,
    *,
    apply_spread: bool = True,
) -> str:
    """Resolve cron in priority order:
        target.schedule_cron →
        yaml column.schedule (with default_column fallback inside) →
        crawl_site.schedule_cron →
        DEFAULT_SCHEDULE.
    Mirrors the api's /api/jobs/scheduled resolver so what the operator sees
    there matches what apscheduler actually fires.
    """
    cron_expr = (target.schedule_cron or "").strip()
    if not cron_expr and yaml_path:
        try:
            from govcrawler.config import get_site_config
            cfg = get_site_config(site_code)
            if cfg is not None:
                prefix = f"{site_code}__"
                tail = (
                    target.target_code[len(prefix):]
                    if target.target_code.startswith(prefix)
                    else target.target_code
                )
                column_id = tail.split("__", 1)[-1]
                col = cfg.get_column(column_id)
                if col is not None and col.schedule:
                    cron_expr = col.schedule
        except Exception:
            pass
    if not cron_expr and site_schedule_cron:
        cron_expr = site_schedule_cron.strip()
    if not cron_expr:
        cron_expr = DEFAULT_SCHEDULE
    return (
        spread_cron(site_code, target.target_code, cron_expr)
        if apply_spread
        else cron_expr
    )


def _compute_desired_jobs(*, apply_spread: bool = True) -> dict[str, str]:
    """Return desired apscheduler state as `{job_id: cron_expression}` for
    every currently-enabled (site, target) row in DB. The reconciler diffs
    this against the live scheduler state — anything in here but missing
    from apscheduler gets added, anything in apscheduler but not here gets
    removed, anything where the cron changed gets rescheduled.
    """
    Session = get_sessionmaker()
    with Session() as session:
        rows = list(
            session.execute(
                select(
                    CrawlTarget,
                    CrawlSite.site_code,
                    CrawlSite.yaml_path,
                    CrawlSite.schedule_cron.label("site_schedule_cron"),
                )
                .join(CrawlSite, CrawlSite.id == CrawlTarget.site_id)
                .where(CrawlSite.enabled.is_(True))
                .where(CrawlTarget.enabled.is_(True))
                .order_by(CrawlSite.site_code, CrawlTarget.target_code)
            ).all()
        )
    out: dict[str, str] = {}
    for target, site_code, yaml_path, site_schedule_cron in rows:
        cron = _resolve_cron_for_target(
            target, site_code, yaml_path, site_schedule_cron,
            apply_spread=apply_spread,
        )
        out[f"target.{target.target_code}"] = cron
    return out


def _trigger_to_cron_str(trigger) -> str:
    """Best-effort serialize a CronTrigger back to a `m h dom mon dow` string
    for diffing. APScheduler's CronTrigger doesn't expose the original
    expression so we reconstruct from its fields. Order matches the cron
    expression we passed in.
    """
    try:
        fields = {f.name: str(f) for f in trigger.fields}
        return " ".join([
            fields.get("minute", "*"),
            fields.get("hour", "*"),
            fields.get("day", "*"),
            fields.get("month", "*"),
            fields.get("day_of_week", "*"),
        ])
    except Exception:
        return ""


def reconcile_jobs(
    sched: BlockingScheduler,
    job_fn: Callable[[str], None],
    *,
    apply_spread: bool = True,
    job_id_prefix: str = "target.",
) -> dict[str, int]:
    """Sync apscheduler state to DB. Add new enabled targets, remove disabled
    /deleted ones, reschedule when cron changed. Counts returned for ops
    visibility. Only touches jobs whose id starts with `job_id_prefix` —
    the periodic reconciler job itself uses a different prefix and is
    untouched.
    """
    desired = _compute_desired_jobs(apply_spread=apply_spread)
    current: dict[str, object] = {}
    for j in sched.get_jobs():
        if j.id.startswith(job_id_prefix):
            current[j.id] = j

    added = 0
    removed = 0
    rescheduled = 0
    for job_id, cron in desired.items():
        if job_id not in current:
            sched.add_job(
                job_fn,
                trigger=CronTrigger.from_crontab(cron, timezone="Asia/Shanghai"),
                id=job_id,
                args=[job_id[len(job_id_prefix):]],
                replace_existing=True,
                max_instances=1,
                coalesce=True,
                misfire_grace_time=3600,
            )
            log.info("reconcile: ADD job=%s cron=%r", job_id, cron)
            added += 1
        else:
            existing_cron = _trigger_to_cron_str(current[job_id].trigger)
            if existing_cron and existing_cron != cron:
                sched.reschedule_job(
                    job_id,
                    trigger=CronTrigger.from_crontab(cron, timezone="Asia/Shanghai"),
                )
                log.info("reconcile: RESCHEDULE job=%s cron=%r→%r",
                         job_id, existing_cron, cron)
                rescheduled += 1

    for job_id in list(current.keys()):
        if job_id not in desired:
            sched.remove_job(job_id)
            log.info("reconcile: REMOVE job=%s (no longer enabled in DB)", job_id)
            removed += 1

    if added or removed or rescheduled:
        log.info("reconcile done: +%d -%d ~%d (total=%d)",
                 added, removed, rescheduled, len(desired))
    return {"added": added, "removed": removed, "rescheduled": rescheduled,
            "total": len(desired)}


def build_target_scheduler(
    job_fn: Callable[[str], None],
    *,
    apply_spread: bool = True,
) -> BlockingScheduler:
    """Create a scheduler with one job per enabled DB `crawl_target`.

    Equivalent to constructing an empty scheduler and calling reconcile_jobs
    once. The startup pass is identical to a periodic reconcile run, which
    keeps "first run" and "Nth run" behavior aligned.
    """
    sched = BlockingScheduler(timezone="Asia/Shanghai")
    summary = reconcile_jobs(sched, job_fn, apply_spread=apply_spread)
    log.info("target scheduler built jobs=%d", summary["total"])
    return sched


def run_forever() -> None:
    """Entry point for `python -m govcrawler schedule`.

    Jobs are dispatched to the api process via HTTP so they go through the
    per-site task queue (see govcrawler/api/task_queue.py). This means
    cron-driven crawls share the same serialization rules as manual runs
    from the dashboard — same site never runs two targets concurrently,
    which is what prevents anti-bot triggers.

    SCHEDULER_API_URL controls the base URL; in docker-compose the api
    service is reachable as http://api:8787. If api is down we SKIP this
    cron tick entirely — the inline fallback was removed because it
    bypassed the per-site task queue (allowing parallel runs of the same
    site, which trips WAFs) and the enabled-flag check.
    """
    import os

    import httpx

    api_url = os.environ.get("SCHEDULER_API_URL", "http://api:8787").rstrip("/")

    # Authenticate to the api with the same Basic Auth credentials the admin
    # UI uses. Without this, when production sets ADMIN_USER/ADMIN_PASSWORD
    # the api returns 401 to every cron-driven /admin/api/targets/.../run
    # call and the entire scheduled-crawl chain silently dies.
    admin_user = os.environ.get("ADMIN_USER", "")
    admin_pw = os.environ.get("ADMIN_PASSWORD", "")
    api_auth = (admin_user, admin_pw) if admin_user and admin_pw else None
    if api_auth is None:
        log.warning(
            "scheduler: ADMIN_USER/ADMIN_PASSWORD not set — calls to api will "
            "fail if Basic Auth is enabled. Set them in the .env shared with "
            "the api container.",
        )

    def _job(target_code: str) -> None:
        # Runtime enabled-flag gate. apscheduler registers all jobs ONCE at
        # process start, so disabling a site/target via the admin UI does
        # NOT remove the in-memory cron — without this check, stale jobs
        # keep firing forever (until scheduler restart). Worse, when api is
        # also flaky the inline fallback below would spawn playwright
        # subprocesses on every fire and they can leak under load (we saw
        # ~5000 zombie processes accumulate in the scheduler container in
        # ~18h). Cheap DB lookup here makes "disable in UI" actually stop
        # the crawl within one cron tick.
        try:
            from govcrawler.db import get_sessionmaker
            from govcrawler.models import CrawlSite, CrawlTarget
            S = get_sessionmaker()
            with S() as s:
                t = (
                    s.query(CrawlTarget)
                    .join(CrawlSite, CrawlSite.id == CrawlTarget.site_id)
                    .filter(CrawlTarget.target_code == target_code)
                    .first()
                )
                if t is None:
                    log.info("scheduler: target_code=%s no longer exists — skip", target_code)
                    return
                if not t.enabled:
                    log.info("scheduler: target=%s disabled — skip", target_code)
                    return
                if not t.site.enabled:
                    log.info("scheduler: target=%s site=%s disabled — skip",
                             target_code, t.site.site_code)
                    return
        except Exception as e:
            log.warning("scheduler enabled-check failed target=%s — proceeding. err=%s",
                        target_code, e)

        # Primary path: enqueue via api → task queue guarantees per-site
        # concurrency=1 across manual + cron triggers.
        try:
            r = httpx.post(
                f"{api_url}/admin/api/targets/{target_code}/run",
                timeout=10.0,
                auth=api_auth,
            )
            if r.status_code == 200:
                body = r.json()
                log.info("crawl enqueued target=%s job=%s (via api)", target_code, body.get("job_id"))
                return
            # CRITICAL: api responded but rejected (e.g. 409 disabled, 404 not
            # found, 422 validation). DO NOT fall through to inline — that
            # bypasses every business rule the api enforces. Inline fallback
            # is only for "api unreachable" (network exception below).
            log.warning("api enqueue rejected target=%s status=%s body=%s — NOT falling back",
                        target_code, r.status_code, r.text[:200])
            return
        except Exception as e:
            # api unreachable. We deliberately do NOT fall back to running
            # crawl_target() inline here. The inline path:
            #   1. bypasses the api's per-site task queue → multiple targets
            #      of the same site fire in parallel (apscheduler's default
            #      ThreadPoolExecutor has 10 workers), violating the "one
            #      site, one task at a time" invariant we rely on for
            #      anti-bot pacing.
            #   2. bypasses the api's enabled-flag check.
            #   3. on ctct sites, each fire spawns a chromium subprocess;
            #      under load these can leak and accumulate (we saw 4974
            #      zombie processes in 18h before this was fixed).
            # api is co-located with scheduler in docker compose; if it's
            # really down the operator will notice within minutes. Until
            # then we just skip — the next cron tick will retry.
            log.warning(
                "api enqueue failed target=%s — SKIPPING (next cron tick will retry). err=%s",
                target_code, e,
            )
            return

    sched = build_target_scheduler(_job)

    # Periodic reconciler: re-reads DB every RECONCILE_INTERVAL_SEC and
    # syncs apscheduler in-memory state to it. Without this, enabling a
    # disabled target / changing cron / creating a new target in the admin
    # UI required a full scheduler container restart to take effect.
    # Default 300s — operator-visible UI changes propagate within 5 min.
    # Override via SCHEDULER_RECONCILE_SEC env var.
    import os as _os
    reconcile_interval = int(_os.environ.get("SCHEDULER_RECONCILE_SEC", "300"))

    def _reconcile_tick() -> None:
        try:
            reconcile_jobs(sched, _job)
        except Exception:
            log.exception("reconcile tick crashed — keeping previous in-memory state")

    sched.add_job(
        _reconcile_tick,
        trigger="interval",
        seconds=reconcile_interval,
        id="admin.reconcile_jobs",
        replace_existing=True,
        max_instances=1,
        coalesce=True,
        misfire_grace_time=60,
    )
    log.info("registered admin reconcile job interval=%ds", reconcile_interval)

    log.info("starting BlockingScheduler — dispatching via %s", api_url)
    try:
        sched.start()
    except (KeyboardInterrupt, SystemExit):
        log.info("scheduler stopped by user")
