"""Backfill gd_gkmlpt article standard metadata.

New gkmlpt rows carry normalized values from the list API. Older rows may not
have `metadata_json["raw"]`, so this script can also read the saved detail HTML
table from `raw_html_path` and recover:

  索引号、分类、发布机构、成文日期、文号、发布日期、主题词

Run inside the api container:

    docker exec docker-api-1 python /app/scripts/backfill_gd_gkmlpt_metadata.py --dry-run
    docker exec docker-api-1 python /app/scripts/backfill_gd_gkmlpt_metadata.py
"""
from __future__ import annotations

import argparse
import re
import sys
from collections import Counter
from dataclasses import dataclass
from datetime import date, datetime, timezone
from pathlib import Path, PurePosixPath
from typing import Any

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))

from parsel import Selector  # noqa: E402
from sqlalchemy import select  # noqa: E402

from govcrawler.adapters.gkmlpt import (  # noqa: E402
    _coerce_content_category,
    _coerce_content_subcategory,
    _coerce_open_category,
    _coerce_publish_date,
    _coerce_publish_time,
    _coerce_topic_words,
)
from govcrawler.db import get_sessionmaker  # noqa: E402
from govcrawler.models import Article, CrawlSite  # noqa: E402
from govcrawler.parser.detail_parser import _canonical_public_label  # noqa: E402
from govcrawler.parser.detail_parser import _clean_source_text  # noqa: E402
from govcrawler.settings import get_settings  # noqa: E402
from govcrawler.storage.paths import to_os_path  # noqa: E402


@dataclass(frozen=True)
class BackfillStats:
    scanned: int
    changed: int
    skipped_no_source: int
    field_counts: Counter[str]


def _clean_text(v: str | None) -> str | None:
    if v is None:
        return None
    cleaned = re.sub(r"\s+", " ", v).strip()
    return cleaned or None


def _coerce_raw(meta: Any) -> dict[str, Any] | None:
    if not isinstance(meta, dict):
        return None
    raw = meta.get("raw")
    return raw if isinstance(raw, dict) else None


def _date_from_text(v: str | None) -> date | None:
    cleaned = _clean_text(v)
    if not cleaned:
        return None
    m = re.search(r"(\d{4})[-/.年](\d{1,2})[-/.月](\d{1,2})", cleaned)
    if not m:
        return None
    return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))


def _datetime_from_date_text(v: str | None) -> datetime | None:
    d = _date_from_text(v)
    if d is None:
        return None
    return datetime(d.year, d.month, d.day, tzinfo=timezone.utc)


def _split_category_text(v: str | None) -> tuple[str | None, str | None, str | None]:
    cleaned = _clean_text(v)
    if not cleaned:
        return None, None, None
    parts = [p.strip() for p in cleaned.split("、") if p.strip()]
    if len(parts) <= 1:
        return cleaned, None, cleaned
    return "、".join(parts[:-1]), parts[-1], cleaned


def values_from_raw(raw: dict[str, Any]) -> dict[str, Any]:
    return {
        "native_post_id": str(raw.get("id")) if raw.get("id") is not None else None,
        "index_no": raw.get("identifier") or None,
        "publisher": raw.get("publisher") or None,
        "source_raw": raw.get("publisher") or None,
        "doc_no": raw.get("document_number") or None,
        "publish_time": _coerce_publish_time(raw),
        "publish_date": _coerce_publish_date(raw),
        "topic_words": _coerce_topic_words(raw),
        "open_category": _coerce_open_category(raw),
        "content_category": _coerce_content_category(raw),
        "content_subcategory": _coerce_content_subcategory(raw),
        "metadata_json": {"raw": raw},
    }


def values_from_detail_html(html: str) -> dict[str, Any]:
    sel = Selector(text=html)
    pairs: dict[str, str] = {}
    for tr in sel.xpath("//tr"):
        cells = [_clean_text(" ".join(td.xpath(".//text()").getall())) for td in tr.xpath("./td")]
        cells = [c for c in cells if c]
        for idx in range(0, len(cells) - 1, 2):
            label = cells[idx].rstrip(":：").strip()
            value = cells[idx + 1]
            if label and _canonical_public_label(value) is None:
                pairs[label] = value

    top_category, sub_category, open_category = _split_category_text(pairs.get("分类"))
    publisher = _clean_source_text(pairs.get("发布机构"))
    return {
        "index_no": pairs.get("索引号"),
        "publisher": publisher,
        "source_raw": publisher,
        "doc_no": pairs.get("文号"),
        "publish_time": _datetime_from_date_text(pairs.get("发布日期")),
        "publish_date": _date_from_text(pairs.get("成文日期")),
        "topic_words": pairs.get("主题词"),
        "open_category": open_category,
        "content_category": top_category,
        "content_subcategory": sub_category,
    }


