"""End-to-end crawl pipeline — v2 schema aligned.

Entry points:
  * `crawl_target(target_code)`   — list page → iterate → fetch_and_store per item
  * `fetch_and_store(*, target_code, url)` — single-article fetch → parse → persist

Dispatch model (§7.5 of 2.0 design doc):
  * `crawl_site.cms_adapter` set (e.g. "gkmlpt")  → adapter-driven JSON list API
  * `crawl_site.yaml_path` set                    → legacy CSS list scraping

Both paths converge on the same per-article loop. Retry / block / robots logic
stays identical to 1.0.

Compared to 1.0:
  * Addressed by `target_code` (globally unique) instead of (site_id, column_id)
  * Article insert goes through `insert_article_from_contract` when a CrawlItem
    is available from the adapter; falls back to `insert_article(**kwargs)` for
    legacy yaml_path sites
  * `content_simhash` removed — §5.5 excludes it; RAG owns content de-dup
  * `insert_crawl_log` now keyed by `(site_pk:int, target_id:int)`
"""
from __future__ import annotations

import json
import logging
import re
import time as _time
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from sqlalchemy.orm import Session

from govcrawler.adapters import get_adapter
from govcrawler.adapters.contract import CrawlItem, Status
from govcrawler.compliance import is_allowed, is_public_path
from govcrawler.config.registry import get_detail_selectors, get_site_config
from govcrawler.db import get_sessionmaker
from govcrawler.fetcher.browser import FetchResult
from govcrawler.fetcher.chain import fetch_html
from govcrawler.fetcher.throttle import HostThrottle
from govcrawler.models import CrawlSite, CrawlTarget, SiteDepartment
from govcrawler.observability import record_fetch
from govcrawler.parser.detail_parser import parse_detail
from govcrawler.parser.list_parser import ListItem, parse_list
from govcrawler.repositories import sites as sites_repo
from govcrawler.repositories import targets as targets_repo
from govcrawler.settings import get_settings
from govcrawler.storage.attachments import download_attachment
from govcrawler.storage.files import write_article_text, write_raw_html
from govcrawler.storage.repo import (
    get_article_by_url_hash,
    insert_article,
    insert_article_from_contract,
    insert_attachments,
    insert_crawl_log,
)
from govcrawler.utils.url_norm import url_hash as compute_url_hash

log = logging.getLogger(__name__)

MIN_CONTENT_TEXT_CHARS = 50

# Retry policy (FETCH-05)
MAX_RETRIES = 2
BACKOFF_BASE_S = 2.0
ABORT_AFTER_N_BLOCKS = 3
BLOCK_STATUSES = (403, 412, 429)
RETRIABLE_STATUSES = (500, 502, 503, 504)


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------
def _parse_publish_time(raw: str) -> datetime | None:
    if not raw:
        return None
    s = raw.strip().replace("/", "-").replace("年", "-").replace("月", "-").replace("日", " ")
    s = re.sub(r"\s+", " ", s).strip()
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
        try:
            return datetime.strptime(s[: len(fmt) + 4], fmt)
        except Exception:
            continue
    return None


def _derive_article_key(url: str) -> str:
    """Last path segment, ext stripped — used as filename stem.

    e.g. `/a/b/post_2136593.html` → `post_2136593`.
    Falls back to url_hash[:16] if no usable stem.
    """
    tail = url.rstrip("/").rsplit("/", 1)[-1].split("?", 1)[0].split("#", 1)[0]
    stem = tail.rsplit(".", 1)[0] if "." in tail else tail
    return stem or compute_url_hash(url)[:16]


def _decompose_target_code(site_code: str, target_code: str) -> tuple[str | None, str]:
    """Split `<site>__<dept>__<col>` or `<site>__<col>` → (dept_path, column_id).

    Mirror of `config.sync._target_code`. Returns (None, column_id) when the
    target is site-level (no dept_path).
    """
    prefix = f"{site_code}__"
    if not target_code.startswith(prefix):
        raise ValueError(f"target_code {target_code!r} doesn't start with {prefix!r}")
    tail = target_code[len(prefix):]
    parts = tail.split("__", 1)
    if len(parts) == 1:
        return None, parts[0]
    return parts[0], parts[1]


