"""End-to-end crawl pipeline — v2 schema aligned.

Entry points:
  * `crawl_target(target_code)`   — list page → iterate → fetch_and_store per item
  * `fetch_and_store(*, target_code, url)` — single-article fetch → parse → persist

Dispatch model (§7.5 of 2.0 design doc):
  * `crawl_site.cms_adapter` set (e.g. "gkmlpt")  → adapter-driven JSON list API
  * `crawl_site.yaml_path` set                    → legacy CSS list scraping

Both paths converge on the same per-article loop. Retry / block / robots logic
stays identical to 1.0.

Compared to 1.0:
  * Addressed by `target_code` (globally unique) instead of (site_id, column_id)
  * Article insert goes through `insert_article_from_contract` when a CrawlItem
    is available from the adapter; falls back to `insert_article(**kwargs)` for
    legacy yaml_path sites
  * `content_simhash` removed — §5.5 excludes it; RAG owns content de-dup
  * `insert_crawl_log` now keyed by `(site_pk:int, target_id:int)`
"""
from __future__ import annotations

import json
import logging
import random
import re
import time as _time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path, PurePosixPath
from typing import Any, Callable
from urllib.parse import urlparse, urlunparse

from sqlalchemy.orm import Session

from govcrawler.adapters import get_adapter
from govcrawler.adapters.contract import CrawlItem, Status
from govcrawler.compliance import is_allowed, is_public_path, is_safe_to_fetch
from govcrawler.config.registry import get_detail_selectors, get_site_config
from govcrawler.db import get_sessionmaker
from govcrawler.fetcher.browser import FetchResult
from govcrawler.fetcher.chain import fetch_html
from govcrawler.fetcher.throttle import HostThrottle
from govcrawler.models import Article, CrawlSite, CrawlTarget, SiteDepartment
from govcrawler.observability import record_fetch
from govcrawler.parser.detail_parser import parse_detail
from govcrawler.parser.list_parser import ListItem, parse_list
from govcrawler.repositories import sites as sites_repo
from govcrawler.repositories import targets as targets_repo
from govcrawler.settings import get_settings
from govcrawler.storage.attachments import download_attachment
from govcrawler.storage.files import write_article_text, write_raw_html
from govcrawler.storage.paths import to_os_path
from govcrawler.storage.repo import (
    get_article_by_url_hash,
    insert_article,
    insert_article_from_contract,
    insert_attachments,
    insert_crawl_log,
    upsert_article_standard_meta,
)
from govcrawler.timeutil import now_cn_naive
from govcrawler.utils.url_norm import url_hash as compute_url_hash

log = logging.getLogger(__name__)

MIN_CONTENT_TEXT_CHARS = 50

# Retry policy (FETCH-05)
MAX_RETRIES = 2
BACKOFF_BASE_S = 2.0
ABORT_AFTER_N_BLOCKS = 3
BLOCK_STATUSES = (403, 412, 429)
RETRIABLE_STATUSES = (500, 502, 503, 504)
PREFER_HTTPS_HOSTS = {"www.gd.gov.cn"}
GENERIC_ATTACHMENT_NAME_RE = re.compile(
    r"^(?:附件|附表|附件\d+(?:[_-]\d+)*|附表\d+|post[_-]?\d+|\d+)"
    r"(?:\.[A-Za-z0-9]{1,8})?$",
    re.IGNORECASE,
)
SPACE_BEFORE_EXT_RE = re.compile(r"\s+(\.[A-Za-z0-9]{1,8})$")


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _interval_with_jitter(interval_sec: float | None, jitter_sec: float | None) -> float:
    base = max(0.0, float(interval_sec or 0.0))
    jitter = max(0.0, float(jitter_sec or 0.0))
    return base + (random.uniform(0.0, jitter) if jitter > 0 else 0.0)


def _is_gd_target(site_code: str | None, target_code: str | None) -> bool:
    return bool(
        (site_code or "").startswith("gd")
        or (target_code or "").startswith("gd")
    )


def _is_cn_work_time(now: datetime, *, start_hour: int, end_hour: int) -> bool:
    if now.weekday() >= 5:
        return False
    return int(start_hour) <= now.hour < int(end_hour)


def _effective_target_intervals(
    *,
    site_code: str,
    target_code: str,
    interval_sec: float | None,
    interval_jitter_sec: float | None,
    settings: Any | None = None,
    now: datetime | None = None,
) -> tuple[float | None, float | None, dict[str, Any] | None]:
    """Return runtime intervals after dynamic policy.

    Guangdong provincial targets get a temporary worktime slowdown. The DB
    value remains the configured baseline; only this run's sleeps are longer.
    """
    if interval_sec is None and interval_jitter_sec is None:
        return interval_sec, interval_jitter_sec, None
    settings = settings or get_settings()
    multiplier = max(1.0, float(getattr(settings, "gd_worktime_interval_multiplier", 1.0) or 1.0))
    now = now or now_cn_naive()
    if not _is_gd_target(site_code, target_code):
        return interval_sec, interval_jitter_sec, None
    if not _is_cn_work_time(
        now,
        start_hour=int(getattr(settings, "gd_worktime_start_hour", 8)),
        end_hour=int(getattr(settings, "gd_worktime_end_hour", 18)),
    ):
        return interval_sec, interval_jitter_sec, None
    if multiplier <= 1.0:
        return interval_sec, interval_jitter_sec, None

    effective_interval = (
        None if interval_sec is None else max(0.0, float(interval_sec) * multiplier)
    )
    effective_jitter = (
        None if interval_jitter_sec is None else max(0.0, float(interval_jitter_sec) * multiplier)
    )
    return effective_interval, effective_jitter, {
        "policy": "gd_worktime_slowdown",
        "multiplier": multiplier,
        "configured_interval_sec": interval_sec,
        "configured_interval_jitter_sec": interval_jitter_sec,
        "effective_interval_sec": effective_interval,
        "effective_interval_jitter_sec": effective_jitter,
    }


def _parse_publish_time(raw: str) -> datetime | None:
    """Pull a date out of free-form publish_time strings.

    Real-world inputs span several gov-site flavors:
      '2026-03-30 09:40'
      '发布日期：2026-03-30 09:40'
      '2026/03/30　来源: …'
      '2026年03月30日'
      '时间： 2026-03-30 09:37 来源：本网'
    Strategy: normalize separators (Chinese / slashes) → re.search for
    the first `YYYY-MM-DD[ HH:MM[:SS]]` substring → strptime. Anchoring
    at position 0 (the old behavior) failed on every site that prefixes
    a label like '发布日期：' or '时间：'.
    """
    if not raw:
        return None
    s = raw.replace("/", "-").replace("年", "-").replace("月", "-").replace("日", " ")
    s = re.sub(r"\s+", " ", s).strip()
    m = re.search(r"\d{4}-\d{1,2}-\d{1,2}(?:\s+\d{1,2}:\d{2}(?::\d{2})?)?", s)
    if not m:
        return None
    chunk = m.group(0)
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
        try:
            return datetime.strptime(chunk, fmt)
        except Exception:
            continue
    return None


def _derive_article_key(url: str) -> str:
    """Last path segment, ext stripped — used as filename stem.

    e.g. `/a/b/post_2136593.html` → `post_2136593`.
    Falls back to url_hash[:16] if no usable stem.
    """
    tail = url.rstrip("/").rsplit("/", 1)[-1].split("?", 1)[0].split("#", 1)[0]
    stem = tail.rsplit(".", 1)[0] if "." in tail else tail
    return stem or compute_url_hash(url)[:16]


def _delete_written_data_file(rel_path: object) -> None:
    if not rel_path:
        return
    data_dir = Path(get_settings().data_dir)
    try:
        abs_path = to_os_path(data_dir, PurePosixPath(str(rel_path)))
        abs_path.resolve().relative_to(data_dir.resolve())
    except Exception:
        return
    try:
        if abs_path.is_file():
            abs_path.unlink()
    except OSError:
        log.warning("delete refetch temp file failed path=%s", abs_path, exc_info=True)


def _delete_refetch_outputs(raw_rel: object, text_rel: object, downloaded: list[Any]) -> None:
    _delete_written_data_file(raw_rel)
    _delete_written_data_file(text_rel)
    for item in downloaded:
        _delete_written_data_file(getattr(item, "file_path", None))


