"""Article / Attachment / CrawlLog persistence helpers — v2 schema aligned.

This module is the thin layer between the crawl pipeline and the SQLAlchemy
tables. Adapters produce Pydantic `CrawlItem` (see `govcrawler.adapters.contract`);
this module projects those into rows.

Key change from 1.0:
  * `Article.site_id` is now an **int FK** to `crawl_site.id` (was str).
    Callers pass the site_code (str) and we resolve via the repo.
  * `CrawlLog.column_id` is gone — replaced by `target_id` FK to `crawl_target.id`.
  * `Article.column_id` / `.category` / `.source` → replaced by `target_id`,
    `channel_name`, `content_category`, `source_raw`.
"""
from __future__ import annotations

import logging
import re
from datetime import date, datetime
from typing import Iterable, Sequence

from sqlalchemy import select
from sqlalchemy.orm import Session

from govcrawler.adapters.contract import CrawlItem
from govcrawler.models import Article, ArticleStandardMeta, Attachment, CrawlLog
from govcrawler.repositories import sites


log = logging.getLogger(__name__)


# Column-length limits from the article schema. Mirrors the MySQL DDL —
# keep in sync if ALTER TABLE changes column widths. content_text is TEXT
# (utf8mb4 → ~16K char hard cap before MySQL 1406 errors); we truncate
# more aggressively to leave headroom for chinese chars (4 bytes each).
_VARCHAR_LIMITS = {
    "native_post_id": 64,
    "source_raw": 500,
    "channel_name": 200,
    "channel_path": 1000,
    "content_category": 100,
    "content_subcategory": 100,
    "index_no": 200,
    "doc_no": 200,
    "publisher": 500,
    "topic_words": 500,
    "open_category": 200,
    # TEXT columns — keep some headroom under 64KB / 4 = 16K chars.
    # 14000 leaves ~10% slack for trailing notes we may append.
    "content_text": 14000,
    "title": 4000,
}

# Strip NULL bytes (MySQL 1366 trigger) and unpaired UTF-16 surrogates which
# can sneak in via mojibake'd source HTML and also produce 1366.
_FORBIDDEN_BYTE_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]|[\ud800-\udfff]")


def _sanitize_str(s: str | None, *, max_len: int | None = None,
                  field: str = "?") -> str | None:
    """Defang a string before INSERT:
      • drop NULL bytes / control chars / surrogate halves (MySQL 1366),
      • truncate to max_len (MySQL 1406),
      • emit a single WARN log when either trim happens so ops can see
        exactly which field a row was rejected on.
    """
    if s is None:
        return None
    if not isinstance(s, str):
        return s
    cleaned = _FORBIDDEN_BYTE_RE.sub("", s)
    truncated = False
    if max_len is not None and len(cleaned) > max_len:
        cleaned = cleaned[:max_len]
        truncated = True
    if cleaned != s:
        if truncated:
            log.warning(
                "sanitize: field=%s truncated %d→%d chars (orig head=%r)",
                field, len(s), len(cleaned), s[:60]
            )
        else:
            log.warning(
                "sanitize: field=%s stripped %d forbidden chars",
                field, len(s) - len(cleaned)
            )
    return cleaned


def _sanitize_item_inplace(item: CrawlItem) -> None:
    """Apply _sanitize_str to every string-shaped field on a CrawlItem before
    INSERT. Mutates in place. Pydantic v2 models with .model_copy() would
    be cleaner but adapters already pass items as plain attribute holders,
    and we'd rather not allocate a copy on every successful insert path.
    """
    for field, limit in _VARCHAR_LIMITS.items():
        cur = getattr(item, field, None)
        if cur is None:
            continue
        cleaned = _sanitize_str(cur, max_len=limit, field=field)
        if cleaned != cur:
            try:
                setattr(item, field, cleaned)
            except Exception:
                pass  # frozen pydantic — fall through, kwargs build below
    # url is unbounded (TEXT) but might still carry control chars
    if getattr(item, "url", None):
        cleaned = _sanitize_str(item.url, field="url")
        if cleaned != item.url:
            try:
                setattr(item, "url", cleaned)
            except Exception:
                pass


