"""End-to-end crawl pipeline — v2 schema aligned.

Entry points:
  * `crawl_target(target_code)`   — list page → iterate → fetch_and_store per item
  * `fetch_and_store(*, target_code, url)` — single-article fetch → parse → persist

Dispatch model (§7.5 of 2.0 design doc):
  * `crawl_site.cms_adapter` set (e.g. "gkmlpt")  → adapter-driven JSON list API
  * `crawl_site.yaml_path` set                    → legacy CSS list scraping

Both paths converge on the same per-article loop. Retry / block / robots logic
stays identical to 1.0.

Compared to 1.0:
  * Addressed by `target_code` (globally unique) instead of (site_id, column_id)
  * Article insert goes through `insert_article_from_contract` when a CrawlItem
    is available from the adapter; falls back to `insert_article(**kwargs)` for
    legacy yaml_path sites
  * `content_simhash` removed — §5.5 excludes it; RAG owns content de-dup
  * `insert_crawl_log` now keyed by `(site_pk:int, target_id:int)`
"""
from __future__ import annotations

import json
import logging
import random
import re
import time as _time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable
from urllib.parse import urlparse, urlunparse

from sqlalchemy.orm import Session

from govcrawler.adapters import get_adapter
from govcrawler.adapters.contract import CrawlItem, Status
from govcrawler.compliance import is_allowed, is_public_path, is_safe_to_fetch
from govcrawler.config.registry import get_detail_selectors, get_site_config
from govcrawler.db import get_sessionmaker
from govcrawler.fetcher.browser import FetchResult
from govcrawler.fetcher.chain import fetch_html
from govcrawler.fetcher.throttle import HostThrottle
from govcrawler.models import Article, CrawlSite, CrawlTarget, SiteDepartment
from govcrawler.observability import record_fetch
from govcrawler.parser.detail_parser import parse_detail
from govcrawler.parser.list_parser import ListItem, parse_list
from govcrawler.repositories import sites as sites_repo
from govcrawler.repositories import targets as targets_repo
from govcrawler.settings import get_settings
from govcrawler.storage.attachments import download_attachment
from govcrawler.storage.files import write_article_text, write_raw_html
from govcrawler.storage.repo import (
    get_article_by_url_hash,
    insert_article,
    insert_article_from_contract,
    insert_attachments,
    insert_crawl_log,
    upsert_article_standard_meta,
)
from govcrawler.utils.url_norm import url_hash as compute_url_hash

log = logging.getLogger(__name__)

MIN_CONTENT_TEXT_CHARS = 50

# Retry policy (FETCH-05)
MAX_RETRIES = 2
BACKOFF_BASE_S = 2.0
ABORT_AFTER_N_BLOCKS = 3
BLOCK_STATUSES = (403, 412, 429)
RETRIABLE_STATUSES = (500, 502, 503, 504)
PREFER_HTTPS_HOSTS = {"www.gd.gov.cn"}


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _interval_with_jitter(interval_sec: float | None, jitter_sec: float | None) -> float:
    base = max(0.0, float(interval_sec or 0.0))
    jitter = max(0.0, float(jitter_sec or 0.0))
    return base + (random.uniform(0.0, jitter) if jitter > 0 else 0.0)


def _parse_publish_time(raw: str) -> datetime | None:
    """Pull a date out of free-form publish_time strings.

    Real-world inputs span several gov-site flavors:
      '2026-03-30 09:40'
      '发布日期：2026-03-30 09:40'
      '2026/03/30　来源: …'
      '2026年03月30日'
      '时间： 2026-03-30 09:37 来源：本网'
    Strategy: normalize separators (Chinese / slashes) → re.search for
    the first `YYYY-MM-DD[ HH:MM[:SS]]` substring → strptime. Anchoring
    at position 0 (the old behavior) failed on every site that prefixes
    a label like '发布日期：' or '时间：'.
    """
    if not raw:
        return None
    s = raw.replace("/", "-").replace("年", "-").replace("月", "-").replace("日", " ")
    s = re.sub(r"\s+", " ", s).strip()
    m = re.search(r"\d{4}-\d{1,2}-\d{1,2}(?:\s+\d{1,2}:\d{2}(?::\d{2})?)?", s)
    if not m:
        return None
    chunk = m.group(0)
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
        try:
            return datetime.strptime(chunk, fmt)
        except Exception:
            continue
    return None


def _derive_article_key(url: str) -> str:
    """Last path segment, ext stripped — used as filename stem.

    e.g. `/a/b/post_2136593.html` → `post_2136593`.
    Falls back to url_hash[:16] if no usable stem.
    """
    tail = url.rstrip("/").rsplit("/", 1)[-1].split("?", 1)[0].split("#", 1)[0]
    stem = tail.rsplit(".", 1)[0] if "." in tail else tail
    return stem or compute_url_hash(url)[:16]


def _prefer_https_for_fetch(url: str) -> str:
    """Use HTTPS for hosts that publish HTTP detail links but serve the same
    content over HTTPS. gd.gov.cn list pages often contain http:// detail
    links; fetching the HTTPS equivalent avoids an extra CDN/WAF edge case.
    """
    p = urlparse(url)
    m = re.fullmatch(r"/gkml/content/post_(\d+)\.html", p.path or "")
    if p.netloc.lower() == "www.gd.gov.cn" and m:
        post_id = int(m.group(1))
        return urlunparse(p._replace(
            scheme="https",
            path=f"/gkmlpt/content/{post_id // 1_000_000}/{post_id // 1000}/post_{post_id}.html",
            query="",
            fragment="",
        ))
    if p.scheme == "http" and p.netloc.lower() in PREFER_HTTPS_HOSTS:
        return urlunparse(p._replace(scheme="https"))
    return url


def _is_http_url(url: str) -> bool:
    try:
        return urlparse(url).scheme.lower() in {"http", "https"}
    except Exception:
        return False


def _decompose_target_code(site_code: str, target_code: str) -> tuple[str | None, str]:
    """Split `<site>__<dept>__<col>` or `<site>__<col>` → (dept_path, column_id).

    Mirror of `config.sync._target_code`. Returns (None, column_id) when the
    target is site-level (no dept_path).
    """
    prefix = f"{site_code}__"
    if not target_code.startswith(prefix):
        raise ValueError(f"target_code {target_code!r} doesn't start with {prefix!r}")
    tail = target_code[len(prefix):]
    parts = tail.split("__", 1)
    if len(parts) == 1:
        return None, parts[0]
    return parts[0], parts[1]


def _resolve_detail_selectors(
    site: CrawlSite, target: CrawlTarget
) -> dict[str, Any] | None:
    """Pick detail selectors: target override → adapter default → legacy YAML."""
    # 1) per-target override
    override = (target.parser_override_json or {}).get("detail") if target.parser_override_json else None
    if override:
        return override
    # 2) adapter default (e.g. gkmlpt DEFAULT_DETAIL_SELECTORS)
    if site.cms_adapter:
        try:
            adapter = get_adapter(site.cms_adapter)
        except KeyError:
            adapter = None
        if adapter is not None:
            default = getattr(adapter, "DEFAULT_DETAIL_SELECTORS", None)
            if default:
                return default
    # 3) legacy YAML registry (1.0 compat for yaml_path sites)
    _, column_id = _decompose_target_code(site.site_code, target.target_code)
    return get_detail_selectors(site.site_code, column_id)


