# 关联文件（Related Documents）功能重构方案（终版）


参考：
 
[research-session-split-prd.md](research-session-split-prd.md)

[research-session-split-design.md](research-session-split-design.md)


## Context

初版实现存在以下问题（code review 指出）：
1. 双向关系同步逻辑散落在 `document.py` 端点和 `ingest_pipeline.py`，无统一服务
2. PUT 端点只追加反向关系、不清理旧反向关系 → 脏数据
3. 上传流只写单向关系，不触发双向同步
4. 前端搜索用裸 axios 不走认证

二审补充：
5. sync 执行顺序必须"先校验产出 valid_related，再写当前文档"
6. 返回契约必须含 `warnings`，不能只返裸 dict
7. `partial_failed` 状态也要触发同步（meta 已落库就该同步）
8. 关联同步属于"后处理 best-effort"，不改主任务状态，不触发重试
9. 前端搜索应放 `search.ts` 而非 `document.ts`

---

## 一、新建 RelatedDocsService

**新建**: `backend/app/core/related_docs_service.py`

```python
class RelatedDocsService:
    REVERSE_TYPE = {"正文": "附件", "附件": "正文"}

    def __init__(self, es_client: ESClient):
        self._es = es_client

    async def sync(self, doc_id: str, new_related: list[dict]) -> dict:
        """全量声明式同步。返回 dict 包含 related_docs, affected_doc_ids, warnings。

        返回普通 dict（不是 dataclass），便于端点层直接 ** 展开构造 Pydantic Response。
        """
```

> **实现细节：service 返回 dict 而非 dataclass**
>
> 避免 `**dataclass_instance` 不可用的问题。sync() 返回：
> ```python
> return {
>     "doc_id": doc_id,
>     "related_docs": valid_related,
>     "affected_doc_ids": affected_ids,
>     "warnings": warnings,
> }
> ```
> 端点层直接 `RelatedDocsSyncResponse(**result)` 构造响应。

### sync() 方法签名

```python
async def sync(
    self,
    doc_id: str,
    new_related: list[dict],
    current_title: str = "",          # 当前文档标题（用于写反向引用）
    known_old_related: list[dict] | None = None,  # 已知旧状态（跳过回读）
) -> dict:
```

> **近实时可见性处理**：`known_old_related` 参数解决上传流时序问题。
> 上传流中 meta 刚写入可能未刷新，sync 不依赖回读当前文档：
> - **PUT 端点调用**：`known_old_related=None`，从 ES 正常回读（meta 已稳定存在）
> - **上传后调用**：`known_old_related=[]`（新文档无旧关系），跳过回读，避免近实时窗口问题

### sync() 执行顺序（严格）

```
1. 过滤自关联 → doc_id == 自身的记录移入 warnings
2. 按 doc_id 去重（保留最后一条），重复项进 warnings
3. 校验目标文档是否存在 → 不存在的移入 warnings
4. 得到 valid_related 和 warnings
5. 获取当前文档旧状态：
   - 如果 known_old_related is not None → 直接使用（跳过 ES 回读）
   - 否则 → 从 ES 读取当前文档的 related_docs
6. 计算差量：
   old_map = {r["doc_id"]: r for r in old_related}
   new_map = {r["doc_id"]: r for r in valid_related}
   added   = new_map.keys() - old_map.keys()
   removed = old_map.keys() - new_map.keys()
   kept    = new_map.keys() & old_map.keys()
   changed = {id for id in kept if type changed}
7. 写当前文档 related_docs = valid_related
8. best-effort 更新受影响目标文档：
   - added: 追加反向关系
   - removed: 删除反向关系
   - changed: 更新反向 relation_type
   - 目标更新失败只进 warnings，不回滚主文档
```

### 返回契约（普通 dict）

sync() 返回普通 dict，端点层直接 `RelatedDocsSyncResponse(**result)` 构造响应：

```python
# sync() 的返回值结构
{
    "doc_id": "A",
    "related_docs": [{"doc_id": "B", "title": "...", "relation_type": "附件"}],
    "affected_doc_ids": ["B"],   # 仅包含 ES 写入成功的目标文档 ID
    "warnings": [{"doc_id": "C", "code": "TARGET_NOT_FOUND", "reason": "目标文档不存在"}],
}
```

