"""Research records API — CRUD, plan, run, subscribe, archive, delete.

研究记录接口模块。
提供研究记录的列表、创建、详情、计划生成、后台执行（Celery）、
SSE 进度订阅、章节重跑、归档和删除。
"""

from __future__ import annotations

import asyncio
import json
import uuid
from pathlib import Path
from typing import Annotated, Any, AsyncIterator

from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import FileResponse, StreamingResponse

from app.api.deps import (
    UserContext,
    get_current_user,
    get_embedding_client,
    get_es_client,
    get_llm_client,
    get_mysql_client,
    get_neo4j_client,
    get_redis_client,
)
from app.api.schemas.research import (
    ResearchChunk,
    ResearchRecordCreateRequest,
    ResearchRecordDetail,
    ResearchRecordListResponse,
    ResearchRecordPlanRequest,
    ResearchRecordReportResponse,
    ResearchRecordRunResponse,
    ResearchRecordSectionRerunRequest,
    ResearchRecordSummary,
)
from app.core.embedding import EmbeddingService
from app.core.graph_query_service import GraphQueryService
from app.core.permission import PermissionService
from app.core.research_engine import ResearchEngine
from app.core.research_record_service import ResearchRecordService
from app.infrastructure.embedding_client import EmbeddingClient
from app.infrastructure.es_client import ESClient
from app.infrastructure.llm_client import LLMClient
from app.infrastructure.mysql_client import MySQLClient
from app.infrastructure.neo4j_client import Neo4jClient
from app.infrastructure.redis_client import RedisClient
from app.infrastructure.research_record_store import (
    ResearchRecordRunStore,
    ResearchRecordStore,
)
from app.infrastructure.session_store import build_research_session_store
from app.utils.logger import get_logger

logger = get_logger(__name__)

router = APIRouter(prefix="/research/records", tags=["research-records"])


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _require_mysql(mysql_client: MySQLClient | None) -> MySQLClient:
    if mysql_client is None:
        raise HTTPException(503, "MySQL is not available")
    return mysql_client


def _build_service(
    mysql: MySQLClient,
    redis: RedisClient,
    es: ESClient,
    neo4j: Neo4jClient | None,
    embedding: EmbeddingClient,
    llm: LLMClient,
) -> ResearchRecordService:
    record_store = ResearchRecordStore(mysql)
    run_store = ResearchRecordRunStore(mysql)
    embedding_svc = EmbeddingService(embedding)
    graph_svc = GraphQueryService(neo4j) if neo4j else None
    session_store = build_research_session_store(redis_client=redis, mysql_client=mysql)
    engine = ResearchEngine(
        es_client=es,
        embedding_service=embedding_svc,
        graph_service=graph_svc,
        llm_client=llm,
        session_store=session_store,
    )
    return ResearchRecordService(record_store, run_store, redis, engine)


async def _sse_stream(gen: AsyncIterator[ResearchChunk]) -> AsyncIterator[str]:
    try:
        async for chunk in gen:
            payload = json.dumps(chunk.model_dump(exclude_none=True), ensure_ascii=False)
            yield f"data: {payload}\n\n"
    except Exception as exc:
        logger.error("sse_stream_error", error=str(exc), exc_info=exc)
        error_chunk = ResearchChunk(type="error", content=str(exc))
        yield f"data: {json.dumps(error_chunk.model_dump(exclude_none=True), ensure_ascii=False)}\n\n"


async def _sse_dict_stream(gen: AsyncIterator[dict[str, Any]]) -> AsyncIterator[str]:
    """SSE helper for dict-based streams (clarify, chat — direct mode)."""
    try:
        async for chunk in gen:
            payload = json.dumps(chunk, ensure_ascii=False)
            yield f"data: {payload}\n\n"
    except Exception as exc:
        logger.error("sse_dict_stream_error", error=str(exc), exc_info=exc)
        yield f"data: {json.dumps({'type': 'error', 'content': str(exc)}, ensure_ascii=False)}\n\n"


def _to_str(val: Any) -> str | None:
    if val is None:
        return None
    return str(val)


# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------


