"""Retry + FETCH-05 consecutive-block abort — v2 (crawl_target) aligned."""
from __future__ import annotations

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from govcrawler import pipeline
from govcrawler.fetcher.browser import FetchResult
from govcrawler.models import Base

from tests._v2fixtures import make_site_and_target


def _ok_fr() -> FetchResult:
    return FetchResult(
        url="https://x/list", final_url="https://x/list", status=200, html="<ok>",
        fetched_at=0.0, duration_ms=10, is_challenge=False, strategy="httpx",
    )


@pytest.fixture
def db(tmp_path, monkeypatch):
    monkeypatch.setenv("DB_URL", "sqlite:///" + str(tmp_path / "r.db"))
    monkeypatch.setenv("DATA_DIR", str(tmp_path))
    monkeypatch.setenv("USER_AGENT", "TestBot/1.0")
    monkeypatch.setattr(pipeline.HostThrottle, "wait", lambda self, url, **kw: 0.0)
    import time as _t
    monkeypatch.setattr(_t, "sleep", lambda s: None)
    monkeypatch.setattr(pipeline._time, "sleep", lambda s: None)

    engine = create_engine("sqlite:///" + str(tmp_path / "r.db"), future=True)
    Base.metadata.create_all(engine)
    SM = sessionmaker(bind=engine, expire_on_commit=False)
    with SM() as s:
        _, target = make_site_and_target(s, site_code="gdqy", column_id="szfwj")
        s.commit()
        target_code = target.target_code

    monkeypatch.setattr(pipeline, "get_sessionmaker", lambda: SM)
    # install a default 4-entry list
    urls = [
        "https://x/a/post_1.html", "https://x/a/post_2.html",
        "https://x/a/post_3.html", "https://x/a/post_4.html",
    ]
    entries = [pipeline._ListEntry(url=u, item=None) for u in urls]

    def _fake_list(rt, **_kw):
        return "https://x/list", entries, _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)
    monkeypatch.setattr(pipeline, "_list_via_adapter", _fake_list)
    return {"target_code": target_code}


