"""pipeline.fetch_and_store tests — v2 schema aligned.

Instead of a fake session, we use a real in-memory SQLite DB and seed a
crawl_site + crawl_target with parser_override_json.detail that matches the
test HTML's CSS. This exercises the same code paths as production but keeps
the test hermetic.
"""
from __future__ import annotations

import hashlib
from contextlib import contextmanager
from unittest.mock import MagicMock

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from govcrawler import pipeline
from govcrawler.fetcher.browser import FetchResult
from govcrawler.models import Article, Attachment, Base, CrawlLog

from tests._v2fixtures import make_site, make_target


_DETAIL_SELECTORS = {
    "title": "h1.article-title::text",
    "publish_time": "span.time::text",
    "source": "",
    "content": "div.article-content",
    "attachment_css": "div.article-content a[href]",
}


@pytest.fixture
def db(tmp_path, monkeypatch):
    monkeypatch.setenv("DB_URL", "sqlite:///" + str(tmp_path / "pipe.db"))
    monkeypatch.setenv("DATA_DIR", str(tmp_path))
    monkeypatch.setenv("USER_AGENT", "TestBot/1.0")

    engine = create_engine("sqlite:///" + str(tmp_path / "pipe.db"), future=True)
    Base.metadata.create_all(engine)
    SM = sessionmaker(bind=engine, expire_on_commit=False)

    with SM() as s:
        site = make_site(s, site_code="gdqy", yaml_path="gdqy.yaml")
        target = make_target(
            s, site=site, column_id="szfwj",
            target_name="市政府文件",
            entry_url="https://www.gdqy.gov.cn/szfwj/",
        )
        # Install per-target detail selectors so pipeline doesn't need YAML/config
        target.parser_override_json = {"detail": _DETAIL_SELECTORS}
        s.commit()
        target_code = target.target_code

    # Point pipeline.get_sessionmaker at our SM
    monkeypatch.setattr(pipeline, "get_sessionmaker", lambda: SM)

    return {"SM": SM, "target_code": target_code}


def _ok_fetch(html: str, *, status: int = 200, final_url: str | None = None):
    return lambda url: FetchResult(
        url=url, final_url=final_url or url, status=status, html=html,
        fetched_at=0.0, duration_ms=100, is_challenge=False,
    )


def test_happy_path_writes_article_and_log(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">测试标题</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<div class="article-content"><p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p></div>'
        "</body></html>"
    )
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html))

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gdqy.gov.cn/x/post_test.html",
    )
    assert r["status"] == "ready"

    with db["SM"]() as s:
        arts = s.query(Article).all()
        logs = s.query(CrawlLog).all()
        assert len(arts) == 1
        assert arts[0].title == "测试标题"
        assert arts[0].status == "ready"
        assert len(logs) == 1
        assert logs[0].success is True


def test_challenge_page_logged_as_failure(monkeypatch, db):
    monkeypatch.setattr(
        pipeline, "fetch_html",
        lambda url: FetchResult(
            url=url, final_url=url, status=412,
            html="<html><title>请稍候</title></html>",
            fetched_at=0.0, duration_ms=50, is_challenge=True,
        ),
    )
    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gdqy.gov.cn/x/post_x.html",
    )
    assert r["status"] == "failed"

    with db["SM"]() as s:
        assert s.query(Article).count() == 0
        logs = s.query(CrawlLog).all()
        assert len(logs) == 1
        assert logs[0].success is False
        assert "challenge" in (logs[0].error_msg or "").lower()


def test_gd_gov_detail_fetch_prefers_https(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">测试标题</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<div class="article-content"><p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p></div>'
        "</body></html>"
    )
    seen = []

    def _fake_fetch(url):
        seen.append(url)
        return FetchResult(
            url=url, final_url=url, status=200, html=html,
            fetched_at=0.0, duration_ms=100, is_challenge=False,
        )

    monkeypatch.setattr(pipeline, "fetch_html", _fake_fetch)

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="http://www.gd.gov.cn/zwgk/wjk/qbwj/yf/content/post_1.html",
    )

    assert r["status"] == "ready"
    assert seen == ["https://www.gd.gov.cn/zwgk/wjk/qbwj/yf/content/post_1.html"]
    with db["SM"]() as s:
        art = s.query(Article).one()
        assert art.url.startswith("https://www.gd.gov.cn/")