@dataclass
class _ResolvedTarget:
    site: CrawlSite
    target: CrawlTarget
    dept: SiteDepartment | None
    dept_path: str | None
    column_id: str


def _resolve_target(session: Session, target_code: str) -> _ResolvedTarget:
    target = targets_repo.get_by_code(session, target_code)
    if target is None:
        raise ValueError(f"no crawl_target with target_code={target_code!r}")
    site = sites_repo.get_by_id(session, target.site_id)
    if site is None:
        raise ValueError(f"crawl_target {target_code!r} points at missing site_id={target.site_id}")
    dept = None
    if target.site_department_id is not None:
        dept = session.get(SiteDepartment, target.site_department_id)
    dept_path, column_id = _decompose_target_code(site.site_code, target.target_code)
    return _ResolvedTarget(
        site=site, target=target, dept=dept,
        dept_path=dept_path, column_id=column_id,
    )


def _adapter_params_for(rt: _ResolvedTarget) -> dict[str, Any]:
    """Merged adapter params for a target.

    Site rows carry the shared adapter defaults. Some adapter sites need
    multiple targets that hit the same origin but use different list facets
    (for example gov.cn 政策文件库: 国务院文件 vs 国务院部门文件). Store those
    narrow overrides under crawl_target.parser_override_json.adapter_params.
    The existing parser_override_json.detail convention remains unchanged.
    """
    params: dict[str, Any] = {}
    if isinstance(rt.site.adapter_params_json, dict):
        params.update(rt.site.adapter_params_json)
    override = rt.target.parser_override_json or {}
    if isinstance(override, dict):
        target_params = override.get("adapter_params")
        if isinstance(target_params, dict):
            params.update(target_params)
    return params


