
    i"R                        d Z ddlmZ ddlZddlZddlZddlmZmZ ddlm	Z	 ddl
mZ ddlmZ  ee          Z G d d	          ZdS )
u  
入库追踪记录器 —— 为文档入库流程提供结构化事件时间线，用于全链路追踪与问题诊断。
Unified ingest trace recorder — structured event timeline for document ingestion.

Phase A: trace_id = task_id (Celery UUID). No independent artifact index yet.

设计要点:
  - 同一 trace 的多个 recorder 实例通过从 ES 同步 seq 保证事件序号单调递增
  - event_id 使用 UUID 避免跨实例冲突
  - 所有写入失败均被捕获并记录日志，绝不阻塞主入库流程

Usage
-----
    recorder = IngestTraceRecorder(es_client, trace_id=task_id, doc_id=doc_id)
    await recorder.start_trace(source_type="webhook", ...)
    await recorder.record("file_type_detected", "completed", summary="检测文件类型: pdf", details={...})
    await recorder.finish_trace("completed")

Multiple recorder instances for the same trace are safe: each one syncs
``seq`` from the trace summary in ES before its first write, and event IDs
use UUIDs instead of seq-based names to avoid collisions.
    )annotationsN)datetimetimezone)Any)settings)
get_loggerc            	         e Zd ZdZdddPdZdQdZdRdZddddSdZdTdZddddddddUd$Z	ddd%dVd(Z
dWd*Zddd+dddd,dd-dXd9Zddd+d:dYd;Zddd+d:dYd<Zddddd+d=dZd>ZedQd?            Zdddd@dAdBd[dIZd\dLZddMd]dOZdS )^IngestTraceRecorderu  入库追踪记录器，为单次文档入库生成结构化事件时间线。

    Records structured ingest events for a single document ingest trace.

    Each instance tracks one trace (one ingest run for one document).
    Events are written to OpenSearch with monotonically increasing ``seq``.

    Design constraints (Phase A):
    - ``trace_id`` equals the Celery ``task_id``.
    - ``seq`` is synced from the trace summary in ES on first write, so
      multiple short-lived recorder instances (API → worker → pipeline)
      produce a monotonically increasing, collision-free sequence.
    - ``event_id`` uses UUID to guarantee uniqueness across instances.
    - Write failures are logged and swallowed — never crash the main pipeline.
    N)task_id	es_clientr   trace_idstrdoc_idr   
str | NonereturnNonec                   || _         || _        || _        |p|| _        d| _        d| _        d | _        d | _        d | _        d| _	        ddddddd| _
        d S )Nr   Funknown source_type	file_typeoriginal_filenametitlecontent_hashoperator)_esr   r   r   _seq_seq_synced_started_at_current_stage_stage_start_time_has_stage_failure_trace_defaults)selfr   r   r   r   s        8D:\work\zm-rag\backend\app\core\ingest_trace_recorder.py__init__zIngestTraceRecorder.__init__7   s{      *(	 '+*./3"'$!#0
 0
    boolc                6  K   | j         rdS 	 | j        j                            t          j        | j        ddi           d{V }t          |t                    r|n|j	        }|                    di           }t          |                    dd                    | _        |                    d          r| j        s|d         | _        d| _         dS # t          $ r:}t                              d	| j        t!          |          
           Y d}~dS d}~ww xY w)u  Fetch ``latest_seq`` and ``started_at`` from the trace summary.

        Called once before the first ``record()`` in this instance so that
        a newly-created recorder continues the sequence produced by an
        earlier instance (e.g. API-layer recorder → worker recorder).

        仅用于首次初始化，后续 seq 递增通过 _atomic_inc_seq() 原子操作完成。
        T_sourcezlatest_seq,started_atindexidparamsN
