from __future__ import annotations


def test_rag_export_after_crawl_defaults_disabled(monkeypatch):
    from govcrawler.settings import Settings

    monkeypatch.delenv("RAG_EXPORT_AFTER_CRAWL_ENABLED", raising=False)

    settings = Settings(_env_file=None, db_url="sqlite:///settings.db")

    assert settings.rag_export_after_crawl_enabled is False


def test_task_queue_skips_rag_export_after_crawl_when_disabled(monkeypatch):
    from govcrawler.api import task_queue
    from govcrawler import runtime_config

    scheduled = []

    monkeypatch.setattr(runtime_config, "_rag_export_after_crawl_enabled", None)
    monkeypatch.setattr(
        runtime_config,
        "get_settings",
        lambda: type("Settings", (), {"rag_export_after_crawl_enabled": False})(),
    )
    monkeypatch.setattr(task_queue.asyncio, "create_task", lambda *args, **kwargs: scheduled.append((args, kwargs)))

    queue = task_queue.TaskQueue()
    job = task_queue.JobInfo(
        job_id="job-1",
        site_code="demo",
        target_code="demo__news",
        source="manual",
        status="done",
    )

    queue._submit_rag_export_after_crawl(job)

    assert scheduled == []


def test_task_queue_detects_nested_host_cooldown():
    from govcrawler.api import task_queue

    result = {
        "status": "ok",
        "results": [
            {"status": "ready"},
            {
                "status": "failed",
                "reason": "host_cooldown: www.gd.gov.cn blocked for 100s more (WAF protection)",
            },
        ],
    }

    assert task_queue._result_has_host_cooldown(result) is True
    assert task_queue._result_has_host_cooldown({"reason": "timeout"}) is False


def test_task_queue_detects_retryable_cooldown_abort():
    from govcrawler.api import task_queue

    result = {
        "status": "aborted",
        "reason": "cooldown",
        "checkpoint_page": 72,
        "results": [
            {"status": "failed", "reason": "host_cooldown: www.gd.gov.cn blocked"},
        ],
    }

    assert task_queue._result_is_cooldown_abort(result) is True
    assert task_queue._checkpoint_from_result(result) == 72
    assert task_queue._result_is_cooldown_abort({"status": "aborted", "reason": "blocked"}) is False


def test_task_queue_auto_retries_cooldown_abort(monkeypatch):
    import asyncio

    from govcrawler import pipeline
    from govcrawler.api import task_queue

    calls = []
    results = [
        {
            "status": "aborted",
            "reason": "cooldown",
            "retryable": True,
            "checkpoint_page": 72,
            "results": [
                {"status": "failed", "reason": "host_cooldown: www.gd.gov.cn blocked"},
            ],
        },
        {"status": "ok", "items_seen": 1, "items_new": 0, "results": []},
    ]

    def fake_crawl_target(target_code, **kwargs):
        calls.append((target_code, kwargs.get("job_id"), kwargs.get("resume_from_page")))
        return results.pop(0)

    async def scenario():
        monkeypatch.setattr(task_queue, "_host_for_site", lambda site_code: "gd_gkmlpt_shared")
        monkeypatch.setattr(task_queue.TaskQueue, "_db_upsert_job", staticmethod(lambda *_args, **_kw: None))
        monkeypatch.setattr(task_queue, "_grow_target_interval_after_cooldown", lambda **_kw: None)
        monkeypatch.setattr(task_queue, "HOST_COOLDOWN_DELAY_S", 0)
        monkeypatch.setattr(task_queue, "COOLDOWN_AUTO_RETRY_MAX", 2)
        monkeypatch.setattr(pipeline, "crawl_target", fake_crawl_target)
        monkeypatch.setattr(task_queue.TaskQueue, "_submit_rag_export_after_crawl", lambda *_args: None)
        task_queue._HOST_PAUSE_UNTIL.clear()

        queue = task_queue.TaskQueue()
        job_id = await queue.submit(
            site_code="gd_amr",
            target_code="gd_amr__2963",
            source="manual",
            force=True,
        )
        await asyncio.wait_for(queue._host_queues["gd_gkmlpt_shared"].join(), timeout=5)
        for worker in queue._workers.values():
            worker.cancel()
        await asyncio.gather(*queue._workers.values(), return_exceptions=True)
        return queue, job_id

    queue, job_id = asyncio.run(scenario())

    original = queue.job(job_id)
    assert original.status == "failed"
    assert original.result["auto_retry"]["status"] == "queued"
    retry_job = queue.job(original.result["auto_retry"]["job_id"])
    assert retry_job.status == "done"
    assert retry_job.source == "retry"
    assert calls[0][2] == 0
    assert calls[1][2] == 72