# ---------------------------------------------------------------------------
# single-article fetch + store
# ---------------------------------------------------------------------------
def fetch_and_store(
    *,
    target_code: str,
    url: str,
    list_item: CrawlItem | None = None,
    throttle: HostThrottle | None = None,
    log_fetch_failures: bool = True,
) -> dict:
    """End-to-end: fetch → parse → write files → download attachments → insert rows.

    `list_item` may be a `CrawlItem` produced by an adapter's `parse_list_response`
    — when present we thread its metadata (title, publish_time, source_raw,
    channel_name, doc_no …) into the Article row via `insert_article_from_contract`.
    """
    owns_throttle = throttle is None
    url_h = compute_url_hash(url)
    Session = get_sessionmaker()

    # dedup + resolve
    with Session() as s:
        rt = _resolve_target(s, target_code)
        existing = get_article_by_url_hash(s, url_h)
        if existing:
            # Distinguish self-dedup (this target already has it → real
            # historical boundary, OK to early-stop) from cross-target
            # dedup (some sibling target already pulled this URL — common
            # on news sites that file the same article into multiple
            # channels, e.g. 网易 domestic vs world). The main loop only
            # treats self-dedup as a stop signal.
            same_target = existing.target_id == rt.target.id
            return {
                "status": "skipped",
                "reason": "duplicate_url_hash" if same_target else "duplicate_other_target",
                "url_hash": url_h,
                "article_id": existing.id,
            }
        site_pk = rt.site.id
        target_id = rt.target.id
        site_code = rt.site.site_code
        target_code_norm = rt.target.target_code
        selectors = _resolve_detail_selectors(rt.site, rt.target)
        dept_id_fk = rt.target.dept_id  # local_department FK (may be None)
        # Snapshot the target's classification metadata so we can copy it
        # onto the article row — saves the RAG / search side from joining
        # against crawl_target on every read. Adapter-driven CrawlItems
        # may carry their own values, in which case we keep the CrawlItem
        # value (it usually came from the JSON list response and is more
        # specific to this article).
        target_channel_name = rt.target.channel_name
        target_channel_path = rt.target.channel_path
        target_content_category = rt.target.content_category
        target_content_subcategory = rt.target.content_subcategory
        target_interval_sec = rt.target.interval_sec
        target_interval_jitter_sec = rt.target.interval_jitter_sec

    if owns_throttle:
        throttle = HostThrottle(
            interval_s=target_interval_sec,
            jitter_s=target_interval_jitter_sec or 0,
        )
    assert throttle is not None

    adapter_fetch_detail = None
    if rt.site.cms_adapter:
        adapter_fetch_detail = getattr(get_adapter(rt.site.cms_adapter), "fetch_detail", None)

    fetch_url = _prefer_https_for_fetch(url)
    if adapter_fetch_detail is not None:
        throttle.wait(fetch_url)
        fr, fields = adapter_fetch_detail(rt, url=fetch_url, list_item=list_item)
    else:
        if selectors is None:
            raise ValueError(
                f"no detail selectors available for target={target_code}; "
                "set crawl_target.parser_override_json.detail or ensure the adapter "
                "defines DEFAULT_DETAIL_SELECTORS"
            )
        throttle.wait(fetch_url)
        fr = fetch_html(fetch_url)
        fields = None
    strategy = fr.strategy

    non_http_final = bool(fr.final_url) and not _is_http_url(fr.final_url)

    # -------- failure path --------
    if fr.error or fr.is_challenge or not fr.html or non_http_final:
        reason = (
            fr.error
            or (f"non_http_final_url:{fr.final_url}" if non_http_final else None)
            or ("ctct_challenge_unresolved" if fr.is_challenge else "empty_html")
        )
        if log_fetch_failures:
            with Session() as s:
                insert_crawl_log(
                    s,
                    site_pk=site_pk,
                    target_id=target_id,
                    article_url=fetch_url,
                    strategy=strategy,
                    http_status=fr.status,
                    duration_ms=fr.duration_ms,
                    success=False,
                    error_msg=reason,
                )
                s.commit()
        outcome = "challenge" if fr.is_challenge else "failed"
        record_fetch(
            site_id=site_code, column_id=target_code_norm, strategy=strategy,
            outcome=outcome, duration_ms=fr.duration_ms, http_status=fr.status,
        )
        return {
            "status": "failed",
            "reason": reason,
            "http_status": fr.status,
            "strategy": strategy,
            "duration_ms": fr.duration_ms,
        }

    canonical_url = fr.final_url or fetch_url
    final_url_hash = compute_url_hash(canonical_url)
    if final_url_hash != url_h:
        with Session() as s:
            existing_final = get_article_by_url_hash(s, final_url_hash)
            if existing_final:
                same_target = existing_final.target_id == target_id
                return {
                    "status": "skipped",
                    "reason": "duplicate_url_hash" if same_target else "duplicate_other_target",
                    "url_hash": final_url_hash,
                    "article_id": existing_final.id,
                }

    # -------- parse detail --------
    if fields is None:
        fields = parse_detail(fr.html, canonical_url, selectors)
    pub_dt = (
        (list_item.publish_time if list_item and list_item.publish_time else None)
        or _parse_publish_time(fields.publish_time_raw)
        or datetime.utcnow()
    )
    # strip tz for DB column (schema uses naive DateTime)
    if pub_dt.tzinfo is not None:
        pub_dt = pub_dt.replace(tzinfo=None)
    normalized_publish_date = fields.publish_date or pub_dt.date()
    detail_metadata = {"public_meta": fields.public_meta} if fields.public_meta else None

    article_key = _derive_article_key(canonical_url)
    raw_rel = write_raw_html(site_code, target_code_norm, pub_dt, article_key, fr.html)
    text_rel = write_article_text(
        site_code, target_code_norm, pub_dt, article_key, fields.content_text
    )

    settings = get_settings()
    attachment_throttle = HostThrottle(
        interval_s=min(float(target_interval_sec or 0.0), settings.attachment_throttle_cap_s),
        jitter_s=min(float(target_interval_jitter_sec or 0.0), settings.attachment_throttle_cap_s),
    )
    downloaded = []
    for au in fields.attachment_urls:
        attachment_url = _prefer_https_for_fetch(au)
        # Attachments are secondary. Space multiple attachment requests, but
        # cap the wait so a 120-180s detail throttle does not keep an already
        # fetched article out of the DB for minutes.
        try:
            attachment_throttle.wait(attachment_url)
        except Exception:
            pass
        try:
            d = download_attachment(
                attachment_url, site=site_code, column=target_code_norm,
                when=pub_dt, article_key=article_key,
            )
            downloaded.append(d)
        except Exception as e:
            log.warning("attachment download failed url=%s err=%s", attachment_url, e)

    # Ready when we have enough body text, OR a downloaded attachment, OR
    # an attachment URL was extracted (binary may have failed to fetch but
    # the article is still real — gd_gkmlpt has many "PDF-only" entries
    # whose detail page is just a download link), OR at least one inline
    # <img> in the article body — gov sites publish photo-only 通知
    # (e.g. 灭蚊宣传 4-image set) where 正文文字 is empty but the page is
    # a real article.
    # `len(content_html) >= 500` covers structured-table articles (qingxin
    # 救助公示 etc): legitimate posts whose entire body is a <table> of
    # records — extracted text is ~49 chars but content_html is 3000+ and
    # carries the real data. Without this, the pipeline marked them failed
    # and they got hidden from operators / RAG even though the structured
    # data was already in raw_html.
    status = (
        "ready"
        if (
            len(fields.content_text) >= MIN_CONTENT_TEXT_CHARS
            or downloaded
            or len(fields.attachment_urls) >= 1
            or fields.inline_image_count >= 1
            or len(fields.content_html or "") >= 500
        )
        else "failed"
    )

    # -------- persist --------
    with Session() as s:
        if list_item is not None:
            # Build a fully-populated CrawlItem with detail fields merged in,
            # then project via contract-driven insert.
            merged = list_item.model_copy(update={
                "url": canonical_url,
                "url_hash": final_url_hash,
                "title": list_item.title or fields.title or None,
                "publish_time": pub_dt,
                "source_raw": list_item.source_raw or fields.source or None,
                "publisher": list_item.publisher or fields.publisher or fields.source or None,
                "doc_no": list_item.doc_no or fields.doc_no or None,
                "index_no": list_item.index_no or fields.index_no or None,
                "publish_date": list_item.publish_date or normalized_publish_date,
                "effective_date": list_item.effective_date or fields.effective_date,
                "is_effective": (
                    list_item.is_effective
                    if list_item.is_effective is not None
                    else fields.is_effective
                ),
                "expiry_date": list_item.expiry_date or fields.expiry_date,
                "topic_words": list_item.topic_words or fields.topic_words or None,
                "open_category": list_item.open_category or fields.open_category or None,
                "metadata_json": (
                    {**(list_item.metadata_json or {}), **(detail_metadata or {})}
                    if (list_item.metadata_json or detail_metadata)
                    else None
                ),
                "content_text": fields.content_text or None,
                "raw_html_path": str(raw_rel),
                "text_path": str(text_rel),
                "has_attachment": bool(downloaded),
                "status": Status.READY if status == "ready" else Status.FAILED,
                "fetch_strategy": list_item.fetch_strategy or None,
                "target_id": target_id,
                "dept_id": dept_id_fk,
                # Inherit classification from the target unless the adapter
                # supplied its own (CrawlItem fields take precedence).
                "channel_name": list_item.channel_name or target_channel_name,
                "channel_path": list_item.channel_path or target_channel_path,
                "content_category": (
                    list_item.content_category
                    or fields.content_category
                    or target_content_category
                ),
                "content_subcategory": (
                    list_item.content_subcategory
                    or fields.content_subcategory
                    or target_content_subcategory
                ),
            })
            article = insert_article_from_contract(s, merged, site_pk=site_pk)
        else:
            article = insert_article(
                s,
                site_id=site_pk,
                target_id=target_id,
                dept_id=dept_id_fk,
                url=canonical_url,
                url_hash=final_url_hash,
                title=fields.title or None,
                publish_time=pub_dt,
                source_raw=fields.source or None,
                publisher=fields.publisher or fields.source or None,
                doc_no=fields.doc_no or None,
                index_no=fields.index_no or None,
                publish_date=normalized_publish_date,
                effective_date=fields.effective_date,
                is_effective=fields.is_effective,
                expiry_date=fields.expiry_date,
                topic_words=fields.topic_words or None,
                open_category=fields.open_category or None,
                metadata_json=detail_metadata,
                content_text=fields.content_text or None,
                raw_html_path=str(raw_rel),
                text_path=str(text_rel),
                has_attachment=bool(downloaded),
                status=status,
                fetch_strategy=strategy,
                # Carry the target's classification onto the article so RAG
                # / search filters can read these without joining
                # crawl_target. Yaml-path doesn't have a CrawlItem with
                # adapter-supplied values, so target is the only source.
                channel_name=target_channel_name,
                channel_path=target_channel_path,
                content_category=fields.content_category or target_content_category,
                content_subcategory=fields.content_subcategory or target_content_subcategory,
            )

        att_records = [
            {
                "file_name": d.file_name,
                "file_ext": d.file_ext,
                "size_bytes": d.size_bytes,
                "file_path": str(d.file_path),
                "file_hash": d.file_hash,
            }
            for d in downloaded
        ]
        upsert_article_standard_meta(s, article)
        if att_records:
            insert_attachments(s, article.id, att_records)

        error_msg = (
            None
            if status == "ready"
            else f"content_text_too_short:{len(fields.content_text)}"
        )
        insert_crawl_log(
            s,
            site_pk=site_pk,
            target_id=target_id,
            article_url=canonical_url,
            strategy=strategy,
            http_status=fr.status,
            duration_ms=fr.duration_ms,
            success=(status == "ready"),
            error_msg=error_msg,
        )
        s.commit()
        article_id = article.id

    record_fetch(
        site_id=site_code, column_id=target_code_norm, strategy=strategy,
        outcome=status, duration_ms=fr.duration_ms, http_status=fr.status,
    )
    return {
        "status": status,
        "article_id": article_id,
        "url_hash": final_url_hash,
        "title": fields.title,
        "used_fallback": fields.used_fallback,
        "fallback_engine": fields.fallback_engine,
        "raw_html_path": str(raw_rel),
        "text_path": str(text_rel),
        "attachments_downloaded": len(downloaded),
        "http_status": fr.status,
        "duration_ms": fr.duration_ms,
    }