def test_gd_gov_detail_falls_back_to_original_http_when_https_resets(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">测试标题</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<div class="article-content"><p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p></div>'
        "</body></html>"
    )
    seen = []

    def _fake_fetch(url):
        seen.append(url)
        if url.startswith("https://www.gd.gov.cn/"):
            return FetchResult(
                url=url, final_url=url, status=0, html="",
                fetched_at=0.0, duration_ms=100, is_challenge=False,
                error="ConnectError: [Errno 104] Connection reset by peer",
            )
        return FetchResult(
            url=url, final_url=url, status=200, html=html,
            fetched_at=0.0, duration_ms=100, is_challenge=False,
        )

    monkeypatch.setattr(pipeline, "fetch_html", _fake_fetch)

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="http://www.gd.gov.cn/zwgk/wjk/qbwj/yf/content/post_1.html",
    )

    assert r["status"] == "ready"
    assert seen == [
        "https://www.gd.gov.cn/zwgk/wjk/qbwj/yf/content/post_1.html",
        "http://www.gd.gov.cn/zwgk/wjk/qbwj/yf/content/post_1.html",
    ]


def test_gd_gov_legacy_gkml_url_rewrites_to_https_gkmlpt(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">测试标题</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<div class="article-content"><p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p></div>'
        "</body></html>"
    )
    seen = []

    def _fake_fetch(url):
        seen.append(url)
        return FetchResult(
            url=url, final_url=url, status=200, html=html,
            fetched_at=0.0, duration_ms=100, is_challenge=False,
        )

    monkeypatch.setattr(pipeline, "fetch_html", _fake_fetch)

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="http://www.gd.gov.cn/gkml/content/post_147172.html",
    )

    assert r["status"] == "ready"
    assert seen == ["https://www.gd.gov.cn/gkmlpt/content/0/147/post_147172.html"]
    with db["SM"]() as s:
        art = s.query(Article).one()
        assert art.url == "https://www.gd.gov.cn/gkmlpt/content/0/147/post_147172.html"


def test_non_http_final_url_records_failure(monkeypatch, db):
    monkeypatch.setattr(
        pipeline,
        "fetch_html",
        lambda url: FetchResult(
            url=url,
            final_url="chrome-error://chromewebdata/",
            status=200,
            html="<html><body></body></html>",
            fetched_at=0.0,
            duration_ms=100,
            is_challenge=False,
            strategy="playwright",
        ),
    )

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gd.gov.cn/gkml/content/post_147172.html",
    )

    assert r["status"] == "failed"
    assert r["reason"] == "non_http_final_url:chrome-error://chromewebdata/"
    with db["SM"]() as s:
        assert s.query(Article).count() == 0
        log = s.query(CrawlLog).one()
        assert log.success is False
        assert log.error_msg == "non_http_final_url:chrome-error://chromewebdata/"


def test_gd_wjk_public_metadata_written_to_article(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">广东省人民政府关于印发规划纲要的通知</h1>'
        '<span class="time">2026-04-28 10:00</span>'
        '<div class="introduce">'
        '<div><label>索引号：</label><span>006939748/2026-00100</span></div>'
        '<div><label>分类：</label><span>国民经济管理、国有资产监管</span></div>'
        '<div><label>发布机构：</label><span>广东省人民政府</span></div>'
        '<div><label>成文日期：</label><span>2026-03-26</span></div>'
        '<div><label>生效日期：</label><span>2026-07-01</span></div>'
        '<div><label>效力状态：</label><span>现行有效</span></div>'
        '<div><label>失效日期：</label><span>2031-06-30</span></div>'
        '<div><label>文号：</label><span>粤府〔2026〕24号</span></div>'
        '<div><label>发布日期：</label><span>2026-04-28</span></div>'
        '</div>'
        '<div class="article-content"><p>'
        + "正文内容，足够长，足够长，足够长，足够长，足够长，足够长。"
        + "</p></div></body></html>"
    )
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html))

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gd.gov.cn/zwgk/wjk/qbwj/yf/content/post_4890249.html",
    )

    assert r["status"] == "ready"
    with db["SM"]() as s:
        art = s.query(Article).one()
        assert art.index_no == "006939748/2026-00100"
        assert art.publisher == "广东省人民政府"
        assert art.doc_no == "粤府〔2026〕24号"
        assert art.publish_date.isoformat() == "2026-03-26"
        assert art.effective_date.isoformat() == "2026-07-01"
        assert art.is_effective is True
        assert art.expiry_date.isoformat() == "2031-06-30"
        assert art.content_category == "国民经济管理"
        assert art.content_subcategory == "国有资产监管"
        assert art.open_category == "国民经济管理、国有资产监管"
        assert art.metadata_json["public_meta"]["文号"] == "粤府〔2026〕24号"


