"""Article search + delete."""
from __future__ import annotations

import logging
from datetime import date, datetime
from pathlib import Path, PurePosixPath
from typing import Any

from fastapi import BackgroundTasks, Body, Depends, HTTPException, Query
from fastapi.responses import FileResponse, Response
from sqlalchemy import desc, func, or_, select
from sqlalchemy.orm import Session

from govcrawler import openstd_download
from govcrawler.models import (
    Article,
    ArticleStandardMeta,
    Attachment,
    CrawlSite,
    CrawlTarget,
    LocalDepartment,
)
from govcrawler.settings import get_settings
from govcrawler.storage.paths import to_os_path

from ._common import _session, router

log = logging.getLogger(__name__)


def _standard_meta_payload(m: ArticleStandardMeta | None) -> dict[str, Any] | None:
    if m is None:
        return None
    return {
        "standard_no": m.standard_no,
        "chinese_title": m.chinese_title,
        "english_title": m.english_title,
        "standard_status": m.standard_status,
        "ccs_codes": m.ccs_codes or [],
        "ics_codes": m.ics_codes or [],
        "publish_date": m.publish_date.isoformat() if m.publish_date else None,
        "implementation_date": (
            m.implementation_date.isoformat() if m.implementation_date else None
        ),
        "competent_department": m.competent_department,
        "technical_committee": m.technical_committee,
        "issuing_body": m.issuing_body,
        "standard_type": m.standard_type,
        "raw_meta_json": m.raw_meta_json,
    }


def _articles_base_stmt():
    return (
        select(
            Article,
            CrawlSite.site_code.label("site_code"),
            CrawlTarget.target_code.label("target_code"),
            LocalDepartment.dept_id.label("dept_id"),
            LocalDepartment.dept_name.label("dept_name"),
            LocalDepartment.short_name.label("dept_short_name"),
        )
        .select_from(Article)
        .join(CrawlSite, CrawlSite.id == Article.site_id, isouter=True)
        .join(CrawlTarget, CrawlTarget.id == Article.target_id, isouter=True)
        .join(LocalDepartment, LocalDepartment.dept_id == Article.dept_id, isouter=True)
    )


def _article_payload(
    a: Article, row: Any | None = None, *, include_heavy: bool = False,
) -> dict[str, Any]:
    payload = {
        "id": a.id,
        "site_id": a.site_id,
        "site_code": getattr(row, "site_code", None),
        "target_id": a.target_id,
        "target_code": getattr(row, "target_code", None),
        "dept_id": getattr(row, "dept_id", a.dept_id),
        "dept_name": getattr(row, "dept_name", None),
        "dept_short_name": getattr(row, "dept_short_name", None),
        "native_post_id": a.native_post_id,
        "url": a.url,
        "url_hash": a.url_hash,
        "title": a.title,
        "publish_time": a.publish_time.isoformat() if a.publish_time else None,
        "source_raw": a.source_raw,
        "channel_name": a.channel_name,
        "channel_path": a.channel_path,
        "content_category": a.content_category,
        "content_subcategory": a.content_subcategory,
        "index_no": a.index_no,
        "doc_no": a.doc_no,
        "publisher": a.publisher,
        "publish_date": a.publish_date.isoformat() if a.publish_date else None,
        "effective_date": a.effective_date.isoformat() if a.effective_date else None,
        "is_effective": a.is_effective,
        "expiry_date": a.expiry_date.isoformat() if a.expiry_date else None,
        "topic_words": a.topic_words,
        "open_category": a.open_category,
        "content_text_len": len(a.content_text or ""),
        "raw_html_path": a.raw_html_path,
        "text_path": a.text_path,
        "has_attachment": a.has_attachment,
        "status": a.status,
        "fetch_strategy": a.fetch_strategy,
        "fetched_at": a.fetched_at.isoformat() if a.fetched_at else None,
        "exported_to_rag_at": a.exported_to_rag_at.isoformat()
        if a.exported_to_rag_at
        else None,
        "rag_export_status": a.rag_export_status,
        "rag_export_started_at": a.rag_export_started_at.isoformat()
        if a.rag_export_started_at
        else None,
        "rag_export_finished_at": a.rag_export_finished_at.isoformat()
        if a.rag_export_finished_at
        else None,
        "rag_export_error": a.rag_export_error,
        "rag_export_task_ids": a.rag_export_task_ids or [],
        "created_at": a.created_at.isoformat() if a.created_at else None,
        "updated_at": a.updated_at.isoformat() if a.updated_at else None,
    }
    if include_heavy:
        payload["metadata_json"] = a.metadata_json
        payload["content_text"] = a.content_text
        payload["standard_meta"] = _standard_meta_payload(a.standard_meta)
    return payload