# ---------------------------------------------------------------------------
# list-page drivers
# ---------------------------------------------------------------------------
@dataclass
class _ListEntry:
    url: str
    item: CrawlItem | None  # None for legacy CSS path
    # 1-based page index this URL came from. Used by the page-level
    # checkpoint feature: pipeline groups entries by page, marks each
    # page completed only after every entry in it is processed.
    page_num: int = 1


@dataclass(frozen=True)
class _CrawlOrder:
    mode: str = "ordered"
    batch_pages: int = 1


_RANDOM_BATCH_DEFAULTS = {
    "gd_wjk": {"mode": "random_batch", "batch_pages": 3},
    # gkmlpt API pages already contain many entries (often 100). With a
    # 180s detail/list interval, prefetching 3 pages delays the first article
    # by ~6 minutes. Randomize within the first page, then move to the next.
    "gd_gkmlpt": {"mode": "random_batch", "batch_pages": 1},
    # gov.cn policy library list pages are cheap but long. Process a few pages
    # at a time so a restart only loses a small in-memory URL batch, and page
    # checkpoints can resume without requiring a persisted pending-URL table.
    "gov_cn_zcwjk": {"mode": "paged_batch", "batch_pages": 3},
    "gov_cn_gjgzk": {"mode": "paged_batch", "batch_pages": 3},
}


def _crawl_order_for(rt: _ResolvedTarget) -> _CrawlOrder:
    raw: Any = None
    params = _adapter_params_for(rt)
    raw = params.get("crawl_order")

    site_cfg = get_site_config(rt.site.site_code)
    if raw is None and site_cfg is not None:
        col_cfg = site_cfg.get_column(rt.column_id)
        if col_cfg is not None:
            raw = getattr(col_cfg, "crawl_order", None)

    if raw is None:
        raw = _RANDOM_BATCH_DEFAULTS.get(rt.site.site_code)

    if raw is None:
        return _CrawlOrder()
    if hasattr(raw, "model_dump"):
        raw = raw.model_dump()
    if not isinstance(raw, dict):
        return _CrawlOrder()

    mode = str(raw.get("mode") or "ordered")
    try:
        batch_pages = int(raw.get("batch_pages") or 1)
    except Exception:
        batch_pages = 1
    return _CrawlOrder(mode=mode, batch_pages=max(1, batch_pages))


def _empty_list_result(url: str) -> FetchResult:
    return FetchResult(
        url=url, final_url=url, status=200, html="[]",
        fetched_at=_time.time(), duration_ms=0, is_challenge=False,
        strategy="synthetic",
    )