@router.get("", summary="研究记录列表")
async def list_records(
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    q: str | None = None,
    status: str | None = None,
    mode: str | None = None,
    output_template: str | None = None,
    created_after: str | None = None,
    created_before: str | None = None,
    archived: bool = False,
    sort_by: str = "updated_at",
    sort_order: str = "desc",
    limit: int = 50,
    offset: int = 0,
) -> ResearchRecordListResponse:
    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    rows, total = await store.list_records(
        user.user_id, q=q, status=status, mode=mode,
        output_template=output_template,
        created_after=created_after, created_before=created_before,
        archived=archived,
        sort_by=sort_by, sort_order=sort_order,
        limit=min(limit, 100), offset=offset,
    )
    records = [
        ResearchRecordSummary(
            id=r["id"],
            title=r["title"],
            mode=r.get("mode", "deep"),
            status=r["status"],
            output_template=r.get("output_template", "comprehensive"),
            summary=r.get("summary"),
            archived=bool(r.get("archived", 0)),
            version_no=r.get("version_no", 1),
            created_at=_to_str(r["created_at"]),
            updated_at=_to_str(r["updated_at"]),
        )
        for r in rows
    ]
    return ResearchRecordListResponse(records=records, total=total)


@router.post("", summary="创建研究记录")
async def create_record(
    body: ResearchRecordCreateRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, str]:
    mysql = _require_mysql(mysql_client)
    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)
    record_id = await svc.create_record(
        user_id=user.user_id,
        title=body.title,
        mode=body.mode,
        output_template=body.output_template,
        task=body.task.model_dump(),
        imported_items=body.imported_items,
    )
    return {"record_id": record_id}