def _append_contains_filter(filters: list[Any], column: Any, value: str | None) -> None:
    if value:
        filters.append(column.ilike(f"%{value}%"))


def _article_filters(
    *,
    q: str | None = None,
    article_id: int | None = None,
    site: str | None = None,
    target: str | None = None,
    dept: int | None = None,
    status: str | None = None,
    has_attachment: bool | None = None,
    is_effective: bool | None = None,
    native_post_id: str | None = None,
    url: str | None = None,
    url_hash: str | None = None,
    title: str | None = None,
    source_raw: str | None = None,
    channel_name: str | None = None,
    channel_path: str | None = None,
    content_category: str | None = None,
    content_subcategory: str | None = None,
    index_no: str | None = None,
    doc_no: str | None = None,
    publisher: str | None = None,
    topic_words: str | None = None,
    open_category: str | None = None,
    fetch_strategy: str | None = None,
    rag_export_status: str | None = None,
    publish_date_from: date | None = None,
    publish_date_to: date | None = None,
    effective_date_from: date | None = None,
    effective_date_to: date | None = None,
    expiry_date_from: date | None = None,
    expiry_date_to: date | None = None,
    fetched_from: datetime | None = None,
    fetched_to: datetime | None = None,
    created_from: datetime | None = None,
    created_to: datetime | None = None,
    since: datetime | None = None,
) -> list[Any]:
    filters: list[Any] = []
    if q:
        like = f"%{q}%"
        filters.append(or_(Article.title.ilike(like), Article.content_text.ilike(like)))
    if article_id is not None:
        filters.append(Article.id == article_id)
    if site:
        filters.append(CrawlSite.site_code == site)
    if target:
        filters.append(CrawlTarget.target_code == target)
    if dept is not None:
        filters.append(Article.dept_id == dept)
    if status:
        filters.append(Article.status == status)
    if has_attachment is not None:
        filters.append(Article.has_attachment.is_(has_attachment))
    if is_effective is not None:
        filters.append(Article.is_effective.is_(is_effective))
    if url_hash:
        filters.append(Article.url_hash == url_hash)
    if fetch_strategy:
        filters.append(Article.fetch_strategy == fetch_strategy)
    if rag_export_status:
        filters.append(Article.rag_export_status == rag_export_status)

    _append_contains_filter(filters, Article.native_post_id, native_post_id)
    _append_contains_filter(filters, Article.url, url)
    _append_contains_filter(filters, Article.title, title)
    _append_contains_filter(filters, Article.source_raw, source_raw)
    _append_contains_filter(filters, Article.channel_name, channel_name)
    _append_contains_filter(filters, Article.channel_path, channel_path)
    _append_contains_filter(filters, Article.content_category, content_category)
    _append_contains_filter(filters, Article.content_subcategory, content_subcategory)
    _append_contains_filter(filters, Article.index_no, index_no)
    _append_contains_filter(filters, Article.doc_no, doc_no)
    _append_contains_filter(filters, Article.publisher, publisher)
    _append_contains_filter(filters, Article.topic_words, topic_words)
    _append_contains_filter(filters, Article.open_category, open_category)

    if publish_date_from:
        filters.append(Article.publish_date >= publish_date_from)
    if publish_date_to:
        filters.append(Article.publish_date <= publish_date_to)
    if effective_date_from:
        filters.append(Article.effective_date >= effective_date_from)
    if effective_date_to:
        filters.append(Article.effective_date <= effective_date_to)
    if expiry_date_from:
        filters.append(Article.expiry_date >= expiry_date_from)
    if expiry_date_to:
        filters.append(Article.expiry_date <= expiry_date_to)
    if since:
        filters.append(Article.fetched_at >= since)
    if fetched_from:
        filters.append(Article.fetched_at >= fetched_from)
    if fetched_to:
        filters.append(Article.fetched_at <= fetched_to)
    if created_from:
        filters.append(Article.created_at >= created_from)
    if created_to:
        filters.append(Article.created_at <= created_to)
    return filters