> **affected_doc_ids 语义**：仅包含实际成功完成 ES 写入的目标文档 ID。
> - 新增关联：B 的反向记录写入成功 → B 进入 affected_doc_ids
> - 删除关联：B 的反向记录删除成功 → B 进入 affected_doc_ids
> - 类型切换：B 的反向类型更新成功 → B 进入 affected_doc_ids
> - 目标不存在/写入失败 → 不进 affected_doc_ids，进 warnings

---

## 二、新增 Schema

**文件**: `backend/app/api/schemas/document.py`

```python
class RelatedDocsWarning(BaseModel):
    doc_id: str = ""
    code: str = ""        # SELF_REFERENCE | DUPLICATE | TARGET_NOT_FOUND | TARGET_UPDATE_FAILED | SYNC_EXCEPTION
    reason: str = ""

class RelatedDocsSyncResponse(BaseModel):
    doc_id: str
    related_docs: list[RelatedDoc] = Field(default_factory=list)
    affected_doc_ids: list[str] = Field(default_factory=list)
    warnings: list[RelatedDocsWarning] = Field(default_factory=list)
```

---

## 三、重构 PUT 端点

**文件**: `backend/app/api/v1/document.py` (~line 417-486)

改为纯编排层：

```python
@router.put("/{doc_id}/related-docs", response_model=RelatedDocsSyncResponse)
async def update_related_docs(doc_id, body, user, request):
    # 1. 验证当前文档存在 + 权限
    # 2. service = RelatedDocsService(es_client)
    # 3. result = await service.sync(doc_id, [r.model_dump() for r in body.related_docs])
    # 4. return RelatedDocsSyncResponse(**result)
```

**删除**: 现有手写的双向同步循环（~line 458-484），全部由服务层负责。

---

## 四、入库后 best-effort 双向同步

**文件**: `backend/app/tasks/ingest_task.py` (~line 162-177)

> **ES client 来源**：复用 `recorder_es`（line 101），它是同一 ES 连接配置的 `ESClient`，
> 在 `_run()` 作用域内始终可用，在 `finally` 统一关闭。
> **不用** `pipeline._es`（私有属性）或重新创建新连接。

在异步内部执行块里（`_run()` 内）分两阶段：

**阶段 A（`pipeline.ingest_document()` 之前）**：读取旧关联状态

```python
# 关键：区分"字段不存在"（不参与同步）和"字段存在但为空"（清空关联）
# metadata.get("related_docs") 返回 None → 字段不存在 → 跳过同步
# metadata.get("related_docs") 返回 []   → 显式清空 → 必须同步
_has_related_docs_field = "related_docs" in metadata
related_docs_from_meta = metadata.get("related_docs", [])
old_related_docs: list[dict] = []
if _has_related_docs_field:
    try:
        old_resp = await recorder_es.raw.get(
            index=settings.es_meta_index, id=doc_id,
        )
        old_raw = old_resp if isinstance(old_resp, dict) else old_resp.body
        old_related_docs = old_raw.get("_source", {}).get("related_docs", [])
    except Exception:
        old_related_docs = []  # 新文档不存在，等价于空
```

**阶段 B（`pipeline.ingest_document()` 之后、`finish_trace()` 之前）**：best-effort 同步

```python
status = result.get("status", "failed")
if status in ("completed", "partial_failed"):
    if _has_related_docs_field:  # 空数组也参与同步（清空场景）
        try:
            from app.core.related_docs_service import RelatedDocsService
            service = RelatedDocsService(recorder_es)  # 复用已有 ESClient
            # resolved_title: 与 _write_doc_meta() 一致的标题 fallback 逻辑
            resolved_title = metadata.get("title", "")
            if not resolved_title:
                from pathlib import Path as _P
                resolved_title = _P(metadata.get("original_filename", "")).stem
            sync_result = await service.sync(
                doc_id, related_docs_from_meta,
                current_title=resolved_title,
                known_old_related=old_related_docs,  # 入库前快照，避免近实时窗口
            )
            if sync_result.get("warnings"):
                # 标准 logging 格式，与文件内现有 logger 风格一致
                logger.warning(
                    "Related docs sync warnings for doc_id=%s: %s",
                    doc_id, sync_result["warnings"],
                )
                result.setdefault("warnings", []).extend(
                    [{"type": "related_docs", **w} for w in sync_result["warnings"]]
                )
        except Exception as e:
            logger.warning("Related docs sync failed for doc_id=%s: %s", doc_id, str(e))
            result.setdefault("warnings", []).append(
                {"type": "related_docs", "doc_id": "", "code": "SYNC_EXCEPTION", "reason": f"关联同步异常: {e}"}
            )
    # result.status 保持 completed 或 partial_failed，不因同步失败改变
```