def test_source_and_publish_date_are_normalized_when_no_public_meta(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">普通新闻</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<span class="source">新华社</span>'
        '<div class="article-content"><p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p></div>'
        "</body></html>"
    )
    selectors = dict(_DETAIL_SELECTORS)
    selectors["source"] = "span.source::text"
    from govcrawler.models import CrawlTarget
    with db["SM"]() as s:
        t = s.query(CrawlTarget).filter_by(target_code=db["target_code"]).one()
        t.parser_override_json = {"detail": selectors}
        s.commit()
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html))

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.news.cn/x/post_source.html",
    )

    assert r["status"] == "ready"
    with db["SM"]() as s:
        art = s.query(Article).one()
        assert art.source_raw == "新华社"
        assert art.publisher == "新华社"
        assert art.publish_date.isoformat() == "2026-04-10"


def test_attachment_downloaded_and_recorded(monkeypatch, db):
    from govcrawler.storage import attachments as att_mod

    html = (
        '<html><body><h1 class="article-title">带附件</h1>'
        '<span class="time">2026-04-10 10:00:00</span>'
        '<div class="article-content"><p>'
        + "正文内容足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长。"
        + '</p>'
        '<a href="https://www.gdqy.gov.cn/attach/a.pdf">公告全文.pdf</a></div></body></html>'
    )
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html))

    FAKE_BYTES = b"%PDF-1.4\n<fake pdf bytes for Path B test>\n" * 32
    EXPECTED_SHA = hashlib.sha256(FAKE_BYTES).hexdigest()

    class _FakeResp:
        headers = {"content-disposition": 'attachment; filename="公告全文.pdf"'}

        def raise_for_status(self):
            pass

        def iter_bytes(self, chunk_size=65536):
            mid = len(FAKE_BYTES) // 2
            yield FAKE_BYTES[:mid]
            yield FAKE_BYTES[mid:]

    @contextmanager
    def fake_stream(method, url, **kw):
        yield _FakeResp()

    monkeypatch.setattr(att_mod.httpx, "stream", fake_stream)

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gdqy.gov.cn/x/post_test.html",
    )
    assert r["status"] == "ready"
    assert r["attachments_downloaded"] == 1

    with db["SM"]() as s:
        atts = s.query(Attachment).all()
        assert len(atts) == 1
        att = atts[0]
        assert att.file_ext == "pdf"
        assert att.file_hash == EXPECTED_SHA
        assert att.size_bytes == len(FAKE_BYTES)


def test_gd_gov_attachment_prefers_https_and_failure_keeps_article_ready(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">带附件</h1>'
        '<span class="time">2026-04-10 10:00:00</span>'
        '<div class="article-content"><p>'
        + "正文内容足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长。"
        + '</p>'
        '<a href="http://www.gd.gov.cn/attachment/0/487/487443/146682.pdf">附件</a>'
        "</div></body></html>"
    )
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html))

    seen: list[str] = []

    def _fail_attachment(url, **_kw):
        seen.append(url)
        raise OSError("connection reset")

    monkeypatch.setattr(pipeline, "download_attachment", _fail_attachment)

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gdqy.gov.cn/x/post_attachment_fail.html",
    )

    assert r["status"] == "ready"
    assert r["attachments_downloaded"] == 0
    assert seen == ["https://www.gd.gov.cn/attachment/0/487/487443/146682.pdf"]
    with db["SM"]() as s:
        art = s.query(Article).one()
        assert art.status == "ready"
        assert s.query(Attachment).count() == 0