@router.get("/api/articles/search")
def search_articles(
    q: str | None = Query(None, description="substring of title or content_text"),
    article_id: int | None = Query(None, description="article.id"),
    site: str | None = Query(None, description="crawl_site.site_code"),
    target: str | None = Query(None, description="crawl_target.target_code"),
    dept: int | None = Query(None, description="local_department.dept_id (article.dept_id)"),
    status: str | None = Query(None, description="ready | failed | raw"),
    has_attachment: bool | None = Query(None),
    is_effective: bool | None = Query(None),
    native_post_id: str | None = Query(None),
    url: str | None = Query(None),
    url_hash: str | None = Query(None),
    title: str | None = Query(None),
    source_raw: str | None = Query(None),
    channel_name: str | None = Query(None),
    channel_path: str | None = Query(None),
    content_category: str | None = Query(None),
    content_subcategory: str | None = Query(None),
    index_no: str | None = Query(None),
    doc_no: str | None = Query(None),
    publisher: str | None = Query(None),
    topic_words: str | None = Query(None),
    open_category: str | None = Query(None),
    fetch_strategy: str | None = Query(None),
    rag_export_status: str | None = Query(None),
    publish_date_from: date | None = None,
    publish_date_to: date | None = None,
    effective_date_from: date | None = None,
    effective_date_to: date | None = None,
    expiry_date_from: date | None = None,
    expiry_date_to: date | None = None,
    fetched_from: datetime | None = None,
    fetched_to: datetime | None = None,
    created_from: datetime | None = None,
    created_to: datetime | None = None,
    since: datetime | None = None,
    limit: int = Query(20, ge=1, le=200),
    offset: int = Query(0, ge=0),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Server-side pagination. UI keeps `limit` small (default 20 = one page)
    and pages by varying `offset`. Returns `total` so the front-end can
    render correct pagination controls.

    Historical context: the UI previously asked for limit=5000 and did
    client-side slicing. Once total articles crossed 5000 every record
    past the cap silently disappeared from the UI. limit is now capped
    at 200 to enforce real pagination."""
    # Build the WHERE clause once, apply to both COUNT and SELECT.
    filters = _article_filters(
        q=q, article_id=article_id, site=site, target=target, dept=dept,
        status=status, has_attachment=has_attachment, is_effective=is_effective,
        native_post_id=native_post_id, url=url, url_hash=url_hash, title=title,
        source_raw=source_raw, channel_name=channel_name, channel_path=channel_path,
        content_category=content_category, content_subcategory=content_subcategory,
        index_no=index_no, doc_no=doc_no, publisher=publisher,
        topic_words=topic_words, open_category=open_category,
        fetch_strategy=fetch_strategy, rag_export_status=rag_export_status,
        publish_date_from=publish_date_from, publish_date_to=publish_date_to,
        effective_date_from=effective_date_from, effective_date_to=effective_date_to,
        expiry_date_from=expiry_date_from, expiry_date_to=expiry_date_to,
        fetched_from=fetched_from, fetched_to=fetched_to,
        created_from=created_from, created_to=created_to, since=since,
    )

    # Total count over the filtered set. Joins are needed when filters
    # touch CrawlSite/CrawlTarget; the LEFT OUTER joins are safe (no
    # duplication) and Article.id is unique.
    count_stmt = (
        select(func.count(Article.id))
        .select_from(Article)
        .join(CrawlSite, CrawlSite.id == Article.site_id, isouter=True)
        .join(CrawlTarget, CrawlTarget.id == Article.target_id, isouter=True)
    )
    for f in filters:
        count_stmt = count_stmt.where(f)
    total = s.execute(count_stmt).scalar() or 0

    stmt = _articles_base_stmt().order_by(desc(Article.fetched_at)).offset(offset).limit(limit)
    for f in filters:
        stmt = stmt.where(f)
    rows = s.execute(stmt).all()
    return {
        "count": len(rows),
        "total": total,
        "limit": limit,
        "offset": offset,
        "items": [
            {
                **_article_payload(r.Article, r),
            }
            for r in rows
        ],
    }


@router.get("/api/articles/search/ids")
def search_articles_ids(
    q: str | None = Query(None),
    article_id: int | None = Query(None),
    site: str | None = Query(None),
    target: str | None = Query(None),
    dept: int | None = Query(None),
    status: str | None = Query(None),
    has_attachment: bool | None = Query(None),
    is_effective: bool | None = Query(None),
    native_post_id: str | None = Query(None),
    url: str | None = Query(None),
    url_hash: str | None = Query(None),
    title: str | None = Query(None),
    source_raw: str | None = Query(None),
    channel_name: str | None = Query(None),
    channel_path: str | None = Query(None),
    content_category: str | None = Query(None),
    content_subcategory: str | None = Query(None),
    index_no: str | None = Query(None),
    doc_no: str | None = Query(None),
    publisher: str | None = Query(None),
    topic_words: str | None = Query(None),
    open_category: str | None = Query(None),
    fetch_strategy: str | None = Query(None),
    rag_export_status: str | None = Query(None),
    publish_date_from: date | None = None,
    publish_date_to: date | None = None,
    effective_date_from: date | None = None,
    effective_date_to: date | None = None,
    expiry_date_from: date | None = None,
    expiry_date_to: date | None = None,
    fetched_from: datetime | None = None,
    fetched_to: datetime | None = None,
    created_from: datetime | None = None,
    created_to: datetime | None = None,
    since: datetime | None = None,
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Lightweight IDs-only counterpart to /api/articles/search. Used by the
    UI's "select all matching filter" button — paginated rendering means
    the row set only has the current page, but bulk-delete should be able
    to act on every record matching the filter regardless of page."""
    stmt = (
        select(Article.id)
        .select_from(Article)
        .join(CrawlSite, CrawlSite.id == Article.site_id, isouter=True)
        .join(CrawlTarget, CrawlTarget.id == Article.target_id, isouter=True)
        .order_by(desc(Article.fetched_at))
    )
    for f in _article_filters(
        q=q, article_id=article_id, site=site, target=target, dept=dept,
        status=status, has_attachment=has_attachment, is_effective=is_effective,
        native_post_id=native_post_id, url=url, url_hash=url_hash, title=title,
        source_raw=source_raw, channel_name=channel_name, channel_path=channel_path,
        content_category=content_category, content_subcategory=content_subcategory,
        index_no=index_no, doc_no=doc_no, publisher=publisher,
        topic_words=topic_words, open_category=open_category,
        fetch_strategy=fetch_strategy, rag_export_status=rag_export_status,
        publish_date_from=publish_date_from, publish_date_to=publish_date_to,
        effective_date_from=effective_date_from, effective_date_to=effective_date_to,
        expiry_date_from=expiry_date_from, expiry_date_to=expiry_date_to,
        fetched_from=fetched_from, fetched_to=fetched_to,
        created_from=created_from, created_to=created_to, since=since,
    ):
        stmt = stmt.where(f)
    ids = [row[0] for row in s.execute(stmt).all()]
    return {"ids": ids, "total": len(ids)}


def retry_failed_articles_to_rag(article_ids: list[int]) -> None:
    """Run selected RAG import jobs sequentially in a FastAPI background task."""
    from govcrawler.rag.exporter import RagExporter

    exporter = RagExporter()
    try:
        for article_id in article_ids:
            exporter.export_pending(article_id=article_id)
    finally:
        exporter.close()


def import_all_pending_articles_to_rag(article_ids: list[int]) -> None:
    """Run one full-table RAG import snapshot in the background."""
    from govcrawler.rag.exporter import RagExporter

    exporter = RagExporter()
    try:
        for article_id in article_ids:
            exporter.export_pending(article_id=article_id)
    finally:
        exporter.close()


@router.post("/api/articles/rag-import-all")
def import_all_rag_articles(
    background: BackgroundTasks,
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Queue all RAG-pending or RAG-failed ready articles for ingestion.

    This is intentionally not selection-based: the Articles page's "全部入库"
    button means the whole article table, not the checked rows or current page.
    """
    eligible_ids = [
        article_id
        for (article_id,) in (
            s.query(Article.id)
            .filter(
                Article.status == "ready",
                Article.exported_to_rag_at.is_(None),
                or_(
                    Article.rag_export_status.is_(None),
                    Article.rag_export_status.in_(("pending", "failed")),
                ),
            )
            .order_by(Article.fetched_at.asc(), Article.id.asc())
            .all()
        )
    ]
    eligible_count = len(eligible_ids)
    if eligible_ids:
        background.add_task(import_all_pending_articles_to_rag, eligible_ids)
    return {
        "queued": bool(eligible_ids),
        "eligible_count": eligible_count,
    }


@router.post("/api/articles/rag-retry")
def retry_failed_rag_articles(
    background: BackgroundTasks,
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Queue selected unexported articles for RAG ingestion.

    Only ready articles that are not already exported and whose RAG export
    state is empty/pending/failed are eligible. Other selected ids are reported
    as skipped so the UI can send the full current selection safely.
    """
    article_ids = payload.get("article_ids") or []
    if not isinstance(article_ids, list) or not article_ids:
        raise HTTPException(400, "article_ids must be a non-empty array")
    if len(article_ids) > 200:
        raise HTTPException(400, "article_ids must be <= 200")

    normalized_ids: list[int] = []
    for raw_id in article_ids:
        try:
            normalized_ids.append(int(raw_id))
        except (TypeError, ValueError):
            raise HTTPException(400, "article_ids must contain integers")
    normalized_ids = list(dict.fromkeys(normalized_ids))

    rows = (
        s.query(Article)
        .filter(Article.id.in_(normalized_ids))
        .all()
    )
    by_id = {row.id: row for row in rows}
    eligible: list[int] = []
    skipped: list[dict[str, Any]] = []
    for article_id in normalized_ids:
        row = by_id.get(article_id)
        if row is None:
            skipped.append({"article_id": article_id, "reason": "not_found"})
            continue
        if row.status != "ready":
            skipped.append({"article_id": article_id, "reason": "article_not_ready"})
            continue
        if row.exported_to_rag_at is not None:
            skipped.append({"article_id": article_id, "reason": "already_exported"})
            continue
        if row.rag_export_status not in {None, "failed", "pending"}:
            skipped.append({"article_id": article_id, "reason": "rag_status_not_importable"})
            continue
        eligible.append(article_id)
        row.rag_export_status = "pending"
        row.rag_export_started_at = None
        row.rag_export_finished_at = None
        row.rag_export_error = None
        row.rag_export_task_ids = []

    if eligible:
        s.commit()
        background.add_task(retry_failed_articles_to_rag, eligible)
    else:
        s.rollback()

    return {
        "queued_count": len(eligible),
        "skipped_count": len(skipped),
        "article_ids": eligible,
        "skipped": skipped,
    }


def _safe_delete_file(rel_path: str | None, *, data_dir: Path) -> bool:
    """Delete a file under data_dir if it sits inside it. Path-traversal safe."""
    if not rel_path:
        return False
    abs_path = to_os_path(data_dir, PurePosixPath(rel_path))
    try:
        abs_path.resolve().relative_to(data_dir.resolve())
    except Exception:
        return False
    if not abs_path.exists():
        return False
    try:
        abs_path.unlink()
        return True
    except OSError as e:
        log.warning("delete file failed path=%s err=%s", abs_path, e)
        return False


@router.delete("/api/articles/{article_id}")
def delete_article(article_id: int, s: Session = Depends(_session)) -> dict[str, Any]:
    """Hard-delete an article + all its attachments (DB rows + files on disk).

    The DB cascades: attachment.article_id has ondelete=CASCADE, so the
    attachment rows go automatically. We additionally remove the on-disk
    files (raw_html, articles_text, attachment payloads) so a re-crawl of
    the same URL doesn't trip the url_hash UNIQUE check while leaving
    orphan files lying around.
    """
    a = s.get(Article, article_id)
    if a is None:
        raise HTTPException(404, "article not found")
    data_dir = Path(get_settings().data_dir)

    files_removed: list[str] = []
    # Attachment files first (still readable via the relationship)
    for att in a.attachments:
        if _safe_delete_file(att.file_path, data_dir=data_dir):
            files_removed.append(att.file_path)
    # Article-level files
    if _safe_delete_file(a.raw_html_path, data_dir=data_dir):
        files_removed.append(a.raw_html_path)
    if _safe_delete_file(a.text_path, data_dir=data_dir):
        files_removed.append(a.text_path)

    s.delete(a)  # cascades to attachment rows via ondelete=CASCADE
    s.commit()
    return {
        "deleted": True,
        "article_id": article_id,
        "attachments_removed": len(files_removed),
        "files_removed": files_removed,
    }


@router.post("/api/articles/bulk-delete")
def bulk_delete_articles(
    payload: dict[str, Any] = Body(...),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    """Hard-delete a batch of articles (and their attachments + on-disk
    files). Used by the 文章列表 multi-select action. Cap 1000 / call.

    Body shape (one of):
      {"article_ids": [1,2,3,...]}     # explicit ids
      {"target_code": "..."}           # delete every article tagged with
                                        # this crawl_target (handy for
                                        # 'wipe and re-run with --force')
      {"site_code": "..."}             # delete every article from a site
    """
    article_ids = payload.get("article_ids") or []
    target_code = (payload.get("target_code") or "").strip() or None
    site_code = (payload.get("site_code") or "").strip() or None

    if not article_ids and not target_code and not site_code:
        raise HTTPException(400, "provide article_ids, target_code, or site_code")
    if article_ids and not isinstance(article_ids, list):
        raise HTTPException(400, "article_ids must be an array")

    q = s.query(Article)
    if article_ids:
        if len(article_ids) > 1000:
            raise HTTPException(400, "article_ids must be <= 1000")
        q = q.filter(Article.id.in_(article_ids))
    if target_code:
        from govcrawler.models import CrawlTarget
        t = s.query(CrawlTarget).filter_by(target_code=target_code).first()
        if t is None:
            raise HTTPException(404, f"target not found: {target_code}")
        q = q.filter(Article.target_id == t.id)
    if site_code:
        from govcrawler.models import CrawlSite
        site = s.query(CrawlSite).filter_by(site_code=site_code).first()
        if site is None:
            raise HTTPException(404, f"site not found: {site_code}")
        q = q.filter(Article.site_id == site.id)

    rows = q.all()
    if len(rows) > 1000:
        raise HTTPException(400, f"refusing to delete {len(rows)} articles in one call (>1000) — narrow the filter")

    data_dir = Path(get_settings().data_dir)
    files_removed = 0
    for a in rows:
        for att in a.attachments:
            if _safe_delete_file(att.file_path, data_dir=data_dir):
                files_removed += 1
        if _safe_delete_file(a.raw_html_path, data_dir=data_dir):
            files_removed += 1
        if _safe_delete_file(a.text_path, data_dir=data_dir):
            files_removed += 1
        s.delete(a)
    s.commit()
    return {
        "deleted_count": len(rows),
        "files_removed": files_removed,
    }


# ---------------------------------------------------------------------------
# Article detail proxies for the admin UI.
# ---------------------------------------------------------------------------
# These routes mirror the public RAG handlers in govcrawler/api/app.py but
# live under /admin/* so the dashboard's modal / "查看原始 HTML" / 附件下载
# links inherit the admin Basic Auth gate. Without these mirrors the modal
# called the public path /api/articles/<id> directly — that's protected by
# _RagTokenGate (Bearer scheme), and browser-cached Basic Auth from /admin
# never auto-flows to a different scheme realm, so the UI got 401 even
# while the operator was logged in. Same logic, just registered under the
# admin router.

def _serve_admin_article_file(rel_path: str | None, media_type: str,
                              download_name: str) -> FileResponse:
    if not rel_path:
        raise HTTPException(404, "path not set on article")
    data_dir = Path(get_settings().data_dir)
    abs_path = to_os_path(data_dir, PurePosixPath(rel_path))
    try:
        abs_path.resolve().relative_to(data_dir.resolve())
    except Exception:
        raise HTTPException(400, "invalid path")
    if not abs_path.exists():
        raise HTTPException(404, "file missing on disk")
    return FileResponse(
        path=str(abs_path), filename=download_name, media_type=media_type,
    )


@router.get("/api/articles/{article_id}")
def admin_get_article(article_id: int, s: Session = Depends(_session)) -> dict[str, Any]:
    a = s.get(Article, article_id)
    if a is None:
        raise HTTPException(404, "article not found")
    _ = a.site, a.target  # eager-load codes
    site_code = a.site.site_code if a.site_id and a.site else None
    target_code = a.target.target_code if a.target_id and a.target else None
    payload = _article_payload(a, include_heavy=True)
    payload.update({"site_code": site_code, "target_code": target_code})
    payload["attachments"] = [
            {
                "id": att.id,
                "file_name": att.file_name,
                "file_ext": att.file_ext,
                "size_bytes": att.size_bytes,
                "file_hash": att.file_hash,
            }
            for att in a.attachments
        ]
    return payload


def _openstd_hcno(a: Article) -> str | None:
    metadata = a.metadata_json if isinstance(a.metadata_json, dict) else {}
    public_meta = metadata.get("public_meta") if isinstance(metadata, dict) else {}
    candidates = [
        public_meta.get("openstd_hcno") if isinstance(public_meta, dict) else None,
        metadata.get("openstd_hcno") if isinstance(metadata, dict) else None,
        a.native_post_id,
    ]
    for value in candidates:
        if isinstance(value, str) and value.strip():
            return value.strip().upper()
    return None


def _openstd_fallback_filename(a: Article) -> str:
    stem = " ".join(x for x in [a.doc_no, a.title] if x).strip()
    return f"{stem or _openstd_hcno(a) or 'openstd-standard'}.pdf"


@router.post("/api/articles/{article_id}/openstd-download/start")
def admin_openstd_download_start(
    article_id: int, s: Session = Depends(_session),
) -> dict[str, Any]:
    a = s.get(Article, article_id)
    if a is None:
        raise HTTPException(404, "article not found")
    site_code = a.site.site_code if a.site_id and a.site else None
    if site_code != "openstd_samr":
        raise HTTPException(400, "openstd download is only supported for openstd_samr")
    hcno = _openstd_hcno(a)
    if not hcno:
        raise HTTPException(400, "openstd hcno missing")
    try:
        sess = openstd_download.start_session(hcno, article_id=article_id)
    except ValueError as e:
        raise HTTPException(400, str(e))
    except Exception as e:
        raise HTTPException(502, f"failed to start openstd download session: {e}")
    return {
        "session_id": sess.session_id,
        "captcha_url": f"/admin/api/openstd-download-sessions/{sess.session_id}/captcha",
        "expires_at": datetime.fromtimestamp(sess.expires_at).isoformat(),
    }


@router.get("/api/openstd-download-sessions/{session_id}/captcha")
def admin_openstd_download_captcha(session_id: str) -> Response:
    try:
        sess = openstd_download.refresh_captcha(session_id)
    except KeyError:
        raise HTTPException(404, "openstd download session expired")
    except Exception as e:
        raise HTTPException(502, f"failed to refresh openstd captcha: {e}")
    return Response(content=sess.captcha_bytes, media_type=sess.captcha_content_type)


@router.post("/api/openstd-download-sessions/{session_id}/submit")
def admin_openstd_download_submit(
    session_id: str,
    payload: dict[str, Any] = Body(default_factory=dict),
    s: Session = Depends(_session),
) -> dict[str, Any]:
    captcha = str(payload.get("captcha") or "").strip()
    try:
        sess = openstd_download.get_session(session_id)
    except KeyError:
        raise HTTPException(404, "openstd download session expired")
    if sess.article_id is None:
        raise HTTPException(400, "openstd download session is missing article binding")
    a = s.get(Article, sess.article_id)
    if a is None:
        openstd_download.close_session(session_id)
        raise HTTPException(404, "article not found")
    site_code = a.site.site_code if a.site_id and a.site else None
    target_code = a.target.target_code if a.target_id and a.target else None
    if site_code != "openstd_samr":
        openstd_download.close_session(session_id)
        raise HTTPException(400, "openstd download is only supported for openstd_samr")
    when = a.publish_time or a.fetched_at or datetime.utcnow()
    try:
        d = openstd_download.submit_captcha_and_download(
            session_id,
            captcha,
            site=site_code,
            column=target_code or "openstd_samr",
            when=when,
            article_key=f"a_{a.id}",
            fallback_name=_openstd_fallback_filename(a),
        )
    except openstd_download.OpenStdCaptchaInvalid:
        raise HTTPException(
            409,
            {
                "reason": "captcha_invalid",
                "captcha_url": f"/admin/api/openstd-download-sessions/{session_id}/captcha",
            },
        )
    except KeyError:
        raise HTTPException(404, "openstd download session expired")
    except Exception as e:
        raise HTTPException(502, f"openstd download failed: {e}")

    existing = s.scalar(
        select(Attachment).where(
            Attachment.article_id == a.id,
            Attachment.file_hash == d.file_hash,
        )
    )
    if existing is not None:
        att = existing
        try:
            new_path = to_os_path(Path(get_settings().data_dir), PurePosixPath(d.file_path))
            if str(new_path) != str(to_os_path(Path(get_settings().data_dir), PurePosixPath(att.file_path))):
                new_path.unlink(missing_ok=True)
        except Exception:
            pass
    else:
        att = Attachment(
            article_id=a.id,
            file_name=d.file_name,
            file_ext=d.file_ext,
            size_bytes=d.size_bytes,
            file_path=str(d.file_path),
            file_hash=d.file_hash,
        )
        s.add(att)
    a.has_attachment = True
    s.commit()
    return {
        "status": "ok",
        "article_id": a.id,
        "attachment": {
            "id": att.id,
            "file_name": att.file_name,
            "file_ext": att.file_ext,
            "size_bytes": att.size_bytes,
            "file_hash": att.file_hash,
        },
    }


@router.get("/api/articles/{article_id}/raw-html")
def admin_get_article_raw_html(
    article_id: int, s: Session = Depends(_session),
) -> FileResponse:
    a = s.get(Article, article_id)
    if a is None:
        raise HTTPException(404, "article not found")
    return _serve_admin_article_file(
        a.raw_html_path, "text/html; charset=utf-8", f"article-{article_id}.html",
    )


@router.get("/api/articles/{article_id}/text")
def admin_get_article_text(
    article_id: int, s: Session = Depends(_session),
) -> FileResponse:
    a = s.get(Article, article_id)
    if a is None:
        raise HTTPException(404, "article not found")
    return _serve_admin_article_file(
        a.text_path, "text/plain; charset=utf-8", f"article-{article_id}.txt",
    )


@router.get("/api/articles/{article_id}/attachments/{attachment_id}")
def admin_download_attachment(
    article_id: int, attachment_id: int, s: Session = Depends(_session),
) -> FileResponse:
    att = s.get(Attachment, attachment_id)
    if att is None or att.article_id != article_id:
        raise HTTPException(404, "attachment not found")
    if not att.file_path:
        raise HTTPException(404, "attachment file_path empty")
    data_dir = Path(get_settings().data_dir)
    abs_path = to_os_path(data_dir, PurePosixPath(att.file_path))
    try:
        abs_path.resolve().relative_to(data_dir.resolve())
    except Exception:
        raise HTTPException(400, "invalid path")
    if not abs_path.exists():
        raise HTTPException(404, "file missing on disk")
    return FileResponse(
        path=str(abs_path),
        filename=att.file_name or abs_path.name,
        media_type="application/octet-stream",
    )
