"""Notebook API endpoints — CRUD, sources, chat (SSE), outputs.

Notebook 模块的 API 路由，提供笔记本管理、来源管理、范围隔离对话、输出文档生成。
"""

from __future__ import annotations

import json
import shutil
from pathlib import Path
from typing import Annotated, Any, AsyncIterator

from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import StreamingResponse

from app.api.deps import (
    UserContext,
    get_current_user,
    get_embedding_client,
    get_es_client,
    get_llm_client,
    get_mysql_client,
    get_neo4j_client,
    get_permission_context,
    get_redis_client,
)
from app.api.schemas.notebook import (
    AddSourceByPasteRequest,
    AddSourceBySearchRequest,
    NotebookChatConfigRequest,
    NotebookChatHistoryResponse,
    NotebookChatMessage,
    NotebookChatRequest,
    NotebookCreateRequest,
    NotebookDetail,
    NotebookListResponse,
    NotebookOutputDetail,
    NotebookOutputInfo,
    NotebookSourceInfo,
    NotebookSummary,
    NotebookUpdateRequest,
    OutputGenerateRequest,
    OutputListResponse,
    SourceListResponse,
    SourceUpdateRequest,
    SuggestionsResponse,
)
from app.api.schemas.notebook import NotebookConfig
from app.api.schemas.research import ResearchChunk
from app.config import settings
from app.core.embedding import EmbeddingService
from app.core.graph_query_service import GraphQueryService
from app.core.notebook_service import NotebookService
from app.core.permission import PermissionContext, PermissionService
from app.infrastructure.embedding_client import EmbeddingClient
from app.infrastructure.es_client import ESClient
from app.infrastructure.llm_client import LLMClient
from app.infrastructure.mysql_client import MySQLClient
from app.infrastructure.neo4j_client import Neo4jClient
from app.infrastructure.notebook_store import NotebookStore
from app.infrastructure.redis_client import RedisClient
from app.infrastructure.session_store import build_research_session_store
from app.utils.logger import get_logger

logger = get_logger(__name__)

router = APIRouter(prefix="/notebook", tags=["notebook"])


# ---------------------------------------------------------------------------
# Helper: build NotebookService from DI clients
# ---------------------------------------------------------------------------

def _build_service(
    es_client: ESClient,
    neo4j_client: Neo4jClient,
    redis_client: RedisClient,
    mysql_client: MySQLClient | None,
    embedding_client: EmbeddingClient,
    llm_client: LLMClient,
) -> NotebookService:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL is required for Notebook module")

    store = NotebookStore(mysql_client)
    embedding_svc = EmbeddingService(embedding_client)
    graph_svc = GraphQueryService(neo4j_client)
    session_store = build_research_session_store(
        redis_client=redis_client, mysql_client=mysql_client,
    )

    return NotebookService(
        store=store,
        es_client=es_client,
        embedding_service=embedding_svc,
        graph_service=graph_svc,
        llm_client=llm_client,
        session_store=session_store,
    )


async def _resolve_acl_tokens(
    user: UserContext, redis_client: RedisClient,
) -> list[str]:
    """Resolve user ACL tokens for chat scoping."""
    perm_service = PermissionService(redis_client=redis_client)
    perm = await perm_service.resolve(user)
    return perm.acl_tokens


async def _sse_stream(gen: AsyncIterator[ResearchChunk]) -> AsyncIterator[str]:
    async for chunk in gen:
        payload = json.dumps(chunk.model_dump(exclude_none=True), ensure_ascii=False)
        yield f"data: {payload}\n\n"


# ===================================================================
# Notebook CRUD
# ===================================================================

@router.post("", summary="新建 Notebook")
async def create_notebook(
    body: NotebookCreateRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, Any]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    return await svc.create_notebook(user.user_id, body.title, body.description)


@router.get("", summary="Notebook 列表", response_model=NotebookListResponse)
async def list_notebooks(
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
    q: str | None = None,
    status: str | None = None,
    limit: int = 50,
    offset: int = 0,
) -> NotebookListResponse:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    items, total = await svc.list_notebooks(user.user_id, q=q, status=status, limit=limit, offset=offset)
    return NotebookListResponse(
        items=[NotebookSummary(**nb) for nb in items],
        total=total,
    )


