"""Retry + FETCH-05 consecutive-block abort — v2 (crawl_target) aligned."""
from __future__ import annotations

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from govcrawler import pipeline
from govcrawler.fetcher.browser import FetchResult
from govcrawler.models import Base

from tests._v2fixtures import make_site_and_target


def _ok_fr() -> FetchResult:
    return FetchResult(
        url="https://x/list", final_url="https://x/list", status=200, html="<ok>",
        fetched_at=0.0, duration_ms=10, is_challenge=False, strategy="httpx",
    )


@pytest.fixture
def db(tmp_path, monkeypatch):
    monkeypatch.setenv("DB_URL", "sqlite:///" + str(tmp_path / "r.db"))
    monkeypatch.setenv("DATA_DIR", str(tmp_path))
    monkeypatch.setenv("USER_AGENT", "TestBot/1.0")
    monkeypatch.setattr(pipeline.HostThrottle, "wait", lambda self, url, **kw: 0.0)
    import time as _t
    monkeypatch.setattr(_t, "sleep", lambda s: None)
    monkeypatch.setattr(pipeline._time, "sleep", lambda s: None)

    engine = create_engine("sqlite:///" + str(tmp_path / "r.db"), future=True)
    Base.metadata.create_all(engine)
    SM = sessionmaker(bind=engine, expire_on_commit=False)
    with SM() as s:
        _, target = make_site_and_target(s, site_code="gdqy", column_id="szfwj")
        s.commit()
        target_code = target.target_code

    monkeypatch.setattr(pipeline, "get_sessionmaker", lambda: SM)
    # install a default 4-entry list
    urls = [
        "https://x/a/post_1.html", "https://x/a/post_2.html",
        "https://x/a/post_3.html", "https://x/a/post_4.html",
    ]
    entries = [pipeline._ListEntry(url=u, item=None) for u in urls]

    def _fake_list(rt):
        return "https://x/list", entries, _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)
    monkeypatch.setattr(pipeline, "_list_via_adapter", _fake_list)
    return {"target_code": target_code}