latest_seqr   
started_atsync_seq_failedr   errorF)r   r   rawgetr   es_trace_indexr   
isinstancedictbodyintr   r    	Exceptionloggerdebugr   )r%   respr5   srcexcs        r&   	_sync_seqzIngestTraceRecorder._sync_seqT   s@       	4	))-=!#:; *        D
 %T400?$$diC'')R((CCGGL!4455DIww|$$ 5T-= 5#&|#4 #D4 	 	 	 LL*T]#c((LSSS55555		s   CC 
D/DDr;   c                "  K   	 | j         j                            t          j        | j        ddddiddd           d	{V }t          |t                    r|n|j        }|	                    d
i           	                    d          }|rd|v rt          |d                   S | j         j        	                    t          j        | j        ddi           d	{V }t          |t                    r|n|j        }t          |	                    di           	                    dd                    S # t          $ rP}t                              d| j        t          |                     | xj        dz  c_        | j        cY d	}~S d	}~ww xY w)uN  通过 Painless 脚本原子递增 trace summary 的 latest_seq，解决多 recorder 并发 seq 冲突。

        Atomically increment latest_seq on the trace summary using a Painless
        script.  Returns the new seq value.  Uses retry_on_conflict=3 to handle
        concurrent updates from multiple recorder instances.
        scriptpainlessz:ctx._source.latest_seq = (ctx._source.latest_seq ?: 0) + 1)langsourcer0      )r+   retry_on_conflict)r-   r.   r:   r/   Nr6   r+   r,      atomic_inc_seq_failedr3   )r   r5   updater   r7   r   r8   r9   r:   r6   r;   r<   r=   warningr   r   )r%   resultr5   
get_sourcer?   fallback_rawrA   s          r&   _atomic_inc_seqz#IngestTraceRecorder._atomic_inc_seqr   s     %	8<..-= *"^  $0aHH / 
 
 
 
 
 
 
 
F 'vt44E&&&+C ++//	::J 5lj88:l3444 ))-=!<0 *        D
 $.dD#9#9H44tyL|''	266::<KKLLL 		 		 		 NN'#hh    
 IINII9		s&   B!D4 &BD4 4
F>AF	F	Frunningupload_receivedstatuscurrent_stager1   rU   rV   dict[str, Any]c               6   i d| j         d| j        d| j        d| j        d         d| j        d         d|d|d| j        d         d	| j        d	         d
| j        d
         p| j        d	         d| j        d         ddddd|dddddddd||dS )z<Build a full trace summary document for create/upsert paths.r   r   r   r   r   rU   rV   r   r   r   r   attempt_countrJ   r0   r   r1   finished_atNduration_ms
error_code)error_messageartifact_count
created_at
updated_at)r   r   r   r$   )r%   r1   rU   rV   s       r&   _build_trace_bodyz%IngestTraceRecorder._build_trace_body   s6   

dk
 t|
 D0@	

 4/>
 f
 ]
 -k:
  !56I!J
 T)'2_d6JK^6_
 ,Z8
 Q
 !
 *
 4