def test_attachment_download_uses_short_configured_timeout(monkeypatch, db):
    from govcrawler.storage import attachments as att_mod

    html = (
        '<html><body><h1 class="article-title">带附件</h1>'
        '<span class="time">2026-04-10 10:00:00</span>'
        '<div class="article-content"><p>'
        + "正文内容足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长。"
        + '</p>'
        '<a href="https://www.gdqy.gov.cn/attach/a.pdf">附件</a></div></body></html>'
    )
    monkeypatch.setenv("ATTACHMENT_TIMEOUT_S", "7")
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html))
    captured: dict[str, float] = {}

    class _FakeResp:
        headers = {"content-disposition": 'attachment; filename="a.pdf"'}

        def raise_for_status(self):
            pass

        def iter_bytes(self, chunk_size=65536):
            yield b"pdf"

    @contextmanager
    def fake_stream(method, url, **kw):
        captured["timeout"] = kw["timeout"]
        yield _FakeResp()

    monkeypatch.setattr(att_mod.httpx, "stream", fake_stream)

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gdqy.gov.cn/x/post_timeout.html",
    )

    assert r["status"] == "ready"
    assert r["attachments_downloaded"] == 1
    assert captured["timeout"] == 7.0


def test_attachment_throttle_is_capped_independent_of_article_interval(monkeypatch, db):
    html = (
        '<html><body><h1 class="article-title">带附件</h1>'
        '<span class="time">2026-04-10 10:00:00</span>'
        '<div class="article-content"><p>'
        + "正文内容足够长足够长足够长足够长足够长足够长足够长足够长足够长足够长。"
        + '</p>'
        '<a href="https://www.gdqy.gov.cn/attach/a.pdf">附件一</a>'
        '<a href="https://www.gdqy.gov.cn/attach/b.pdf">附件二</a>'
        "</div></body></html>"
    )
    monkeypatch.setenv("ATTACHMENT_THROTTLE_CAP_S", "5")
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html))
    from govcrawler.models import CrawlTarget
    with db["SM"]() as s:
        t = s.query(CrawlTarget).filter_by(target_code=db["target_code"]).one()
        t.interval_sec = 180
        s.commit()

    waits: list[float | None] = []
    real_init = pipeline.HostThrottle.__init__
    real_wait = pipeline.HostThrottle.wait

    def _spy_init(self, interval_s=None, **kw):
        self._spy_interval = interval_s
        return real_init(self, interval_s=interval_s, **kw)

    def _spy_wait(self, url, **kw):
        if "attach/" in url:
            waits.append(self._spy_interval)
        return real_wait(self, url, **kw)

    monkeypatch.setattr(pipeline.HostThrottle, "__init__", _spy_init)
    monkeypatch.setattr(pipeline.HostThrottle, "wait", _spy_wait)
    monkeypatch.setattr(
        pipeline,
        "download_attachment",
        lambda url, **kw: (_ for _ in ()).throw(OSError("skip real attachment")),
    )

    r = pipeline.fetch_and_store(
        target_code=db["target_code"],
        url="https://www.gdqy.gov.cn/x/post_attachment_throttle.html",
    )

    assert r["status"] == "ready"
    assert waits == [5.0, 5.0]


def test_duplicate_url_hash_skips(monkeypatch, db):
    # Seed an existing article with the same url_hash → fetch must be skipped
    from govcrawler.utils.url_norm import url_hash as compute_url_hash

    target_url = "https://www.gdqy.gov.cn/x/post_dup.html"
    dup_hash = compute_url_hash(target_url)

    with db["SM"]() as s:
        site_id = s.query(Article).count()  # 0
        from govcrawler.models import CrawlTarget
        t = s.query(CrawlTarget).filter_by(target_code=db["target_code"]).one()
        s.add(Article(
            site_id=t.site_id, target_id=t.id,
            url=target_url, url_hash=dup_hash,
            title="existing", status="ready", has_attachment=False,
        ))
        s.commit()

    def _boom(url):
        raise AssertionError("fetch_html should not be called on duplicate")

    monkeypatch.setattr(pipeline, "fetch_html", _boom)

    r = pipeline.fetch_and_store(target_code=db["target_code"], url=target_url)
    assert r["status"] == "skipped"
    assert r["article_id"] >= 1