def _prefer_https_for_fetch(url: str) -> str:
    """Use HTTPS for hosts that publish HTTP detail links but serve the same
    content over HTTPS. gd.gov.cn list pages often contain http:// detail
    links; fetching the HTTPS equivalent avoids an extra CDN/WAF edge case.

    Legacy gd.gov.cn /gkml/content/post_N.html pages are an exception: they
    are tiny JavaScript redirect shells, and production has repeatedly seen
    HTTPS resets on the real /gkmlpt/content/... target. Rewrite those shells
    straight to the HTTP gkmlpt URL so httpx can fetch the real body without
    falling through to Playwright for every historical article.
    """
    p = urlparse(url)
    m = re.fullmatch(r"/gkml/content/post_(\d+)\.html", p.path or "")
    if p.netloc.lower() == "www.gd.gov.cn" and m:
        post_id = int(m.group(1))
        return urlunparse(p._replace(
            scheme="http",
            path=f"/gkmlpt/content/{post_id // 1_000_000}/{post_id // 1000}/post_{post_id}.html",
            query="",
            fragment="",
        ))
    if p.scheme == "http" and p.netloc.lower() in PREFER_HTTPS_HOSTS:
        return urlunparse(p._replace(scheme="https"))
    return url


def _is_http_url(url: str) -> bool:
    try:
        return urlparse(url).scheme.lower() in {"http", "https"}
    except Exception:
        return False


def _clean_attachment_name(name: str | None) -> str | None:
    if not name:
        return None
    cleaned = SPACE_BEFORE_EXT_RE.sub(r"\1", re.sub(r"\s+", " ", name).strip())
    return cleaned or None


def _preferred_attachment_name(fields: Any, url: str) -> str | None:
    attachment_names = getattr(fields, "attachment_names", None) or {}
    name = _clean_attachment_name(
        attachment_names.get(url)
        or attachment_names.get(_prefer_https_for_fetch(url))
    )
    if (
        len(fields.attachment_urls or []) == 1
        and fields.title
        and (not name or GENERIC_ATTACHMENT_NAME_RE.fullmatch(name))
    ):
        return fields.title
    return name


def _decompose_target_code(site_code: str, target_code: str) -> tuple[str | None, str]:
    """Split `<site>__<dept>__<col>` or `<site>__<col>` → (dept_path, column_id).

    Mirror of `config.sync._target_code`. Returns (None, column_id) when the
    target is site-level (no dept_path).
    """
    prefix = f"{site_code}__"
    if not target_code.startswith(prefix):
        raise ValueError(f"target_code {target_code!r} doesn't start with {prefix!r}")
    tail = target_code[len(prefix):]
    parts = tail.split("__", 1)
    if len(parts) == 1:
        return None, parts[0]
    return parts[0], parts[1]


def _resolve_detail_selectors(
    site: CrawlSite, target: CrawlTarget
) -> dict[str, Any] | None:
    """Pick detail selectors: target override → adapter default → legacy YAML."""
    # 1) per-target override
    override = (target.parser_override_json or {}).get("detail") if target.parser_override_json else None
    if override:
        return override
    # 2) adapter default (e.g. gkmlpt DEFAULT_DETAIL_SELECTORS)
    if site.cms_adapter:
        try:
            adapter = get_adapter(site.cms_adapter)
        except KeyError:
            adapter = None
        if adapter is not None:
            default = getattr(adapter, "DEFAULT_DETAIL_SELECTORS", None)
            if default:
                return default
    # 3) legacy YAML registry (1.0 compat for yaml_path sites)
    _, column_id = _decompose_target_code(site.site_code, target.target_code)
    return get_detail_selectors(site.site_code, column_id)


@dataclass
class _ResolvedTarget:
    site: CrawlSite
    target: CrawlTarget
    dept: SiteDepartment | None
    dept_path: str | None
    column_id: str


def _resolve_target(session: Session, target_code: str) -> _ResolvedTarget:
    target = targets_repo.get_by_code(session, target_code)
    if target is None:
        raise ValueError(f"no crawl_target with target_code={target_code!r}")
    site = sites_repo.get_by_id(session, target.site_id)
    if site is None:
        raise ValueError(f"crawl_target {target_code!r} points at missing site_id={target.site_id}")
    dept = None
    if target.site_department_id is not None:
        dept = session.get(SiteDepartment, target.site_department_id)
    dept_path, column_id = _decompose_target_code(site.site_code, target.target_code)
    return _ResolvedTarget(
        site=site, target=target, dept=dept,
        dept_path=dept_path, column_id=column_id,
    )


def _adapter_params_for(rt: _ResolvedTarget) -> dict[str, Any]:
    """Merged adapter params for a target.

    Site rows carry the shared adapter defaults. Some adapter sites need
    multiple targets that hit the same origin but use different list facets
    (for example gov.cn 政策文件库: 国务院文件 vs 国务院部门文件). Store those
    narrow overrides under crawl_target.parser_override_json.adapter_params.
    The existing parser_override_json.detail convention remains unchanged.
    """
    params: dict[str, Any] = {}
    if isinstance(rt.site.adapter_params_json, dict):
        params.update(rt.site.adapter_params_json)
    override = rt.target.parser_override_json or {}
    if isinstance(override, dict):
        target_params = override.get("adapter_params")
        if isinstance(target_params, dict):
            params.update(target_params)
    return params


