"""Crawl targets — CRUD, toggle, manual run, parser override, category, dry-run."""
from __future__ import annotations

import json
import logging
import re
from typing import Any
from urllib.parse import urlparse

from fastapi import BackgroundTasks, Body, Depends, HTTPException, Query
from sqlalchemy.orm import Session

from sqlalchemy import select

from govcrawler.models import CrawlJob, CrawlSite, CrawlTarget, LocalDepartment, SiteDepartment
from govcrawler.repositories import sites as sites_repo
from govcrawler.repositories import targets as targets_repo


# Column-name → 一级分类 keyword map. Ordered by specificity: the first
# tuple whose any keyword shows up in the column name wins. Lets bulk-create
# auto-fill content_category instead of leaving every new target NULL.
# Operators can still override per-target via the editor (or the bulk-edit
# UI). The "其他" sentinel is the catch-all when no keyword matches.
_CATEGORY_RULES: tuple[tuple[str, tuple[str, ...]], ...] = (
    ("法规文件", ("规章", "规范", "文件", "法规", "法律", "条例", "办法")),
    ("公告",     ("公告", "通知", "公示")),
    ("公开报告", ("年报", "报告", "公报", "白皮书", "总结")),
    ("数据",     ("统计", "数据", "指标", "公开数据")),
    ("财政",     ("财政", "预算", "决算", "三公", "审计", "采购")),
    ("人事",     ("人事", "招聘", "招考", "录用", "干部", "任免")),
    ("规划",     ("规划", "计划", "纲要", "方案")),
    ("资讯",     ("要闻", "动态", "新闻", "资讯", "信息", "工作动态")),
)


def _classify_column_name(name: str) -> str | None:
    """Return a category label for a column name, or None when nothing
    matches and the caller should leave content_category NULL."""
    if not name:
        return None
    for category, kws in _CATEGORY_RULES:
        if any(kw in name for kw in kws):
            return category
    return None


def _ensure_site_department(
    s: Session,
    *,
    site: "CrawlSite",
    dept_path: str,
    local_dept_id: int | None,
    dept_display_name: str | None,
) -> SiteDepartment:
    """Upsert a site_department row keyed by (site_id, dept_path).

    The 部门绑定对账 page lists site_department rows; if we create crawl_target
    rows without a matching site_department row, the dept_path becomes
    invisible to the reconciliation UI. binding flips to 'mapped' iff
    local_dept_id is provided (the CHECK constraint requires that pairing).
    """
    row = s.execute(
        select(SiteDepartment)
        .where(SiteDepartment.site_id == site.id, SiteDepartment.dept_path == dept_path)
    ).scalar_one_or_none()
    desired_binding = "mapped" if local_dept_id is not None else "pending"
    if row is None:
        row = SiteDepartment(
            site_id=site.id,
            dept_path=dept_path,
            local_dept_id=local_dept_id,
            dept_binding=desired_binding,
            dept_display_name=dept_display_name,
        )
        s.add(row)
        s.flush()
        return row
    # Existing row: only promote pending→mapped when caller now has a dept_id.
    # Never demote mapped→pending here — operators may have intentionally bound
    # via the dept-reconciliation UI.
    if local_dept_id is not None and row.dept_binding == "pending":
        row.local_dept_id = local_dept_id
        row.dept_binding = "mapped"
    if dept_display_name and not row.dept_display_name:
        row.dept_display_name = dept_display_name
    return row

from ._common import (
    _job_total,
    _infer_legacy_validator_key,
    _normalize_str,
    _serialize_target,
    _session,
    _validate_target_payload,
    iso_cn,
    router,
)

log = logging.getLogger(__name__)