def _sanitize_article_kwargs_inplace(kwargs: dict) -> None:
    """Apply the same MySQL-safe string cleanup to the low-level insert path.

    YAML/detail-only crawls don't build a CrawlItem, so they call
    insert_article(**kwargs) directly. Keep that path protected too.
    """
    for field, limit in _VARCHAR_LIMITS.items():
        cur = kwargs.get(field)
        if cur is None:
            continue
        cleaned = _sanitize_str(cur, max_len=limit, field=field)
        if cleaned != cur:
            kwargs[field] = cleaned
    if kwargs.get("url"):
        kwargs["url"] = _sanitize_str(kwargs["url"], field="url")


# ---------------------------------------------------------------------------
# read side
# ---------------------------------------------------------------------------
def get_article_by_url_hash(session: Session, url_hash: str) -> Article | None:
    return session.scalar(select(Article).where(Article.url_hash == url_hash))


def get_article_by_native_id(
    session: Session, *, site_pk: int, native_post_id: str
) -> Article | None:
    """Look up by (site_id, native_post_id) — the primary CMS-side dedup key."""
    return session.scalar(
        select(Article).where(
            Article.site_id == site_pk,
            Article.native_post_id == native_post_id,
        )
    )


# ---------------------------------------------------------------------------
# article insert — contract-driven
# ---------------------------------------------------------------------------
# fields that map 1:1 from CrawlItem → Article column
_CONTRACT_FIELDS: tuple[str, ...] = (
    "native_post_id",
    "url",
    "url_hash",
    "title",
    "publish_time",
    "source_raw",
    "publisher",
    "content_text",
    "raw_html_path",
    "text_path",
    "channel_name",
    "channel_path",
    "content_category",
    "content_subcategory",
    "index_no",
    "doc_no",
    "publish_date",
    "effective_date",
    "is_effective",
    "expiry_date",
    "topic_words",
    "open_category",
    "metadata_json",
    "has_attachment",
)


def insert_article_from_contract(
    session: Session, item: CrawlItem, *, site_pk: int | None = None
) -> Article:
    """Project a Pydantic `CrawlItem` into an `Article` row and insert it.

    `site_pk` is the int FK target (crawl_site.id). If omitted we resolve it
    by looking up `item.site_id` (which is the site_code) in crawl_site.
    """
    if site_pk is None:
        site_row = sites.get_by_code(session, item.site_id)
        if site_row is None:
            raise ValueError(
                f"crawl_site not found for site_code={item.site_id!r}; "
                "run yaml-sync or create the row first"
            )
        site_pk = site_row.id

    # Defang the item before kwargs are read out so MySQL 1366 (forbidden
    # chars) / 1406 (data too long) failures get truncated to a recoverable
    # row + a warning instead of a failed crawl_log entry.
    _sanitize_item_inplace(item)

    kwargs: dict = {name: getattr(item, name) for name in _CONTRACT_FIELDS}
    kwargs["site_id"] = site_pk
    kwargs["target_id"] = item.target_id
    kwargs["dept_id"] = item.dept_id
    kwargs["status"] = item.status.value
    kwargs["fetch_strategy"] = item.fetch_strategy.value if item.fetch_strategy else None

    row = Article(**kwargs)
    session.add(row)
    session.flush()
    return row


def insert_article(session: Session, **kwargs) -> Article:
    """Low-level insert — passes kwargs through to `Article(...)`.

    Prefer `insert_article_from_contract` in the pipeline; this stays for
    tests and edge callers that have already built a dict.
    """
    _sanitize_article_kwargs_inplace(kwargs)
    row = Article(**kwargs)
    session.add(row)
    session.flush()
    return row