# ---------------------------------------------------------------------------
# single-article fetch + store
# ---------------------------------------------------------------------------
def fetch_and_store(
    *,
    target_code: str,
    url: str,
    list_item: CrawlItem | None = None,
    throttle: HostThrottle | None = None,
    log_fetch_failures: bool = True,
    replace_article_id: int | None = None,
) -> dict:
    """End-to-end: fetch → parse → write files → download attachments → insert rows.

    `list_item` may be a `CrawlItem` produced by an adapter's `parse_list_response`
    — when present we thread its metadata (title, publish_time, source_raw,
    channel_name, doc_no …) into the Article row via `insert_article_from_contract`.
    """
    owns_throttle = throttle is None
    url_h = compute_url_hash(url)
    Session = get_sessionmaker()
    if replace_article_id is not None and list_item is not None:
        raise ValueError("replace_article_id is only supported for direct article fetches")

    # dedup + resolve
    with Session() as s:
        rt = _resolve_target(s, target_code)
        if replace_article_id is not None:
            replace_article = s.get(Article, replace_article_id)
            if replace_article is None:
                raise ValueError(f"article not found for replace_article_id={replace_article_id}")
            if replace_article.target_id != rt.target.id:
                raise ValueError(
                    f"article {replace_article_id} does not belong to target={target_code!r}"
                )
        existing = get_article_by_url_hash(s, url_h)
        if existing and existing.id != replace_article_id:
            # Distinguish self-dedup (this target already has it → real
            # historical boundary, OK to early-stop) from cross-target
            # dedup (some sibling target already pulled this URL — common
            # on news sites that file the same article into multiple
            # channels, e.g. 网易 domestic vs world). The main loop only
            # treats self-dedup as a stop signal.
            same_target = existing.target_id == rt.target.id
            return {
                "status": "skipped",
                "reason": "duplicate_url_hash" if same_target else "duplicate_other_target",
                "url_hash": url_h,
                "article_id": existing.id,
            }
        site_pk = rt.site.id
        target_id = rt.target.id
        site_code = rt.site.site_code
        target_code_norm = rt.target.target_code
        selectors = _resolve_detail_selectors(rt.site, rt.target)
        dept_id_fk = rt.target.dept_id  # local_department FK (may be None)
        # Snapshot the target's classification metadata so we can copy it
        # onto the article row — saves the RAG / search side from joining
        # against crawl_target on every read. Adapter-driven CrawlItems
        # may carry their own values, in which case we keep the CrawlItem
        # value (it usually came from the JSON list response and is more
        # specific to this article).
        target_channel_name = rt.target.channel_name
        target_channel_path = rt.target.channel_path
        target_content_category = rt.target.content_category
        target_content_subcategory = rt.target.content_subcategory
        target_interval_sec = rt.target.interval_sec
        target_interval_jitter_sec = rt.target.interval_jitter_sec
        target_interval_sec, target_interval_jitter_sec, _ = _effective_target_intervals(
            site_code=site_code,
            target_code=target_code_norm,
            interval_sec=target_interval_sec,
            interval_jitter_sec=target_interval_jitter_sec,
        )

    if owns_throttle:
        throttle = HostThrottle(
            interval_s=target_interval_sec,
            jitter_s=target_interval_jitter_sec or 0,
        )
    assert throttle is not None

    adapter_fetch_detail = None
    if rt.site.cms_adapter:
        adapter_fetch_detail = getattr(get_adapter(rt.site.cms_adapter), "fetch_detail", None)

    fetch_url = _prefer_https_for_fetch(url)
    if adapter_fetch_detail is not None:
        throttle.wait(fetch_url)
        fr, fields = adapter_fetch_detail(rt, url=fetch_url, list_item=list_item)
    else:
        if selectors is None:
            raise ValueError(
                f"no detail selectors available for target={target_code}; "
                "set crawl_target.parser_override_json.detail or ensure the adapter "
                "defines DEFAULT_DETAIL_SELECTORS"
            )
        throttle.wait(fetch_url)
        fr = fetch_html(fetch_url)
        fields = None
    if (
        fr.error
        and fetch_url != url
        and urlparse(fetch_url).netloc.lower() == "www.gd.gov.cn"
        and urlparse(url).scheme.lower() == "http"
    ):
        # gd.gov.cn sometimes resets HTTPS from the production network while
        # the original HTTP article URL still works. Keep the HTTPS preference
        # for canonical redirects, but fall back before declaring the row dead.
        throttle.wait(url)
        if adapter_fetch_detail is not None:
            fr, fields = adapter_fetch_detail(rt, url=url, list_item=list_item)
        else:
            fr = fetch_html(url)
    strategy = fr.strategy

    non_http_final = bool(fr.final_url) and not _is_http_url(fr.final_url)

    # -------- failure path --------
    if fr.error or fr.is_challenge or not fr.html or non_http_final:
        reason = (
            fr.error
            or (f"non_http_final_url:{fr.final_url}" if non_http_final else None)
            or ("ctct_challenge_unresolved" if fr.is_challenge else "empty_html")
        )
        if log_fetch_failures:
            with Session() as s:
                insert_crawl_log(
                    s,
                    site_pk=site_pk,
                    target_id=target_id,
                    article_url=fetch_url,
                    strategy=strategy,
                    http_status=fr.status,
                    duration_ms=fr.duration_ms,
                    success=False,
                    error_msg=reason,
                )
                s.commit()
        outcome = "challenge" if fr.is_challenge else "failed"
        record_fetch(
            site_id=site_code, column_id=target_code_norm, strategy=strategy,
            outcome=outcome, duration_ms=fr.duration_ms, http_status=fr.status,
        )
        return {
            "status": "failed",
            "reason": reason,
            "http_status": fr.status,
            "strategy": strategy,
            "duration_ms": fr.duration_ms,
        }

    canonical_url = fr.final_url or fetch_url
    final_url_hash = compute_url_hash(canonical_url)
    if final_url_hash != url_h:
        with Session() as s:
            existing_final = get_article_by_url_hash(s, final_url_hash)
            if existing_final and existing_final.id != replace_article_id:
                same_target = existing_final.target_id == target_id
                return {
                    "status": "skipped",
                    "reason": "duplicate_url_hash" if same_target else "duplicate_other_target",
                    "url_hash": final_url_hash,
                    "article_id": existing_final.id,
                }

    # -------- parse detail --------
    if fields is None:
        fields = parse_detail(fr.html, canonical_url, selectors)
    pub_dt = (
        (list_item.publish_time if list_item and list_item.publish_time else None)
        or _parse_publish_time(fields.publish_time_raw)
        or now_cn_naive()
    )
    # strip tz for DB column (schema uses naive DateTime)
    if pub_dt.tzinfo is not None:
        pub_dt = pub_dt.replace(tzinfo=None)
    normalized_publish_date = fields.publish_date or pub_dt.date()
    detail_metadata = {"public_meta": fields.public_meta} if fields.public_meta else None

    article_key = _derive_article_key(canonical_url)
    if replace_article_id is not None:
        article_key = f"{article_key}_refetch_{replace_article_id}_{int(_time.time() * 1000)}"
    raw_rel = write_raw_html(site_code, target_code_norm, pub_dt, article_key, fr.html)
    text_rel = write_article_text(
        site_code, target_code_norm, pub_dt, article_key, fields.content_text,
        title=fields.title or (list_item.title if list_item else None),
    )

    settings = get_settings()
    attachment_throttle = HostThrottle(
        interval_s=min(float(target_interval_sec or 0.0), settings.attachment_throttle_cap_s),
        jitter_s=min(float(target_interval_jitter_sec or 0.0), settings.attachment_throttle_cap_s),
    )
    downloaded = []
    for au in fields.attachment_urls:
        # Do not apply the detail-page HTTPS rewrite to attachment URLs.
        # Historical gd.gov.cn PDF resources under /GD_ZWGKRESOURCES/ are
        # published as HTTP; the HTTPS endpoint may reset the connection.
        attachment_url = au
        # Attachments are secondary. Space multiple attachment requests, but
        # cap the wait so a 120-180s detail throttle does not keep an already
        # fetched article out of the DB for minutes.
        try:
            attachment_throttle.wait(attachment_url)
        except Exception:
            pass
        try:
            d = download_attachment(
                attachment_url, site=site_code, column=target_code_norm,
                when=pub_dt, article_key=article_key,
                preferred_name=_preferred_attachment_name(fields, au),
            )
            downloaded.append(d)
        except Exception as e:
            log.warning("attachment download failed url=%s err=%s", attachment_url, e)

    # Ready when we have enough body text, OR a downloaded attachment, OR
    # an attachment URL was extracted (binary may have failed to fetch but
    # the article is still real — gd_gkmlpt has many "PDF-only" entries
    # whose detail page is just a download link), OR at least one inline
    # <img> in the article body — gov sites publish photo-only 通知
    # (e.g. 灭蚊宣传 4-image set) where 正文文字 is empty but the page is
    # a real article.
    # `len(content_html) >= 500` covers structured-table articles (qingxin
    # 救助公示 etc): legitimate posts whose entire body is a <table> of
    # records — extracted text is ~49 chars but content_html is 3000+ and
    # carries the real data. Without this, the pipeline marked them failed
    # and they got hidden from operators / RAG even though the structured
    # data was already in raw_html.
    status = (
        "ready"
        if (
            len(fields.content_text) >= MIN_CONTENT_TEXT_CHARS
            or downloaded
            or len(fields.attachment_urls) >= 1
            or fields.inline_image_count >= 1
            or len(fields.content_html or "") >= 500
        )
        else "failed"
    )

    if replace_article_id is not None and status != "ready":
        error_msg = f"content_text_too_short:{len(fields.content_text)}"
        _delete_refetch_outputs(raw_rel, text_rel, downloaded)
        with Session() as s:
            insert_crawl_log(
                s,
                site_pk=site_pk,
                target_id=target_id,
                article_url=canonical_url,
                strategy=strategy,
                http_status=fr.status,
                duration_ms=fr.duration_ms,
                success=False,
                error_msg=error_msg,
            )
            s.commit()
        record_fetch(
            site_id=site_code, column_id=target_code_norm, strategy=strategy,
            outcome="failed", duration_ms=fr.duration_ms, http_status=fr.status,
        )
        return {
            "status": "failed",
            "reason": error_msg,
            "article_id": replace_article_id,
            "url_hash": final_url_hash,
            "title": fields.title,
            "used_fallback": fields.used_fallback,
            "fallback_engine": fields.fallback_engine,
            "http_status": fr.status,
            "duration_ms": fr.duration_ms,
        }

    # -------- persist --------
    with Session() as s:
        if list_item is not None:
            # Build a fully-populated CrawlItem with detail fields merged in,
            # then project via contract-driven insert.
            merged = list_item.model_copy(update={
                "url": canonical_url,
                "url_hash": final_url_hash,
                "title": list_item.title or fields.title or None,
                "publish_time": pub_dt,
                "source_raw": list_item.source_raw or fields.source or None,
                "publisher": list_item.publisher or fields.publisher or fields.source or None,
                "doc_no": list_item.doc_no or fields.doc_no or None,
                "index_no": list_item.index_no or fields.index_no or None,
                "publish_date": list_item.publish_date or normalized_publish_date,
                "effective_date": list_item.effective_date or fields.effective_date,
                "is_effective": (
                    list_item.is_effective
                    if list_item.is_effective is not None
                    else fields.is_effective
                ),
                "expiry_date": list_item.expiry_date or fields.expiry_date,
                "topic_words": list_item.topic_words or fields.topic_words or None,
                "open_category": list_item.open_category or fields.open_category or None,
                "metadata_json": (
                    {**(list_item.metadata_json or {}), **(detail_metadata or {})}
                    if (list_item.metadata_json or detail_metadata)
                    else None
                ),
                "content_text": fields.content_text or None,
                "raw_html_path": str(raw_rel),
                "text_path": str(text_rel),
                "has_attachment": bool(downloaded),
                "status": Status.READY if status == "ready" else Status.FAILED,
                "fetch_strategy": list_item.fetch_strategy or None,
                "target_id": target_id,
                "dept_id": dept_id_fk,
                # Inherit classification from the target unless the adapter
                # supplied its own (CrawlItem fields take precedence).
                "channel_name": list_item.channel_name or target_channel_name,
                "channel_path": list_item.channel_path or target_channel_path,
                "content_category": (
                    list_item.content_category
                    or fields.content_category
                    or target_content_category
                ),
                "content_subcategory": (
                    list_item.content_subcategory
                    or fields.content_subcategory
                    or target_content_subcategory
                ),
            })
            article = insert_article_from_contract(s, merged, site_pk=site_pk)
        else:
            article_kwargs = {
                "site_id": site_pk,
                "target_id": target_id,
                "dept_id": dept_id_fk,
                "url": canonical_url,
                "url_hash": final_url_hash,
                "title": fields.title or None,
                "publish_time": pub_dt,
                "source_raw": fields.source or None,
                "publisher": fields.publisher or fields.source or None,
                "doc_no": fields.doc_no or None,
                "index_no": fields.index_no or None,
                "publish_date": normalized_publish_date,
                "effective_date": fields.effective_date,
                "is_effective": fields.is_effective,
                "expiry_date": fields.expiry_date,
                "topic_words": fields.topic_words or None,
                "open_category": fields.open_category or None,
                "metadata_json": detail_metadata,
                "content_text": fields.content_text or None,
                "raw_html_path": str(raw_rel),
                "text_path": str(text_rel),
                "has_attachment": bool(downloaded),
                "status": status,
                "fetch_strategy": strategy,
                # Carry the target's classification onto the article so RAG
                # / search filters can read these without joining
                # crawl_target. Yaml-path doesn't have a CrawlItem with
                # adapter-supplied values, so target is the only source.
                "channel_name": target_channel_name,
                "channel_path": target_channel_path,
                "content_category": fields.content_category or target_content_category,
                "content_subcategory": fields.content_subcategory or target_content_subcategory,
            }
            if replace_article_id is None:
                article = insert_article(s, **article_kwargs)
            else:
                article = s.get(Article, replace_article_id)
                if article is None:
                    raise ValueError(f"article not found for replace_article_id={replace_article_id}")
                for att in list(article.attachments):
                    s.delete(att)
                for key, value in article_kwargs.items():
                    setattr(article, key, value)
                now = now_cn_naive()
                article.fetched_at = now
                article.updated_at = now
                article.exported_to_rag_at = None
                article.rag_export_status = None
                article.rag_export_started_at = None
                article.rag_export_finished_at = None
                article.rag_export_error = None
                article.rag_export_task_ids = []
                s.flush()

        att_records = [
            {
                "file_name": d.file_name,
                "file_ext": d.file_ext,
                "size_bytes": d.size_bytes,
                "file_path": str(d.file_path),
                "file_hash": d.file_hash,
                "source_url": d.source_url,
            }
            for d in downloaded
        ]
        standard_meta = upsert_article_standard_meta(s, article)
        if replace_article_id is not None and standard_meta is None and article.standard_meta is not None:
            s.delete(article.standard_meta)
        if att_records:
            insert_attachments(s, article.id, att_records)

        error_msg = (
            None
            if status == "ready"
            else f"content_text_too_short:{len(fields.content_text)}"
        )
        insert_crawl_log(
            s,
            site_pk=site_pk,
            target_id=target_id,
            article_url=canonical_url,
            strategy=strategy,
            http_status=fr.status,
            duration_ms=fr.duration_ms,
            success=(status == "ready"),
            error_msg=error_msg,
        )
        s.commit()
        article_id = article.id

    record_fetch(
        site_id=site_code, column_id=target_code_norm, strategy=strategy,
        outcome=status, duration_ms=fr.duration_ms, http_status=fr.status,
    )
    return {
        "status": status,
        "article_id": article_id,
        "url_hash": final_url_hash,
        "title": fields.title,
        "used_fallback": fields.used_fallback,
        "fallback_engine": fields.fallback_engine,
        "raw_html_path": str(raw_rel),
        "text_path": str(text_rel),
        "attachments_downloaded": len(downloaded),
        "http_status": fr.status,
        "duration_ms": fr.duration_ms,
    }