def _resolve_detail_selectors(
    site: CrawlSite, target: CrawlTarget
) -> dict[str, Any] | None:
    """Pick detail selectors: target override → adapter default → legacy YAML."""
    # 1) per-target override
    override = (target.parser_override_json or {}).get("detail") if target.parser_override_json else None
    if override:
        return override
    # 2) adapter default (e.g. gkmlpt DEFAULT_DETAIL_SELECTORS)
    if site.cms_adapter:
        try:
            adapter = get_adapter(site.cms_adapter)
        except KeyError:
            adapter = None
        if adapter is not None:
            default = getattr(adapter, "DEFAULT_DETAIL_SELECTORS", None)
            if default:
                return default
    # 3) legacy YAML registry (1.0 compat for yaml_path sites)
    _, column_id = _decompose_target_code(site.site_code, target.target_code)
    return get_detail_selectors(site.site_code, column_id)


@dataclass
class _ResolvedTarget:
    site: CrawlSite
    target: CrawlTarget
    dept: SiteDepartment | None
    dept_path: str | None
    column_id: str


def _resolve_target(session: Session, target_code: str) -> _ResolvedTarget:
    target = targets_repo.get_by_code(session, target_code)
    if target is None:
        raise ValueError(f"no crawl_target with target_code={target_code!r}")
    site = sites_repo.get_by_id(session, target.site_id)
    if site is None:
        raise ValueError(f"crawl_target {target_code!r} points at missing site_id={target.site_id}")
    dept = None
    if target.site_department_id is not None:
        dept = session.get(SiteDepartment, target.site_department_id)
    dept_path, column_id = _decompose_target_code(site.site_code, target.target_code)
    return _ResolvedTarget(
        site=site, target=target, dept=dept,
        dept_path=dept_path, column_id=column_id,
    )