@router.get("/{notebook_id}", summary="Notebook 详情")
async def get_notebook(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, Any]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    nb = await svc.get_notebook(notebook_id, user.user_id)
    if nb is None:
        raise HTTPException(status_code=404, detail="Notebook not found")
    return nb


@router.put("/{notebook_id}", summary="更新 Notebook")
async def update_notebook(
    notebook_id: str,
    body: NotebookUpdateRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, str]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    await svc.update_notebook(
        notebook_id, user.user_id,
        title=body.title, description=body.description, config=body.config,
    )
    return {"status": "ok"}


@router.delete("/{notebook_id}", summary="删除 Notebook")
async def delete_notebook(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, str]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    affected = await svc.delete_notebook(notebook_id, user.user_id)
    if affected == 0:
        raise HTTPException(status_code=404, detail="Notebook not found")
    return {"status": "deleted"}


@router.post("/{notebook_id}/archive", summary="归档 Notebook")
async def archive_notebook(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, str]:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    await store.archive_notebook(notebook_id, user.user_id)
    return {"status": "archived"}


# ===================================================================
# Sources
# ===================================================================

@router.get("/{notebook_id}/sources", summary="来源列表", response_model=SourceListResponse)
async def list_sources(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> SourceListResponse:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    sources = await store.list_sources(notebook_id, user.user_id)
    return SourceListResponse(
        items=[NotebookSourceInfo(**s) for s in sources],
        total=len(sources),
    )


@router.post("/{notebook_id}/sources/search", summary="从检索添加来源")
async def add_source_by_search(
    notebook_id: str,
    body: AddSourceBySearchRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, Any]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    try:
        return await svc.add_source_by_search(notebook_id, user.user_id, body.doc_id, body.title)
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))


@router.post("/{notebook_id}/sources/upload", summary="上传文件作为来源")
async def add_source_by_upload(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
    file: UploadFile = File(...),
) -> dict[str, Any]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)

    # Save uploaded file
    upload_dir = Path(settings.file_storage_path) / "notebook_uploads"
    upload_dir.mkdir(parents=True, exist_ok=True)

    import uuid
    safe_name = f"{uuid.uuid4().hex[:8]}_{file.filename or 'upload'}"
    dest_path = upload_dir / safe_name
    with open(dest_path, "wb") as f:
        content = await file.read()
        f.write(content)

    try:
        result = await svc.add_source_by_upload(
            notebook_id, user.user_id,
            str(dest_path), file.filename or safe_name,
        )
    except ValueError as exc:
        dest_path.unlink(missing_ok=True)
        raise HTTPException(status_code=400, detail=str(exc))

    # Dispatch Celery task — pass POSIX relative path for Docker portability
    from app.tasks.notebook_task import notebook_ingest_source_task
    try:
        relative_path = dest_path.relative_to(settings.file_storage_path).as_posix()
    except ValueError:
        relative_path = str(dest_path)
    logger.info("notebook_upload_dispatch", dest_path=str(dest_path), storage=str(settings.file_storage_path), relative_path=relative_path)
    task = notebook_ingest_source_task.delay(
        source_id=result["id"],
        notebook_id=notebook_id,
        user_id=user.user_id,
        doc_id=result["doc_id"],
        file_path=relative_path,
        title=result["title"],
    )
    # Update task ID
    if mysql_client:
        store = NotebookStore(mysql_client)
        await store.update_source(
            result["id"], notebook_id, user.user_id, ingest_task_id=task.id,
        )
    result["task_id"] = task.id
    return result