def _list_via_adapter(
    rt: _ResolvedTarget, *, interval_sec: float | None = None,
    interval_jitter_sec: float | None = None,
    max_items: int | None = None,
    stop_check: Callable[[], bool] | None = None,
    start_page: int = 1,
    page_limit: int | None = None,
) -> tuple[str, list[_ListEntry], FetchResult]:
    """Adapter path: walk pagination via ?page=N, projecting each page's
    CrawlItems and accumulating across pages.

    Stops when:
      • a page returns 0 articles                  (end of column)
      • a page returns only URLs we've already seen (CMS pagination loop)
      • a page fetch fails after page 1            (transient — page 1 hard fails)
      • hard_max_pages is reached                   (safety cap, defaults 50)

    The main loop's stop_on_duplicate handles the historical-boundary case:
    gkmlpt list responses are publish-time-descending, so once we accumulate
    enough URLs to reach known history, fetch_and_store hits dedup and the
    main loop exits without us needing to know how deep we've gone.

    `hard_max_pages` is sourced from crawl_site.adapter_params_json — the
    cms yaml `hard_max_pages: 50` shipped with gkmlpt is just a doc-level
    suggestion; if a site row carries an explicit value it wins.
    """
    adapter = get_adapter(rt.site.cms_adapter)
    params = _adapter_params_for(rt)
    hard_max = int(params.get("hard_max_pages") or 50)
    # Adapters that need POST / session warmup / non-trivial fetch can
    # expose `fetch_list_page(rt, *, page_num, params, page_size, interval_sec)`
    # returning (list_url, list_of_CrawlItem, FetchResult). When present we
    # delegate the entire fetch+parse to the adapter and skip the standard
    # GET-and-json.loads path. gkmlpt-style adapters leave it unset.
    custom_fetch = getattr(adapter, "fetch_list_page", None)

    seen_urls: set[str] = set()
    aggregated: list[_ListEntry] = []
    first_url: str | None = None
    first_fr: FetchResult | None = None

    list_throttle = max(0.0, float(interval_sec)) if interval_sec else 0.0
    start_page = max(1, int(start_page or 1))
    end_page = hard_max
    if page_limit is not None:
        end_page = min(hard_max, start_page + max(1, int(page_limit)) - 1)
    if start_page > hard_max:
        return rt.target.entry_url or rt.site.base_url or "", [], _empty_list_result(
            rt.target.entry_url or rt.site.base_url or "",
        )

    for page_num in range(start_page, end_page + 1):
        # Polite sleep between pages — without this we previously fired ~8
        # list-API requests within 4s on gd.gov.cn, which tripped the
        # prefecture-level WAF and blocked all subsequent article fetches
        # with ERR_CONNECTION_RESET. Sleep BEFORE pages 2..N (page 1 is
        # the first hit, no prior page to space from).
        if page_num > start_page and list_throttle > 0:
            _time.sleep(_interval_with_jitter(list_throttle, interval_jitter_sec))
        # Cancel poll between list pages — so a /cancel against a long
        # paginated walk (gd_wjk index_2..index_5 with interval_sec=60
        # = ~5 min) actually halts at the next page boundary instead of
        # waiting out the entire walk.
        if stop_check is not None and page_num > start_page and stop_check():
            log.info("_list_via_adapter cancelled by operator at page %d", page_num)
            break

        if custom_fetch is not None:
            try:
                list_url, items, fr = custom_fetch(
                    rt, page_num=page_num, params=params,
                    interval_sec=interval_sec,
                )
            except Exception as e:
                log.warning("adapter custom fetch failed page=%d err=%s", page_num, e)
                if page_num == start_page:
                    list_url = rt.target.entry_url or rt.site.base_url or ""
                    return list_url, [], FetchResult(
                        url=list_url,
                        final_url=list_url,
                        status=0,
                        html="",
                        fetched_at=_time.time(),
                        duration_ms=0,
                        is_challenge=False,
                        error=f"{type(e).__name__}: {e}",
                        strategy="exception",
                    )
                break
            if page_num == start_page:
                first_url = list_url
                first_fr = fr
                if (fr is None) or fr.error or (not items and not fr.html):
                    return list_url, [], fr
            elif fr is not None and (fr.error or (not items and not fr.html)):
                log.info("adapter pagination stop: page %d fetch failed url=%s",
                         page_num, list_url)
                break
            if not items:
                log.info("adapter pagination stop: page %d returned 0 items url=%s",
                         page_num, list_url)
                break
            new_in_page = 0
            for it in items:
                if it.url in seen_urls:
                    continue
                seen_urls.add(it.url)
                aggregated.append(_ListEntry(url=it.url, item=it, page_num=page_num))
                new_in_page += 1
            if new_in_page == 0:
                log.info("adapter pagination stop: page %d had no new urls", page_num)
                break
            if max_items is not None and len(aggregated) >= max_items:
                log.info("adapter pagination stop: reached max_items=%d at page %d",
                         max_items, page_num)
                break
            continue

        list_url = adapter.build_list_url(
            base_url=rt.site.base_url or "",
            dept_path=rt.dept_path or "",
            column_id=rt.column_id,
            page=page_num,
            sid=params.get("sid"),
            path_tpl=params.get("list_api_path_tpl"),
        )
        if page_num == start_page:
            first_url = list_url

        fr = fetch_html(list_url)
        if page_num == start_page:
            first_fr = fr
            if fr.error or not fr.html:
                return list_url, [], fr
        elif fr.error or not fr.html:
            log.info("adapter pagination stop: page %d fetch failed url=%s err=%s",
                     page_num, list_url, fr.error or "empty")
            break

        # When fetch_html falls through to playwright (ctct-shielded sites
        # like gdqy.gov.cn), the browser auto-renders any JSON-content URL
        # as '<html><head>…</head><body><pre>{…json…}</pre></body></html>'
        # — json.loads on that fails. Unwrap the <pre> body before parsing.
        body = fr.html or ""
        body_stripped = body.strip()
        if body_stripped.startswith("<"):
            import re as _re
            m = _re.search(r"<pre[^>]*>(.*?)</pre>", body, _re.S)
            if m:
                body = m.group(1).strip()
        try:
            payload = json.loads(body)
        except Exception as e:
            log.warning("adapter list JSON parse failed page=%d url=%s err=%s",
                        page_num, list_url, e)
            break

        items = adapter.parse_list_response(
            payload,
            site_id=rt.site.site_code,
            target_id=rt.target.id,
            dept_id=rt.target.dept_id,
        )
        if not items:
            log.info("adapter pagination stop: page %d returned 0 items url=%s",
                     page_num, list_url)
            break

        new_in_page = 0
        for it in items:
            if it.url in seen_urls:
                continue
            seen_urls.add(it.url)
            aggregated.append(_ListEntry(url=it.url, item=it, page_num=page_num))
            new_in_page += 1
        if new_in_page == 0:
            log.info("adapter pagination stop: page %d had no new urls (cms loop)",
                     page_num)
            break
        if max_items is not None and len(aggregated) >= max_items:
            log.info("adapter pagination stop: reached max_items=%d at page %d (cms loop)",
                     max_items, page_num)
            break

    assert first_fr is not None and first_url is not None
    return first_url, aggregated, first_fr


def _paginated_yaml_urls(list_url: str, pag) -> list[tuple[int, str]]:
    """Build the ordered list of list-page URLs to fetch in a single pass,
    based on the column's pagination config. Page 1 is always list_url
    itself; pages [start..start+max_pages-1] are derived per pagination
    type. max_pages caps total pages (including page 1)."""
    urls = [(1, list_url)]
    if pag is None or pag.type == "none" or pag.max_pages <= 1:
        return urls

    from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
    if pag.type == "page_param":
        for n in range(pag.start, pag.start + pag.max_pages - 1):
            u = urlparse(list_url)
            q = dict(parse_qsl(u.query))
            q[pag.param] = str(n)
            urls.append((n, urlunparse(u._replace(query=urlencode(q)))))
        return urls

    if pag.type == "path_pattern":
        if not pag.pattern or "{page}" not in pag.pattern:
            log.warning("path_pattern pagination missing valid pattern for url=%s", list_url)
            return urls
        u = urlparse(list_url)
        # Replace the last path segment (e.g. 'index.html' → 'index_2.html').
        # This matches every 清远 NF-CMS site's `/foo/index_N.html` style.
        head, _, _last = u.path.rpartition("/")
        for n in range(pag.start, pag.start + pag.max_pages - 1):
            new_seg = pag.pattern.replace("{page}", str(n))
            new_path = f"{head}/{new_seg}" if head else f"/{new_seg}"
            urls.append((n, urlunparse(u._replace(path=new_path))))
        return urls

    log.warning("unknown pagination type %r — falling back to first page only", pag.type)
    return urls