# ---------------------------------------------------------------------------
# single-article fetch + store
# ---------------------------------------------------------------------------
def fetch_and_store(
    *,
    target_code: str,
    url: str,
    list_item: CrawlItem | None = None,
    throttle: HostThrottle | None = None,
) -> dict:
    """End-to-end: fetch → parse → write files → download attachments → insert rows.

    `list_item` may be a `CrawlItem` produced by an adapter's `parse_list_response`
    — when present we thread its metadata (title, publish_time, source_raw,
    channel_name, doc_no …) into the Article row via `insert_article_from_contract`.
    """
    throttle = throttle or HostThrottle()
    url_h = compute_url_hash(url)
    Session = get_sessionmaker()

    # dedup + resolve
    with Session() as s:
        existing = get_article_by_url_hash(s, url_h)
        if existing:
            return {
                "status": "skipped",
                "reason": "duplicate_url_hash",
                "url_hash": url_h,
                "article_id": existing.id,
            }
        rt = _resolve_target(s, target_code)
        site_pk = rt.site.id
        target_id = rt.target.id
        site_code = rt.site.site_code
        target_code_norm = rt.target.target_code
        selectors = _resolve_detail_selectors(rt.site, rt.target)
        dept_id_fk = rt.target.dept_id  # local_department FK (may be None)

    if selectors is None:
        raise ValueError(
            f"no detail selectors available for target={target_code}; "
            "set crawl_target.parser_override_json.detail or ensure the adapter "
            "defines DEFAULT_DETAIL_SELECTORS"
        )

    throttle.wait(url)
    fr: FetchResult = fetch_html(url)
    strategy = fr.strategy

    # -------- failure path --------
    if fr.error or fr.is_challenge or not fr.html:
        with Session() as s:
            insert_crawl_log(
                s,
                site_pk=site_pk,
                target_id=target_id,
                article_url=url,
                strategy=strategy,
                http_status=fr.status,
                duration_ms=fr.duration_ms,
                success=False,
                error_msg=(
                    fr.error
                    or ("ctct_challenge_unresolved" if fr.is_challenge else "empty_html")
                ),
            )
            s.commit()
        outcome = "challenge" if fr.is_challenge else "failed"
        record_fetch(
            site_id=site_code, column_id=target_code_norm, strategy=strategy,
            outcome=outcome, duration_ms=fr.duration_ms, http_status=fr.status,
        )
        return {
            "status": "failed",
            "reason": fr.error or ("challenge" if fr.is_challenge else "empty_html"),
            "http_status": fr.status,
        }

    # -------- parse detail --------
    fields = parse_detail(fr.html, url, selectors)
    pub_dt = (
        (list_item.publish_time if list_item and list_item.publish_time else None)
        or _parse_publish_time(fields.publish_time_raw)
        or datetime.utcnow()
    )
    # strip tz for DB column (schema uses naive DateTime)
    if pub_dt.tzinfo is not None:
        pub_dt = pub_dt.replace(tzinfo=None)

    article_key = _derive_article_key(url)
    raw_rel = write_raw_html(site_code, target_code_norm, pub_dt, article_key, fr.html)
    text_rel = write_article_text(
        site_code, target_code_norm, pub_dt, article_key, fields.content_text
    )

    downloaded = []
    for au in fields.attachment_urls:
        try:
            d = download_attachment(
                au, site=site_code, column=target_code_norm,
                when=pub_dt, article_key=article_key,
            )
            downloaded.append(d)
        except Exception as e:
            log.warning("attachment download failed url=%s err=%s", au, e)

    status = (
        "ready"
        if len(fields.content_text) >= MIN_CONTENT_TEXT_CHARS or downloaded
        else "failed"
    )

    # -------- persist --------
    with Session() as s:
        if list_item is not None:
            # Build a fully-populated CrawlItem with detail fields merged in,
            # then project via contract-driven insert.
            merged = list_item.model_copy(update={
                "url": url,
                "url_hash": url_h,
                "title": list_item.title or fields.title or None,
                "publish_time": pub_dt,
                "source_raw": list_item.source_raw or fields.source or None,
                "content_text": fields.content_text or None,
                "raw_html_path": str(raw_rel),
                "text_path": str(text_rel),
                "has_attachment": bool(downloaded),
                "status": Status.READY if status == "ready" else Status.FAILED,
                "fetch_strategy": list_item.fetch_strategy or None,
                "target_id": target_id,
                "dept_id": dept_id_fk,
            })
            article = insert_article_from_contract(s, merged, site_pk=site_pk)
        else:
            article = insert_article(
                s,
                site_id=site_pk,
                target_id=target_id,
                dept_id=dept_id_fk,
                url=url,
                url_hash=url_h,
                title=fields.title or None,
                publish_time=pub_dt,
                source_raw=fields.source or None,
                content_text=fields.content_text or None,
                raw_html_path=str(raw_rel),
                text_path=str(text_rel),
                has_attachment=bool(downloaded),
                status=status,
                fetch_strategy=strategy,
            )

        att_records = [
            {
                "file_name": d.file_name,
                "file_ext": d.file_ext,
                "size_bytes": d.size_bytes,
                "file_path": str(d.file_path),
                "file_hash": d.file_hash,
            }
            for d in downloaded
        ]
        if att_records:
            insert_attachments(s, article.id, att_records)

        error_msg = (
            None
            if status == "ready"
            else f"content_text_too_short:{len(fields.content_text)}"
        )
        insert_crawl_log(
            s,
            site_pk=site_pk,
            target_id=target_id,
            article_url=url,
            strategy=strategy,
            http_status=fr.status,
            duration_ms=fr.duration_ms,
            success=(status == "ready"),
            error_msg=error_msg,
        )
        s.commit()
        article_id = article.id

    record_fetch(
        site_id=site_code, column_id=target_code_norm, strategy=strategy,
        outcome=status, duration_ms=fr.duration_ms, http_status=fr.status,
    )
    return {
        "status": status,
        "article_id": article_id,
        "url_hash": url_h,
        "title": fields.title,
        "used_fallback": fields.used_fallback,
        "fallback_engine": fields.fallback_engine,
        "raw_html_path": str(raw_rel),
        "text_path": str(text_rel),
        "attachments_downloaded": len(downloaded),
        "http_status": fr.status,
        "duration_ms": fr.duration_ms,
    }


# ---------------------------------------------------------------------------
# list-page drivers
# ---------------------------------------------------------------------------
@dataclass
class _ListEntry:
    url: str
    item: CrawlItem | None  # None for legacy CSS path


