diff --git a/config/sites/gd_wjk.yaml b/config/sites/gd_wjk.yaml index 19380e2..7085177 100644 --- a/config/sites/gd_wjk.yaml +++ b/config/sites/gd_wjk.yaml @@ -34,10 +34,13 @@ default_column: publish_time: "meta[name='PubDate']::attr(content)" source: "meta[name='ContentSource']::attr(content)" content: "div.zw, div.TRS_Editor, div.TRS_UEDITOR, div.content" + # attachment_css is applied to the already-narrowed content_root + # (div.zw / div.TRS_Editor / ...). DON'T prefix with the parent + # selector — the search is relative to the picked content node. attachment_css: | - div.content a[href$='.pdf'], div.content a[href$='.doc'], div.content a[href$='.docx'], - div.content a[href$='.xls'], div.content a[href$='.xlsx'], div.content a[href$='.zip'], - div.content a[href$='.wps'], div.content a[href$='.rar'] + a[href$='.pdf'], a[href$='.doc'], a[href$='.docx'], + a[href$='.xls'], a[href$='.xlsx'], a[href$='.zip'], + a[href$='.wps'], a[href$='.rar'] columns: # 主入口:全部文件(带翻页),覆盖 1999 至今 ~4960 条 diff --git a/govcrawler/api/admin/articles.py b/govcrawler/api/admin/articles.py index ca49c6e..d556984 100644 --- a/govcrawler/api/admin/articles.py +++ b/govcrawler/api/admin/articles.py @@ -7,6 +7,7 @@ from pathlib import Path, PurePosixPath from typing import Any from fastapi import Body, Depends, HTTPException, Query +from fastapi.responses import FileResponse from sqlalchemy import desc, func, or_, select from sqlalchemy.orm import Session @@ -275,3 +276,125 @@ def bulk_delete_articles( "deleted_count": len(rows), "files_removed": files_removed, } + + +# --------------------------------------------------------------------------- +# Article detail proxies for the admin UI. +# --------------------------------------------------------------------------- +# These routes mirror the public RAG handlers in govcrawler/api/app.py but +# live under /admin/* so the dashboard's modal / "查看原始 HTML" / 附件下载 +# links inherit the admin Basic Auth gate. Without these mirrors the modal +# called the public path /api/articles/ directly — that's protected by +# _RagTokenGate (Bearer scheme), and browser-cached Basic Auth from /admin +# never auto-flows to a different scheme realm, so the UI got 401 even +# while the operator was logged in. Same logic, just registered under the +# admin router. + +def _serve_admin_article_file(rel_path: str | None, media_type: str, + download_name: str) -> FileResponse: + if not rel_path: + raise HTTPException(404, "path not set on article") + data_dir = Path(get_settings().data_dir) + abs_path = to_os_path(data_dir, PurePosixPath(rel_path)) + try: + abs_path.resolve().relative_to(data_dir.resolve()) + except Exception: + raise HTTPException(400, "invalid path") + if not abs_path.exists(): + raise HTTPException(404, "file missing on disk") + return FileResponse( + path=str(abs_path), filename=download_name, media_type=media_type, + ) + + +@router.get("/api/articles/{article_id}") +def admin_get_article(article_id: int, s: Session = Depends(_session)) -> dict[str, Any]: + a = s.get(Article, article_id) + if a is None: + raise HTTPException(404, "article not found") + _ = a.site, a.target # eager-load codes + site_code = a.site.site_code if a.site_id and a.site else None + target_code = a.target.target_code if a.target_id and a.target else None + return { + "id": a.id, + "site_code": site_code, + "target_code": target_code, + "dept_id": a.dept_id, + "url": a.url, + "title": a.title, + "publish_time": a.publish_time.isoformat() if a.publish_time else None, + "publisher": a.publisher, + "source_raw": a.source_raw, + "doc_no": a.doc_no, + "channel_name": a.channel_name, + "channel_path": a.channel_path, + "content_category": a.content_category, + "content_subcategory": a.content_subcategory, + "status": a.status, + "fetch_strategy": a.fetch_strategy, + "fetched_at": a.fetched_at.isoformat() if a.fetched_at else None, + "exported_to_rag_at": a.exported_to_rag_at.isoformat() if a.exported_to_rag_at else None, + "content_text": a.content_text, + "raw_html_path": a.raw_html_path, + "text_path": a.text_path, + "metadata_json": a.metadata_json, + "has_attachment": a.has_attachment, + "attachments": [ + { + "id": att.id, + "file_name": att.file_name, + "file_ext": att.file_ext, + "size_bytes": att.size_bytes, + "file_hash": att.file_hash, + } + for att in a.attachments + ], + } + + +@router.get("/api/articles/{article_id}/raw-html") +def admin_get_article_raw_html( + article_id: int, s: Session = Depends(_session), +) -> FileResponse: + a = s.get(Article, article_id) + if a is None: + raise HTTPException(404, "article not found") + return _serve_admin_article_file( + a.raw_html_path, "text/html; charset=utf-8", f"article-{article_id}.html", + ) + + +@router.get("/api/articles/{article_id}/text") +def admin_get_article_text( + article_id: int, s: Session = Depends(_session), +) -> FileResponse: + a = s.get(Article, article_id) + if a is None: + raise HTTPException(404, "article not found") + return _serve_admin_article_file( + a.text_path, "text/plain; charset=utf-8", f"article-{article_id}.txt", + ) + + +@router.get("/api/articles/{article_id}/attachments/{attachment_id}") +def admin_download_attachment( + article_id: int, attachment_id: int, s: Session = Depends(_session), +) -> FileResponse: + att = s.get(Attachment, attachment_id) + if att is None or att.article_id != article_id: + raise HTTPException(404, "attachment not found") + if not att.file_path: + raise HTTPException(404, "attachment file_path empty") + data_dir = Path(get_settings().data_dir) + abs_path = to_os_path(data_dir, PurePosixPath(att.file_path)) + try: + abs_path.resolve().relative_to(data_dir.resolve()) + except Exception: + raise HTTPException(400, "invalid path") + if not abs_path.exists(): + raise HTTPException(404, "file missing on disk") + return FileResponse( + path=str(abs_path), + filename=att.file_name or abs_path.name, + media_type="application/octet-stream", + ) diff --git a/govcrawler/api/admin/jobs.py b/govcrawler/api/admin/jobs.py index c5b3796..6f3e8c1 100644 --- a/govcrawler/api/admin/jobs.py +++ b/govcrawler/api/admin/jobs.py @@ -13,7 +13,7 @@ from typing import Any from zoneinfo import ZoneInfo from apscheduler.triggers.cron import CronTrigger -from fastapi import Depends, HTTPException, Query +from fastapi import Body, Depends, HTTPException, Query from sqlalchemy import select from sqlalchemy.orm import Session @@ -49,6 +49,62 @@ async def cancel_job(job_id: str) -> dict[str, Any]: return r +@router.get("/api/jobs/{job_id}/checkpoint") +def get_checkpoint( + job_id: str, s: Session = Depends(_session), +) -> dict[str, Any]: + """Return the durable crawl_job row's checkpoint state. + + Used by the UI to render "已完成第 N 页 / 从第 M 页恢复" hints next + to a job. Surface track_checkpoint flag too so the UI knows whether + to show the editor at all (only meaningful for opt-in targets).""" + from govcrawler.models import CrawlJob, CrawlTarget + cj = s.get(CrawlJob, job_id) + if cj is None: + raise HTTPException(404, "job not found in durable store") + tc = ( + s.query(CrawlTarget).filter_by(target_code=cj.target_code).first() + ) + return { + "job_id": job_id, + "target_code": cj.target_code, + "status": cj.status, + "attempt_count": cj.attempt_count, + "last_completed_page": cj.last_completed_page or 0, + "track_checkpoint": bool(tc.track_checkpoint) if tc else False, + } + + +@router.patch("/api/jobs/{job_id}/checkpoint") +async def patch_checkpoint( + job_id: str, + body: dict = Body(...), + s: Session = Depends(_session), +) -> dict[str, Any]: + """Manual override of last_completed_page on a queued / failed / + cancelled job. Useful when ops know the target station was lost mid- + page and want to force re-do that page on resume. + + Body: {"last_completed_page": =0} + + Refused on running jobs — wait for it to finish or cancel first to + avoid racing the worker's own checkpoint writes.""" + from govcrawler.models import CrawlJob + page = body.get("last_completed_page") + if not isinstance(page, int) or page < 0: + raise HTTPException(400, "last_completed_page must be an int >= 0") + cj = s.get(CrawlJob, job_id) + if cj is None: + raise HTTPException(404, "job not found") + if cj.status == "running": + raise HTTPException( + 409, "cannot patch checkpoint while job is running — cancel first", + ) + cj.last_completed_page = page + s.commit() + return {"ok": True, "job_id": job_id, "last_completed_page": page} + + def _decompose_target_code(site_code: str, target_code: str) -> tuple[str | None, str]: """Mirror of pipeline._decompose_target_code — kept private here to avoid pulling pipeline (and its heavy imports) into the api hot path.""" diff --git a/govcrawler/api/app.py b/govcrawler/api/app.py index 2c00532..9a126a7 100644 --- a/govcrawler/api/app.py +++ b/govcrawler/api/app.py @@ -148,10 +148,29 @@ class _RagTokenGate(BaseHTTPMiddleware): # startup so production deployers don't accidentally ship this. return await call_next(request) auth = request.headers.get("authorization", "") + # Accept EITHER: (a) Bearer token matching RAG_API_TOKEN, or + # (b) Basic Auth matching the admin credentials. The admin UI + # legitimately calls /api/articles/, /api/articles/.../raw-html, + # /api/articles/.../text and /api/articles/.../attachments/... when + # the operator views article detail. The UI is already gated by + # /admin Basic Auth (browser sends those creds on every same-origin + # request), so accepting them here lets the UI work without + # exposing a separate cookie / token plumbing. if auth.lower().startswith("bearer "): sent = auth.split(" ", 1)[1].strip() if _hmac.compare_digest(sent, cfg.rag_api_token): return await call_next(request) + elif auth.lower().startswith("basic ") and cfg.admin_user: + try: + raw = _base64.b64decode(auth.split(" ", 1)[1]).decode( + "utf-8", "replace", + ) + user, _, pw = raw.partition(":") + if (_hmac.compare_digest(user, cfg.admin_user) + and _hmac.compare_digest(pw, cfg.admin_password)): + return await call_next(request) + except Exception: + pass return _Resp( status_code=401, headers={"WWW-Authenticate": 'Bearer realm="GovCrawler RAG"'}, @@ -174,21 +193,26 @@ app.add_middleware(AuditMiddleware) @app.on_event("startup") async def _restore_durable_jobs() -> None: """Recover jobs from crawl_job DB rows on every api boot: - • status='running' rows are orphans (their python process died) → - flip to 'failed' with reason 'restart_during_run' + • status='running' rows are orphans (their process died mid-flight) + → flip to 'queued', bump attempt_count, re-enqueue. After 3 + consecutive restart-recoveries we give up and mark 'failed' to + avoid an infinite reboot-and-resume loop. • status='queued' rows get re-pushed into the in-memory FIFO so - a fresh worker drains them at the next opportunity + a fresh worker drains them at the next opportunity. - No-op if the table is empty (first boot after migration). - Failures here must NOT block startup — log and continue.""" + No-op if the table is empty. Failures here must NOT block startup — + log and continue.""" import logging try: from govcrawler.api.task_queue import get_queue summary = await get_queue().restore_from_db() - if summary["orphaned"] or summary["requeued"]: + if any(summary.values()): logging.getLogger("govcrawler.api").info( - "task_queue restored from DB: orphaned_running→failed=%d, requeued=%d", - summary["orphaned"], summary["requeued"], + "task_queue restored from DB: running→requeued=%d, " + "queued_requeued=%d, permanently_failed=%d", + summary["recovered"], + summary["requeued"] - summary["recovered"], + summary["permanently_failed"], ) except Exception: logging.getLogger("govcrawler.api").exception( diff --git a/govcrawler/api/static/js/pages/articles.js b/govcrawler/api/static/js/pages/articles.js index 9b1dc35..dcff7d9 100644 --- a/govcrawler/api/static/js/pages/articles.js +++ b/govcrawler/api/static/js/pages/articles.js @@ -348,9 +348,11 @@ async function openArticleModal(id) { wide: true, }); try { - const r = await fetch(`/api/articles/${id}`); - if (!r.ok) throw new Error(`${r.status} ${r.statusText}`); - const a = await r.json(); + // Use admin namespace so the request inherits the dashboard's + // Basic Auth realm (/admin/*). Hitting the public /api/articles/ + // directly would fall under _RagTokenGate's Bearer realm, which the + // browser-cached admin Basic creds don't auto-flow into. + const a = await api(`/api/articles/${id}`); _renderArticleModal(a); } catch (e) { document.querySelector(".modal-body").innerHTML = @@ -373,13 +375,13 @@ function _renderArticleModal(a) { const statusPill = a.status === "ready" ? pill("ready", "green") : pill(a.status || "—", "slate"); const ragPill = a.exported_to_rag_at ? pill("已入库 RAG", "green") : pill("未入库", "amber"); const content = (a.content_text || "").trim(); + const paragraphs = content ? content.split(/\n+/) : []; const contentBlock = !content ? `
正文为空(未入库)
` - : content.split(/\n+/).slice(0, 20).map(p => + : paragraphs.map(p => `