$ 4%
& $'
( "$$/
 
 
 	
r(   c                  K   |                                   d{V rdS | j        p/t          j        t          j                                                  }|| _        |                     |d|          }|                     t          j
        | j        |t          j        t          j                                                  d|           d{V  |                                   d{V  dS )zEnsure the trace summary exists before recording events.

        This recovers from API-side trace creation failures by lazily upserting
        a minimal summary document from the worker-side recorder instance.
        NrR   rT   )rV   r`   upsert)rB   r    r   nowr   utc	isoformatra   _safe_updater   r7   r   )r%   rV   now_isoupsert_bodys       r&   _ensure_trace_summaryz)IngestTraceRecorder._ensure_trace_summary   s       !!!!!!!! 	F"Lhl8<&@&@&J&J&L&L",,' - 
 

 #M!.&l8<88BBDD     
 
 	
 	
 	
 	
 	
 	
 	
 nnr(   webhookr   r   r   r   r   r   r   r   c          	     F  K   t          j        t          j                                                  }|| _        | j                            ||||||d           |                     |          }| 	                    t          j        | j        |           d{V | _        dS )z*Create or update the trace summary record.r   N)r   re   r   rf   rg   r    r$   rL   ra   _safe_indexr   r7   r   r   )	r%   r   r   r   r   r   r   ri   r:   s	            r&   start_tracezIngestTraceRecorder.start_trace   s       ,x|,,6688"##*&%6 ,$ 		
 		
 		
 %%g..!%!1!1#M"
 "
 
 
 
 
 
 
r(   )r\   r]   r\   r]   c               :  K   |                      | j        p|           d{V  t          j        t          j                                                  }d}| j        rn	 t          j        | j                  }t          t          j        t          j                  |z
  
                                dz            }n# t          $ r Y nw xY w|||| j        |d}|||d<   ||dd         |d<   |                     t          j        | j        |           d{V  dS )zUpdate the trace summary with final status.

        ``started_at`` is read from ES if this recorder instance was not
        the one that called ``start_trace()``.
        Nr     )rU   rZ   r[   r0   r`   r\     r]   )rk   r!   r   re   r   rf   rg   r    fromisoformatr;   total_secondsr<   r   rh   r   r7   r   )r%   rU   r\   r]   ri   r[   startedupdate_fieldss           r&   finish_tracez IngestTraceRecorder.finish_trace   sk      (()<)FGGGGGGGGG,x|,,6688 	"01ABB!\(,//'9HHJJTQ      "&)!)
 )
 !*4M,'$-:5D5-AM/*#M
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
s   AB< <
C	C	fieldsc                   K   t          j        t          j                                                  |d<   |                     t          j        | j        |           d{V  dS )zCPartial update on the trace summary (e.g. content_hash, file_type).r`   N)	r   re   r   rf   rg   rh   r   r7   r   )r%   rx   s     r&   update_trace_fieldsz'IngestTraceRecorder.update_trace_fields,  s]      '|HL99CCEE| 7OOOOOOOOOOOr(   workerinfo)summarydetailsservicer\   r]   r[   severityartifact_refsstage
event_typer}   r~   dict[str, Any] | Noner   r[   
int | Noner   r   list[str] | Nonec                 K   |                      |           d{V  |                                  d{V }|| _        t          j        t
          j                                                  }|dk    r || _        t          j
                    | _        nJ|dv rF|D| j        =| j        |k    r2t          t          j
                    | j        z
  dz            }d| _        |s|rd}	|dk    rd| _        dt          j                    j         }i d	|d
| j        d| j        d| j        d|d|d|d| j        ddd|ddd|d|pdd|	d|d|r
|dd         ndd|
pg |pi ||d}|                     t*          j        ||           d{V  | j        |d}|d v r||d!<   |r
||d<   ||d<   |                     t*          j        | j        |           d{V  dS )"u   向时间线追加一条结构化事件，自动递增 seq 并同步更新 trace 摘要。

        Append a structured event to the timeline.Nru   )	completedfailedrq   r4   r   Tevt_event_idr   r   r   r   r   rU   seqattemptrJ   r   r   r   r}   r[   r   r   r\   r]   rr   r   )r~   	timestampr_   )r0   r`   )ru   r   rV   )rk   rQ   r   r   re   r   rf   rg   r!   time	monotonicr"   r;   r#   uuiduuid4hexr   r   r   rn   r   es_event_indexrh   r7   )r%   r   r   r}   r~   r   r\   r]   r[   r   r   new_seqri   r   r:   trace_updates                   r&   recordzIngestTraceRecorder.record3  s     $ ((///////// ,,........	,x|,,6688"""'D%)^%5%5D""222{7J%1d6IU6R6R!^%%(>>$F  *.& 	 	H!!&*D# -$*,,*,, 
 
 
 dk 
 t|	 

 U 
 * 
 j 
 49 
 q 
 w 
  
 w 
 ;+! 
  
 * 
  ]L]5D511! 
" ]0b# 
$ }" !) 
  
  
. x6$GGGGGGGGG )!(
 (
 111,1L) 	:)3L&,9L) 7UUUUUUUUUUUr(   r}   r~   r   c               J   K   |                      |d|||           d{V  dS )zRecord a stage started event.ru   r   Nr   r%   r   r}   r~   r   s        r&   record_stage_startz&IngestTraceRecorder.record_stage_start  sV       kk9gw  
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
r(   c               J   K   |                      |d|||           d{V  dS )zRecord a stage completed event.r   r   Nr   r   s        r&   record_stage_completez)IngestTraceRecorder.record_stage_complete  sV       kk;'  
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
r(   )r}   r~   r\   r]   r   c          
     P   K   |                      |d|||||d           d{V  dS )zRecord a stage failure event.r   r4   )r}   r~   r\   r]   r   r   Nr   )r%   r   r}   r~   r\   r]   r   s          r&   record_stage_failedz'IngestTraceRecorder.record_stage_failed  sc       kk!'  	
 	
 		
 		
 		
 		
 		
 		
 		
 		
 		