@router.post("/{notebook_id}/sources/paste", summary="粘贴文字作为来源")
async def add_source_by_paste(
    notebook_id: str,
    body: AddSourceByPasteRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, Any]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    try:
        result = await svc.add_source_by_paste(notebook_id, user.user_id, body.title, body.content)
    except ValueError as exc:
        raise HTTPException(status_code=400, detail=str(exc))

    # Dispatch Celery task — pass POSIX relative path for Docker portability
    from app.tasks.notebook_task import notebook_ingest_source_task
    try:
        _paste_rel = Path(result["file_path"]).relative_to(settings.file_storage_path).as_posix()
    except ValueError:
        _paste_rel = result["file_path"]
    task = notebook_ingest_source_task.delay(
        source_id=result["id"],
        notebook_id=notebook_id,
        user_id=user.user_id,
        doc_id=result["doc_id"],
        file_path=_paste_rel,
        title=result["title"],
    )
    if mysql_client:
        store = NotebookStore(mysql_client)
        await store.update_source(
            result["id"], notebook_id, user.user_id, ingest_task_id=task.id,
        )
    result["task_id"] = task.id
    return result


@router.put("/{notebook_id}/sources/{source_id}", summary="更新来源")
async def update_source(
    notebook_id: str,
    source_id: str,
    body: SourceUpdateRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, str]:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    fields: dict[str, Any] = {}
    if body.selected is not None:
        fields["selected"] = int(body.selected)
    if body.title is not None:
        fields["title"] = body.title
    await store.update_source(source_id, notebook_id, user.user_id, **fields)
    return {"status": "ok"}


@router.delete("/{notebook_id}/sources/{source_id}", summary="删除来源")
async def delete_source(
    notebook_id: str,
    source_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, str]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    affected = await svc.delete_source(source_id, notebook_id, user.user_id)
    if affected == 0:
        raise HTTPException(status_code=404, detail="Source not found")
    return {"status": "deleted"}