def _is_empty(v: Any) -> bool:
    return v is None or v == ""


def _same_value(left: Any, right: Any) -> bool:
    if isinstance(left, datetime) and isinstance(right, datetime):
        return left.replace(microsecond=0) == right.replace(microsecond=0)
    if isinstance(left, date) and isinstance(right, date):
        return left == right
    return left == right


def _merge_values(primary: dict[str, Any], fallback: dict[str, Any]) -> dict[str, Any]:
    merged = dict(fallback)
    for k, v in primary.items():
        if not _is_empty(v):
            merged[k] = v
    return merged


def apply_values(article: Article, values: dict[str, Any], *, only_missing: bool) -> list[str]:
    changed_fields: list[str] = []
    for field, new_value in values.items():
        if _is_empty(new_value):
            continue
        old_value = getattr(article, field)
        if field == "metadata_json":
            if not isinstance(old_value, dict):
                old_value = {}
            new_value = {**old_value, **new_value}
        if only_missing and not _is_empty(old_value):
            continue
        if _same_value(old_value, new_value):
            continue
        setattr(article, field, new_value)
        changed_fields.append(field)
    return changed_fields


def _html_values_for_article(article: Article, *, data_dir: Path) -> dict[str, Any]:
    if not article.raw_html_path:
        return {}
    try:
        path = to_os_path(data_dir, PurePosixPath(article.raw_html_path))
        path.resolve().relative_to(data_dir.resolve())
        html = path.read_text(encoding="utf-8", errors="ignore")
    except Exception:
        return {}
    return values_from_detail_html(html)


def run_backfill(
    *,
    site_code: str,
    dry_run: bool,
    only_missing: bool,
    limit: int | None,
) -> BackfillStats:
    Session = get_sessionmaker()
    data_dir = Path(get_settings().data_dir)
    field_counts: Counter[str] = Counter()
    scanned = 0
    changed = 0
    skipped_no_source = 0

    with Session() as s:
        site = s.scalar(select(CrawlSite).where(CrawlSite.site_code == site_code))
        if site is None:
            raise SystemExit(f"site not found: {site_code}")

        stmt = select(Article).where(Article.site_id == site.id).order_by(Article.id.asc())
        if limit is not None:
            stmt = stmt.limit(limit)

        for article in s.scalars(stmt):
            scanned += 1
            raw = _coerce_raw(article.metadata_json)
            raw_values = values_from_raw(raw) if raw is not None else {}
            html_values = _html_values_for_article(article, data_dir=data_dir)
            values = _merge_values(raw_values, html_values)
            if not values:
                skipped_no_source += 1
                continue
            fields = apply_values(article, values, only_missing=only_missing)
            if fields:
                changed += 1
                field_counts.update(fields)

        if dry_run:
            s.rollback()
        else:
            s.commit()

    return BackfillStats(scanned, changed, skipped_no_source, field_counts)


def _build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(description=__doc__)
    p.add_argument("--site-code", default="gd_gkmlpt")
    p.add_argument("--dry-run", action="store_true")
    p.add_argument("--only-missing", action="store_true")
    p.add_argument("--limit", type=int, default=None)
    return p


def main(argv: list[str] | None = None) -> int:
    args = _build_parser().parse_args(argv)
    stats = run_backfill(
        site_code=args.site_code,
        dry_run=args.dry_run,
        only_missing=args.only_missing,
        limit=args.limit,
    )
    mode = "dry_run" if args.dry_run else "committed"
    print(
        f"mode={mode} site={args.site_code} scanned={stats.scanned} "
        f"changed={stats.changed} skipped_no_source={stats.skipped_no_source}"
    )
    if stats.field_counts:
        print("field_counts=" + ",".join(
            f"{k}:{v}" for k, v in sorted(stats.field_counts.items())
        ))
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