# ---------------------------------------------------------------------------
# list-page drivers
# ---------------------------------------------------------------------------
@dataclass
class _ListEntry:
    url: str
    item: CrawlItem | None  # None for legacy CSS path
    # 1-based page index this URL came from. Used by the page-level
    # checkpoint feature: pipeline groups entries by page, marks each
    # page completed only after every entry in it is processed.
    page_num: int = 1


@dataclass(frozen=True)
class _CrawlOrder:
    mode: str = "ordered"
    batch_pages: int = 1


_RANDOM_BATCH_DEFAULTS = {
    "gd_wjk": {"mode": "random_batch", "batch_pages": 3},
    # gkmlpt API pages already contain many entries (often 100). With a
    # 180s detail/list interval, prefetching 3 pages delays the first article
    # by ~6 minutes. Randomize within the first page, then move to the next.
    "gd_gkmlpt": {"mode": "random_batch", "batch_pages": 1},
    # gov.cn policy library list pages are cheap but long. Process a few pages
    # at a time so a restart only loses a small in-memory URL batch, and page
    # checkpoints can resume without requiring a persisted pending-URL table.
    "gov_cn_zcwjk": {"mode": "paged_batch", "batch_pages": 3},
    "gov_cn_gjgzk": {"mode": "paged_batch", "batch_pages": 3},
}


def _crawl_order_for(rt: _ResolvedTarget) -> _CrawlOrder:
    raw: Any = None
    params = _adapter_params_for(rt)
    raw = params.get("crawl_order")

    site_cfg = get_site_config(rt.site.site_code)
    if raw is None and site_cfg is not None:
        col_cfg = site_cfg.get_column(rt.column_id)
        if col_cfg is not None:
            raw = getattr(col_cfg, "crawl_order", None)

    if raw is None:
        raw = _RANDOM_BATCH_DEFAULTS.get(rt.site.site_code)

    if raw is None:
        return _CrawlOrder()
    if hasattr(raw, "model_dump"):
        raw = raw.model_dump()
    if not isinstance(raw, dict):
        return _CrawlOrder()

    mode = str(raw.get("mode") or "ordered")
    try:
        batch_pages = int(raw.get("batch_pages") or 1)
    except Exception:
        batch_pages = 1
    return _CrawlOrder(mode=mode, batch_pages=max(1, batch_pages))


def _empty_list_result(url: str) -> FetchResult:
    return FetchResult(
        url=url, final_url=url, status=200, html="[]",
        fetched_at=_time.time(), duration_ms=0, is_challenge=False,
        strategy="synthetic",
    )