r(   c                    | j         S )zCWhether any non-blocking stage recorded a failure in this instance.)r#   )r%   s    r&   has_stage_failurez%IngestTraceRecorder.has_stage_failure  s     &&r(   standardF)payload_jsonpayload_textpreview_textretention_levelis_redactedartifact_typer   r   r   r   r   c               n  K   dt          j                    j        dd          }t          j        t
          j                                                  }	d}
|r6t          t          j
        |d                                                    }
n#|r!t          |                                          }
i d|d| j        d	dd
| j        d|d|d|d|d|rdndd|r
|dd         ndd|d|r
|dd         ndddddd|
ddd|	}|                     t          j        ||           d{V  |S )u   写入一条产物记录（如解析结果、LLM 输出等），返回产物 ID，用于事件关联。

        Write an artifact record and return its ID.art_N   r   F)ensure_asciiartifact_idr   r   r   r   r   r   r   content_encodingjson_inlinetext_inliner   i  r   r   r   i   storage_backend
opensearchstorage_pathcontent_bytes
expires_atr_   )r   r   r   r   re   r   rf   rg   lenjsondumpsencoder   r   rn   r   es_artifact_index)r%   r   r   r   r   r   r   r   r   ri   r   r:   s               r&   record_artifactz#IngestTraceRecorder.record_artifact  s      5TZ\\-crc244,x|,,6688 	7
<e L L L S S U UVVMM 	7 3 3 5 566M 
; 
 
  
 dk	 

 U 
 ] 
  
 ; 
  P= 
 ,FL#..B 
 L 
 LJL%00d 
 | 
 D 
 ] 
  $! 
" '# 
( x9;MMMMMMMMMr(   r-   r:   c           	        K   	 | j         j                            |||           d{V  dS # t          $ r6}t                              d||t          |                     Y d}~dS d}~ww xY w)z@Index a document, swallowing errors to never block the pipeline.r-   r.   r:   NTtrace_write_failedr-   r   r4   F)r   r5   r-   r<   r=   rM   r   )r%   r-   r   r:   rA   s        r&   rn   zIngestTraceRecorder._safe_index  s      
	(,$$5V$$GGGGGGGGG4 	 	 	NN$#hh	     55555	s   (. 
A.+A))A.rc   rd   c          	        K   	 d|i}|||d<   | j         j                            |||           d{V  dS # t          $ r6}t                              d||t          |                     Y d}~dS d}~ww xY w)	z"Partial update, swallowing errors.docNrd   r   Ttrace_update_failedr   F)r   r5   rL   r<   r=   rM   r   )r%   r-   r   rx   rd   r:   rA   s          r&   rh   z IngestTraceRecorder._safe_update  s      	$)6?D!!'X(,%% &         
 4 	 	 	NN%#hh	     55555	s   39 
A9+A44A9)
r   r   r   r   r   r   r   r   r   r   )r   r)   )r   r;   )r1   r   rU   r   rV   r   r   rW   )rV   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   )rU   r   r\   r   r]   r   r   r   )rx   rW   r   r   )r   r   r   r   r}   r   r~   r   r   r   r\   r   r]   r   r[   r   r   r   r   r   r   r   )
r   r   r}   r   r~   r   r   r   r   r   )r   r   r}   r   r~   r   r\   r   r]   r   r   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r)   r   r   )r-   r   r   r   r:   rW   r   r)   )
r-   r   r   r   rx   rW   rd   r   r   r)   )__name__
__module____qualname____doc__r'   rB   rQ   ra   rk   ro   rw   rz   r   r   r   r   propertyr   r   rn   rh    r(   r&   r
   r
   &   se        , #
 
 
 
 
 
:   <, , , ,d  . 
  
  
  
  
  
D   @ %!#
 
 
 
 
 
D "&$(-
 -
 -
 -
 -
 -
^P P P P )-!%$("&*.PV PV PV PV PV PVp )-
 
 
 
 
 
" )-
 
 
 
 
 
" )-
 
 
 
 
 
, ' ' ' X' /3#')!, , , , , ,`   , )-       r(   r
   )r   
__future__r   r   r   r   r   r   typingr   
app.configr   app.utils.loggerr   r   r=   r
   r   r(   r&   <module>r      s    . # " " " " "    ' ' ' ' ' ' ' '             ' ' ' ' ' '	H		r r r r r r r r r rr(   