"""Task-queue admin endpoints — list / cancel crawl jobs + show what cron
will fire next. The runtime queue lives in the api process (TaskQueue);
the cron schedule lives in the scheduler process. We can't introspect
apscheduler across processes, so the /api/jobs/scheduled endpoint
recomputes next-fire times on the api side using the same rules
(target.schedule_cron → yaml column.schedule → DEFAULT_SCHEDULE,
optionally spread).
"""
from __future__ import annotations

from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo

from apscheduler.triggers.cron import CronTrigger
from fastapi import Body, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.orm import Session

from govcrawler.api.task_queue import get_queue
from govcrawler.config import get_site_config
from govcrawler.models import CrawlJob, CrawlSite, CrawlTarget
from govcrawler.scheduler import DEFAULT_SCHEDULE, spread_cron

from ._common import _session, iso_cn, router


@router.get("/api/jobs")
def list_jobs(
    site: str | None = Query(None, description="filter by site_code"),
    status: str | None = Query(None, description="queued|running|done|failed|cancelled"),
    include_history: bool = Query(True),
    limit: int = Query(200, ge=1, le=5000),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    q = get_queue()
    jobs = [j.to_dict() for j in q.list_jobs(site=site, status=status, include_history=include_history, limit=limit)]
    summary = q.queue_summary()
    site_codes = {
        code
        for j in jobs
        for code in [j.get("site_code")]
        if code
    }
    for row in summary:
        site_codes.update(row.get("site_codes") or [])
        if row.get("running_site"):
            site_codes.add(row["running_site"])
        if row.get("next_queued_site"):
            site_codes.add(row["next_queued_site"])
    target_codes = {
        code
        for j in jobs
        for code in [j.get("target_code")]
        if code
    }
    for row in summary:
        if row.get("running_target"):
            target_codes.add(row["running_target"])
        if row.get("next_queued_target"):
            target_codes.add(row["next_queued_target"])

    sites = {}
    if site_codes:
        sites = {
            row.site_code: row.site_name
            for row in s.execute(
                select(CrawlSite).where(CrawlSite.site_code.in_(site_codes))
            ).scalars()
        }
    targets = {}
    if target_codes:
        targets = {
            row.target_code: {
                "target_name": row.target_name,
                "channel_name": row.channel_name,
                "channel_path": row.channel_path,
            }
            for row in s.execute(
                select(CrawlTarget).where(CrawlTarget.target_code.in_(target_codes))
            ).scalars()
        }
    for j in jobs:
        j["site_name"] = sites.get(j.get("site_code"))
        meta = targets.get(j.get("target_code")) or {}
        j.update(meta)

    durable_jobs = {}
    job_ids = [j.get("job_id") for j in jobs if j.get("job_id")]
    if job_ids:
        durable_jobs = {
            row.job_id: row
            for row in s.execute(
                select(CrawlJob).where(CrawlJob.job_id.in_(job_ids))
            ).scalars()
        }
        for j in jobs:
            row = durable_jobs.get(j.get("job_id"))
            if row is None:
                continue
            j["current_page"] = row.current_page or 0
            j["last_completed_page"] = row.last_completed_page or 0
    for row in summary:
        row["site_name"] = sites.get(row.get("site_code"))
        row["running_site_name"] = sites.get(row.get("running_site"))
        row["next_queued_site_name"] = sites.get(row.get("next_queued_site"))
        running = targets.get(row.get("running_target")) or {}
        next_queued = targets.get(row.get("next_queued_target")) or {}
        row["running_target_name"] = running.get("target_name")
        row["next_queued_target_name"] = next_queued.get("target_name")
        running_job_id = row.get("running_job_id")
        if running_job_id and job_ids:
            durable = durable_jobs.get(running_job_id)
            if durable is not None:
                row["running_current_page"] = durable.current_page or 0
                row["running_last_completed_page"] = durable.last_completed_page or 0
    return {
        "count": len(jobs),
        "queue_summary": summary,
        "jobs": jobs,
    }


@router.post("/api/jobs/{job_id}/cancel")
async def cancel_job(job_id: str) -> dict[str, Any]:
    r = await get_queue().cancel(job_id)
    if not r.get("ok"):
        raise HTTPException(404 if r.get("reason") == "not_found" else 409, r.get("reason", "cannot cancel"))
    return r


@router.get("/api/jobs/{job_id}/checkpoint")
def get_checkpoint(
    job_id: str, s: Session = Depends(_session),
) -> dict[str, Any]:
    """Return the durable crawl_job row's checkpoint state.

    Used by the UI to render "已完成第 N 页 / 从第 M 页恢复" hints next
    to a job. Surface track_checkpoint flag too so the UI knows whether
    to show the editor at all (only meaningful for opt-in targets)."""
    from govcrawler.models import CrawlJob, CrawlTarget
    cj = s.get(CrawlJob, job_id)
    if cj is None:
        raise HTTPException(404, "job not found in durable store")
    tc = (
        s.query(CrawlTarget).filter_by(target_code=cj.target_code).first()
    )
    return {
        "job_id": job_id,
        "target_code": cj.target_code,
        "status": cj.status,
        "attempt_count": cj.attempt_count,
        "current_page": cj.current_page or 0,
        "last_completed_page": cj.last_completed_page or 0,
        "track_checkpoint": bool(tc.track_checkpoint) if tc else False,
    }


@router.patch("/api/jobs/{job_id}/checkpoint")
async def patch_checkpoint(
    job_id: str,
    body: dict = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Manual override of last_completed_page on a queued / failed /
    cancelled job. Useful when ops know the target station was lost mid-
    page and want to force re-do that page on resume.

    Body: {"last_completed_page": <int>=0}

    Refused on running jobs — wait for it to finish or cancel first to
    avoid racing the worker's own checkpoint writes."""
    from govcrawler.models import CrawlJob
    page = body.get("last_completed_page")
    if not isinstance(page, int) or page < 0:
        raise HTTPException(400, "last_completed_page must be an int >= 0")
    cj = s.get(CrawlJob, job_id)
    if cj is None:
        raise HTTPException(404, "job not found")
    if cj.status == "running":
        raise HTTPException(
            409, "cannot patch checkpoint while job is running — cancel first",
        )
    cj.last_completed_page = page
    s.commit()
    return {"ok": True, "job_id": job_id, "last_completed_page": page}


def _decompose_target_code(site_code: str, target_code: str) -> tuple[str | None, str]:
    """Mirror of pipeline._decompose_target_code — kept private here to
    avoid pulling pipeline (and its heavy imports) into the api hot path."""
    prefix = f"{site_code}__"
    if not target_code.startswith(prefix):
        return None, target_code
    tail = target_code[len(prefix):]
    parts = tail.split("__", 1)
    if len(parts) == 1:
        return None, parts[0]
    return parts[0], parts[1]


@router.get("/api/jobs/scheduled")
def list_scheduled_jobs(
    site: str | None = Query(None, description="filter by site_code"),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """List all enabled crawl_target rows with their cron expression and
    next firing time. Equivalent to what the scheduler process has
    registered with apscheduler — recomputed here so the admin UI can
    show 'what's scheduled' without IPC into the scheduler.

    A target's cron source (in priority order):
      1. crawl_target.schedule_cron       (operator override per target)
      2. yaml column.schedule              (when site has a yaml_path)
      3. DEFAULT_SCHEDULE = '0 2 * * *'   (daily 02:00)
    The result is then run through spread_cron() so it matches what
    apscheduler actually fires.
    """
    tz = ZoneInfo("Asia/Shanghai")
    now = datetime.now(tz)

    stmt = (
        select(
            CrawlTarget,
            CrawlSite.site_code,
            CrawlSite.site_name,
            CrawlSite.yaml_path,
            CrawlSite.cms_adapter,
            CrawlSite.schedule_cron.label("site_schedule_cron"),
        )
        .join(CrawlSite, CrawlSite.id == CrawlTarget.site_id)
        .where(CrawlSite.enabled.is_(True))
        .where(CrawlTarget.enabled.is_(True))
        .order_by(CrawlSite.site_code, CrawlTarget.target_code)
    )
    if site:
        stmt = stmt.where(CrawlSite.site_code == site)

    items: list[dict[str, Any]] = []
    for (target, site_code, site_name, yaml_path, cms_adapter,
         site_schedule_cron) in s.execute(stmt).all():
        # 1. Per-target override
        cron_expr = (target.schedule_cron or "").strip()
        cron_source = "target_override" if cron_expr else None
        # 2. yaml column.schedule (or yaml default_column.schedule via the
        # synthesized fallback inside SiteConfig.get_column)
        if not cron_expr and yaml_path:
            site_cfg = get_site_config(site_code)
            if site_cfg is not None:
                _, column_id = _decompose_target_code(site_code, target.target_code)
                col = site_cfg.get_column(column_id)
                if col is not None and col.schedule:
                    cron_expr = col.schedule
                    cron_source = "yaml_column"
        # 3. crawl_site.schedule_cron — applies to gkmlpt-adapter sites or
        # any site that just wants a DB-side cron without a yaml file.
        if not cron_expr and site_schedule_cron:
            cron_expr = site_schedule_cron.strip()
            cron_source = "site_default"
        # 4. codebase default
        if not cron_expr:
            cron_expr = DEFAULT_SCHEDULE
            cron_source = cron_source or "default"

        spread_expr = spread_cron(site_code, target.target_code, cron_expr)
        try:
            trigger = CronTrigger.from_crontab(spread_expr, timezone=tz)
            next_fire = trigger.get_next_fire_time(None, now)
        except Exception:
            next_fire = None

        items.append({
            "target_code": target.target_code,
            "target_name": target.target_name,
            "site_code": site_code,
            "site_name": site_name,
            "cms_adapter": cms_adapter,
            "yaml_path": yaml_path,
            "cron_expr_raw": cron_expr,
            "cron_expr_effective": spread_expr,
            "cron_source": cron_source,
            "next_fire_at": next_fire.isoformat() if next_fire else None,
            "last_crawled_at": iso_cn(target.last_crawled_at),
        })

    items.sort(key=lambda x: x["next_fire_at"] or "9999-12-31")
    return {"count": len(items), "items": items}