${esc(p)}

` - ).join("") + (content.split(/\n+/).length > 20 - ? `
(仅预览前 20 段 · 全文 ${content.length} 字,已落盘 content_text)
` - : ""); + ).join("") + + `
(共 ${paragraphs.length} 段 · ${content.length} 字)
`; const atts = (a.attachments || []).map(att => `
@@ -387,7 +389,7 @@ function _renderArticleModal(a) {
${esc(att.file_name || "(未命名)")}
${esc(att.file_ext || "—")}${att.size_bytes ? " · " + _humanSize(att.size_bytes) : ""}
- 下载 + 下载 `).join(""); const header = `
@@ -430,8 +432,8 @@ function _renderArticleModal(a) {
正文预览
- ${a.raw_html_path ? `查看原始 HTML` : ""} - ${a.text_path ? `下载文本缓存` : ""} + ${a.raw_html_path ? `查看原始 HTML` : ""} + ${a.text_path ? `下载文本缓存` : ""}
${contentBlock}
diff --git a/govcrawler/api/task_queue.py b/govcrawler/api/task_queue.py index 5d2c355..548a9d7 100644 --- a/govcrawler/api/task_queue.py +++ b/govcrawler/api/task_queue.py @@ -182,9 +182,16 @@ class TaskQueue: log.exception("crawl_job upsert failed job=%s", j.job_id) async def restore_from_db(self) -> dict[str, int]: - """Boot-time recovery: orphaned 'running' jobs → 'failed' (the - process they were on died); 'queued' jobs → re-pushed into the - in-memory FIFO so they get drained by a fresh worker. + """Boot-time recovery: any 'queued' or 'running' row whose process + died is re-enqueued into the in-memory FIFO so a fresh worker + drains it. The crawl is idempotent (dedup early-stop handles the + already-fetched URLs in zero time), so resuming is safer than + marking failed — operator's manual ▶ shouldn't get burned by a + deploy that happens to overlap. + + Only jobs that exceeded MAX_RESTART_RECOVERY (3) get marked failed + permanently — that bounds the loop in case some job keeps killing + the api process repeatedly. Idempotent — safe to call once at api startup. Returns counts so the startup logger can show what was restored.""" @@ -192,21 +199,45 @@ class TaskQueue: from govcrawler.db import get_sessionmaker from govcrawler.models import CrawlJob - orphaned = 0 + MAX_RESTART_RECOVERY = 3 + + recovered = 0 requeued = 0 + permanently_failed = 0 + S = get_sessionmaker() with S() as s: - # 1. Mark stale running jobs as failed. + # 1. running rows are orphans — their process is gone. Bump + # attempt_count and recover; if that crosses MAX, give up. for row in s.query(CrawlJob).filter_by(status="running").all(): - row.status = "failed" - row.finished_at = datetime.utcnow() - row.error_msg = (row.error_msg or "") + ( - "" if row.error_msg else "restart_during_run" + if (row.attempt_count or 0) + 1 >= MAX_RESTART_RECOVERY: + row.status = "failed" + row.finished_at = datetime.utcnow() + row.error_msg = (row.error_msg or "") + ( + " | " + if row.error_msg else "" + ) + ( + f"abandoned after {MAX_RESTART_RECOVERY} restart " + "recoveries (job kept crashing the api?)" + ) + permanently_failed += 1 + continue + row.status = "queued" + row.attempt_count = (row.attempt_count or 0) + 1 + # Clear started_at so the next run starts a fresh stopwatch. + row.started_at = None + # Annotate (not overwrite) error_msg so audit trail + # preserves prior failures. + annotation = f"restart_during_run (recovery attempt {row.attempt_count}/{MAX_RESTART_RECOVERY})" + row.error_msg = ( + f"{row.error_msg} | {annotation}" + if row.error_msg else annotation ) - orphaned += 1 + recovered += 1 s.commit() - # 2. Re-enqueue surviving queued jobs in original FIFO order. + # 2. Re-enqueue all queued jobs (including those we just + # converted from running). Original FIFO order. queued_rows = ( s.query(CrawlJob) .filter_by(status="queued") @@ -238,10 +269,15 @@ class TaskQueue: requeued += 1 log.info( - "task_queue restore: orphaned_running→failed=%d, queued_requeued=%d", - orphaned, requeued, + "task_queue restore: running→requeued=%d, queued_requeued=%d, " + "permanently_failed=%d", + recovered, requeued - recovered, permanently_failed, ) - return {"orphaned": orphaned, "requeued": requeued} + return { + "recovered": recovered, + "requeued": requeued, + "permanently_failed": permanently_failed, + } # ---------- public API ---------- @@ -379,16 +415,43 @@ class TaskQueue: job_id, host, j.site_code, j.target_code, j.started_at - j.enqueued_at) try: - # Run the (sync) crawl in a thread so we don't block the event - # loop. Pipeline import is deferred so tests can patch it. + # Run the (sync) crawl in a thread so we don't block the + # event loop. Pass a thread-safe stop_check closure that + # peeks at j.stop_requested — pipeline polls it between + # list pages and between articles so /cancel actually + # halts the in-flight crawl mid-flight, not just labels + # the row 'cancelled' after it finishes naturally. from govcrawler.pipeline import crawl_target + + # Resume from saved page checkpoint if the target opted in. + # crawl_target reads track_checkpoint flag itself; we just + # supply the last completed page from the durable row. + resume_page = 0 + try: + from govcrawler.db import get_sessionmaker as _S + from govcrawler.models import CrawlJob as _CJ + with _S()() as _s: + _row = _s.get(_CJ, j.job_id) + if _row is not None: + resume_page = int(_row.last_completed_page or 0) + except Exception: + pass + result = await asyncio.to_thread( crawl_target, j.target_code, stop_on_duplicate=not j.force, + stop_check=lambda: j.stop_requested, + job_id=j.job_id, + resume_from_page=resume_page, ) j.result = result if isinstance(result, dict) else {"ok": True} - j.status = "cancelled" if j.stop_requested else "done" + if j.stop_requested or ( + isinstance(result, dict) and result.get("status") == "cancelled" + ): + j.status = "cancelled" + else: + j.status = "done" except Exception as e: log.exception("taskq FAIL job=%s target=%s", job_id, j.target_code) j.error_msg = f"{type(e).__name__}: {e}" diff --git a/govcrawler/models.py b/govcrawler/models.py index 0dd7ce4..f4f2b60 100644 --- a/govcrawler/models.py +++ b/govcrawler/models.py @@ -199,6 +199,14 @@ class CrawlTarget(Base): expected_cadence_days: Mapped[int] = mapped_column(Integer, server_default="30") interval_sec: Mapped[int | None] = mapped_column(Integer) enabled: Mapped[bool] = mapped_column(Boolean, server_default=text("true")) + # Opt-in page-level checkpoint. When True, crawl_target() records + # progress in crawl_job.last_completed_page after every fully-completed + # list page; a restart resumes from page+1 instead of re-walking the + # whole list. Default False — only valuable for big WAF-sensitive + # targets (gd_wjk__qbwj full backfill) where list re-walk costs minutes. + track_checkpoint: Mapped[bool] = mapped_column( + Boolean, server_default=text("false"), + ) last_crawled_at: Mapped[datetime | None] = mapped_column(DateTime) last_article_time: Mapped[datetime | None] = mapped_column(DateTime) created_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now()) @@ -415,6 +423,13 @@ class CrawlJob(Base): Boolean, server_default=text("false"), ) attempt_count: Mapped[int] = mapped_column(Integer, server_default=text("0")) + # Last fully-processed list page (1-based). Only meaningful when the + # owning target has track_checkpoint=True. 0 means "nothing completed + # yet"; pipeline resumes from page (last_completed_page + 1) on + # restart. Operator may PATCH this value via API to override. + last_completed_page: Mapped[int] = mapped_column( + Integer, server_default=text("0"), + ) enqueued_at: Mapped[datetime] = mapped_column(DateTime, server_default=func.now()) started_at: Mapped[datetime | None] = mapped_column(DateTime) diff --git a/govcrawler/parser/cleaner.py b/govcrawler/parser/cleaner.py index 02d6466..7145a17 100644 --- a/govcrawler/parser/cleaner.py +++ b/govcrawler/parser/cleaner.py @@ -1,4 +1,7 @@ from __future__ import annotations + +import re + from lxml import html as lxml_html BLOCK_TAGS = { @@ -9,9 +12,63 @@ BLOCK_TAGS = { REMOVE_TAGS = {"script", "style", "noscript", "iframe"} +def _cell_text(cell_el) -> str: + """Plain text of a / cell with whitespace collapsed. + + Pipe characters inside cell text would break the markdown row format, + so we escape them. Newlines inside a cell collapse to a space — Markdown + table rows have to be single-line.""" + txt = cell_el.text_content() or "" + # Drop NBSP and full-width spaces that are common in TRS-CMS markup. + txt = txt.replace(" ", " ").replace(" ", " ") + txt = re.sub(r"\s+", " ", txt).strip() + return txt.replace("|", "\\|") + + +def _render_table_markdown(table_el) -> str: + """Render a as a Markdown-style text block: + + | col1 | col2 | col3 | + | --- | --- | --- | + | a | b | c | + + Empty tables collapse to "" so the placeholder doesn't add noise. + Header row is detected as either the first with any
, or + falls back to the first row with the most cells (which matches the + `tableTab` header pattern used by gd_wjk's row-headers).""" + rows = table_el.findall(".//tr") + if not rows: + return "" + + matrix: list[list[str]] = [] + for tr in rows: + cells = tr.findall("./td") + tr.findall("./th") + if not cells: + continue + matrix.append([_cell_text(c) for c in cells]) + + if not matrix: + return "" + + width = max(len(row) for row in matrix) + # Pad short rows so the markdown grid stays rectangular. + matrix = [row + [""] * (width - len(row)) for row in matrix] + + out_lines = [] + for i, row in enumerate(matrix): + out_lines.append("| " + " | ".join(row) + " |") + if i == 0: + out_lines.append("| " + " | ".join(["---"] * width) + " |") + return "\n".join(out_lines) + + def html_to_text(html_str: str) -> str: """Convert HTML fragment to paragraph-preserving plain text. + - Remove script/style/noscript/iframe + - Render every as a markdown table block (cells were + previously concatenated with no delimiter, making relief lists / + org charts / fee schedules unreadable) - Insert '\\n' after every block-level tag's tail - Collapse runs of blank lines, strip per-line whitespace """ @@ -28,10 +85,38 @@ def html_to_text(html_str: str) -> str: parent = el.getparent() if parent is not None: parent.remove(el) + + # Replace each
with a
-like placeholder containing the
+    # markdown rendering. Walk in DOC ORDER but build the substitution
+    # bottom-up — the lxml `iter("table")` pass below catches every table
+    # including nested ones; we render them outside-in so a nested table's
+    # text gets folded into the outer cell's _cell_text() naturally.
+    # tostring() reading order is depth-first; processing reverse() means
+    # innermost nested tables get markdown'd first.
+    tables = list(doc.iter("table"))
+    for tbl in reversed(tables):
+        md = _render_table_markdown(tbl)
+        parent = tbl.getparent()
+        if parent is None:
+            continue
+        placeholder = lxml_html.Element("p")
+        # Surround with newlines so the table block is visually separated
+        # from the surrounding prose after text_content() flattens it.
+        placeholder.text = ("\n" + md + "\n") if md else ""
+        placeholder.tail = tbl.tail
+        parent.replace(tbl, placeholder)
+
     # Tail-inject newlines on block tags
     for el in doc.iter():
         if el.tag in BLOCK_TAGS:
             el.tail = ("\n" + el.tail) if el.tail else "\n"
     text = doc.text_content()
-    lines = [ln.strip() for ln in text.splitlines()]
-    return "\n".join(ln for ln in lines if ln)
+    # Per-line strip but preserve markdown table lines verbatim.
+    out_lines: list[str] = []
+    for ln in text.splitlines():
+        s = ln.rstrip()
+        if s.lstrip().startswith("|") and s.lstrip().endswith("|"):
+            out_lines.append(s.lstrip())  # markdown row
+        else:
+            out_lines.append(s.strip())
+    return "\n".join(ln for ln in out_lines if ln)
diff --git a/govcrawler/parser/detail_parser.py b/govcrawler/parser/detail_parser.py
index a05fa66..1d6fafd 100644
--- a/govcrawler/parser/detail_parser.py
+++ b/govcrawler/parser/detail_parser.py
@@ -8,35 +8,56 @@ from govcrawler.parser.extractor import gne_extract, trafilatura_extract
 
 
 def _robust_css(sel: Selector, css: str):
-    """CSS query that falls back to `//`-prefix xpath on libxml2/parsel
-    quirks. On gov.cn /zhengce/content/* the article body sits inside two
-    nested 
wrappers; for reasons we couldn't pin down the - descendant-or-self axis from stops short and `sel.css('div.foo')` - returns 0 matches, even though `//div[has-class('foo')]` finds it. We - retry each comma sub-selector translated to xpath with prefix='//'. + """CSS query with two sister-features against parsel's defaults: + + 1. Priority fallback chain (NOT union). yaml authors write + `div.zw, div.TRS_Editor, div.content` meaning "use div.zw if + present, else div.TRS_Editor, else div.content". parsel's + sel.css(comma-list) instead returns ALL matches in DOM order, + which on TRS-CMS pages picks the OUTER wrapper (div.content) + before the INNER body (div.zw) — pulling in toolbars + sidebars. + We split on commas and return the first sub-selector that hits. + + 2. // xpath fallback for the libxml2 axis bug. On gov.cn + /zhengce/content/* the body sits inside double-nested tables and + descendant-or-self::div[has-class('foo')] from finds + nothing, while //div[has-class('foo')] does. Each sub-selector + gets the // retry before we move on to the next. Returns a SelectorList. Empty list if nothing matched either pass. """ - out = sel.css(css) - if out: - return out + if not css: + return sel.css("") # empty SelectorList try: from cssselect import HTMLTranslator + translator = HTMLTranslator() except Exception: - return out - t = HTMLTranslator() - for sub in (css or "").split(","): + translator = None + + last_empty = None # remember an empty SelectorList to return on miss + + for sub in css.split(","): sub = sub.strip() if not sub: continue - try: - xp = t.css_to_xpath(sub, prefix="//") - except Exception: - continue - found = sel.xpath(xp) - if found: - return found - return out + # Pass 1: vanilla parsel CSS for this sub-selector alone. + out = sel.css(sub) + if out: + return out + last_empty = out + # Pass 2: // xpath fallback for the libxml2 axis bug. + if translator is not None: + try: + xp = translator.css_to_xpath(sub, prefix="//") + except Exception: + continue + try: + found = sel.xpath(xp) + except Exception: + found = None + if found: + return found + return last_empty if last_empty is not None else sel.css("") MIN_CONTENT_CHARS = 100 # 主 XPath content_html 短于此值 → 触发 GNE MIN_FALLBACK_CHARS = 50 # GNE/trafilatura 正文短于此值 → 认为失败 diff --git a/govcrawler/pipeline.py b/govcrawler/pipeline.py index c590d52..a4e3f95 100644 --- a/govcrawler/pipeline.py +++ b/govcrawler/pipeline.py @@ -27,7 +27,7 @@ import re import time as _time from dataclasses import dataclass from datetime import datetime -from typing import Any +from typing import Any, Callable from sqlalchemy.orm import Session @@ -446,11 +446,16 @@ def fetch_and_store( class _ListEntry: url: str item: CrawlItem | None # None for legacy CSS path + # 1-based page index this URL came from. Used by the page-level + # checkpoint feature: pipeline groups entries by page, marks each + # page completed only after every entry in it is processed. + page_num: int = 1 def _list_via_adapter( rt: _ResolvedTarget, *, interval_sec: float | None = None, max_items: int | None = None, + stop_check: Callable[[], bool] | None = None, ) -> tuple[str, list[_ListEntry], FetchResult]: """Adapter path: walk pagination via ?page=N, projecting each page's CrawlItems and accumulating across pages. @@ -494,6 +499,13 @@ def _list_via_adapter( # the first hit, no prior page to space from). if page_num > 1 and list_throttle > 0: _time.sleep(list_throttle) + # Cancel poll between list pages — so a /cancel against a long + # paginated walk (gd_wjk index_2..index_5 with interval_sec=60 + # = ~5 min) actually halts at the next page boundary instead of + # waiting out the entire walk. + if stop_check is not None and page_num > 1 and stop_check(): + log.info("_list_via_adapter cancelled by operator at page %d", page_num) + break if custom_fetch is not None: try: @@ -524,7 +536,7 @@ def _list_via_adapter( if it.url in seen_urls: continue seen_urls.add(it.url) - aggregated.append(_ListEntry(url=it.url, item=it)) + aggregated.append(_ListEntry(url=it.url, item=it, page_num=page_num)) new_in_page += 1 if new_in_page == 0: log.info("adapter pagination stop: page %d had no new urls", page_num) @@ -590,7 +602,7 @@ def _list_via_adapter( if it.url in seen_urls: continue seen_urls.add(it.url) - aggregated.append(_ListEntry(url=it.url, item=it)) + aggregated.append(_ListEntry(url=it.url, item=it, page_num=page_num)) new_in_page += 1 if new_in_page == 0: log.info("adapter pagination stop: page %d had no new urls (cms loop)", @@ -644,6 +656,7 @@ def _paginated_yaml_urls(list_url: str, pag) -> list[str]: def _list_via_yaml( rt: _ResolvedTarget, *, interval_sec: float | None = None, max_items: int | None = None, + stop_check: Callable[[], bool] | None = None, ) -> tuple[str, list[_ListEntry], FetchResult]: """Legacy CSS scrape path — reads list selectors from config/sites/*.yaml. @@ -684,6 +697,13 @@ def _list_via_yaml( # in <10s and trips IP-level rate limits on chatty hosts. if i > 0 and list_throttle > 0: _time.sleep(list_throttle) + # Cancel poll between yaml list pages — same purpose as in + # _list_via_adapter: long paginated walks (qbwj index_2..index_5 + # at 60s/page) need to halt at page boundaries instead of waiting + # the whole walk out. + if stop_check is not None and i > 0 and stop_check(): + log.info("_list_via_yaml cancelled by operator at page %d", i + 1) + break fr = fetch_html(page_url) if i == 0: first_fr = fr # page 1's FetchResult is the canonical one @@ -702,7 +722,8 @@ def _list_via_yaml( if it.url in seen_urls: continue seen_urls.add(it.url) - aggregated.append(_ListEntry(url=it.url, item=None)) + # i is 0-based page index; page_num in _ListEntry is 1-based. + aggregated.append(_ListEntry(url=it.url, item=None, page_num=i + 1)) new_in_page += 1 # If a deeper page repeated only known URLs from earlier pages, the # column's listing is shorter than max_pages → no point fetching more. @@ -724,15 +745,61 @@ def crawl_target( max_items: int | None = None, stop_on_duplicate: bool = True, throttle: HostThrottle | None = None, + force_disabled: bool = False, + stop_check: "Callable[[], bool] | None" = None, + job_id: str | None = None, + resume_from_page: int = 0, ) -> dict: """Fetch list page, iterate article URLs, call fetch_and_store per item. `stop_on_duplicate=True` implements INC-03 early-stop: the list is publish-time ordered, so the first already-seen URL ends the pass. + + `stop_check` is a callable invoked at safe break points (between + list pages, between articles). Returning True triggers a graceful + shutdown — no further fetches, return {'status': 'cancelled', ...} + with what we managed to ingest so far. Used by the task_queue + worker to honor operator cancel without waiting for natural + completion (was previously cosmetic — flag was set but pipeline + didn't poll it). + + DEFENSE-IN-DEPTH enabled check: refuse to run when the site or target + is disabled in DB. We learned this gap the hard way — programmatic + callers (a python REPL, a misbehaving test, a buggy task_queue path) + bypass the api's /run enabled gate AND the scheduler's pre-dispatch + gate, but they still hit `crawl_target()`. The only place that + catches everything is here. Pass `force_disabled=True` to override + (used by the v2.1.3 待修选择器 retry flow where ops explicitly want + to re-test a disabled target). """ + # Default to a no-op stop_check so the rest of the function can call + # it unconditionally without `if stop_check is not None:` everywhere. + if stop_check is None: + stop_check = lambda: False # noqa: E731 Session = get_sessionmaker() with Session() as s: rt = _resolve_target(s, target_code) + # Hard refuse if the site/target was disabled — even programmatic + # callers must respect this. Lets the operator's "disable in UI" + # action stop ALL fetch paths, not just the HTTP-triggered ones. + if not force_disabled: + if not rt.target.enabled: + log.info("crawl_target refused: target=%s disabled", target_code) + return { + "status": "skipped_disabled", + "reason": "target_disabled", + "target_code": target_code, + "items_seen": 0, "items_new": 0, + } + if not rt.site.enabled: + log.info("crawl_target refused: site=%s disabled", rt.site.site_code) + return { + "status": "skipped_disabled", + "reason": "site_disabled", + "target_code": target_code, + "site_code": rt.site.site_code, + "items_seen": 0, "items_new": 0, + } # snapshot what we need after session closes site_pk = rt.site.id target_id = rt.target.id @@ -741,6 +808,24 @@ def crawl_target( cms_adapter = rt.site.cms_adapter respect_robots = rt.site.respect_robots interval_sec = rt.target.interval_sec + track_checkpoint = bool(getattr(rt.target, "track_checkpoint", False)) + + # Helper: write current progress to crawl_job row (best-effort). + # Only fires when the target opted in AND we have a job_id to attach + # the checkpoint to. DB error here must NOT kill the running crawl — + # log + continue. + def _save_checkpoint(page_num: int) -> None: + if not (track_checkpoint and job_id): + return + try: + from govcrawler.models import CrawlJob + with Session() as s: + cj = s.get(CrawlJob, job_id) + if cj is not None and page_num > (cj.last_completed_page or 0): + cj.last_completed_page = page_num + s.commit() + except Exception: + log.exception("checkpoint update failed job=%s page=%d", job_id, page_num) ua = get_settings().user_agent @@ -766,11 +851,13 @@ def crawl_target( # worth) but bounds total list-walk time. if cms_adapter: list_url, entries, list_fr = _list_via_adapter( - rt, interval_sec=interval_sec, max_items=max_items + rt, interval_sec=interval_sec, max_items=max_items, + stop_check=stop_check, ) else: list_url, entries, list_fr = _list_via_yaml( - rt, interval_sec=interval_sec, max_items=max_items + rt, interval_sec=interval_sec, max_items=max_items, + stop_check=stop_check, ) # COMP-03: reject non-public list URLs @@ -886,10 +973,46 @@ def crawl_target( except Exception: log.exception("failed to persist crawl_log for exception url=%s", url) + cancelled = False + # Page-level checkpoint resume. When track_checkpoint is on and the + # operator (or restore_from_db) indicated resume_from_page > 0, drop + # all entries from already-completed pages. Pages > resume_from_page + # still need re-process — dedup will skip the already-ingested + # articles inside each page in milliseconds. + if track_checkpoint and resume_from_page > 0: + before = len(entries) + entries = [e for e in entries if e.page_num > resume_from_page] + log.info( + "checkpoint resume: target=%s skipping pages 1..%d " + "(%d → %d entries)", + target_code_norm, resume_from_page, before, len(entries), + ) + + # Track which page we're currently on so we can call _save_checkpoint + # exactly when the boundary crosses. + last_seen_page = resume_from_page for i, entry in enumerate(entries): if max_items is not None and i >= max_items: break + # Operator-requested cancel. Polled BEFORE each article so the + # current page's tail won't be fetched after /cancel was hit. The + # already-ingested articles are kept (idempotent inserts) — caller + # gets {"status": "cancelled", items_new=N} reflecting what we + # managed to do before the stop. + if stop_check(): + log.info("crawl_target cancelled by operator at item %d/%d url=%s", + i, len(entries), entry.url) + cancelled = True + break + + # We just crossed into a new page → the previous page is fully + # done. Persist checkpoint so a restart resumes from page+1. + if entry.page_num > last_seen_page: + if last_seen_page > 0: + _save_checkpoint(last_seen_page) + last_seen_page = entry.page_num + # SSRF + path guard. is_safe_to_fetch layers DNS / private-IP / # allowlist checks on top of the path blacklist. Without this, a # crafted yaml or admin row could trick us into fetching internal @@ -931,6 +1054,13 @@ def crawl_target( aborted = True break + # Loop finished naturally (or aborted) — write final checkpoint for + # the last page we processed. Skip when cancelled mid-page so we + # don't claim that page as completed (some entries inside it didn't + # run; resume should retry it). + if not cancelled and last_seen_page > 0: + _save_checkpoint(last_seen_page) + # update target.last_crawled_at with Session() as s: t = targets_repo.get_by_code(s, target_code_norm) @@ -938,8 +1068,14 @@ def crawl_target( t.last_crawled_at = datetime.utcnow() s.commit() + if cancelled: + final_status = "cancelled" + elif aborted: + final_status = "aborted" + else: + final_status = "ok" return { - "status": "aborted" if aborted else "ok", + "status": final_status, "target_code": target_code_norm, "site_code": site_code, "items_seen": len(entries),