def test_retry_succeeds_on_second_attempt(monkeypatch, db):
    attempts: list[int] = []

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        attempts.append(1)
        if sum(attempts) == 1:
            return {"status": "failed", "reason": "server_error", "http_status": 503}
        return {"status": "ready", "article_id": len(attempts), "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert r["items_new"] == 1
    assert sum(attempts) == 2   # 1 transient failure + 1 success


def test_list_batch_prefilters_duplicate_url_before_fetch(monkeypatch, db):
    from govcrawler.models import Article, CrawlTarget
    from govcrawler.utils.url_norm import url_hash as compute_url_hash

    duplicate_url = "https://x/a/post_1.html"
    with pipeline.get_sessionmaker()() as s:
        t = s.query(CrawlTarget).filter_by(target_code=db["target_code"]).one()
        s.add(Article(
            site_id=t.site_id,
            target_id=t.id,
            url=duplicate_url,
            url_hash=compute_url_hash(duplicate_url),
            title="existing",
            status="ready",
            has_attachment=False,
        ))
        s.commit()

    def _boom(**_kw):
        raise AssertionError("duplicate list URL should be skipped before fetch")

    monkeypatch.setattr(pipeline, "fetch_and_store", _boom)

    r = pipeline.crawl_target(db["target_code"])

    assert r["items_skipped"] == 1
    assert r["items_new"] == 0
    assert r["results"] == [{"url": duplicate_url, "status": "skipped"}]


def test_no_retry_on_block_status_and_abort_after_3(monkeypatch, db):
    attempts: list[str] = []

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        attempts.append(url)
        return {"status": "failed", "reason": "challenge", "http_status": 412}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"])
    assert r["status"] == "aborted"
    assert r["consecutive_blocks"] == 3
    assert len(attempts) == 3
    assert r["items_failed"] == 3


def test_block_counter_resets_on_success(monkeypatch, db):
    script = iter([
        {"status": "failed", "reason": "challenge", "http_status": 412},
        {"status": "ready", "article_id": 1, "http_status": 200},
        {"status": "failed", "reason": "challenge", "http_status": 412},
        {"status": "ready", "article_id": 2, "http_status": 200},
    ])

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        return next(script)

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"])
    assert r["status"] == "ok"
    assert r["items_new"] == 2
    assert r["items_failed"] == 2
    assert r["consecutive_blocks"] == 0


def test_retry_gives_up_after_max(monkeypatch, db):
    attempts: list[int] = []

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        attempts.append(1)
        return {"status": "failed", "reason": "server_error", "http_status": 500}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert len(attempts) == 1 + pipeline.MAX_RETRIES
    assert r["items_failed"] == 1


def test_host_cooldown_does_not_retry_same_url(monkeypatch, db):
    attempts: list[str] = []

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        attempts.append(url)
        return {
            "status": "failed",
            "reason": "host_cooldown: www.gd.gov.cn blocked",
            "http_status": 0,
        }

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"], max_items=1)

    assert len(attempts) == 1
    assert r["status"] == "aborted"
    assert r["reason"] == "cooldown"
    assert r["retryable"] is True
    assert r["items_failed"] == 1
    assert r["results"][0]["reason"].startswith("host_cooldown:")


def test_retry_on_exception_then_success(monkeypatch, db):
    count = {"n": 0}

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        count["n"] += 1
        if count["n"] == 1:
            raise TimeoutError("boom")
        return {"status": "ready", "article_id": 1, "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert r["items_new"] == 1
    assert count["n"] == 2


def test_exhausted_exception_writes_crawl_log(monkeypatch, db):
    """Regression: when fetch_and_store raises on every retry, crawl_log
    must still carry a row so the admin '抓取日志' page shows why."""
    calls: list[int] = []

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        calls.append(1)
        raise TimeoutError("connect_timeout")

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    # capture insert_crawl_log calls
    seen: list[dict] = []
    real_insert = pipeline.insert_crawl_log

    def _spy(s, **kw):
        seen.append(kw)
        return real_insert(s, **kw)

    monkeypatch.setattr(pipeline, "insert_crawl_log", _spy)

    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert r["items_failed"] == 1
    assert len(calls) == 1 + pipeline.MAX_RETRIES
    # exactly one log row for the exhausted-exception path
    exc_rows = [row for row in seen if row.get("strategy") == "exception"]
    assert len(exc_rows) == 1
    assert exc_rows[0]["success"] is False
    assert "TimeoutError" in (exc_rows[0]["error_msg"] or "")
    assert "connect_timeout" in (exc_rows[0]["error_msg"] or "")


def test_adapter_default_interval_applies_when_target_null(monkeypatch, db):
    """target.interval_sec=NULL + adapter exposes DEFAULT_INTERVAL_SEC →
    HostThrottle receives the adapter value, not None."""
    # Seed a site with cms_adapter set so the resolution branch fires.
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from govcrawler.models import Base
    from tests._v2fixtures import make_site, make_target

    import tempfile, os
    tmp = tempfile.mkdtemp()
    os.environ["DB_URL"] = "sqlite:///" + os.path.join(tmp, "t.db")
    eng = create_engine(os.environ["DB_URL"], future=True)
    Base.metadata.create_all(eng)
    SM = sessionmaker(bind=eng, expire_on_commit=False)
    with SM() as s:
        site = make_site(
            s, site_code="gkm", cms_adapter="gkmlpt", yaml_path=None,
            base_url="https://g.example.com",
        )
        target = make_target(s, site=site, column_id="42", entry_url="https://g.example.com/x")
        s.commit()
        tc = target.target_code

    monkeypatch.setattr(pipeline, "get_sessionmaker", lambda: SM)

    # Stub adapter resolution + list flow
    class _StubAdapter:
        DEFAULT_INTERVAL_SEC = 7.5

    monkeypatch.setattr(pipeline, "get_adapter", lambda name: _StubAdapter)

    captured_interval: dict = {}
    original_init = pipeline.HostThrottle.__init__

    def _spy_init(self, interval_s=None, **kw):
        captured_interval["v"] = interval_s
        original_init(self, interval_s=interval_s, **kw)

    monkeypatch.setattr(pipeline.HostThrottle, "__init__", _spy_init)

    def _fake_list(rt, **_kw):
        return "https://g.example.com/list", [], _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)
    monkeypatch.setattr(pipeline, "_list_via_adapter", _fake_list)

    pipeline.crawl_target(tc, max_items=0)
    assert captured_interval["v"] == 7.5


def test_logical_failure_at_http_200_does_not_retry_or_early_stop(monkeypatch, db):
    """Regression: when fetch_and_store returns status='failed' with
    http_status=200 (e.g. content_text_too_short), we must NOT retry — the
    article was already inserted, so a retry would short-circuit via the
    url_hash dedup check and return status='skipped', which the main loop
    then misreads as a duplicate-boundary early-stop. Net effect was that
    a single failed article terminated the whole pass after item 1."""
    calls: list[str] = []

    def _fake(*, target_code, url, list_item=None, throttle=None, **_kw):
        calls.append(url)
        return {"status": "failed", "reason": "content_text_too_short:48", "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"])
    # 4 entries, each tried exactly once (no retry on 200-logical-failure),
    # and the loop did NOT early-stop after the first failed item.
    assert len(calls) == 4
    assert r["items_failed"] == 4
    assert r["items_new"] == 0