def _list_via_adapter(rt: _ResolvedTarget, *, page: int = 1) -> tuple[str, list[_ListEntry], FetchResult]:
    """Adapter path: build JSON URL, fetch, project to CrawlItems."""
    adapter = get_adapter(rt.site.cms_adapter)
    params = rt.site.adapter_params_json or {}
    list_url = adapter.build_list_url(
        base_url=rt.site.base_url or "",
        dept_path=rt.dept_path or "",
        column_id=rt.column_id,
        page=page,
        sid=params.get("sid"),
        path_tpl=params.get("list_api_path_tpl"),
    )

    fr = fetch_html(list_url)
    if fr.error or not fr.html:
        return list_url, [], fr

    try:
        payload = json.loads(fr.html)
    except Exception as e:
        log.warning("adapter list JSON parse failed url=%s err=%s", list_url, e)
        return list_url, [], fr

    items = adapter.parse_list_response(
        payload,
        site_id=rt.site.site_code,
        target_id=rt.target.id,
        dept_id=rt.target.dept_id,
    )
    return list_url, [_ListEntry(url=it.url, item=it) for it in items], fr


def _list_via_yaml(rt: _ResolvedTarget) -> tuple[str, list[_ListEntry], FetchResult]:
    """Legacy CSS scrape path — reads list selectors from config/sites/*.yaml."""
    site_cfg = get_site_config(rt.site.site_code)
    if site_cfg is None:
        raise ValueError(
            f"yaml_path site {rt.site.site_code!r} has no config/sites YAML "
            "— legacy CSS path requires it"
        )
    col_cfg = site_cfg.get_column(rt.column_id)
    if col_cfg is None or col_cfg.list_selector is None:
        raise ValueError(
            f"yaml config missing list_selector for {rt.site.site_code}/{rt.column_id}"
        )
    list_url = rt.target.entry_url or col_cfg.list_url

    fr = fetch_html(list_url)
    if fr.error or not fr.html:
        return list_url, [], fr

    selectors = {
        "row": col_cfg.list_selector.row,
        "href": col_cfg.list_selector.href,
        "title": col_cfg.list_selector.title,
        "date": col_cfg.list_selector.date or "span::text",
    }
    items: list[ListItem] = parse_list(fr.html, list_url, selectors)
    return list_url, [_ListEntry(url=it.url, item=None) for it in items], fr