def _list_via_adapter(
    rt: _ResolvedTarget, *, interval_sec: float | None = None,
    interval_jitter_sec: float | None = None,
    max_items: int | None = None,
    stop_check: Callable[[], bool] | None = None,
    start_page: int = 1,
    page_limit: int | None = None,
) -> tuple[str, list[_ListEntry], FetchResult]:
    """Adapter path: walk pagination via ?page=N, projecting each page's
    CrawlItems and accumulating across pages.

    Stops when:
      • a page returns 0 articles                  (end of column)
      • a page returns only URLs we've already seen (CMS pagination loop)
      • a page fetch fails after page 1            (transient — page 1 hard fails)
      • hard_max_pages is reached                   (safety cap, defaults 50)

    The main loop's stop_on_duplicate handles the historical-boundary case:
    gkmlpt list responses are publish-time-descending, so once we accumulate
    enough URLs to reach known history, fetch_and_store hits dedup and the
    main loop exits without us needing to know how deep we've gone.

    `hard_max_pages` is sourced from crawl_site.adapter_params_json — the
    cms yaml `hard_max_pages: 50` shipped with gkmlpt is just a doc-level
    suggestion; if a site row carries an explicit value it wins.
    """
    adapter = get_adapter(rt.site.cms_adapter)
    params = _adapter_params_for(rt)
    hard_max = int(params.get("hard_max_pages") or 50)
    # Adapters that need POST / session warmup / non-trivial fetch can
    # expose `fetch_list_page(rt, *, page_num, params, page_size, interval_sec)`
    # returning (list_url, list_of_CrawlItem, FetchResult). When present we
    # delegate the entire fetch+parse to the adapter and skip the standard
    # GET-and-json.loads path. gkmlpt-style adapters leave it unset.
    custom_fetch = getattr(adapter, "fetch_list_page", None)

    seen_urls: set[str] = set()
    aggregated: list[_ListEntry] = []
    first_url: str | None = None
    first_fr: FetchResult | None = None

    list_throttle = max(0.0, float(interval_sec)) if interval_sec else 0.0
    start_page = max(1, int(start_page or 1))
    end_page = hard_max
    if page_limit is not None:
        end_page = min(hard_max, start_page + max(1, int(page_limit)) - 1)
    if start_page > hard_max:
        return rt.target.entry_url or rt.site.base_url or "", [], _empty_list_result(
            rt.target.entry_url or rt.site.base_url or "",
        )

    for page_num in range(start_page, end_page + 1):
        # Polite sleep between pages — without this we previously fired ~8
        # list-API requests within 4s on gd.gov.cn, which tripped the
        # prefecture-level WAF and blocked all subsequent article fetches
        # with ERR_CONNECTION_RESET. Sleep BEFORE pages 2..N (page 1 is
        # the first hit, no prior page to space from).
        if page_num > start_page and list_throttle > 0:
            _time.sleep(_interval_with_jitter(list_throttle, interval_jitter_sec))
        # Cancel poll between list pages — so a /cancel against a long
        # paginated walk (gd_wjk index_2..index_5 with interval_sec=60
        # = ~5 min) actually halts at the next page boundary instead of
        # waiting out the entire walk.
        if stop_check is not None and page_num > start_page and stop_check():
            log.info("_list_via_adapter cancelled by operator at page %d", page_num)
            break

        if custom_fetch is not None:
            try:
                list_url, items, fr = custom_fetch(
                    rt, page_num=page_num, params=params,
                    interval_sec=interval_sec,
                )
            except Exception as e:
                log.warning("adapter custom fetch failed page=%d err=%s", page_num, e)
                if page_num == start_page:
                    list_url = rt.target.entry_url or rt.site.base_url or ""
                    return list_url, [], FetchResult(
                        url=list_url,
                        final_url=list_url,
                        status=0,
                        html="",
                        fetched_at=_time.time(),
                        duration_ms=0,
                        is_challenge=False,
                        error=f"{type(e).__name__}: {e}",
                        strategy="exception",
                    )
                break
            if page_num == start_page:
                first_url = list_url
                first_fr = fr
                if (fr is None) or fr.error or (not items and not fr.html):
                    return list_url, [], fr
            elif fr is not None and (fr.error or (not items and not fr.html)):
                log.info("adapter pagination stop: page %d fetch failed url=%s",
                         page_num, list_url)
                break
            if not items:
                log.info("adapter pagination stop: page %d returned 0 items url=%s",
                         page_num, list_url)
                break
            new_in_page = 0
            for it in items:
                if it.url in seen_urls:
                    continue
                seen_urls.add(it.url)
                aggregated.append(_ListEntry(url=it.url, item=it, page_num=page_num))
                new_in_page += 1
            if new_in_page == 0:
                log.info("adapter pagination stop: page %d had no new urls", page_num)
                break
            if max_items is not None and len(aggregated) >= max_items:
                log.info("adapter pagination stop: reached max_items=%d at page %d",
                         max_items, page_num)
                break
            continue

        list_url = adapter.build_list_url(
            base_url=rt.site.base_url or "",
            dept_path=rt.dept_path or "",
            column_id=rt.column_id,
            page=page_num,
            sid=params.get("sid"),
            path_tpl=params.get("list_api_path_tpl"),
        )
        if page_num == start_page:
            first_url = list_url

        fr = fetch_html(list_url)
        if page_num == start_page:
            first_fr = fr
            if fr.error or not fr.html:
                return list_url, [], fr
        elif fr.error or not fr.html:
            log.info("adapter pagination stop: page %d fetch failed url=%s err=%s",
                     page_num, list_url, fr.error or "empty")
            break

        # When fetch_html falls through to playwright (ctct-shielded sites
        # like gdqy.gov.cn), the browser auto-renders any JSON-content URL
        # as '<html><head>…</head><body><pre>{…json…}</pre></body></html>'
        # — json.loads on that fails. Unwrap the <pre> body before parsing.
        body = fr.html or ""
        body_stripped = body.strip()
        if body_stripped.startswith("<"):
            import re as _re
            m = _re.search(r"<pre[^>]*>(.*?)</pre>", body, _re.S)
            if m:
                body = m.group(1).strip()
        try:
            payload = json.loads(body)
        except Exception as e:
            log.warning("adapter list JSON parse failed page=%d url=%s err=%s",
                        page_num, list_url, e)
            break

        items = adapter.parse_list_response(
            payload,
            site_id=rt.site.site_code,
            target_id=rt.target.id,
            dept_id=rt.target.dept_id,
        )
        if not items:
            log.info("adapter pagination stop: page %d returned 0 items url=%s",
                     page_num, list_url)
            break

        new_in_page = 0
        for it in items:
            if it.url in seen_urls:
                continue
            seen_urls.add(it.url)
            aggregated.append(_ListEntry(url=it.url, item=it, page_num=page_num))
            new_in_page += 1
        if new_in_page == 0:
            log.info("adapter pagination stop: page %d had no new urls (cms loop)",
                     page_num)
            break
        if max_items is not None and len(aggregated) >= max_items:
            log.info("adapter pagination stop: reached max_items=%d at page %d (cms loop)",
                     max_items, page_num)
            break

    assert first_fr is not None and first_url is not None
    return first_url, aggregated, first_fr


def _paginated_yaml_urls(list_url: str, pag) -> list[tuple[int, str]]:
    """Build the ordered list of list-page URLs to fetch in a single pass,
    based on the column's pagination config. Page 1 is always list_url
    itself; pages [start..start+max_pages-1] are derived per pagination
    type. max_pages caps total pages (including page 1)."""
    urls = [(1, list_url)]
    if pag is None or pag.type == "none" or pag.max_pages <= 1:
        return urls

    from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
    if pag.type == "page_param":
        for n in range(pag.start, pag.start + pag.max_pages - 1):
            u = urlparse(list_url)
            q = dict(parse_qsl(u.query))
            q[pag.param] = str(n)
            urls.append((n, urlunparse(u._replace(query=urlencode(q)))))
        return urls

    if pag.type == "path_pattern":
        if not pag.pattern or "{page}" not in pag.pattern:
            log.warning("path_pattern pagination missing valid pattern for url=%s", list_url)
            return urls
        u = urlparse(list_url)
        # Replace the last path segment (e.g. 'index.html' → 'index_2.html').
        # This matches every 清远 NF-CMS site's `/foo/index_N.html` style.
        head, _, _last = u.path.rpartition("/")
        for n in range(pag.start, pag.start + pag.max_pages - 1):
            new_seg = pag.pattern.replace("{page}", str(n))
            new_path = f"{head}/{new_seg}" if head else f"/{new_seg}"
            urls.append((n, urlunparse(u._replace(path=new_path))))
        return urls

    log.warning("unknown pagination type %r — falling back to first page only", pag.type)
    return urls