def _parse_date(value: object) -> date | None:
    if isinstance(value, date):
        return value
    s = str(value or "").strip()
    m = re.search(r"\d{4}-\d{1,2}-\d{1,2}", s)
    if not m:
        return None
    try:
        return datetime.strptime(m.group(0), "%Y-%m-%d").date()
    except ValueError:
        return None


def _split_standard_codes(value: object) -> list[str] | None:
    s = str(value or "").strip()
    if not s:
        return None
    parts = [p.strip() for p in re.split(r"[;；,，、]+", s) if p.strip()]
    return parts or None


def _standard_meta_payload(article: Article) -> dict | None:
    """Extract standards-domain metadata from an Article.

    The canonical article table stores common document fields. Standards have
    their own useful facets (ICS/CCS, implementation date, technical committee),
    so keep them in a side table rather than widening `article` for one domain.
    """
    metadata = article.metadata_json or {}
    public_meta = metadata.get("public_meta") or {}
    if not isinstance(public_meta, dict):
        return None
    if not (public_meta.get("std_no") or public_meta.get("openstd_hcno")):
        return None
    standard_no = str(public_meta.get("std_no") or article.doc_no or "").strip() or None
    return {
        "article_id": article.id,
        "standard_no": standard_no,
        "chinese_title": str(public_meta.get("title") or article.title or "").strip() or None,
        "english_title": str(public_meta.get("english_title") or "").strip() or None,
        "standard_status": (
            str(public_meta.get("standard_status") or article.open_category or "").strip()
            or None
        ),
        "ccs_codes": _split_standard_codes(public_meta.get("ccs")),
        "ics_codes": _split_standard_codes(public_meta.get("ics") or article.topic_words),
        "publish_date": _parse_date(public_meta.get("publish_date") or article.publish_date),
        "implementation_date": _parse_date(
            public_meta.get("effective_date") or article.effective_date
        ),
        "competent_department": str(public_meta.get("主管部门") or "").strip() or None,
        "technical_committee": str(public_meta.get("归口部门") or "").strip() or None,
        "issuing_body": str(public_meta.get("发布单位") or article.publisher or "").strip() or None,
        "standard_type": str(
            metadata.get("std_type")
            or article.content_subcategory
            or article.channel_name
            or ""
        ).strip() or None,
        "raw_meta_json": public_meta,
    }


def upsert_article_standard_meta(session: Session, article: Article) -> ArticleStandardMeta | None:
    payload = _standard_meta_payload(article)
    if payload is None:
        return None
    row = session.scalar(
        select(ArticleStandardMeta).where(ArticleStandardMeta.article_id == article.id)
    )
    if row is None:
        row = ArticleStandardMeta(**payload)
        session.add(row)
    else:
        for key, value in payload.items():
            if key != "article_id":
                setattr(row, key, value)
    session.flush()
    return row


# ---------------------------------------------------------------------------
# attachments
# ---------------------------------------------------------------------------
def insert_attachments(
    session: Session, article_id: int, records: Sequence[dict]
) -> list[Attachment]:
    out: list[Attachment] = []
    for r in records:
        att = Attachment(article_id=article_id, **r)
        session.add(att)
        out.append(att)
    session.flush()
    return out


# ---------------------------------------------------------------------------
# crawl_log — now keyed by target_id, not (site_id, column_id)
# ---------------------------------------------------------------------------
def insert_crawl_log(
    session: Session,
    *,
    site_pk: int | None,
    target_id: int | None,
    article_url: str | None,
    strategy: str,
    http_status: int | None,
    duration_ms: int,
    success: bool,
    error_msg: str | None = None,
) -> CrawlLog:
    row = CrawlLog(
        site_id=site_pk,
        target_id=target_id,
        article_url=article_url,
        strategy=strategy,
        http_status=http_status,
        duration_ms=duration_ms,
        success=success,
        error_msg=error_msg,
    )
    session.add(row)
    session.flush()
    return row