def crawl_target(
    target_code: str,
    *,
    max_items: int | None = None,
    stop_on_duplicate: bool = True,
    throttle: HostThrottle | None = None,
) -> dict:
    """Fetch list page, iterate article URLs, call fetch_and_store per item.

    `stop_on_duplicate=True` implements INC-03 early-stop: the list is
    publish-time ordered, so the first already-seen URL ends the pass.
    """
    Session = get_sessionmaker()
    with Session() as s:
        rt = _resolve_target(s, target_code)
        # snapshot what we need after session closes
        site_pk = rt.site.id
        target_id = rt.target.id
        site_code = rt.site.site_code
        target_code_norm = rt.target.target_code
        cms_adapter = rt.site.cms_adapter
        respect_robots = rt.site.respect_robots
        interval_sec = rt.target.interval_sec

    ua = get_settings().user_agent

    # target.interval_sec (nullable) → adapter DEFAULT_INTERVAL_SEC (optional) →
    # HostThrottle default. Without this ladder, fresh crawl_target rows (no
    # per-target override) rate-limited themselves on chatty CMS fleets because
    # HostThrottle got `None` and silently fell back to 5s only after the first
    # hit — the per-adapter hint lets ops seed sensible values.
    if interval_sec is None and cms_adapter:
        try:
            adapter_mod = get_adapter(cms_adapter)
        except KeyError:
            adapter_mod = None
        if adapter_mod is not None:
            interval_sec = getattr(adapter_mod, "DEFAULT_INTERVAL_SEC", None)

    # ----- list page -----
    if cms_adapter:
        list_url, entries, list_fr = _list_via_adapter(rt)
    else:
        list_url, entries, list_fr = _list_via_yaml(rt)

    # COMP-03: reject non-public list URLs
    if not is_public_path(list_url):
        raise ValueError(f"refuse to crawl non-public list path: {list_url!r}")

    # COMP-02: robots.txt gate
    if respect_robots and not is_allowed(list_url, ua):
        log.warning("robots.txt disallows list_url=%s ua=%s — skipping", list_url, ua)
        return {
            "status": "robots_blocked",
            "target_code": target_code_norm,
            "list_url": list_url,
            "items_seen": 0,
            "items_new": 0,
        }

    if list_fr.error or not list_fr.html:
        return {
            "status": "list_failed",
            "reason": list_fr.error or "empty_html",
            "http_status": list_fr.status,
            "items_seen": 0,
            "items_new": 0,
        }

    throttle = throttle or HostThrottle(interval_s=interval_sec)
    log.info(
        "crawl_target start target=%s (adapter=%s) url=%s items=%d",
        target_code_norm, cms_adapter or "yaml", list_url, len(entries),
    )

    new_count = 0
    skipped_count = 0
    failed_count = 0
    consecutive_blocks = 0
    aborted = False
    results: list[dict] = []

    def _run_one(entry: _ListEntry) -> dict:
        """fetch_and_store with exp-backoff retry on transient failures.

        When fetch_and_store raises (e.g. network stack TypeError, httpx
        ConnectError outside the adapter's own try/except), it never gets the
        chance to insert_crawl_log — so operators hitting the admin UI see
        crawl_log silent. We mirror the failure into crawl_log here so
        retry-exhausted exceptions show up next to normal failed rows.
        """
        last: dict = {"status": "failed", "reason": "no_attempt"}
        last_came_from_exception = False
        for attempt in range(MAX_RETRIES + 1):
            try:
                r = fetch_and_store(
                    target_code=target_code_norm,
                    url=entry.url,
                    list_item=entry.item,
                    throttle=throttle,
                )
            except Exception as e:
                last = {"status": "failed", "reason": f"{type(e).__name__}: {e}"}
                last_came_from_exception = True
                if attempt < MAX_RETRIES:
                    _time.sleep(BACKOFF_BASE_S * (2**attempt))
                    continue
                _log_run_one_exception(entry.url, last["reason"])
                return last

            last = r
            last_came_from_exception = False
            status = r.get("status")
            http_status = r.get("http_status") or 0

            if status in ("ready", "skipped"):
                return r
            if http_status in BLOCK_STATUSES:
                return r
            if attempt < MAX_RETRIES and (
                http_status in RETRIABLE_STATUSES or status == "failed"
            ):
                _time.sleep(BACKOFF_BASE_S * (2**attempt))
                continue
            return r
        if last_came_from_exception:
            _log_run_one_exception(entry.url, last.get("reason", "unknown"))
        return last

    def _log_run_one_exception(url: str, reason: str) -> None:
        """Persist a crawl_log row for exception-path failures. Best-effort —
        we swallow DB errors so a log-write regression can't mask the real
        crawl error from callers."""
        try:
            with Session() as s:
                insert_crawl_log(
                    s,
                    site_pk=site_pk,
                    target_id=target_id,
                    article_url=url,
                    strategy="exception",
                    http_status=0,
                    duration_ms=0,
                    success=False,
                    error_msg=reason[:500],  # crawl_log.error_msg is TEXT but keep sane
                )
                s.commit()
        except Exception:
            log.exception("failed to persist crawl_log for exception url=%s", url)

    for i, entry in enumerate(entries):
        if max_items is not None and i >= max_items:
            break

        if not is_public_path(entry.url):
            results.append({"url": entry.url, "status": "skipped", "reason": "non_public_path"})
            skipped_count += 1
            continue
        if respect_robots and not is_allowed(entry.url, ua):
            results.append({"url": entry.url, "status": "skipped", "reason": "robots_blocked"})
            skipped_count += 1
            continue

        r = _run_one(entry)
        results.append({"url": entry.url, "status": r.get("status")})

        http_status = r.get("http_status") or 0
        if http_status in BLOCK_STATUSES:
            consecutive_blocks += 1
        else:
            consecutive_blocks = 0

        if r.get("status") == "skipped":
            skipped_count += 1
            if stop_on_duplicate:
                log.info("early stop: duplicate url_hash at url=%s", entry.url)
                break
        elif r.get("status") == "ready":
            new_count += 1
        else:
            failed_count += 1

        if consecutive_blocks >= ABORT_AFTER_N_BLOCKS:
            log.warning("abort pass: %d consecutive block responses (FETCH-05)", consecutive_blocks)
            aborted = True
            break

    # update target.last_crawled_at
    with Session() as s:
        t = targets_repo.get_by_code(s, target_code_norm)
        if t is not None:
            t.last_crawled_at = datetime.utcnow()
            s.commit()

    return {
        "status": "aborted" if aborted else "ok",
        "target_code": target_code_norm,
        "site_code": site_code,
        "items_seen": len(entries),
        "items_new": new_count,
        "items_skipped": skipped_count,
        "items_failed": failed_count,
        "consecutive_blocks": consecutive_blocks,
        "results": results,
    }