> **日志格式**：使用 `%s` 占位符（标准 logging），与 `ingest_task.py` 现有风格一致。
> 不使用 structlog keyword 风格（如 `logger.warning("key", field=val)`）。

**保持**: `ingest_pipeline.py:1335` 的 `"related_docs": metadata.get("related_docs", [])` 单向写入（合理兜底）。

---

## 五、前端搜索迁到认证实例

### 5.1 新增到 search.ts

**文件**: `frontend/src/api/search.ts`（或新建如果不存在）

```typescript
import type { SearchResponse } from '@/types/search'

/** 搜索已入库文档（关联文档选择器专用，最小字段集） */
export function searchDocsForBinding(query: string): Promise<SearchResponse> {
  return request
    .post<SearchResponse>('/api/ai/v1/search', { query, page_size: 10 })
    .then((res) => res.data)
}
```

### 5.2 删除 mock.ts 中的裸 axios searchDocs

**文件**: `frontend/src/api/mock.ts` — 删除 `searchDocs` 方法

### 5.3 MockOA.vue 改引用

**文件**: `frontend/src/views/MockOA.vue`

```diff
- import { mockApi } from '@/api/mock'
+ import { searchDocsForBinding } from '@/api/search'
  // ...
- const res = await mockApi.searchDocs(query)
+ const res = await searchDocsForBinding(query)
```

---

## 六、已完成项（本次不动）

| 项目 | 文件 | 状态 |
|------|------|------|
| ES mapping `related_docs` nested 字段 | `es_client.py` | ✅ |
| `RelatedDoc` / `RelatedDocsUpdateRequest` Schema | `document.py` schemas | ✅ |
| 入库流透传 `related_docs` 到 meta | `ingest_pipeline.py` | ✅ |
| 文档详情接口返回 `related_docs` | `document.py` | ✅ |
| 前端类型 `RelatedDoc` | `document.d.ts` | ✅ |
| 前端 `updateRelatedDocs` API | `document.ts` | ✅ |
| 上传页关联文档 UI | `MockOA.vue` | ✅ |
| 详情页关联文件 tab | `DocDetailView.vue` | ✅ |
| PRD 和 API 文档 | `docs/` | ✅ |

---

## 七、实施步骤

| # | 任务 | 文件 |
|---|------|------|
| 1 | 新增 Schema (Warning + Response) | `backend/app/api/schemas/document.py` |
| 2 | 新建 RelatedDocsService | `backend/app/core/related_docs_service.py` |
| 3 | 重构 PUT 端点 | `backend/app/api/v1/document.py` |
| 4 | 入库后 best-effort 同步 | `backend/app/tasks/ingest_task.py` |
| 5 | 前端搜索迁到 search.ts | `search.ts` + `mock.ts` + `MockOA.vue` |
| 6 | 启动验证 | 重启后端，跑测试场景 |

---

## 八、验证方案

| 场景 | 操作 | 预期 |
|------|------|------|
| 新增关联 | A 关联 B | A、B 双向可见，HTTP 200，warnings=[] |
| 删除关联 | A 取消 B | 两边消失，affected_doc_ids 含 B |
| 类型切换 | A 把 B 从附件→正文 | B 反向从正文→附件 |
| 上传建链 | 上传带 related_docs | 入库 completed，双向正确 |
| 自循环 | A 关联自己 | warnings 含 "不允许关联自身" |
| 不存在 | A 关联不存在的 C | related_docs 不含 C，warnings 含 "目标文档不存在" |
| 去重 | 同一 B 出现两次 | 只保留一条，warnings 含重复提示 |
| partial_failed 同步 | 入库 partial_failed + related_docs | 仍触发同步，主状态不变 |
| 同步异常 | 目标文档 ES 写入失败 | 主文档已写入，warnings 记录失败，不回滚 |