@router.get("/api/targets/lookup")
def lookup_target(
    entry_url: str | None = Query(None, description="exact entry_url match"),
    target_code: str | None = Query(None, description="exact target_code match"),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Cheap dedup probe — used by the simplified new-target modal to warn
    the operator before POST'ing a duplicate. Either filter is allowed; if
    nothing matches the response is `{"exists": false}`.
    """
    from govcrawler.models import CrawlTarget, CrawlSite
    from sqlalchemy import select as _sel
    if not entry_url and not target_code:
        raise HTTPException(400, "entry_url or target_code required")
    stmt = (
        _sel(CrawlTarget, CrawlSite.site_code.label("site_code"))
        .join(CrawlSite, CrawlSite.id == CrawlTarget.site_id, isouter=True)
        .limit(5)
    )
    if entry_url:
        stmt = stmt.where(CrawlTarget.entry_url == entry_url)
    if target_code:
        stmt = stmt.where(CrawlTarget.target_code == target_code)
    rows = s.execute(stmt).all()
    if not rows:
        return {"exists": False}
    return {
        "exists": True,
        "matches": [
            {
                "target_code": r.CrawlTarget.target_code,
                "target_name": r.CrawlTarget.target_name,
                "site_code": r.site_code,
                "entry_url": r.CrawlTarget.entry_url,
                "enabled": r.CrawlTarget.enabled,
            }
            for r in rows
        ],
    }


@router.post("/api/targets/{target_code}/toggle")
def toggle_target(
    target_code: str, enabled: bool = Query(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    row = targets_repo.set_enabled(s, target_code, enabled)
    if row is None:
        raise HTTPException(404, f"target not found: {target_code}")
    s.commit()
    return {"target_code": target_code, "enabled": enabled}


@router.delete("/api/targets/{target_code}")
def delete_target(
    target_code: str, s: Session = Depends(_session),
) -> dict[str, Any]:
    row = targets_repo.get_by_code(s, target_code)
    if row is None:
        raise HTTPException(404, f"target not found: {target_code}")
    # Article.target_id + CrawlLog.target_id are ondelete=SET NULL, so historical
    # rows survive with a null target_id pointer. Safe to hard-delete.
    s.delete(row)
    s.commit()
    return {"deleted": True, "target_code": target_code}


@router.post("/api/targets/bulk-delete")
def bulk_delete_targets(
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Hard-delete multiple crawl_target rows.

    Matches the single-target delete semantics: historical Article / CrawlLog
    rows are preserved by the DB FK behavior, while the target definitions are
    removed. Missing target_codes are reported per item instead of aborting the
    whole batch.
    """
    target_codes = payload.get("target_codes") or []
    if not isinstance(target_codes, list) or not target_codes:
        raise HTTPException(400, "target_codes must be a non-empty array")
    if len(target_codes) > 500:
        raise HTTPException(400, "target_codes must be <= 500")

    seen: set[str] = set()
    deleted: list[dict[str, Any]] = []
    skipped: list[dict[str, Any]] = []
    for raw_code in target_codes:
        code = (raw_code or "").strip()
        if not code:
            continue
        if code in seen:
            skipped.append({"target_code": code, "reason": "duplicate"})
            continue
        seen.add(code)
        row = targets_repo.get_by_code(s, code)
        if row is None:
            skipped.append({"target_code": code, "reason": "not_found"})
            continue
        s.delete(row)
        deleted.append({"target_code": code})
    s.commit()
    return {
        "deleted_count": len(deleted),
        "skipped_count": len(skipped),
        "deleted": deleted,
        "skipped": skipped,
    }


def _latest_checkpoint_page(s: Session, target_code: str) -> int:
    row = (
        s.query(CrawlJob)
        .filter(CrawlJob.target_code == target_code)
        .filter(CrawlJob.force.is_(True))
        .filter(CrawlJob.last_completed_page > 0)
        .order_by(CrawlJob.last_completed_page.desc(), CrawlJob.job_id.desc())
        .first()
    )
    return int(row.last_completed_page or 0) if row is not None else 0


@router.post("/api/targets/{target_code}/run")
async def run_target(
    target_code: str,
    max_items: int | None = Query(None, ge=1),
    force: bool = Query(False, description="ignore stop_on_duplicate — walk all pages even if all entries are dedup-skipped"),
    resume_from_latest_checkpoint: bool = Query(
        False,
        description="start a new force crawl from the latest saved checkpoint for this target",
    ),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Enqueue a manual crawl. Per-site concurrency = 1 — if this site is
    currently running another target, the new job waits in the FIFO so we
    don't trigger anti-bot rules by hammering the same origin.

    `force=true` is the 全量抓取 mode — pipeline ignores the
    'first dup → boundary, stop' rule and processes every list entry.
    Useful when historical pagination just got enabled and the target's
    page-1 articles are all already in DB.
    """
    t = targets_repo.get_by_code(s, target_code)
    if t is None:
        raise HTTPException(404, f"target not found: {target_code}")
    site_code = t.site.site_code if t.site else None
    if not site_code:
        raise HTTPException(409, "target has no site_code")
    # Enforce enabled flags here so cron-triggered runs (scheduler still
    # has the job registered until restart) and manual UI clicks both
    # respect the on/off toggle without needing scheduler restart.
    if not t.enabled:
        raise HTTPException(409, f"target {target_code!r} is disabled")
    if t.site is not None and not t.site.enabled:
        raise HTTPException(409, f"site {site_code!r} is disabled")
    resume_from_page = 0
    if resume_from_latest_checkpoint:
        if not force:
            raise HTTPException(400, "resume_from_latest_checkpoint requires force=true")
        if not t.track_checkpoint:
            raise HTTPException(409, f"target {target_code!r} does not track checkpoints")
        resume_from_page = _latest_checkpoint_page(s, target_code)
        if resume_from_page <= 0:
            raise HTTPException(409, f"target {target_code!r} has no saved checkpoint")

    from govcrawler.api.task_queue import get_queue
    job_id = await get_queue().submit(
        site_code=site_code,
        target_code=target_code,
        source="manual",
        force=force,
        resume_from_page=resume_from_page,
    )
    return {
        "queued": True,
        "job_id": job_id,
        "site_code": site_code,
        "target_code": target_code,
        "force": force,
        "resume_from_latest_checkpoint": resume_from_latest_checkpoint,
        "resume_from_page": resume_from_page,
    }


@router.post("/api/targets/bulk-run")
async def bulk_run_targets(
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Enqueue manual crawls for multiple targets at once. Per-site
    concurrency stays 1 — jobs for the same site_code go through the
    same FIFO queue so we don't slam any single host."""
    target_codes = payload.get("target_codes") or []
    force = bool(payload.get("force"))
    if not isinstance(target_codes, list) or not target_codes:
        raise HTTPException(400, "target_codes must be a non-empty array")
    if len(target_codes) > 500:
        raise HTTPException(400, "target_codes must be <= 500")

    from govcrawler.api.task_queue import get_queue
    queue = get_queue()
    queued: list[dict[str, Any]] = []
    skipped: list[dict[str, Any]] = []
    for code in target_codes:
        code = (code or "").strip()
        if not code:
            continue
        t = targets_repo.get_by_code(s, code)
        if t is None:
            skipped.append({"target_code": code, "reason": "not_found"})
            continue
        site_code = t.site.site_code if t.site else None
        if not site_code:
            skipped.append({"target_code": code, "reason": "no_site_code"})
            continue
        if not t.enabled:
            skipped.append({"target_code": code, "reason": "target_disabled"})
            continue
        if t.site is not None and not t.site.enabled:
            skipped.append({"target_code": code, "reason": "site_disabled"})
            continue
        job_id = await queue.submit(
            site_code=site_code, target_code=code, source="manual", force=force,
        )
        queued.append({"target_code": code, "site_code": site_code, "job_id": job_id})
    return {
        "queued_count": len(queued),
        "skipped_count": len(skipped),
        "queued": queued,
        "skipped": skipped,
    }


@router.get("/api/targets/{target_code}/crawl-jobs")
def target_crawl_jobs(
    target_code: str,
    force: bool | None = Query(None, description="true=full crawl only, false=incremental only"),
    status: str | None = Query(None, description="queued|running|done|failed|cancelled"),
    limit: int = Query(100, ge=1, le=500),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    if targets_repo.get_by_code(s, target_code) is None:
        raise HTTPException(404, f"target not found: {target_code}")
    stmt = select(CrawlJob).where(CrawlJob.target_code == target_code)
    if force is not None:
        stmt = stmt.where(CrawlJob.force.is_(force))
    if status:
        stmt = stmt.where(CrawlJob.status == status)
    stmt = stmt.order_by(CrawlJob.enqueued_at.desc()).limit(limit)

    jobs = []
    for row in s.execute(stmt).scalars():
        result = row.result_json if isinstance(row.result_json, dict) else {}
        jobs.append({
            "job_id": row.job_id,
            "site_code": row.site_code,
            "target_code": row.target_code,
            "source": row.source,
            "status": row.status,
            "force": bool(row.force),
            "stop_requested": bool(row.stop_requested),
            "attempt_count": row.attempt_count or 0,
            "current_page": row.current_page or 0,
            "last_completed_page": row.last_completed_page or 0,
            "enqueued_at": iso_cn(row.enqueued_at),
            "started_at": iso_cn(row.started_at),
            "finished_at": iso_cn(row.finished_at),
            "error_msg": row.error_msg,
            "items_seen": _job_total(result),
            "items_new": result.get("items_new"),
            "items_skipped": result.get("items_skipped"),
            "items_failed": result.get("items_failed"),
            "result_status": result.get("status"),
        })
    return {"target_code": target_code, "count": len(jobs), "jobs": jobs}


@router.post("/api/targets")
def create_target(
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    fields = _validate_target_payload(payload, partial=False)
    site_code = fields.pop("site_code")
    target_code = fields.pop("target_code")
    assert site_code is not None and target_code is not None
    if targets_repo.get_by_code(s, target_code) is not None:
        raise HTTPException(409, f"target already exists: {target_code}")
    site = sites_repo.get_by_code(s, site_code)
    if site is None:
        raise HTTPException(404, f"site not found: {site_code}")
    dept_id = fields.get("dept_id")
    if dept_id is not None and s.get(LocalDepartment, dept_id) is None:
        raise HTTPException(404, f"local_department not found: {dept_id}")
    site_department_id = fields.pop("site_department_id", None)
    if site_department_id is not None and s.get(SiteDepartment, site_department_id) is None:
        raise HTTPException(404, f"site_department not found: {site_department_id}")
    row = targets_repo.upsert_by_code(
        s,
        target_code=target_code,
        site_id=site.id,
        site_department_id=site_department_id,
        **fields,
    )
    s.commit()
    s.refresh(row)
    return {"target": _serialize_target(row), "site_code": site.site_code}


@router.put("/api/targets/{target_code}")
def update_target(
    target_code: str,
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    if "target_code" in payload and payload["target_code"] != target_code:
        raise HTTPException(400, "target_code in path/body must match")
    row = targets_repo.get_by_code(s, target_code)
    if row is None:
        raise HTTPException(404, f"target not found: {target_code}")
    fields = _validate_target_payload(payload, partial=True)
    site_code = fields.pop("site_code", None)
    if site_code is not None:
        site = sites_repo.get_by_code(s, site_code)
        if site is None:
            raise HTTPException(404, f"site not found: {site_code}")
        row.site_id = site.id
    dept_id = fields.get("dept_id")
    if dept_id is not None and s.get(LocalDepartment, dept_id) is None:
        raise HTTPException(404, f"local_department not found: {dept_id}")
    if "site_department_id" in fields:
        site_department_id = fields.pop("site_department_id")
        if site_department_id is not None and s.get(SiteDepartment, site_department_id) is None:
            raise HTTPException(404, f"site_department not found: {site_department_id}")
        row.site_department_id = site_department_id
    for key, value in fields.items():
        if key not in {"site_code", "target_code"}:
            setattr(row, key, value)
    s.commit()
    s.refresh(row)
    return {"target": _serialize_target(row), "site_code": row.site.site_code if row.site else None}


# ---------- Parser override (crawl_target.parser_override_json) ----------

@router.get("/api/targets/{target_code}/parser")
def get_target_parser(
    target_code: str, s: Session = Depends(_session),
) -> dict[str, Any]:
    t = targets_repo.get_by_code(s, target_code)
    if t is None:
        raise HTTPException(404, f"target not found: {target_code}")
    return {
        "target_code": t.target_code,
        "parser_override": t.parser_override_json or {},
    }


@router.put("/api/targets/{target_code}/parser")
def put_target_parser(
    target_code: str,
    payload: dict[str, Any] = Body(..., description='{"detail": {"title": "...", ...}}'),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Overwrite `parser_override_json`. Pass {} to clear (stores NULL)."""
    if not isinstance(payload, dict):
        raise HTTPException(400, "body must be a JSON object")
    if payload and "detail" in payload and not isinstance(payload["detail"], dict):
        raise HTTPException(400, '"detail" must be an object if present')

    t = targets_repo.get_by_code(s, target_code)
    if t is None:
        raise HTTPException(404, f"target not found: {target_code}")
    t.parser_override_json = payload or None
    s.commit()
    return {
        "target_code": target_code,
        "parser_override": t.parser_override_json or {},
    }


@router.put("/api/targets/{target_code}/category")
def put_target_category(
    target_code: str,
    payload: dict[str, Any] = Body(
        ..., description='{"content_category": "...", "content_subcategory": "..."}'
    ),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Update a target's content_category / content_subcategory."""
    allowed = {"content_category", "content_subcategory"}
    unknown = set(payload.keys()) - allowed
    if unknown:
        raise HTTPException(400, f"unknown keys: {sorted(unknown)}")

    t = targets_repo.get_by_code(s, target_code)
    if t is None:
        raise HTTPException(404, f"target not found: {target_code}")

    for k in allowed:
        if k in payload:
            v = payload[k]
            if v is not None and not isinstance(v, str):
                raise HTTPException(400, f"{k} must be string or null")
            setattr(t, k, v or None)
    s.commit()
    return {
        "target_code": target_code,
        "content_category": t.content_category,
        "content_subcategory": t.content_subcategory,
    }


@router.post("/api/targets/{target_code}/dry-run")
def dry_run_target(
    target_code: str,
    payload: dict[str, Any] = Body(default={}),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Best-effort validation for a target.

    If the target_code still maps cleanly to legacy YAML `(site_id, column_id)`,
    reuse `govcrawler.validator.validate`. Otherwise fall back to lightweight
    field checks so the admin UI has a meaningful pre-save sanity gate.
    """
    t = targets_repo.get_by_code(s, target_code)
    if t is None:
        raise HTTPException(404, f"target not found: {target_code}")

    sample_url = _normalize_str(payload.get("sample_article_url")) if "sample_article_url" in payload else t.sample_article_url
    entry_url = _normalize_str(payload.get("entry_url")) if "entry_url" in payload else t.entry_url
    result: dict[str, Any] = {
        "target_code": t.target_code,
        "entry_url": entry_url,
        "sample_article_url": sample_url,
        "mode": "basic",
        "ok": True,
        "checks": [],
    }
    if not entry_url:
        result["ok"] = False
        result["checks"].append({"level": "error", "message": "entry_url 为空"})
    else:
        result["checks"].append({"level": "info", "message": "entry_url 已填写"})
    if sample_url:
        result["checks"].append({"level": "info", "message": "sample_article_url 已填写"})
    else:
        result["checks"].append({"level": "warn", "message": "未提供 sample_article_url，只做基础校验"})

    inferred = _infer_legacy_validator_key(target_code)
    if inferred:
        from govcrawler.validator import validate

        site_id, column_id = inferred
        v = validate(site_id, column_id, url=sample_url) if sample_url else validate(site_id, column_id, max_detail=1)
        result["validator"] = v
        result["mode"] = "validator"
        result["ok"] = bool(v.get("ok")) and result["ok"]
        if v.get("error"):
            result["checks"].append({"level": "error", "message": str(v["error"])})
        else:
            if "detail" in v:
                detail = v["detail"]
                if detail.get("title"):
                    result["checks"].append({"level": "info", "message": "标题命中"})
                if detail.get("content_text_length", 0) >= 50:
                    result["checks"].append({"level": "info", "message": "正文长度通过"})
                else:
                    result["checks"].append({"level": "warn", "message": "正文长度较短"})
            if "list_items_parsed" in v:
                result["checks"].append({"level": "info", "message": f"列表解析 {v['list_items_parsed']} 条"})
    else:
        result["checks"].append({"level": "warn", "message": "target_code 无法映射旧 validator，仅完成基础字段校验"})
    return result


# ---------- Column discovery + bulk create (gkmlpt dept index → targets) ----------

_TREE_RE = re.compile(r"TREE\s*:\s*(\[.*?\])\s*,\s*\n", re.DOTALL)
_SID_RE = re.compile(r"SID\s*:\s*'([^']+)'")


def _flatten_tree(nodes: list[dict[str, Any]], depth: int = 0,
                  ancestors: list[str] | None = None) -> list[dict[str, Any]]:
    """Walk the gkmlpt TREE tree, returning a flat list of columns.

    Skips entries with non-empty jump_url (external links — not crawlable).
    Carries the ancestor name chain so each column can store a human-readable
    `path` like "机构信息｜内设机构".
    """
    ancestors = ancestors or []
    out: list[dict[str, Any]] = []
    for n in nodes or []:
        jump = (n.get("jump_url") or "").strip()
        is_external = bool(jump)
        name = (n.get("name") or "").strip()
        path_chain = ancestors + [name] if name else ancestors
        out.append({
            "id": str(n.get("id")),
            "name": name,
            "path": "｜".join(path_chain),
            "parent": n.get("parent") or 0,
            "depth": depth,
            "is_external": is_external,
            "has_children": bool(n.get("children")),
        })
        if n.get("children"):
            out.extend(_flatten_tree(n["children"], depth + 1, path_chain))
    return out


@router.post("/api/targets/discover")
def discover_columns(payload: dict[str, Any] = Body(...)) -> dict[str, Any]:
    """Fetch a gkmlpt dept index page and return all columns under it.

    Accepts a URL like `http://<host>/<dept_path>/gkmlpt/index` (no column
    anchor). Reads the inline `TREE: […]` JSON + `SID` config shipped by the
    CMS and returns a flat, depth-annotated column list so the UI can show a
    picker and bulk-create crawl_targets.
    """
    entry_url = _normalize_str(payload.get("entry_url"))
    if not entry_url:
        raise HTTPException(400, "entry_url is required")
    try:
        parsed = urlparse(entry_url)
    except Exception:
        raise HTTPException(400, f"malformed URL: {entry_url}")
    if not parsed.scheme or not parsed.netloc:
        raise HTTPException(400, f"URL must include scheme + host: {entry_url}")
    segs = [p for p in parsed.path.split("/") if p]
    if "gkmlpt" not in segs:
        raise HTTPException(400, "URL 不是 gkmlpt 公开目录平台的栏目入口")
    dept_path = segs[0] if segs and segs[0] != "gkmlpt" else ""

    # Use the fetcher chain (httpx → playwright auto-fallback) so this works
    # against ctct-shielded hosts like gdqy.gov.cn. Naked httpx returns 412
    # + an obfuscated challenge page on those, killing TREE detection.
    from govcrawler.fetcher.chain import fetch_html as _fetch_html_chain
    fr = _fetch_html_chain(entry_url)
    if fr.error or not fr.html:
        raise HTTPException(502, f"fetch index failed: {fr.error or 'empty html'}")
    if fr.is_challenge:
        raise HTTPException(502, "页面仍是 ctct 挑战页 — playwright 解锁失败")
    html = fr.html

    tree_m = _TREE_RE.search(html)
    if not tree_m:
        raise HTTPException(400, "页面未找到 TREE 结构 — 该 URL 可能不是 gkmlpt 入口")
    try:
        tree = json.loads(tree_m.group(1))
    except json.JSONDecodeError as e:
        raise HTTPException(500, f"TREE JSON parse failed: {e}")
    sid_m = _SID_RE.search(html)
    sid = sid_m.group(1) if sid_m else None

    columns = _flatten_tree(tree)
    return {
        "entry_url": entry_url,
        "dept_path": dept_path,
        "sid": sid,
        "count": len(columns),
        "columns": columns,
    }


@router.post("/api/targets/bulk-create")
def bulk_create_targets(
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Create one crawl_target per column_id picked from discover.

    Body shape:
      {
        "site_code": "fogang",
        "dept_path": "qyfgczj",
        "entry_url_base": "http://www.fogang.gov.cn/qyfgczj/gkmlpt/index",
        "columns": [{"id":"2396","name":"机构信息"}, ...],
        "dept_id": 123 | null,
        "expected_cadence_days": 30,
        "interval_sec": 30,        # per-request base delay (politeness)
        "interval_jitter_sec": 10, # random extra delay, 0..N seconds
        "enabled": true
      }
    Idempotent: existing target_code is skipped; returns created + skipped lists.
    """
    site_code = _normalize_str(payload.get("site_code"))
    dept_path = _normalize_str(payload.get("dept_path")) or ""
    entry_url_base = _normalize_str(payload.get("entry_url_base"))
    columns = payload.get("columns") or []
    if not site_code or not entry_url_base or not columns:
        raise HTTPException(400, "site_code, entry_url_base, columns are required")
    if not isinstance(columns, list):
        raise HTTPException(400, "columns must be an array")
    if len(columns) > 500:
        raise HTTPException(400, "columns must be <= 500")

    site = sites_repo.get_by_code(s, site_code)
    if site is None:
        raise HTTPException(404, f"site not found: {site_code}")

    dept_id = payload.get("dept_id")
    dept_row = None
    if dept_id is not None:
        if not isinstance(dept_id, int) or dept_id <= 0:
            raise HTTPException(400, "dept_id must be positive int or null")
        dept_row = s.get(LocalDepartment, dept_id)
        if dept_row is None:
            raise HTTPException(404, f"local_department not found: {dept_id}")

    cadence = payload.get("expected_cadence_days")
    cadence = int(cadence) if isinstance(cadence, int) and cadence > 0 else 30
    interval = payload.get("interval_sec")
    interval = int(interval) if isinstance(interval, int) and interval >= 0 else 30
    jitter = payload.get("interval_jitter_sec")
    jitter = int(jitter) if isinstance(jitter, int) and jitter >= 0 else 0
    enabled = payload.get("enabled")
    enabled = bool(enabled) if isinstance(enabled, bool) else True

    # Short chinese label for the dept — used in the human-readable target_name.
    # Prefer short_name; fall back to full_name stripped of common suffixes.
    dept_label = ""
    if dept_row is not None:
        dept_label = (dept_row.short_name or dept_row.dept_name or dept_row.full_name or "").strip()

    # Upsert the dept_path-level site_department row up front so every target
    # we create can be linked to it (otherwise the 部门绑定对账 page won't
    # see this dept_path even though crawl_target rows exist for it).
    site_dept_row: SiteDepartment | None = None
    if dept_path:
        site_dept_row = _ensure_site_department(
            s,
            site=site,
            dept_path=dept_path,
            local_dept_id=dept_id,
            dept_display_name=dept_label or None,
        )

    base = entry_url_base.rstrip("#").split("#", 1)[0]
    created: list[dict[str, Any]] = []
    skipped: list[dict[str, Any]] = []
    for col in columns:
        col_id = str(col.get("id") or "").strip()
        col_name = _normalize_str(col.get("name"))
        col_path = _normalize_str(col.get("path")) or col_name
        if not col_id:
            continue
        parts = [site_code]
        if dept_path:
            parts.append(dept_path)
        parts.append(col_id)
        target_code = "__".join(parts)
        if targets_repo.get_by_code(s, target_code) is not None:
            skipped.append({"target_code": target_code, "reason": "exists"})
            continue
        # Human-readable name: "佛冈县政数局-机构信息" when dept is bound;
        # falls back to just the column name otherwise.
        human_name = (
            f"{dept_label}-{col_name}" if dept_label and col_name
            else (col_name or None)
        )
        # Auto-classify by column name. gkmlpt columns use Chinese names
        # like '部门动态' / '财政预决算', so the same keyword map fits.
        content_category = _classify_column_name(col_name or col_id)
        row = targets_repo.upsert_by_code(
            s,
            target_code=target_code,
            site_id=site.id,
            site_department_id=site_dept_row.id if site_dept_row is not None else None,
            target_name=human_name,
            entry_url=f"{base}#{col_id}",
            dept_id=dept_id,
            channel_name=col_name,
            channel_path=col_path,
            content_category=content_category,
            expected_cadence_days=cadence,
            interval_sec=interval,
            interval_jitter_sec=jitter,
            enabled=enabled,
        )
        s.flush()
        created.append({"target_code": row.target_code, "target_name": row.target_name})
    s.commit()
    return {
        "created_count": len(created),
        "skipped_count": len(skipped),
        "created": created,
        "skipped": skipped,
    }


@router.post("/api/targets/bulk-create-yaml")
def bulk_create_yaml_targets(
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Yaml-path counterpart to /api/targets/bulk-create. Used by the
    HTML discover flow: accepts a list of {column_id, name, list_url}
    items selected from the discover-html tree and creates one
    crawl_target per item. Yaml is NOT modified — pipeline relies on
    the site's `default_column` block to resolve selectors for any
    column_id not enumerated in yaml, and on `crawl_target.entry_url`
    for the list URL.

    Body:
      {
        "site_code": "qingxin_zwgk",
        "columns": [{"column_id":"gfxwj","name":"规范性文件","list_url":"https://..."}, ...],
        "dept_id": 123 | null,
        "expected_cadence_days": 30,
        "interval_sec": 10,
        "enabled": true
      }
    Idempotent on target_code; existing rows go in `skipped`.
    """
    site_code = _normalize_str(payload.get("site_code"))
    columns = payload.get("columns") or []
    if not site_code or not columns:
        raise HTTPException(400, "site_code and columns are required")
    if not isinstance(columns, list):
        raise HTTPException(400, "columns must be an array")
    if len(columns) > 500:
        raise HTTPException(400, "columns must be <= 500")

    site = sites_repo.get_by_code(s, site_code)
    if site is None:
        raise HTTPException(404, f"site not found: {site_code}")

    dept_id = payload.get("dept_id")
    dept_row = None
    if dept_id is not None:
        if not isinstance(dept_id, int) or dept_id <= 0:
            raise HTTPException(400, "dept_id must be positive int or null")
        dept_row = s.get(LocalDepartment, dept_id)
        if dept_row is None:
            raise HTTPException(404, f"local_department not found: {dept_id}")
    dept_label = ""
    if dept_row is not None:
        dept_label = (dept_row.short_name or dept_row.dept_name or dept_row.full_name or "").strip()

    cadence = payload.get("expected_cadence_days")
    cadence = int(cadence) if isinstance(cadence, int) and cadence > 0 else 30
    interval = payload.get("interval_sec")
    interval = int(interval) if isinstance(interval, int) and interval >= 0 else 10
    jitter = payload.get("interval_jitter_sec")
    jitter = int(jitter) if isinstance(jitter, int) and jitter >= 0 else 0
    enabled = payload.get("enabled")
    enabled = bool(enabled) if isinstance(enabled, bool) else True

    created: list[dict[str, Any]] = []
    skipped: list[dict[str, Any]] = []
    for col in columns:
        col_id = _normalize_str(col.get("column_id"))
        col_name = _normalize_str(col.get("name"))
        list_url = _normalize_str(col.get("list_url"))
        if not col_id or not list_url:
            continue
        target_code = f"{site_code}__{col_id}"
        if targets_repo.get_by_code(s, target_code) is not None:
            skipped.append({"target_code": target_code, "reason": "exists"})
            continue
        human_name = (
            f"{dept_label}-{col_name}" if dept_label and col_name
            else (col_name or col_id)
        )
        # channel_path: URL path relative to host. Lets the admin list show
        # "这个 target 在哪个目录" without having to expand entry_url.
        try:
            from urllib.parse import urlparse
            channel_path = urlparse(list_url).path or list_url
        except Exception:
            channel_path = list_url
        # Auto-classify by column name (e.g. '规范性文件' → 法规文件).
        content_category = _classify_column_name(col_name or col_id)
        row = targets_repo.upsert_by_code(
            s,
            target_code=target_code,
            site_id=site.id,
            target_name=human_name,
            entry_url=list_url,
            dept_id=dept_id,
            channel_name=col_name or col_id,
            channel_path=channel_path,
            content_category=content_category,
            expected_cadence_days=cadence,
            interval_sec=interval,
            interval_jitter_sec=jitter,
            enabled=enabled,
        )
        s.flush()
        created.append({"target_code": row.target_code, "target_name": row.target_name})
    s.commit()
    return {
        "created_count": len(created),
        "skipped_count": len(skipped),
        "created": created,
        "skipped": skipped,
    }