@router.get("/{record_id}", summary="研究记录详情")
async def get_record_detail(
    record_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> ResearchRecordDetail:
    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    run_store = ResearchRecordRunStore(mysql)

    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")

    # Enrich with latest run metadata (C7)
    latest_run = await run_store.get_latest_run(record_id, user.user_id)

    return ResearchRecordDetail(
        id=record["id"],
        title=record["title"],
        mode=record.get("mode", "deep"),
        status=record["status"],
        output_template=record.get("output_template", "comprehensive"),
        summary=record.get("summary"),
        archived=bool(record.get("archived", 0)),
        session_id=record.get("session_id"),
        task=record.get("task_json"),
        plan=record.get("plan_json"),
        references=record.get("references_json"),
        imported_items=record.get("imported_items_json"),
        clarification_messages=record.get("clarification_messages_json"),
        chat_messages=record.get("chat_messages_json"),
        notes=record.get("notes"),
        parent_record_id=record.get("parent_record_id"),
        root_record_id=record.get("root_record_id"),
        version_no=record.get("version_no", 1),
        last_error=record.get("last_error"),
        created_at=_to_str(record["created_at"]),
        updated_at=_to_str(record["updated_at"]),
        completed_at=_to_str(record.get("completed_at")),
        latest_run_id=latest_run["run_id"] if latest_run else None,
        latest_run_status=latest_run["status"] if latest_run else None,
        latest_run_type=latest_run.get("run_type") if latest_run else None,
        latest_run_started_at=_to_str(latest_run.get("started_at")) if latest_run else None,
    )


@router.get("/{record_id}/report", summary="获取完整研究报告")
async def get_record_report(
    record_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> ResearchRecordReportResponse:
    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    row = await store.get_report(record_id, user.user_id)
    if row is None:
        raise HTTPException(404, "Report not found")
    return ResearchRecordReportResponse(
        record_id=record_id,
        report=row.get("report_json"),
        final_document_md=row.get("final_document_md"),
        updated_at=_to_str(row.get("updated_at")),
    )


@router.post("/{record_id}/plan", summary="生成研究计划")
async def generate_plan(
    record_id: str,
    _body: ResearchRecordPlanRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, Any]:
    mysql = _require_mysql(mysql_client)
    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)
    try:
        plan = await svc.generate_plan(record_id, user.user_id)
    except ValueError as exc:
        raise HTTPException(400, str(exc)) from exc
    return {"record_id": record_id, "plan": plan}


@router.put("/{record_id}/plan", summary="保存用户修改的研究计划")
async def update_plan(
    record_id: str,
    body: dict[str, Any],
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, Any]:
    """Accept a user-edited plan and persist it to plan_json.

    Only allowed when record status is 'planned'.
    """
    from app.api.schemas.research import ResearchPlan as ResearchPlanSchema

    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")
    if record["status"] != "planned":
        raise HTTPException(400, f"只有 planned 状态的记录才能修改计划（当前: {record['status']}）")

    try:
        plan = ResearchPlanSchema(**body)
    except Exception as exc:
        raise HTTPException(422, f"计划格式无效: {exc}") from exc

    await store.update_record(record_id, user.user_id, plan_json=plan.model_dump())
    return {"record_id": record_id, "plan": plan.model_dump()}


@router.post("/{record_id}/run", summary="启动研究执行")
async def start_run(
    record_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> ResearchRecordRunResponse:
    mysql = _require_mysql(mysql_client)
    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)

    # Resolve acl_tokens for Celery task (C11)
    perm_svc = PermissionService(redis_client=redis_client)
    perm = await perm_svc.resolve(user)

    try:
        run_id = await svc.start_run(record_id, user.user_id, perm.acl_tokens)
    except ValueError as exc:
        raise HTTPException(400, str(exc)) from exc
    except Exception:
        raise HTTPException(503, "任务投递失败，请稍后重试")

    return ResearchRecordRunResponse(
        run_id=run_id, record_id=record_id, status="pending",
    )


@router.post("/{record_id}/sections/rerun", summary="启动章节重跑")
async def start_section_rerun(
    record_id: str,
    body: ResearchRecordSectionRerunRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> ResearchRecordRunResponse:
    mysql = _require_mysql(mysql_client)
    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)

    perm_svc = PermissionService(redis_client=redis_client)
    perm = await perm_svc.resolve(user)

    try:
        run_id = await svc.start_section_rerun(
            record_id, user.user_id, perm.acl_tokens,
            section_title=body.section_title,
            section_summary=body.section_summary,
            source_doc_ids=body.source_doc_ids,
        )
    except ValueError as exc:
        raise HTTPException(400, str(exc)) from exc
    except Exception:
        raise HTTPException(503, "章节重跑任务投递失败，请稍后重试")

    return ResearchRecordRunResponse(
        run_id=run_id, record_id=record_id, status="pending",
    )


@router.post("/{record_id}/runs/{run_id}/events", summary="SSE 订阅执行进度")
async def subscribe_run_events(
    record_id: str,
    run_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> StreamingResponse:
    mysql = _require_mysql(mysql_client)
    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)

    # H3: validate run ownership before creating the async generator,
    # because ValueError inside the generator body won't be caught here.
    run_store = ResearchRecordRunStore(mysql)
    run = await run_store.get_run_by_record(run_id, record_id, user.user_id)
    if run is None:
        raise HTTPException(404, f"Run {run_id} not found or not owned by user")

    gen = svc.subscribe_run(record_id, run_id, user.user_id)

    return StreamingResponse(
        _sse_stream(gen),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )


@router.post("/{record_id}/clarify", summary="计划前澄清对话 (F1)")
async def clarify_record(
    record_id: str,
    body: dict[str, Any],
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> StreamingResponse:
    """Run clarification: yield SSE chunks (direct stream, no Celery)."""
    messages = body.get("messages", [])
    enable_kb_search = body.get("enable_kb_search", True)
    mysql = _require_mysql(mysql_client)
    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)
    # Pre-validate before creating async generator (same pattern as H3)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")
    if record["status"] not in ("draft", "clarifying"):
        raise HTTPException(400, f"Cannot clarify record in status {record['status']}")

    # Resolve acl_tokens for KB search
    acl_tokens: list[str] = []
    if enable_kb_search:
        perm_svc = PermissionService(redis_client=redis_client)
        perm = await perm_svc.resolve(user)
        acl_tokens = perm.acl_tokens

    gen = svc.clarify(
        record_id, user.user_id, messages,
        enable_kb_search=enable_kb_search,
        acl_tokens=acl_tokens,
    )
    return StreamingResponse(
        _sse_dict_stream(gen),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )


@router.post("/{record_id}/upload-material", summary="上传文件并提取文本作为材料")
async def upload_material(
    record_id: str,
    file: Annotated[UploadFile, File(description="待提取文本的文件")],
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, Any]:
    """Upload a file, extract its text, and append to imported_items_json.

    Routing:
    - txt/md/markdown → read directly as UTF-8 plain text
    - pdf/docx/pptx/xlsx/images → DoclingProcessor (CPU-bound, thread)
    - doc/xls/ppt/wps/et/ofd → require Java converter; not supported here

    No vectorization, graph building, or ES indexing is performed.
    """
    from app.config import settings
    from app.core.docling_processor import DoclingProcessor
    from app.utils.file_type import DOCLING_FORMATS, PLAIN_TEXT_FORMATS, detect_file_type

    # Formats that need Java pre-conversion — not available in this lightweight endpoint
    _JAVA_FORMATS = {"doc", "xls", "ppt", "wps", "et", "ofd"}

    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")
    if record["status"] not in ("draft", "clarifying", "planned", "completed"):
        raise HTTPException(400, f"Cannot upload material for record in status {record['status']}")

    # Read file content
    content = await file.read()
    max_bytes = settings.max_file_size_mb * 1024 * 1024
    if len(content) > max_bytes:
        raise HTTPException(413, f"File too large (max {settings.max_file_size_mb}MB)")

    # Save to permanent path (kept for later download)
    original_filename = file.filename or "unknown"
    ext = Path(original_filename).suffix.lower() or ".bin"
    file_id = uuid.uuid4().hex[:12]
    record_dir = settings.file_storage_path / "_research_uploads" / record_id
    record_dir.mkdir(parents=True, exist_ok=True)
    stored_path = record_dir / f"{file_id}{ext}"

    try:
        stored_path.write_bytes(content)

        # Detect and validate file type
        detected_type = detect_file_type(stored_path)
        if detected_type not in settings.supported_file_types:
            raise HTTPException(
                415,
                f"不支持的文件类型: {detected_type}，"
                f"支持: {', '.join(settings.supported_file_types)}",
            )
        if detected_type in _JAVA_FORMATS:
            raise HTTPException(
                415,
                f".{detected_type} 格式需要转换器支持，请先另存为 .docx / .xlsx / .pdf 后再上传",
            )

        # Extract text by routing on file type
        if detected_type in PLAIN_TEXT_FORMATS:
            # txt: read directly as UTF-8
            full_text = content.decode("utf-8", errors="replace")
        elif detected_type in {"md", "markdown"}:
            full_text = content.decode("utf-8", errors="replace")
        elif detected_type in DOCLING_FORMATS:
            # pdf/docx/pptx/xlsx/images: use Docling (CPU-bound → thread)
            try:
                processor = DoclingProcessor()
                doc = await asyncio.to_thread(processor.process, stored_path)
                full_text = doc.full_text or ""
            except Exception as exc:
                logger.error("upload_material_docling_failed", error=str(exc), file_type=detected_type)
                raise HTTPException(422, f"文件解析失败: {exc}") from exc
        else:
            raise HTTPException(415, f"无法处理的文件类型: {detected_type}")

        # Append to imported_items_json
        existing_items = record.get("imported_items_json") or []
        new_item = {
            "item_type": "uploaded_file",
            "file_id": file_id,
            "title": original_filename,
            "doc_id": file_id,
            "extracted_text": full_text,
            "file_type": detected_type,
            "file_ext": ext,
            "char_count": len(full_text),
        }
        updated_items = existing_items + [new_item]

        # Also record a file-attachment message in clarification history so the
        # conversation timeline can be reconstructed in the detail view.
        file_msg = {
            "role": "user",
            "content": "",
            "attachment_type": "file",
            "file_id": file_id,
            "file_name": original_filename,
            "file_type": detected_type,
            "char_count": len(full_text),
        }
        existing_clarify = record.get("clarification_messages_json") or []
        await store.update_record(
            record_id, user.user_id,
            imported_items_json=updated_items,
            clarification_messages_json=existing_clarify + [file_msg],
        )

        return {
            "file_id": file_id,
            "original_filename": original_filename,
            "file_type": detected_type,
            "extracted_text_preview": full_text[:200],
            "char_count": len(full_text),
        }
    except HTTPException:
        # Cleanup stored file on validation/processing errors
        try:
            stored_path.unlink(missing_ok=True)
        except Exception:
            pass
        raise
    except Exception as exc:
        import traceback
        err_detail = traceback.format_exc()
        logger.error("upload_material_unexpected_error", error=str(exc), detail=err_detail)
        try:
            stored_path.unlink(missing_ok=True)
        except Exception:
            pass
        raise HTTPException(500, "服务器内部错误，请稍后重试") from exc


_FILE_TYPE_MIME: dict[str, str] = {
    "pdf": "application/pdf",
    "doc": "application/msword",
    "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    "xls": "application/vnd.ms-excel",
    "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    "ppt": "application/vnd.ms-powerpoint",
    "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
    "png": "image/png",
    "jpg": "image/jpeg",
    "jpeg": "image/jpeg",
    "tiff": "image/tiff",
    "bmp": "image/bmp",
    "txt": "text/plain",
    "md": "text/markdown",
    "markdown": "text/markdown",
}


@router.get("/{record_id}/uploaded-files/{file_id}", summary="下载已上传文件")
async def download_uploaded_file(
    record_id: str,
    file_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> FileResponse:
    """Download the original uploaded file by file_id."""
    import re
    from app.config import settings

    # Path traversal prevention
    if not re.fullmatch(r'[a-f0-9]+', record_id) or not re.fullmatch(r'[a-f0-9]+', file_id):
        raise HTTPException(400, "Invalid record_id or file_id format")

    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")

    # Find file metadata in imported_items_json
    items = record.get("imported_items_json") or []
    file_meta = next((i for i in items if i.get("file_id") == file_id and i.get("item_type") == "uploaded_file"), None)
    if file_meta is None:
        raise HTTPException(404, "File not found in record")

    file_ext = file_meta.get("file_ext", f".{file_meta.get('file_type', 'bin')}")
    if not file_ext.startswith("."):
        file_ext = f".{file_ext}"
    file_path = settings.file_storage_path / "_research_uploads" / record_id / f"{file_id}{file_ext}"
    if not file_path.exists():
        raise HTTPException(404, "File not found on disk")

    title = file_meta.get("title", file_id)
    # Sanitize filename
    title = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', title)
    file_type = file_meta.get("file_type", "bin")
    media_type = _FILE_TYPE_MIME.get(file_type, "application/octet-stream")

    return FileResponse(
        path=str(file_path),
        media_type=media_type,
        filename=title,
    )


@router.post("/{record_id}/regenerate-document", summary="重新生成交付物文档")
async def regenerate_document(
    record_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
    body: dict[str, Any] | None = None,
) -> StreamingResponse:
    """Regenerate the final deliverable document using all available context.

    Accepts optional JSON body: ``{"output_template": "policy_brief"}``.
    """
    from app.config import settings
    from app.core.research_formatter import _build_final_document_messages

    output_template = (body or {}).get("output_template")

    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")
    if record["status"] != "completed":
        raise HTTPException(400, "Document regeneration requires completed status")

    # Load report
    report_row = await store.get_report(record_id, user.user_id)
    if report_row is None:
        raise HTTPException(404, "Report not found")
    report_json = report_row.get("report_json") or {}

    task_json = record.get("task_json") or {}
    plan_json = record.get("plan_json") or {}
    refs = record.get("references_json") or []
    chat_msgs = record.get("chat_messages_json") or []
    imported_items = record.get("imported_items_json") or []

    # Use provided template or fall back to record's original template
    template = output_template or record.get("output_template") or "comprehensive"

    # Filter uploaded files as extra materials
    extra_materials = [i for i in imported_items if i.get("item_type") == "uploaded_file"]

    messages = _build_final_document_messages(
        task_json, plan_json, report_json, refs,
        output_template=template,
        chat_history=chat_msgs if chat_msgs else None,
        extra_materials=extra_materials if extra_materials else None,
    )

    async def _stream() -> AsyncIterator[str]:
        import json as _json

        yield f"data: {_json.dumps({'type': 'progress', 'content': '正在重新生成交付物文档...'}, ensure_ascii=False)}\n\n"

        doc_parts: list[str] = []
        try:
            async for chunk in llm_client.chat(
                messages,
                temperature=0.3,
                max_tokens=settings.llm_max_tokens,
            ):
                doc_parts.append(chunk)

            final_md = "".join(doc_parts)

            # Persist to database
            await mysql.execute(
                """
                UPDATE research_record_reports
                SET final_document_md = %s, updated_at = CURRENT_TIMESTAMP
                WHERE record_id = %s
                """,
                (final_md, record_id),
            )

            yield f"data: {_json.dumps({'type': 'final_document', 'content': final_md}, ensure_ascii=False)}\n\n"
            yield f"data: {_json.dumps({'type': 'done', 'content': ''}, ensure_ascii=False)}\n\n"
        except Exception as exc:
            logger.error("regenerate_document_failed: %s", exc)
            yield f"data: {_json.dumps({'type': 'error', 'content': '文档生成失败，请稍后重试'}, ensure_ascii=False)}\n\n"
            yield f"data: {_json.dumps({'type': 'done', 'content': ''}, ensure_ascii=False)}\n\n"

    return StreamingResponse(
        _stream(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )


@router.post("/{record_id}/chat", summary="研究后继续对话 (F2)")
async def chat_record(
    record_id: str,
    body: dict[str, Any],
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> StreamingResponse:
    """Run post-research chat: yield SSE chunks (direct stream, no Celery)."""
    user_message = body.get("message", "")
    if not user_message:
        raise HTTPException(400, "message is required")
    mysql = _require_mysql(mysql_client)
    # Pre-validate before creating async generator (same pattern as H3)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")
    if record["status"] != "completed":
        raise HTTPException(400, "Chat is only available for completed records")

    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)
    gen = svc.chat(record_id, user.user_id, user_message)
    return StreamingResponse(
        _sse_dict_stream(gen),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )


@router.post("/{record_id}/regenerate", summary="重新生成（创建新版本）")
async def regenerate_record(
    record_id: str,
    body: dict[str, Any],
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient | None, Depends(get_neo4j_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, Any]:
    """Create a new version of this record with an updated task."""
    task = body.get("task")
    if not task or not isinstance(task, dict):
        raise HTTPException(400, "task is required and must be a dict")
    mysql = _require_mysql(mysql_client)
    svc = _build_service(mysql, redis_client, es_client, neo4j_client, embedding_client, llm_client)
    try:
        result = await svc.regenerate_record(record_id, user.user_id, task)
    except ValueError as exc:
        raise HTTPException(400, str(exc)) from exc
    return result


@router.get("/{record_id}/versions", summary="列出该记录的所有版本")
async def list_versions(
    record_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> list[dict[str, Any]]:
    """Return all versions sharing the same root_record_id."""
    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")
    root_id = record.get("root_record_id") or record_id
    versions = await store.list_versions(root_id, user.user_id)
    return [
        ResearchRecordSummary(
            id=r["id"],
            title=r["title"],
            mode=r.get("mode", "deep"),
            status=r["status"],
            output_template=r.get("output_template", "comprehensive"),
            summary=r.get("summary"),
            archived=bool(r.get("archived")),
            version_no=r.get("version_no", 1),
            created_at=str(r.get("created_at", "")),
            updated_at=str(r.get("updated_at", "")),
        ).model_dump()
        for r in versions
    ]


@router.post("/{record_id}/import-items", summary="导入资料到已有记录 (F6)")
async def import_items(
    record_id: str,
    body: dict[str, Any],
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, Any]:
    """Append items to an existing record's imported_items_json.

    Only allowed for draft and planned records.
    """
    items = body.get("items")
    if not items or not isinstance(items, list):
        raise HTTPException(400, "items is required and must be a list")
    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    record = await store.get_record(record_id, user.user_id)
    if record is None:
        raise HTTPException(404, "Record not found")
    if record["status"] not in ("draft", "planned"):
        raise HTTPException(400, f"Cannot import items to record in status {record['status']}")
    existing = record.get("imported_items_json") or []
    merged = existing + items
    await store.update_record(record_id, user.user_id, imported_items_json=merged)
    needs_replan = record["status"] == "planned"
    return {"status": "ok", "total_items": len(merged), "needs_replan": needs_replan}


@router.post("/{record_id}/archive", summary="归档研究记录")
async def archive_record(
    record_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, str]:
    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    affected = await store.archive_record(record_id, user.user_id)
    if affected == 0:
        raise HTTPException(404, "Record not found")
    return {"status": "archived"}


@router.delete("/{record_id}", summary="删除研究记录")
async def delete_record(
    record_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, str]:
    mysql = _require_mysql(mysql_client)
    store = ResearchRecordStore(mysql)
    affected = await store.delete_record(record_id, user.user_id)
    if affected == 0:
        raise HTTPException(404, "Record not found or not in deletable state")
    return {"status": "deleted"}