@router.post("/{notebook_id}/sources/{source_id}/retry", summary="重试入库")
async def retry_source_ingest(
    notebook_id: str,
    source_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> dict[str, Any]:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    source = await svc.retry_source_ingest(source_id, notebook_id, user.user_id)
    if source is None:
        raise HTTPException(status_code=400, detail="Source not retryable")

    # Re-dispatch Celery task — pass POSIX relative path for Docker portability
    from app.tasks.notebook_task import notebook_ingest_source_task
    _retry_path = source.get("file_path", "")
    if _retry_path:
        try:
            _retry_path = Path(_retry_path).relative_to(settings.file_storage_path).as_posix()
        except ValueError:
            pass  # already relative or different root
    task = notebook_ingest_source_task.delay(
        source_id=source_id,
        notebook_id=notebook_id,
        user_id=user.user_id,
        doc_id=source["doc_id"],
        file_path=_retry_path,
        title=source.get("title", ""),
    )
    if mysql_client:
        store = NotebookStore(mysql_client)
        await store.update_source(
            source_id, notebook_id, user.user_id, ingest_task_id=task.id,
        )
    return {"status": "retrying", "task_id": task.id}


@router.get("/{notebook_id}/sources/{source_id}/chunks", summary="获取来源内容片段")
async def get_source_chunks(
    notebook_id: str,
    source_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    limit: int = 20,
) -> dict[str, Any]:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    source = await store.get_source(source_id, notebook_id, user.user_id)
    if source is None or not source.get("doc_id"):
        raise HTTPException(status_code=404, detail="Source not found")

    doc_id = source["doc_id"]
    resp = await es_client.raw.search(
        index=settings.es_chunk_index,
        body={
            "size": limit,
            "query": {"term": {"doc_ids": doc_id}},
            "sort": [{"chunk_index": {"order": "asc", "unmapped_type": "integer"}}],
            "_source": ["content", "heading_hierarchy", "page_number", "page_numbers", "element_type", "chunk_index"],
        },
    )
    raw = resp if isinstance(resp, dict) else resp.body
    chunks = []
    for hit in raw.get("hits", {}).get("hits", []):
        src = hit.get("_source", {})
        chunks.append({
            "content": src.get("content", ""),
            "heading_hierarchy": src.get("heading_hierarchy", []),
            "page_number": src.get("page_number"),
            "page_numbers": src.get("page_numbers", []),
            "element_type": src.get("element_type", ""),
            "chunk_index": src.get("chunk_index"),
        })
    return {"doc_id": doc_id, "chunks": chunks, "total": len(chunks)}


# ===================================================================
# Chat
# ===================================================================

@router.post("/{notebook_id}/chat", summary="Notebook 对话 (SSE)")
async def notebook_chat(
    notebook_id: str,
    body: NotebookChatRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> StreamingResponse:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    acl_tokens = await _resolve_acl_tokens(user, redis_client)

    gen = svc.chat(
        notebook_id, user.user_id, body.question,
        session_id=body.session_id, acl_tokens=acl_tokens,
    )

    return StreamingResponse(
        _sse_stream(gen),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )


@router.get("/{notebook_id}/chat/history", summary="对话历史", response_model=NotebookChatHistoryResponse)
async def get_chat_history(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    session_id: str = "default",
    limit: int = 100,
    offset: int = 0,
) -> NotebookChatHistoryResponse:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    messages = await store.list_messages(
        notebook_id, user.user_id, session_id=session_id, limit=limit, offset=offset,
    )
    return NotebookChatHistoryResponse(
        notebook_id=notebook_id,
        session_id=session_id,
        messages=[NotebookChatMessage(
            id=m["id"],
            role=m["role"],
            content=m.get("content"),
            references=m.get("references_json"),
            suggestions=m.get("suggestions_json"),
            created_at=m.get("created_at"),
        ) for m in messages],
        total=len(messages),
    )


@router.post("/{notebook_id}/chat/suggest", summary="生成建议问题", response_model=SuggestionsResponse)
async def suggest_questions(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> SuggestionsResponse:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    suggestions = await svc.generate_suggestions(notebook_id, user.user_id)
    return SuggestionsResponse(suggestions=suggestions)


@router.put("/{notebook_id}/chat/config", summary="更新对话配置")
async def update_chat_config(
    notebook_id: str,
    body: NotebookChatConfigRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, str]:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    await store.update_notebook(
        notebook_id, user.user_id,
        config_json=body.config.model_dump(),
    )
    return {"status": "ok"}


# ===================================================================
# Outputs
# ===================================================================

@router.post("/{notebook_id}/outputs", summary="生成输出文档 (SSE)")
async def generate_output(
    notebook_id: str,
    body: OutputGenerateRequest,
    user: Annotated[UserContext, Depends(get_current_user)],
    es_client: Annotated[ESClient, Depends(get_es_client)],
    neo4j_client: Annotated[Neo4jClient, Depends(get_neo4j_client)],
    redis_client: Annotated[RedisClient, Depends(get_redis_client)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
    embedding_client: Annotated[EmbeddingClient, Depends(get_embedding_client)],
    llm_client: Annotated[LLMClient, Depends(get_llm_client)],
) -> StreamingResponse:
    svc = _build_service(es_client, neo4j_client, redis_client, mysql_client, embedding_client, llm_client)
    gen = svc.generate_output(
        notebook_id, user.user_id,
        body.output_type, body.title, body.custom_instructions,
    )
    return StreamingResponse(
        _sse_stream(gen),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )


@router.get("/{notebook_id}/outputs", summary="输出文档列表", response_model=OutputListResponse)
async def list_outputs(
    notebook_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> OutputListResponse:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    outputs = await store.list_outputs(notebook_id, user.user_id)
    return OutputListResponse(
        items=[NotebookOutputInfo(**o) for o in outputs],
        total=len(outputs),
    )


@router.get("/{notebook_id}/outputs/{output_id}", summary="输出文档详情")
async def get_output(
    notebook_id: str,
    output_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, Any]:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    output = await store.get_output(output_id, notebook_id, user.user_id)
    if output is None:
        raise HTTPException(status_code=404, detail="Output not found")
    return output


@router.delete("/{notebook_id}/outputs/{output_id}", summary="删除输出文档")
async def delete_output(
    notebook_id: str,
    output_id: str,
    user: Annotated[UserContext, Depends(get_current_user)],
    mysql_client: Annotated[MySQLClient | None, Depends(get_mysql_client)],
) -> dict[str, str]:
    if mysql_client is None:
        raise HTTPException(status_code=503, detail="MySQL required")
    store = NotebookStore(mysql_client)
    affected = await store.delete_output(output_id, notebook_id, user.user_id)
    if affected == 0:
        raise HTTPException(status_code=404, detail="Output not found")
    return {"status": "deleted"}