def test_retry_succeeds_on_second_attempt(monkeypatch, db):
    attempts: list[int] = []

    def _fake(*, target_code, url, list_item=None, throttle=None):
        attempts.append(1)
        if sum(attempts) == 1:
            return {"status": "failed", "reason": "server_error", "http_status": 503}
        return {"status": "ready", "article_id": len(attempts), "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert r["items_new"] == 1
    assert sum(attempts) == 2   # 1 transient failure + 1 success


def test_no_retry_on_block_status_and_abort_after_3(monkeypatch, db):
    attempts: list[str] = []

    def _fake(*, target_code, url, list_item=None, throttle=None):
        attempts.append(url)
        return {"status": "failed", "reason": "challenge", "http_status": 412}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"])
    assert r["status"] == "aborted"
    assert r["consecutive_blocks"] == 3
    assert len(attempts) == 3
    assert r["items_failed"] == 3


def test_block_counter_resets_on_success(monkeypatch, db):
    script = iter([
        {"status": "failed", "reason": "challenge", "http_status": 412},
        {"status": "ready", "article_id": 1, "http_status": 200},
        {"status": "failed", "reason": "challenge", "http_status": 412},
        {"status": "ready", "article_id": 2, "http_status": 200},
    ])

    def _fake(*, target_code, url, list_item=None, throttle=None):
        return next(script)

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"])
    assert r["status"] == "ok"
    assert r["items_new"] == 2
    assert r["items_failed"] == 2
    assert r["consecutive_blocks"] == 0


def test_retry_gives_up_after_max(monkeypatch, db):
    attempts: list[int] = []

    def _fake(*, target_code, url, list_item=None, throttle=None):
        attempts.append(1)
        return {"status": "failed", "reason": "server_error", "http_status": 500}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert len(attempts) == 1 + pipeline.MAX_RETRIES
    assert r["items_failed"] == 1


def test_retry_on_exception_then_success(monkeypatch, db):
    count = {"n": 0}

    def _fake(*, target_code, url, list_item=None, throttle=None):
        count["n"] += 1
        if count["n"] == 1:
            raise TimeoutError("boom")
        return {"status": "ready", "article_id": 1, "http_status": 200}

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert r["items_new"] == 1
    assert count["n"] == 2


def test_exhausted_exception_writes_crawl_log(monkeypatch, db):
    """Regression: when fetch_and_store raises on every retry, crawl_log
    must still carry a row so the admin '抓取日志' page shows why."""
    calls: list[int] = []

    def _fake(*, target_code, url, list_item=None, throttle=None):
        calls.append(1)
        raise TimeoutError("connect_timeout")

    monkeypatch.setattr(pipeline, "fetch_and_store", _fake)
    # capture insert_crawl_log calls
    seen: list[dict] = []
    real_insert = pipeline.insert_crawl_log

    def _spy(s, **kw):
        seen.append(kw)
        return real_insert(s, **kw)

    monkeypatch.setattr(pipeline, "insert_crawl_log", _spy)

    r = pipeline.crawl_target(db["target_code"], max_items=1)
    assert r["items_failed"] == 1
    assert len(calls) == 1 + pipeline.MAX_RETRIES
    # exactly one log row for the exhausted-exception path
    exc_rows = [row for row in seen if row.get("strategy") == "exception"]
    assert len(exc_rows) == 1
    assert exc_rows[0]["success"] is False
    assert "TimeoutError" in (exc_rows[0]["error_msg"] or "")
    assert "connect_timeout" in (exc_rows[0]["error_msg"] or "")


def test_adapter_default_interval_applies_when_target_null(monkeypatch, db):
    """target.interval_sec=NULL + adapter exposes DEFAULT_INTERVAL_SEC →
    HostThrottle receives the adapter value, not None."""
    # Seed a site with cms_adapter set so the resolution branch fires.
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from govcrawler.models import Base
    from tests._v2fixtures import make_site, make_target

    import tempfile, os
    tmp = tempfile.mkdtemp()
    os.environ["DB_URL"] = "sqlite:///" + os.path.join(tmp, "t.db")
    eng = create_engine(os.environ["DB_URL"], future=True)
    Base.metadata.create_all(eng)
    SM = sessionmaker(bind=eng, expire_on_commit=False)
    with SM() as s:
        site = make_site(
            s, site_code="gkm", cms_adapter="gkmlpt", yaml_path=None,
            base_url="https://g.example.com",
        )
        target = make_target(s, site=site, column_id="42", entry_url="https://g.example.com/x")
        s.commit()
        tc = target.target_code

    monkeypatch.setattr(pipeline, "get_sessionmaker", lambda: SM)

    # Stub adapter resolution + list flow
    class _StubAdapter:
        DEFAULT_INTERVAL_SEC = 7.5

    monkeypatch.setattr(pipeline, "get_adapter", lambda name: _StubAdapter)

    captured_interval: dict = {}
    original_init = pipeline.HostThrottle.__init__

    def _spy_init(self, interval_s=None, **kw):
        captured_interval["v"] = interval_s
        original_init(self, interval_s=interval_s, **kw)

    monkeypatch.setattr(pipeline.HostThrottle, "__init__", _spy_init)

    def _fake_list(rt):
        return "https://g.example.com/list", [], _ok_fr()

    monkeypatch.setattr(pipeline, "_list_via_yaml", _fake_list)
    monkeypatch.setattr(pipeline, "_list_via_adapter", _fake_list)

    pipeline.crawl_target(tc, max_items=0)
    assert captured_interval["v"] == 7.5


def test_host_throttle_none_falls_back_to_default():
    """HostThrottle(None) must not crash on the 2nd wait() call."""
    from govcrawler.fetcher.throttle import HostThrottle, DEFAULT_INTERVAL_S

    t = HostThrottle(interval_s=None)
    assert t.interval_s == DEFAULT_INTERVAL_S

    slept: list[float] = []
    # wait() calls now() twice per invocation (once to compute sleep_s, once
    # to stamp _last_by_host). Two invocations → need 4 values.
    fake_now = iter([0.0, 0.1, 1.0, 1.1])
    t.wait("https://h/a", sleep=slept.append, now=lambda: next(fake_now))
    # same host — second call exercises the `last is not None` branch where
    # the bug used to live (None * float → TypeError).
    t.wait("https://h/a", sleep=slept.append, now=lambda: next(fake_now))