def _list_via_yaml(
    rt: _ResolvedTarget, *, interval_sec: float | None = None,
    interval_jitter_sec: float | None = None,
    max_items: int | None = None,
    stop_check: Callable[[], bool] | None = None,
    start_page: int = 1,
    page_limit: int | None = None,
) -> tuple[str, list[_ListEntry], FetchResult]:
    """Legacy CSS scrape path — reads list selectors from config/sites/*.yaml.

    Walks pagination according to col_cfg.pagination. Returns the canonical
    page-1 URL as `list_url` (used for robots / public-path checks) and the
    accumulated entries from every page that returned content. Stops early
    when a page returns 0 rows (avoids hammering 5 empty pages on small
    columns) but keeps going when a page just contains all-duplicate URLs —
    the main loop's stop_on_duplicate handles that case correctly.
    """
    site_cfg = get_site_config(rt.site.site_code)
    if site_cfg is None:
        raise ValueError(
            f"yaml_path site {rt.site.site_code!r} has no config/sites YAML "
            "— legacy CSS path requires it"
        )
    col_cfg = site_cfg.get_column(rt.column_id)
    if col_cfg is None or col_cfg.list_selector is None:
        raise ValueError(
            f"yaml config missing list_selector for {rt.site.site_code}/{rt.column_id}"
        )
    list_url = rt.target.entry_url or col_cfg.list_url
    selectors = {
        "row": col_cfg.list_selector.row,
        "href": col_cfg.list_selector.href,
        "title": col_cfg.list_selector.title,
        "date": col_cfg.list_selector.date or "span::text",
    }

    start_page = max(1, int(start_page or 1))
    page_urls = [
        (n, u) for n, u in _paginated_yaml_urls(list_url, col_cfg.pagination)
        if n >= start_page
    ]
    if page_limit is not None:
        page_urls = page_urls[: max(1, int(page_limit))]
    if not page_urls:
        return list_url, [], _empty_list_result(list_url)
    aggregated: list[_ListEntry] = []
    seen_urls: set[str] = set()
    first_fr: FetchResult | None = None
    list_throttle = max(0.0, float(interval_sec)) if interval_sec else 0.0
    for i, (page_num, page_url) in enumerate(page_urls):
        # Same anti-WAF spacing as the adapter path. Without this the
        # 30-page pagination on a single yaml site fires 30 list requests
        # in <10s and trips IP-level rate limits on chatty hosts.
        if i > 0 and list_throttle > 0:
            _time.sleep(_interval_with_jitter(list_throttle, interval_jitter_sec))
        # Cancel poll between yaml list pages — same purpose as in
        # _list_via_adapter: long paginated walks (qbwj index_2..index_5
        # at 60s/page) need to halt at page boundaries instead of waiting
        # the whole walk out.
        if stop_check is not None and i > 0 and stop_check():
            log.info("_list_via_yaml cancelled by operator at page %d", page_num)
            break
        fr = fetch_html(page_url)
        if i == 0:
            first_fr = fr  # page 1's FetchResult is the canonical one
            if fr.error or not fr.html:
                return list_url, [], fr  # bail early — page 1 broken
        elif fr.error or not fr.html:
            log.info("pagination stop: page %d fetch failed url=%s err=%s",
                     page_num, page_url, fr.error or "empty")
            break
        items = parse_list(fr.html, page_url, selectors)
        if not items:
            log.info("pagination stop: page %d returned 0 rows url=%s", page_num, page_url)
            break
        new_in_page = 0
        for it in items:
            if it.url in seen_urls:
                continue
            seen_urls.add(it.url)
            aggregated.append(_ListEntry(url=it.url, item=None, page_num=page_num))
            new_in_page += 1
        # If a deeper page repeated only known URLs from earlier pages, the
        # column's listing is shorter than max_pages → no point fetching more.
        if new_in_page == 0:
            log.info("pagination stop: page %d had no new urls url=%s", page_num, page_url)
            break
        if max_items is not None and len(aggregated) >= max_items:
            log.info("pagination stop: reached max_items=%d at page %d url=%s",
                     max_items, page_num, page_url)
            break

    assert first_fr is not None
    return list_url, aggregated, first_fr