def test_random_batch_fetches_next_list_batch_after_processing(monkeypatch, db):
    """WAF-sensitive targets can fetch a few list pages, randomize details
    inside that batch, then fetch the next list batch."""
    monkeypatch.setattr(pipeline, "_crawl_order_for", lambda rt: pipeline._CrawlOrder("random_batch", 2))
    monkeypatch.setattr(pipeline, "is_allowed", lambda url, ua: True)

    calls: list[tuple[int, int | None]] = []

    def _fake_list(rt, *, start_page=1, page_limit=None, **_kw):
        calls.append((start_page, page_limit))
        if start_page == 1:
            entries = [
                pipeline._ListEntry(url="https://x/a/post_1.html", item=None, page_num=1),
                pipeline._ListEntry(url="https://x/a/post_2.html", item=None, page_num=2),
            ]
        elif start_page == 3:
            entries = [pipeline._ListEntry(url="https://x/a/post_3.html", item=None, page_num=3)]
        else:
            entries = []
        return "https://x/list", entries, _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)
    monkeypatch.setattr(pipeline, "random", type("_R", (), {"shuffle": staticmethod(lambda xs: xs.reverse())}))

    seen: list[str] = []

    def _fake_fetch(*, target_code, url, list_item=None, throttle=None, **_kw):
        seen.append(url)
        return {"status": "ready", "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake_fetch)

    r = pipeline.crawl_target(db["target_code"])

    assert calls == [(1, 2), (3, 2)]
    assert seen == [
        "https://x/a/post_2.html",
        "https://x/a/post_1.html",
        "https://x/a/post_3.html",
    ]
    assert r["items_new"] == 3


def test_paged_batch_processes_each_list_batch_before_next_fetch(monkeypatch, db):
    monkeypatch.setattr(pipeline, "_crawl_order_for", lambda rt: pipeline._CrawlOrder("paged_batch", 2))
    monkeypatch.setattr(pipeline, "is_safe_to_fetch", lambda url: True)
    monkeypatch.setattr(pipeline, "is_allowed", lambda url, ua: True)

    events: list[str] = []

    def _fake_list(rt, *, start_page=1, page_limit=None, **_kw):
        events.append(f"list:{start_page}:{page_limit}")
        if start_page == 1:
            entries = [
                pipeline._ListEntry(url="https://x/a/post_1.html", item=None, page_num=1),
                pipeline._ListEntry(url="https://x/a/post_2.html", item=None, page_num=2),
            ]
        elif start_page == 3:
            entries = [
                pipeline._ListEntry(url="https://x/a/post_3.html", item=None, page_num=3),
            ]
        else:
            entries = []
        return "https://x/list", entries, _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)

    def _fake_fetch(*, target_code, url, list_item=None, throttle=None, **_kw):
        events.append(f"fetch:{url.rsplit('/', 1)[-1]}")
        return {"status": "ready", "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake_fetch)

    r = pipeline.crawl_target(db["target_code"])

    assert events == [
        "list:1:2",
        "fetch:post_1.html",
        "fetch:post_2.html",
        "list:3:2",
        "fetch:post_3.html",
    ]
    assert r["items_new"] == 3


def test_random_batch_does_not_stop_on_first_duplicate(monkeypatch, db):
    """Randomized batches cannot use the ordered-list early-stop rule.

    A duplicate picked first after shuffling does not prove the rest of the
    batch is old; keep processing the batch so new entries behind it are not
    missed.
    """
    monkeypatch.setattr(pipeline, "_crawl_order_for", lambda rt: pipeline._CrawlOrder("random_batch", 2))
    monkeypatch.setattr(pipeline, "is_safe_to_fetch", lambda url: True)
    monkeypatch.setattr(pipeline, "is_allowed", lambda url, ua: True)
    monkeypatch.setattr(pipeline, "random", type("_R", (), {"shuffle": staticmethod(lambda xs: None)}))

    def _fake_list(rt, *, start_page=1, page_limit=None, **_kw):
        assert start_page == 1
        assert page_limit == 2
        entries = [
            pipeline._ListEntry(url="https://x/a/post_old.html", item=None, page_num=1),
            pipeline._ListEntry(url="https://x/a/post_new.html", item=None, page_num=1),
        ]
        return "https://x/list", entries, _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)

    seen: list[str] = []

    def _fake_fetch(*, target_code, url, list_item=None, throttle=None, **_kw):
        seen.append(url)
        if url.endswith("post_old.html"):
            return {"status": "skipped", "reason": "duplicate_url_hash", "http_status": 0}
        return {"status": "ready", "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake_fetch)

    r = pipeline.crawl_target(db["target_code"])

    assert seen == ["https://x/a/post_old.html", "https://x/a/post_new.html"]
    assert r["items_skipped"] == 1
    assert r["items_new"] == 1


def test_random_batch_stops_after_all_duplicate_batch(monkeypatch, db):
    """When the whole random batch is self-duplicate, stop at the history
    boundary instead of walking every old page."""
    monkeypatch.setattr(pipeline, "_crawl_order_for", lambda rt: pipeline._CrawlOrder("random_batch", 2))
    monkeypatch.setattr(pipeline, "is_safe_to_fetch", lambda url: True)
    monkeypatch.setattr(pipeline, "is_allowed", lambda url, ua: True)
    monkeypatch.setattr(pipeline, "random", type("_R", (), {"shuffle": staticmethod(lambda xs: None)}))

    calls: list[tuple[int, int | None]] = []

    def _fake_list(rt, *, start_page=1, page_limit=None, **_kw):
        calls.append((start_page, page_limit))
        if start_page == 1:
            entries = [
                pipeline._ListEntry(url="https://x/a/post_old_1.html", item=None, page_num=1),
                pipeline._ListEntry(url="https://x/a/post_old_2.html", item=None, page_num=2),
            ]
        else:
            entries = [pipeline._ListEntry(url="https://x/a/post_new.html", item=None, page_num=3)]
        return "https://x/list", entries, _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)

    seen: list[str] = []

    def _fake_fetch(*, target_code, url, list_item=None, throttle=None, **_kw):
        seen.append(url)
        return {"status": "skipped", "reason": "duplicate_url_hash", "http_status": 0}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake_fetch)

    r = pipeline.crawl_target(db["target_code"])

    assert calls == [(1, 2)]
    assert seen == ["https://x/a/post_old_1.html", "https://x/a/post_old_2.html"]
    assert r["items_skipped"] == 2
    assert r["items_new"] == 0