def _list_via_yaml(
    rt: _ResolvedTarget, *, interval_sec: float | None = None,
    interval_jitter_sec: float | None = None,
    max_items: int | None = None,
    stop_check: Callable[[], bool] | None = None,
    start_page: int = 1,
    page_limit: int | None = None,
) -> tuple[str, list[_ListEntry], FetchResult]:
    """Legacy CSS scrape path — reads list selectors from config/sites/*.yaml.

    Walks pagination according to col_cfg.pagination. Returns the canonical
    page-1 URL as `list_url` (used for robots / public-path checks) and the
    accumulated entries from every page that returned content. Stops early
    when a page returns 0 rows (avoids hammering 5 empty pages on small
    columns) but keeps going when a page just contains all-duplicate URLs —
    the main loop's stop_on_duplicate handles that case correctly.
    """
    site_cfg = get_site_config(rt.site.site_code)
    if site_cfg is None:
        raise ValueError(
            f"yaml_path site {rt.site.site_code!r} has no config/sites YAML "
            "— legacy CSS path requires it"
        )
    col_cfg = site_cfg.get_column(rt.column_id)
    if col_cfg is None or col_cfg.list_selector is None:
        raise ValueError(
            f"yaml config missing list_selector for {rt.site.site_code}/{rt.column_id}"
        )
    list_url = rt.target.entry_url or col_cfg.list_url
    selectors = {
        "row": col_cfg.list_selector.row,
        "href": col_cfg.list_selector.href,
        "title": col_cfg.list_selector.title,
        "date": col_cfg.list_selector.date or "span::text",
    }

    start_page = max(1, int(start_page or 1))
    page_urls = [
        (n, u) for n, u in _paginated_yaml_urls(list_url, col_cfg.pagination)
        if n >= start_page
    ]
    if page_limit is not None:
        page_urls = page_urls[: max(1, int(page_limit))]
    if not page_urls:
        return list_url, [], _empty_list_result(list_url)
    aggregated: list[_ListEntry] = []
    seen_urls: set[str] = set()
    first_fr: FetchResult | None = None
    list_throttle = max(0.0, float(interval_sec)) if interval_sec else 0.0
    for i, (page_num, page_url) in enumerate(page_urls):
        # Same anti-WAF spacing as the adapter path. Without this the
        # 30-page pagination on a single yaml site fires 30 list requests
        # in <10s and trips IP-level rate limits on chatty hosts.
        if i > 0 and list_throttle > 0:
            _time.sleep(_interval_with_jitter(list_throttle, interval_jitter_sec))
        # Cancel poll between yaml list pages — same purpose as in
        # _list_via_adapter: long paginated walks (qbwj index_2..index_5
        # at 60s/page) need to halt at page boundaries instead of waiting
        # the whole walk out.
        if stop_check is not None and i > 0 and stop_check():
            log.info("_list_via_yaml cancelled by operator at page %d", page_num)
            break
        fr = fetch_html(page_url)
        if i == 0:
            first_fr = fr  # page 1's FetchResult is the canonical one
            if fr.error or not fr.html:
                return list_url, [], fr  # bail early — page 1 broken
        elif fr.error or not fr.html:
            log.info("pagination stop: page %d fetch failed url=%s err=%s",
                     page_num, page_url, fr.error or "empty")
            break
        items = parse_list(fr.html, page_url, selectors)
        if not items:
            log.info("pagination stop: page %d returned 0 rows url=%s", page_num, page_url)
            break
        new_in_page = 0
        for it in items:
            if it.url in seen_urls:
                continue
            seen_urls.add(it.url)
            aggregated.append(_ListEntry(url=it.url, item=None, page_num=page_num))
            new_in_page += 1
        # If a deeper page repeated only known URLs from earlier pages, the
        # column's listing is shorter than max_pages → no point fetching more.
        if new_in_page == 0:
            log.info("pagination stop: page %d had no new urls url=%s", page_num, page_url)
            break
        if max_items is not None and len(aggregated) >= max_items:
            log.info("pagination stop: reached max_items=%d at page %d url=%s",
                     max_items, page_num, page_url)
            break

    assert first_fr is not None
    return list_url, aggregated, first_fr