def test_duplicate_final_url_hash_skips_after_redirect(monkeypatch, db):
    from govcrawler.models import CrawlTarget
    from govcrawler.utils.url_norm import url_hash as compute_url_hash

    source_url = "https://www.gdqy.gov.cn/redirect?id=123"
    final_url = "https://www.gdqy.gov.cn/x/post_redirected.html"
    final_hash = compute_url_hash(final_url)

    with db["SM"]() as s:
        t = s.query(CrawlTarget).filter_by(target_code=db["target_code"]).one()
        s.add(Article(
            site_id=t.site_id, target_id=t.id,
            url=final_url, url_hash=final_hash,
            title="existing", status="ready", has_attachment=False,
        ))
        s.commit()

    html = (
        '<html><body><h1 class="article-title">重复标题</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<div class="article-content"><p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p></div>'
        "</body></html>"
    )
    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html, final_url=final_url))

    r = pipeline.fetch_and_store(target_code=db["target_code"], url=source_url)

    assert r["status"] == "skipped"
    assert r["reason"] == "duplicate_url_hash"
    assert r["url_hash"] == final_hash
    with db["SM"]() as s:
        assert s.query(Article).count() == 1


def test_transient_fetch_failure_then_success_logs_only_success(monkeypatch, db):
    target_url = "https://www.gdqy.gov.cn/x/post_transient_ssl.html"
    html = (
        '<html><body><h1 class="article-title">瞬时错误后成功</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<div class="article-content"><p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p></div>'
        "</body></html>"
    )

    monkeypatch.setattr(pipeline.HostThrottle, "wait", lambda self, url, **kw: 0.0)
    monkeypatch.setattr(pipeline._time, "sleep", lambda s: None)
    monkeypatch.setattr(pipeline, "is_allowed", lambda url, ua: True)
    monkeypatch.setattr(
        pipeline,
        "_list_via_yaml",
        lambda rt, **kw: (
            "https://www.gdqy.gov.cn/list/",
            [pipeline._ListEntry(url=target_url, item=None)],
            FetchResult(
                url="https://www.gdqy.gov.cn/list/",
                final_url="https://www.gdqy.gov.cn/list/",
                status=200,
                html="<ok>",
                fetched_at=0.0,
                duration_ms=10,
                is_challenge=False,
                strategy="httpx",
            ),
        ),
    )
    script = iter([
        FetchResult(
            url=target_url,
            final_url=target_url,
            status=0,
            html="",
            fetched_at=0.0,
            duration_ms=50,
            is_challenge=False,
            error="ConnectError: [SSL: SSLV3_ALERT_BAD_RECORD_MAC]",
            strategy="httpx",
        ),
        FetchResult(
            url=target_url,
            final_url=target_url,
            status=200,
            html=html,
            fetched_at=0.0,
            duration_ms=100,
            is_challenge=False,
            strategy="httpx",
        ),
    ])
    monkeypatch.setattr(pipeline, "fetch_html", lambda url: next(script))

    r = pipeline.crawl_target(db["target_code"])

    assert r["items_new"] == 1
    with db["SM"]() as s:
        logs = s.query(CrawlLog).filter_by(article_url=target_url).all()
        assert len(logs) == 1
        assert logs[0].success is True
        assert logs[0].http_status == 200


def test_redirect_final_url_is_detail_base_for_attachments(monkeypatch, db):
    source_url = "https://www.gdqy.gov.cn/redirect?id=456"
    final_url = "https://www.gdqy.gov.cn/final/path/post_with_attachment.html"
    html = (
        '<html><body><h1 class="article-title">附件标题</h1>'
        '<span class="time">2026-04-10 16:34:22</span>'
        '<div class="article-content">'
        '<p>正文内容，足够长，足够长，足够长，足够长，足够长，足够长。</p>'
        '<a href="files/a.pdf">附件</a>'
        "</div></body></html>"
    )
    downloaded = []

    monkeypatch.setattr(pipeline, "fetch_html", _ok_fetch(html, final_url=final_url))

    def _fake_download(url, **kw):
        downloaded.append(url)
        return MagicMock(
            file_name="a.pdf",
            file_ext=".pdf",
            size_bytes=10,
            file_path="attachments/a.pdf",
            file_hash="f" * 64,
        )

    monkeypatch.setattr(pipeline, "download_attachment", _fake_download)

    r = pipeline.fetch_and_store(target_code=db["target_code"], url=source_url)

    assert r["status"] == "ready"
    assert downloaded == ["https://www.gdqy.gov.cn/final/path/files/a.pdf"]
    with db["SM"]() as s:
        art = s.query(Article).one()
        log = s.query(CrawlLog).one()
        assert art.url == final_url
        assert log.article_url == final_url