def test_fetcher_host_cooldown_is_at_least_ten_minutes():
    from govcrawler.fetcher import chain

    assert chain._COOLDOWN_SECONDS >= 600


def test_gd_worktime_interval_policy_only_applies_on_weekday_worktime():
    from datetime import datetime
    from types import SimpleNamespace

    from govcrawler import pipeline

    settings = SimpleNamespace(
        gd_worktime_interval_multiplier=1.2,
        gd_worktime_start_hour=8,
        gd_worktime_end_hour=18,
    )
    interval, jitter, policy = pipeline._effective_target_intervals(
        site_code="gd_amr",
        target_code="gd_amr__2963",
        interval_sec=70,
        interval_jitter_sec=20,
        settings=settings,
        now=datetime(2026, 5, 15, 10, 0, 0),  # Friday
    )
    assert interval == 84
    assert jitter == 24
    assert policy["policy"] == "gd_worktime_slowdown"

    interval2, jitter2, policy2 = pipeline._effective_target_intervals(
        site_code="gd_amr",
        target_code="gd_amr__2963",
        interval_sec=70,
        interval_jitter_sec=20,
        settings=settings,
        now=datetime(2026, 5, 16, 10, 0, 0),  # Saturday
    )
    assert (interval2, jitter2, policy2) == (70, 20, None)


def test_cooldown_grows_gd_target_interval_in_db(tmp_path, monkeypatch):
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker

    from govcrawler.api import task_queue
    from govcrawler.models import Base, CrawlSite, CrawlTarget

    engine = create_engine("sqlite:///" + str(tmp_path / "cooldown.db"), future=True)
    Base.metadata.create_all(engine)
    SM = sessionmaker(bind=engine, expire_on_commit=False)
    monkeypatch.setattr(task_queue, "get_sessionmaker", lambda: SM)
    monkeypatch.setattr(
        "govcrawler.api.task_queue.get_settings",
        lambda: type("Settings", (), {
            "cooldown_interval_growth_factor": 1.10,
            "cooldown_interval_max_sec": 600,
        })(),
        raising=False,
    )

    with SM() as s:
        site = CrawlSite(
            site_code="gd_amr",
            site_name="广东省市场监督管理局",
            base_url="https://amr.gd.gov.cn",
            cms_adapter="gkmlpt",
            managed_by="ui",
            enabled=True,
        )
        s.add(site)
        s.flush()
        s.add(CrawlTarget(
            site_id=site.id,
            target_code="gd_amr__2963",
            target_name="政策文件",
            entry_url="https://amr.gd.gov.cn/",
            interval_sec=70,
            interval_jitter_sec=20,
            enabled=True,
        ))
        s.commit()

    first = task_queue._grow_target_interval_after_cooldown(
        site_code="gd_amr",
        target_code="gd_amr__2963",
    )
    second = task_queue._grow_target_interval_after_cooldown(
        site_code="gd_amr",
        target_code="gd_amr__2963",
    )

    assert first["old_interval_sec"] == 70
    assert first["new_interval_sec"] == 77
    assert second["old_interval_sec"] == 77
    assert second["new_interval_sec"] == 85