def crawl_target(
    target_code: str,
    *,
    max_items: int | None = None,
    stop_on_duplicate: bool = True,
    throttle: HostThrottle | None = None,
    force_disabled: bool = False,
    stop_check: "Callable[[], bool] | None" = None,
    job_id: str | None = None,
    resume_from_page: int = 0,
    progress_callback: "Callable[[int], None] | None" = None,
) -> dict:
    """Fetch list page, iterate article URLs, call fetch_and_store per item.

    `stop_on_duplicate=True` implements INC-03 early-stop: the list is
    publish-time ordered, so the first already-seen URL ends the pass.

    `stop_check` is a callable invoked at safe break points (between
    list pages, between articles). Returning True triggers a graceful
    shutdown — no further fetches, return {'status': 'cancelled', ...}
    with what we managed to ingest so far. Used by the task_queue
    worker to honor operator cancel without waiting for natural
    completion (was previously cosmetic — flag was set but pipeline
    didn't poll it).

    DEFENSE-IN-DEPTH enabled check: refuse to run when the site or target
    is disabled in DB. We learned this gap the hard way — programmatic
    callers (a python REPL, a misbehaving test, a buggy task_queue path)
    bypass the api's /run enabled gate AND the scheduler's pre-dispatch
    gate, but they still hit `crawl_target()`. The only place that
    catches everything is here. Pass `force_disabled=True` to override
    (used by the v2.1.3 待修选择器 retry flow where ops explicitly want
    to re-test a disabled target).
    """
    # Default to a no-op stop_check so the rest of the function can call
    # it unconditionally without `if stop_check is not None:` everywhere.
    if stop_check is None:
        stop_check = lambda: False  # noqa: E731
    Session = get_sessionmaker()
    with Session() as s:
        rt = _resolve_target(s, target_code)
        # Hard refuse if the site/target was disabled — even programmatic
        # callers must respect this. Lets the operator's "disable in UI"
        # action stop ALL fetch paths, not just the HTTP-triggered ones.
        if not force_disabled:
            if not rt.target.enabled:
                log.info("crawl_target refused: target=%s disabled", target_code)
                return {
                    "status": "skipped_disabled",
                    "reason": "target_disabled",
                    "target_code": target_code,
                    "items_seen": 0, "items_new": 0,
                }
            if not rt.site.enabled:
                log.info("crawl_target refused: site=%s disabled", rt.site.site_code)
                return {
                    "status": "skipped_disabled",
                    "reason": "site_disabled",
                    "target_code": target_code,
                    "site_code": rt.site.site_code,
                    "items_seen": 0, "items_new": 0,
                }
        # snapshot what we need after session closes
        site_pk = rt.site.id
        target_id = rt.target.id
        site_code = rt.site.site_code
        target_code_norm = rt.target.target_code
        cms_adapter = rt.site.cms_adapter
        respect_robots = rt.site.respect_robots
        interval_sec = rt.target.interval_sec
        interval_jitter_sec = rt.target.interval_jitter_sec
        track_checkpoint = bool(getattr(rt.target, "track_checkpoint", False))
        crawl_order = _crawl_order_for(rt)

    # Helper: write current progress to crawl_job row (best-effort).
    # Only fires when the target opted in AND we have a job_id to attach
    # the checkpoint to. DB error here must NOT kill the running crawl —
    # log + continue.
    current_page_saved = -1

    def _save_checkpoint(page_num: int) -> None:
        if not (track_checkpoint and job_id):
            return
        try:
            from govcrawler.models import CrawlJob
            with Session() as s:
                cj = s.get(CrawlJob, job_id)
                if cj is not None and page_num > (cj.last_completed_page or 0):
                    cj.last_completed_page = page_num
                    s.commit()
        except Exception:
            log.exception("checkpoint update failed job=%s page=%d", job_id, page_num)

    def _save_current_page(page_num: int) -> None:
        nonlocal current_page_saved
        page_num = max(0, int(page_num or 0))
        if page_num == current_page_saved:
            return
        current_page_saved = page_num
        if progress_callback is not None:
            try:
                progress_callback(page_num)
            except Exception:
                log.exception("progress callback failed job=%s page=%d", job_id, page_num)
        if not job_id:
            return
        try:
            from govcrawler.models import CrawlJob
            with Session() as s:
                cj = s.get(CrawlJob, job_id)
                if cj is not None and (cj.current_page or 0) != page_num:
                    cj.current_page = page_num
                    s.commit()
        except Exception:
            log.exception("current page update failed job=%s page=%d", job_id, page_num)

    settings = get_settings()
    ua = settings.user_agent

    # target.interval_sec (nullable) → adapter DEFAULT_INTERVAL_SEC (optional) →
    # HostThrottle default. Without this ladder, fresh crawl_target rows (no
    # per-target override) rate-limited themselves on chatty CMS fleets because
    # HostThrottle got `None` and silently fell back to 5s only after the first
    # hit — the per-adapter hint lets ops seed sensible values.
    if interval_sec is None and cms_adapter:
        try:
            adapter_mod = get_adapter(cms_adapter)
        except KeyError:
            adapter_mod = None
        if adapter_mod is not None:
            interval_sec = getattr(adapter_mod, "DEFAULT_INTERVAL_SEC", None)

    interval_sec, interval_jitter_sec, interval_policy = _effective_target_intervals(
        site_code=site_code,
        target_code=target_code_norm,
        interval_sec=interval_sec,
        interval_jitter_sec=interval_jitter_sec,
        settings=settings,
    )
    if interval_policy is not None:
        log.info(
            "dynamic interval policy target=%s policy=%s configured=%s+%s effective=%.1f+%.1f",
            target_code_norm,
            interval_policy["policy"],
            interval_policy["configured_interval_sec"],
            interval_policy["configured_interval_jitter_sec"],
            interval_policy["effective_interval_sec"] or 0,
            interval_policy["effective_interval_jitter_sec"] or 0,
        )

    # ----- list page -----
    # Pass max_items down so list enumeration breaks early once enough URLs
    # are aggregated. Without this, sites with hard_max_pages=100 +
    # interval_sec=10 (e.g. gov_cn_xxgk) walked all 100 pages — ~18 min —
    # before the per-article loop even started, even when caller asked for
    # max_items=2. The cap is a soft hint (we may overshoot by one page's
    # worth) but bounds total list-walk time.
    random_batch = crawl_order.mode == "random_batch"
    paged_batch = crawl_order.mode == "paged_batch"
    batch_mode = random_batch or paged_batch
    start_page = resume_from_page + 1 if track_checkpoint and resume_from_page > 0 else 1

    def _load_entries(page: int, page_limit: int | None = None):
        _save_current_page(page)
        if cms_adapter:
            return _list_via_adapter(
                rt, interval_sec=interval_sec, max_items=max_items,
                interval_jitter_sec=interval_jitter_sec,
                stop_check=stop_check, start_page=page, page_limit=page_limit,
            )
        return _list_via_yaml(
            rt, interval_sec=interval_sec, max_items=max_items,
            interval_jitter_sec=interval_jitter_sec,
            stop_check=stop_check, start_page=page, page_limit=page_limit,
        )

    list_url, entries, list_fr = _load_entries(
        start_page, crawl_order.batch_pages if batch_mode else None,
    )

    # COMP-03: reject non-public list URLs
    if not is_public_path(list_url):
        raise ValueError(f"refuse to crawl non-public list path: {list_url!r}")

    # COMP-02: robots.txt gate
    if respect_robots and not is_allowed(list_url, ua):
        log.warning("robots.txt disallows list_url=%s ua=%s — skipping", list_url, ua)
        return {
            "status": "robots_blocked",
            "target_code": target_code_norm,
            "list_url": list_url,
            "items_seen": 0,
            "items_new": 0,
        }

    if list_fr.error or not list_fr.html:
        return {
            "status": "list_failed",
            "reason": list_fr.error or "empty_html",
            "http_status": list_fr.status,
            "items_seen": 0,
            "items_new": 0,
        }

    throttle = throttle or HostThrottle(
        interval_s=interval_sec,
        jitter_s=interval_jitter_sec or 0,
    )
    log.info(
        "crawl_target start target=%s (adapter=%s) url=%s items=%d",
        target_code_norm, cms_adapter or "yaml", list_url, len(entries),
    )

    new_count = 0
    skipped_count = 0
    failed_count = 0
    consecutive_blocks = 0
    aborted = False
    abort_reason: str | None = None
    retryable_abort = False
    abort_checkpoint_page: int | None = None
    results: list[dict] = []
    total_list_entries = len(entries)

    def _prefetch_existing_articles(batch_entries: list[_ListEntry]) -> dict[str, dict[str, Any]]:
        """Bulk URL-hash dedup for list entries before any detail fetch.

        fetch_and_store still keeps its own pre-fetch check as a single-item
        safety net, and its post-fetch final_url check handles redirect
        canonicalization. This batch precheck is the cheap path: compare the
        list URL set against Article.url_hash once, then skip known URLs
        without entering retry/fetch/parsing at all.
        """
        url_hash_by_url: dict[str, str] = {}
        for entry in batch_entries:
            if entry.url in url_hash_by_url:
                continue
            try:
                url_hash_by_url[entry.url] = compute_url_hash(entry.url)
            except ValueError:
                continue
        if not url_hash_by_url:
            return {}
        hash_to_urls: dict[str, list[str]] = {}
        for url, url_hash in url_hash_by_url.items():
            hash_to_urls.setdefault(url_hash, []).append(url)
        with Session() as s:
            rows = (
                s.query(Article.url_hash, Article.id, Article.target_id)
                .filter(Article.url_hash.in_(list(hash_to_urls.keys())))
                .all()
            )
        out: dict[str, dict[str, Any]] = {}
        for url_hash, article_id, article_target_id in rows:
            for url in hash_to_urls.get(url_hash, []):
                out[url] = {
                    "url_hash": url_hash,
                    "article_id": article_id,
                    "target_id": article_target_id,
                }
        return out

    def _run_one(entry: _ListEntry) -> dict:
        """fetch_and_store with exp-backoff retry on transient failures.

        When fetch_and_store raises (e.g. network stack TypeError, httpx
        ConnectError outside the adapter's own try/except), it never gets the
        chance to insert_crawl_log — so operators hitting the admin UI see
        crawl_log silent. We mirror the failure into crawl_log here so
        retry-exhausted exceptions show up next to normal failed rows.
        """
        last: dict = {"status": "failed", "reason": "no_attempt"}
        last_came_from_exception = False
        for attempt in range(MAX_RETRIES + 1):
            suppress_fetch_failure_log = attempt < MAX_RETRIES
            try:
                r = fetch_and_store(
                    target_code=target_code_norm,
                    url=entry.url,
                    list_item=entry.item,
                    throttle=throttle,
                    log_fetch_failures=not suppress_fetch_failure_log,
                )
            except Exception as e:
                last = {"status": "failed", "reason": f"{type(e).__name__}: {e}"}
                last_came_from_exception = True
                if attempt < MAX_RETRIES:
                    _time.sleep(BACKOFF_BASE_S * (2**attempt))
                    continue
                _log_run_one_exception(entry.url, last["reason"])
                return last

            last = r
            last_came_from_exception = False
            status = r.get("status")
            http_status = r.get("http_status") or 0
            reason = str(r.get("reason") or "")

            if status in ("ready", "skipped"):
                return r
            if reason.startswith("host_cooldown:"):
                if suppress_fetch_failure_log:
                    _log_run_one_failure(entry.url, r)
                return r
            if http_status in BLOCK_STATUSES:
                if suppress_fetch_failure_log:
                    _log_run_one_failure(entry.url, r)
                return r
            # Only retry transient network/gateway failures. A logical failure
            # at HTTP 200 (e.g. content_text_too_short) won't change on retry —
            # worse, fetch_and_store already inserted the failed Article on
            # attempt 0, so attempt 1 short-circuits via the url_hash dedup
            # check and returns status='skipped'. The main loop then treats
            # that as an early-stop duplicate and aborts the whole pass after
            # the first failed article. Restrict retry to http_status==0
            # (network error / exception path) and 5xx.
            if attempt < MAX_RETRIES and (
                http_status == 0 or http_status in RETRIABLE_STATUSES
            ):
                _time.sleep(BACKOFF_BASE_S * (2**attempt))
                continue
            if suppress_fetch_failure_log and status == "failed":
                _log_run_one_failure(entry.url, r)
            return r
        if last_came_from_exception:
            _log_run_one_exception(entry.url, last.get("reason", "unknown"))
        return last

    def _log_run_one_failure(url: str, r: dict) -> None:
        """Persist a failure when an intermediate attempt suppressed
        fetch_and_store's own crawl_log write but the result turned out to be
        non-retriable. Retriable failures that later succeed stay out of the
        operator-facing failed log."""
        try:
            with Session() as s:
                insert_crawl_log(
                    s,
                    site_pk=site_pk,
                    target_id=target_id,
                    article_url=url,
                    strategy=str(r.get("strategy") or "httpx"),
                    http_status=int(r.get("http_status") or 0),
                    duration_ms=int(r.get("duration_ms") or 0),
                    success=False,
                    error_msg=str(r.get("reason") or "failed")[:500],
                )
                s.commit()
        except Exception:
            log.exception("failed to persist crawl_log for failed url=%s", url)

    def _log_run_one_exception(url: str, reason: str) -> None:
        """Persist a crawl_log row for exception-path failures. Best-effort —
        we swallow DB errors so a log-write regression can't mask the real
        crawl error from callers."""
        try:
            with Session() as s:
                insert_crawl_log(
                    s,
                    site_pk=site_pk,
                    target_id=target_id,
                    article_url=url,
                    strategy="exception",
                    http_status=0,
                    duration_ms=0,
                    success=False,
                    error_msg=reason[:500],  # crawl_log.error_msg is TEXT but keep sane
                )
                s.commit()
        except Exception:
            log.exception("failed to persist crawl_log for exception url=%s", url)

    cancelled = False
    last_seen_page = resume_from_page

    def _process_entries(
        batch_entries: list[_ListEntry],
        *,
        ordered_checkpoint: bool,
        duplicate_stop_mode: str,
    ) -> str:
        nonlocal new_count, skipped_count, failed_count, consecutive_blocks
        nonlocal aborted, abort_reason, retryable_abort, abort_checkpoint_page
        nonlocal cancelled, last_seen_page
        existing_by_url = _prefetch_existing_articles(batch_entries)
        processed_count = 0
        duplicate_count = 0
        for i, entry in enumerate(batch_entries):
            _save_current_page(entry.page_num)
            if max_items is not None and len(results) >= max_items:
                return "max_items"

            if stop_check():
                log.info("crawl_target cancelled by operator at item %d/%d url=%s",
                         i, len(batch_entries), entry.url)
                cancelled = True
                return "cancelled"

            if ordered_checkpoint and entry.page_num > last_seen_page:
                if last_seen_page > 0:
                    _save_checkpoint(last_seen_page)
                last_seen_page = entry.page_num

            # SSRF + path guard. is_safe_to_fetch layers DNS / private-IP /
            # allowlist checks on top of the path blacklist. Without this, a
            # crafted yaml or admin row could trick us into fetching internal
            # services on the docker network or cloud metadata endpoints.
            if not is_safe_to_fetch(entry.url):
                results.append({"url": entry.url, "status": "skipped", "reason": "non_public_path"})
                skipped_count += 1
                continue
            if respect_robots and not is_allowed(entry.url, ua):
                results.append({"url": entry.url, "status": "skipped", "reason": "robots_blocked"})
                skipped_count += 1
                continue

            existing = existing_by_url.get(entry.url)
            if existing:
                processed_count += 1
                skipped_count += 1
                same_target = existing["target_id"] == target_id
                reason = "duplicate_url_hash" if same_target else "duplicate_other_target"
                results.append({"url": entry.url, "status": "skipped"})
                if stop_on_duplicate and reason == "duplicate_url_hash":
                    duplicate_count += 1
                    if duplicate_stop_mode == "single":
                        log.info("early stop: duplicate url_hash at url=%s", entry.url)
                        return "duplicate"
                continue

            r = _run_one(entry)
            processed_count += 1
            item_result = {"url": entry.url, "status": r.get("status")}
            if r.get("reason"):
                item_result["reason"] = r.get("reason")
            if r.get("http_status") is not None:
                item_result["http_status"] = r.get("http_status")
            results.append(item_result)

            http_status = r.get("http_status") or 0
            if http_status in BLOCK_STATUSES:
                consecutive_blocks += 1
            else:
                consecutive_blocks = 0

            if r.get("status") == "skipped":
                skipped_count += 1
                if stop_on_duplicate and r.get("reason") == "duplicate_url_hash":
                    duplicate_count += 1
                    if duplicate_stop_mode == "single":
                        log.info("early stop: duplicate url_hash at url=%s", entry.url)
                        return "duplicate"
            elif r.get("status") == "ready":
                new_count += 1
            else:
                failed_count += 1

            reason = str(r.get("reason") or "")
            if reason.startswith("host_cooldown:"):
                safe_page = max(0, entry.page_num - 1) if ordered_checkpoint else max(0, last_seen_page)
                safe_page = max(0, int(safe_page or 0), int(resume_from_page or 0))
                if safe_page > 0:
                    _save_checkpoint(safe_page)
                aborted = True
                abort_reason = "cooldown"
                retryable_abort = True
                abort_checkpoint_page = safe_page
                log.warning(
                    "abort pass: host cooldown target=%s page=%s checkpoint=%s reason=%s",
                    target_code_norm, entry.page_num, safe_page, reason,
                )
                return "aborted"

            if consecutive_blocks >= ABORT_AFTER_N_BLOCKS:
                log.warning("abort pass: %d consecutive block responses (FETCH-05)", consecutive_blocks)
                aborted = True
                abort_reason = "blocked"
                return "aborted"
        if (
            stop_on_duplicate
            and duplicate_stop_mode == "batch"
            and processed_count > 0
            and duplicate_count == processed_count
        ):
            log.info("early stop: random batch is all duplicate url_hash entries")
            return "duplicate"
        return "done"

    if batch_mode:
        batch_entries = entries
        page = start_page
        while batch_entries:
            if random_batch:
                random.shuffle(batch_entries)
            status = _process_entries(
                batch_entries,
                ordered_checkpoint=False,
                duplicate_stop_mode="batch" if random_batch else "single",
            )
            if status in ("cancelled", "aborted", "duplicate", "max_items"):
                break
            max_page = max(e.page_num for e in batch_entries)
            last_seen_page = max(last_seen_page, max_page)
            _save_checkpoint(last_seen_page)
            if len({e.page_num for e in batch_entries}) < crawl_order.batch_pages:
                break
            page = max_page + 1
            _list_url, batch_entries, _list_fr = _load_entries(page, crawl_order.batch_pages)
            total_list_entries += len(batch_entries)
            if _list_fr.error or not _list_fr.html:
                break
    else:
        _process_entries(
            entries,
            ordered_checkpoint=True,
            duplicate_stop_mode="single",
        )
        if not cancelled and not aborted and last_seen_page > 0:
            _save_checkpoint(last_seen_page)

    # update target.last_crawled_at
    with Session() as s:
        t = targets_repo.get_by_code(s, target_code_norm)
        if t is not None:
            t.last_crawled_at = now_cn_naive()
            s.commit()

    if cancelled:
        final_status = "cancelled"
    elif aborted:
        final_status = "aborted"
    else:
        final_status = "ok"
    out = {
        "status": final_status,
        "target_code": target_code_norm,
        "site_code": site_code,
        "items_seen": total_list_entries,
        "items_new": new_count,
        "items_skipped": skipped_count,
        "items_failed": failed_count,
        "consecutive_blocks": consecutive_blocks,
        "interval_policy": interval_policy,
        "results": results,
    }
    if abort_reason:
        out["reason"] = abort_reason
    if retryable_abort:
        out["retryable"] = True
    if abort_checkpoint_page is not None:
        out["checkpoint_page"] = abort_checkpoint_page
    return out