def crawl_target(
    target_code: str,
    *,
    max_items: int | None = None,
    stop_on_duplicate: bool = True,
    throttle: HostThrottle | None = None,
    force_disabled: bool = False,
    stop_check: "Callable[[], bool] | None" = None,
    job_id: str | None = None,
    resume_from_page: int = 0,
    progress_callback: "Callable[[int], None] | None" = None,
) -> dict:
    """Fetch list page, iterate article URLs, call fetch_and_store per item.

    `stop_on_duplicate=True` implements INC-03 early-stop: the list is
    publish-time ordered, so the first already-seen URL ends the pass.

    `stop_check` is a callable invoked at safe break points (between
    list pages, between articles). Returning True triggers a graceful
    shutdown — no further fetches, return {'status': 'cancelled', ...}
    with what we managed to ingest so far. Used by the task_queue
    worker to honor operator cancel without waiting for natural
    completion (was previously cosmetic — flag was set but pipeline
    didn't poll it).

    DEFENSE-IN-DEPTH enabled check: refuse to run when the site or target
    is disabled in DB. We learned this gap the hard way — programmatic
    callers (a python REPL, a misbehaving test, a buggy task_queue path)
    bypass the api's /run enabled gate AND the scheduler's pre-dispatch
    gate, but they still hit `crawl_target()`. The only place that
    catches everything is here. Pass `force_disabled=True` to override
    (used by the v2.1.3 待修选择器 retry flow where ops explicitly want
    to re-test a disabled target).
    """
    # Default to a no-op stop_check so the rest of the function can call
    # it unconditionally without `if stop_check is not None:` everywhere.
    if stop_check is None:
        stop_check = lambda: False  # noqa: E731
    Session = get_sessionmaker()
    with Session() as s:
        rt = _resolve_target(s, target_code)
        # Hard refuse if the site/target was disabled — even programmatic
        # callers must respect this. Lets the operator's "disable in UI"
        # action stop ALL fetch paths, not just the HTTP-triggered ones.
        if not force_disabled:
            if not rt.target.enabled:
                log.info("crawl_target refused: target=%s disabled", target_code)
                return {
                    "status": "skipped_disabled",
                    "reason": "target_disabled",
                    "target_code": target_code,
                    "items_seen": 0, "items_new": 0,
                }
            if not rt.site.enabled:
                log.info("crawl_target refused: site=%s disabled", rt.site.site_code)
                return {
                    "status": "skipped_disabled",
                    "reason": "site_disabled",
                    "target_code": target_code,
                    "site_code": rt.site.site_code,
                    "items_seen": 0, "items_new": 0,
                }
        # snapshot what we need after session closes
        site_pk = rt.site.id
        target_id = rt.target.id
        site_code = rt.site.site_code
        target_code_norm = rt.target.target_code
        cms_adapter = rt.site.cms_adapter
        respect_robots = rt.site.respect_robots
        interval_sec = rt.target.interval_sec
        interval_jitter_sec = rt.target.interval_jitter_sec
        track_checkpoint = bool(getattr(rt.target, "track_checkpoint", False))
        crawl_order = _crawl_order_for(rt)

    # Helper: write current progress to crawl_job row (best-effort).
    # Only fires when the target opted in AND we have a job_id to attach
    # the checkpoint to. DB error here must NOT kill the running crawl —
    # log + continue.
    current_page_saved = -1

    def _save_checkpoint(page_num: int) -> None:
        if not (track_checkpoint and job_id):
            return
        try:
            from govcrawler.models import CrawlJob
            with Session() as s:
                cj = s.get(CrawlJob, job_id)
                if cj is not None and page_num > (cj.last_completed_page or 0):
                    cj.last_completed_page = page_num
                    s.commit()
        except Exception:
            log.exception("checkpoint update failed job=%s page=%d", job_id, page_num)

    def _save_current_page(page_num: int) -> None:
        nonlocal current_page_saved
        page_num = max(0, int(page_num or 0))
        if page_num == current_page_saved:
            return
        current_page_saved = page_num
        if progress_callback is not None:
            try:
                progress_callback(page_num)
            except Exception:
                log.exception("progress callback failed job=%s page=%d", job_id, page_num)
        if not job_id:
            return
        try:
            from govcrawler.models import CrawlJob
            with Session() as s:
                cj = s.get(CrawlJob, job_id)
                if cj is not None and (cj.current_page or 0) != page_num:
                    cj.current_page = page_num
                    s.commit()
        except Exception:
            log.exception("current page update failed job=%s page=%d", job_id, page_num)

    ua = get_settings().user_agent

    # target.interval_sec (nullable) → adapter DEFAULT_INTERVAL_SEC (optional) →
    # HostThrottle default. Without this ladder, fresh crawl_target rows (no
    # per-target override) rate-limited themselves on chatty CMS fleets because
    # HostThrottle got `None` and silently fell back to 5s only after the first
    # hit — the per-adapter hint lets ops seed sensible values.
    if interval_sec is None and cms_adapter:
        try:
            adapter_mod = get_adapter(cms_adapter)
        except KeyError:
            adapter_mod = None
        if adapter_mod is not None:
            interval_sec = getattr(adapter_mod, "DEFAULT_INTERVAL_SEC", None)

    # ----- list page -----
    # Pass max_items down so list enumeration breaks early once enough URLs
    # are aggregated. Without this, sites with hard_max_pages=100 +
    # interval_sec=10 (e.g. gov_cn_xxgk) walked all 100 pages — ~18 min —
    # before the per-article loop even started, even when caller asked for
    # max_items=2. The cap is a soft hint (we may overshoot by one page's
    # worth) but bounds total list-walk time.
    random_batch = crawl_order.mode == "random_batch"
    paged_batch = crawl_order.mode == "paged_batch"
    batch_mode = random_batch or paged_batch
    start_page = resume_from_page + 1 if track_checkpoint and resume_from_page > 0 else 1

    def _load_entries(page: int, page_limit: int | None = None):
        _save_current_page(page)
        if cms_adapter:
            return _list_via_adapter(
                rt, interval_sec=interval_sec, max_items=max_items,
                interval_jitter_sec=interval_jitter_sec,
                stop_check=stop_check, start_page=page, page_limit=page_limit,
            )
        return _list_via_yaml(
            rt, interval_sec=interval_sec, max_items=max_items,
            interval_jitter_sec=interval_jitter_sec,
            stop_check=stop_check, start_page=page, page_limit=page_limit,
        )

    list_url, entries, list_fr = _load_entries(
        start_page, crawl_order.batch_pages if batch_mode else None,
    )

    # COMP-03: reject non-public list URLs
    if not is_public_path(list_url):
        raise ValueError(f"refuse to crawl non-public list path: {list_url!r}")

    # COMP-02: robots.txt gate
    if respect_robots and not is_allowed(list_url, ua):
        log.warning("robots.txt disallows list_url=%s ua=%s — skipping", list_url, ua)
        return {
            "status": "robots_blocked",
            "target_code": target_code_norm,
            "list_url": list_url,
            "items_seen": 0,
            "items_new": 0,
        }

    if list_fr.error or not list_fr.html:
        return {
            "status": "list_failed",
            "reason": list_fr.error or "empty_html",
            "http_status": list_fr.status,
            "items_seen": 0,
            "items_new": 0,
        }

    throttle = throttle or HostThrottle(
        interval_s=interval_sec,
        jitter_s=interval_jitter_sec or 0,
    )
    log.info(
        "crawl_target start target=%s (adapter=%s) url=%s items=%d",
        target_code_norm, cms_adapter or "yaml", list_url, len(entries),
    )

    new_count = 0
    skipped_count = 0
    failed_count = 0
    consecutive_blocks = 0
    aborted = False
    results: list[dict] = []
    total_list_entries = len(entries)

    def _prefetch_existing_articles(batch_entries: list[_ListEntry]) -> dict[str, dict[str, Any]]:
        """Bulk URL-hash dedup for list entries before any detail fetch.

        fetch_and_store still keeps its own pre-fetch check as a single-item
        safety net, and its post-fetch final_url check handles redirect
        canonicalization. This batch precheck is the cheap path: compare the
        list URL set against Article.url_hash once, then skip known URLs
        without entering retry/fetch/parsing at all.
        """
        url_hash_by_url: dict[str, str] = {}
        for entry in batch_entries:
            if entry.url in url_hash_by_url:
                continue
            try:
                url_hash_by_url[entry.url] = compute_url_hash(entry.url)
            except ValueError:
                continue
        if not url_hash_by_url:
            return {}
        hash_to_urls: dict[str, list[str]] = {}
        for url, url_hash in url_hash_by_url.items():
            hash_to_urls.setdefault(url_hash, []).append(url)
        with Session() as s:
            rows = (
                s.query(Article.url_hash, Article.id, Article.target_id)
                .filter(Article.url_hash.in_(list(hash_to_urls.keys())))
                .all()
            )
        out: dict[str, dict[str, Any]] = {}
        for url_hash, article_id, article_target_id in rows:
            for url in hash_to_urls.get(url_hash, []):
                out[url] = {
                    "url_hash": url_hash,
                    "article_id": article_id,
                    "target_id": article_target_id,
                }
        return out

    def _run_one(entry: _ListEntry) -> dict:
        """fetch_and_store with exp-backoff retry on transient failures.

        When fetch_and_store raises (e.g. network stack TypeError, httpx
        ConnectError outside the adapter's own try/except), it never gets the
        chance to insert_crawl_log — so operators hitting the admin UI see
        crawl_log silent. We mirror the failure into crawl_log here so
        retry-exhausted exceptions show up next to normal failed rows.
        """
        last: dict = {"status": "failed", "reason": "no_attempt"}
        last_came_from_exception = False
        for attempt in range(MAX_RETRIES + 1):
            suppress_fetch_failure_log = attempt < MAX_RETRIES
            try:
                r = fetch_and_store(
                    target_code=target_code_norm,
                    url=entry.url,
                    list_item=entry.item,
                    throttle=throttle,
                    log_fetch_failures=not suppress_fetch_failure_log,
                )
            except Exception as e:
                last = {"status": "failed", "reason": f"{type(e).__name__}: {e}"}
                last_came_from_exception = True
                if attempt < MAX_RETRIES:
                    _time.sleep(BACKOFF_BASE_S * (2**attempt))
                    continue
                _log_run_one_exception(entry.url, last["reason"])
                return last

            last = r
            last_came_from_exception = False
            status = r.get("status")
            http_status = r.get("http_status") or 0
            reason = str(r.get("reason") or "")

            if status in ("ready", "skipped"):
                return r
            if reason.startswith("host_cooldown:"):
                if suppress_fetch_failure_log:
                    _log_run_one_failure(entry.url, r)
                return r
            if http_status in BLOCK_STATUSES:
                if suppress_fetch_failure_log:
                    _log_run_one_failure(entry.url, r)
                return r
            # Only retry transient network/gateway failures. A logical failure
            # at HTTP 200 (e.g. content_text_too_short) won't change on retry —
            # worse, fetch_and_store already inserted the failed Article on
            # attempt 0, so attempt 1 short-circuits via the url_hash dedup
            # check and returns status='skipped'. The main loop then treats
            # that as an early-stop duplicate and aborts the whole pass after
            # the first failed article. Restrict retry to http_status==0
            # (network error / exception path) and 5xx.
            if attempt < MAX_RETRIES and (
                http_status == 0 or http_status in RETRIABLE_STATUSES
            ):
                _time.sleep(BACKOFF_BASE_S * (2**attempt))
                continue
            if suppress_fetch_failure_log and status == "failed":
                _log_run_one_failure(entry.url, r)
            return r
        if last_came_from_exception:
            _log_run_one_exception(entry.url, last.get("reason", "unknown"))
        return last

    def _log_run_one_failure(url: str, r: dict) -> None:
        """Persist a failure when an intermediate attempt suppressed
        fetch_and_store's own crawl_log write but the result turned out to be
        non-retriable. Retriable failures that later succeed stay out of the
        operator-facing failed log."""
        try:
            with Session() as s:
                insert_crawl_log(
                    s,
                    site_pk=site_pk,
                    target_id=target_id,
                    article_url=url,
                    strategy=str(r.get("strategy") or "httpx"),
                    http_status=int(r.get("http_status") or 0),
                    duration_ms=int(r.get("duration_ms") or 0),
                    success=False,
                    error_msg=str(r.get("reason") or "failed")[:500],
                )
                s.commit()
        except Exception:
            log.exception("failed to persist crawl_log for failed url=%s", url)

    def _log_run_one_exception(url: str, reason: str) -> None:
        """Persist a crawl_log row for exception-path failures. Best-effort —
        we swallow DB errors so a log-write regression can't mask the real
        crawl error from callers."""
        try:
            with Session() as s:
                insert_crawl_log(
                    s,
                    site_pk=site_pk,
                    target_id=target_id,
                    article_url=url,
                    strategy="exception",
                    http_status=0,
                    duration_ms=0,
                    success=False,
                    error_msg=reason[:500],  # crawl_log.error_msg is TEXT but keep sane
                )
                s.commit()
        except Exception:
            log.exception("failed to persist crawl_log for exception url=%s", url)

    cancelled = False
    last_seen_page = resume_from_page

    def _process_entries(
        batch_entries: list[_ListEntry],
        *,
        ordered_checkpoint: bool,
        duplicate_stop_mode: str,
    ) -> str:
        nonlocal new_count, skipped_count, failed_count, consecutive_blocks
        nonlocal aborted, cancelled, last_seen_page
        existing_by_url = _prefetch_existing_articles(batch_entries)
        processed_count = 0
        duplicate_count = 0
        for i, entry in enumerate(batch_entries):
            _save_current_page(entry.page_num)
            if max_items is not None and len(results) >= max_items:
                return "max_items"

            if stop_check():
                log.info("crawl_target cancelled by operator at item %d/%d url=%s",
                         i, len(batch_entries), entry.url)
                cancelled = True
                return "cancelled"

            if ordered_checkpoint and entry.page_num > last_seen_page:
                if last_seen_page > 0:
                    _save_checkpoint(last_seen_page)
                last_seen_page = entry.page_num

            # SSRF + path guard. is_safe_to_fetch layers DNS / private-IP /
            # allowlist checks on top of the path blacklist. Without this, a
            # crafted yaml or admin row could trick us into fetching internal
            # services on the docker network or cloud metadata endpoints.
            if not is_safe_to_fetch(entry.url):
                results.append({"url": entry.url, "status": "skipped", "reason": "non_public_path"})
                skipped_count += 1
                continue
            if respect_robots and not is_allowed(entry.url, ua):
                results.append({"url": entry.url, "status": "skipped", "reason": "robots_blocked"})
                skipped_count += 1
                continue

            existing = existing_by_url.get(entry.url)
            if existing:
                processed_count += 1
                skipped_count += 1
                same_target = existing["target_id"] == target_id
                reason = "duplicate_url_hash" if same_target else "duplicate_other_target"
                results.append({"url": entry.url, "status": "skipped"})
                if stop_on_duplicate and reason == "duplicate_url_hash":
                    duplicate_count += 1
                    if duplicate_stop_mode == "single":
                        log.info("early stop: duplicate url_hash at url=%s", entry.url)
                        return "duplicate"
                continue

            r = _run_one(entry)
            processed_count += 1
            results.append({"url": entry.url, "status": r.get("status")})

            http_status = r.get("http_status") or 0
            if http_status in BLOCK_STATUSES:
                consecutive_blocks += 1
            else:
                consecutive_blocks = 0

            if r.get("status") == "skipped":
                skipped_count += 1
                if stop_on_duplicate and r.get("reason") == "duplicate_url_hash":
                    duplicate_count += 1
                    if duplicate_stop_mode == "single":
                        log.info("early stop: duplicate url_hash at url=%s", entry.url)
                        return "duplicate"
            elif r.get("status") == "ready":
                new_count += 1
            else:
                failed_count += 1

            if consecutive_blocks >= ABORT_AFTER_N_BLOCKS:
                log.warning("abort pass: %d consecutive block responses (FETCH-05)", consecutive_blocks)
                aborted = True
                return "aborted"
        if (
            stop_on_duplicate
            and duplicate_stop_mode == "batch"
            and processed_count > 0
            and duplicate_count == processed_count
        ):
            log.info("early stop: random batch is all duplicate url_hash entries")
            return "duplicate"
        return "done"

    if batch_mode:
        batch_entries = entries
        page = start_page
        while batch_entries:
            if random_batch:
                random.shuffle(batch_entries)
            status = _process_entries(
                batch_entries,
                ordered_checkpoint=False,
                duplicate_stop_mode="batch" if random_batch else "single",
            )
            if status in ("cancelled", "aborted", "duplicate", "max_items"):
                break
            max_page = max(e.page_num for e in batch_entries)
            last_seen_page = max(last_seen_page, max_page)
            _save_checkpoint(last_seen_page)
            if len({e.page_num for e in batch_entries}) < crawl_order.batch_pages:
                break
            page = max_page + 1
            _list_url, batch_entries, _list_fr = _load_entries(page, crawl_order.batch_pages)
            total_list_entries += len(batch_entries)
            if _list_fr.error or not _list_fr.html:
                break
    else:
        _process_entries(
            entries,
            ordered_checkpoint=True,
            duplicate_stop_mode="single",
        )
        if not cancelled and last_seen_page > 0:
            _save_checkpoint(last_seen_page)

    # update target.last_crawled_at
    with Session() as s:
        t = targets_repo.get_by_code(s, target_code_norm)
        if t is not None:
            t.last_crawled_at = datetime.utcnow()
            s.commit()

    if cancelled:
        final_status = "cancelled"
    elif aborted:
        final_status = "aborted"
    else:
        final_status = "ok"
    return {
        "status": final_status,
        "target_code": target_code_norm,
        "site_code": site_code,
        "items_seen": total_list_entries,
        "items_new": new_count,
        "items_skipped": skipped_count,
        "items_failed": failed_count,
        "consecutive_blocks": consecutive_blocks,
        "results": results,
    }