def test_checkpoint_resume_starts_list_fetch_after_completed_page(monkeypatch, db):
    """Resume page must be pushed into list enumeration itself, not just
    filtered after pages 1..N were already fetched."""
    from govcrawler.models import CrawlTarget

    SM = pipeline.get_sessionmaker()
    with SM() as s:
        t = s.query(CrawlTarget).filter_by(target_code=db["target_code"]).one()
        t.track_checkpoint = True
        s.commit()

    monkeypatch.setattr(pipeline, "_crawl_order_for", lambda rt: pipeline._CrawlOrder())
    calls: list[int] = []

    def _fake_list(rt, *, start_page=1, **_kw):
        calls.append(start_page)
        return "https://x/list", [], _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)

    pipeline.crawl_target(db["target_code"], resume_from_page=4)

    assert calls == [5]


def test_adapter_params_merge_target_overrides(db):
    from govcrawler.models import CrawlTarget

    SM = pipeline.get_sessionmaker()
    with SM() as s:
        t = s.query(CrawlTarget).filter_by(target_code=db["target_code"]).one()
        t.site.adapter_params_json = {
            "policy_profile": "zcwjk",
            "hard_max_pages": 100,
            "page_size": 20,
        }
        t.parser_override_json = {
            "detail": {"content": "div.article"},
            "adapter_params": {
                "hard_max_pages": 635,
                "zcwjk_t": "zhengcelibrary_bm",
                "zcwjk_cat_key": "bumenfile",
            },
        }
        s.commit()

        rt = pipeline._resolve_target(s, db["target_code"])
        params = pipeline._adapter_params_for(rt)

    assert params["policy_profile"] == "zcwjk"
    assert params["page_size"] == 20
    assert params["hard_max_pages"] == 635
    assert params["zcwjk_t"] == "zhengcelibrary_bm"
    assert params["zcwjk_cat_key"] == "bumenfile"


def test_host_throttle_none_falls_back_to_default():
    """HostThrottle(None) must not crash on the 2nd wait() call."""
    from govcrawler.fetcher.throttle import HostThrottle, DEFAULT_INTERVAL_S

    t = HostThrottle(interval_s=None)
    assert t.interval_s == DEFAULT_INTERVAL_S

    slept: list[float] = []
    # wait() calls now() twice per invocation (once to compute sleep_s, once
    # to stamp _last_by_host). Two invocations → need 4 values.
    fake_now = iter([0.0, 0.1, 1.0, 1.1])
    t.wait("https://h/a", sleep=slept.append, now=lambda: next(fake_now))
    # same host — second call exercises the `last is not None` branch where
    # the bug used to live (None * float → TypeError).
    t.wait("https://h/a", sleep=slept.append, now=lambda: next(fake_now))
