
    i0                        d Z ddlmZ ddlZddlmZ ddlmZmZ ddl	m
Z
 ddlmZ  e
e          Zd Z ej        d	d
ddd
d
          	 	 ddd            Z ej        dd
dd          	 dd d            ZdS )!us  Celery tasks for document ingestion and permission updates.

Each task runs the async pipeline inside an event loop managed by
the Celery worker. The pipeline creates its own short-lived async clients
so it is safe to run in a separate process.

文档入库与权限更新的 Celery 异步任务模块。
主要包含两个任务：
1. ingest_document_task —— 执行完整的文档入库流水线（解析→分块→向量化→写入 ES/Neo4j）
2. update_permissions_task —— 更新文档 ACL 并重算所有共享 chunk 的权限
每个任务在独立事件循环中运行异步管线，支持失败自动重试。
    )annotationsN)Any)MaxRetriesExceededErrorRetry)get_task_logger)
celery_appc                    t          j                    }	 |                    |           |                                 S # |                                 w xY w)u   Run an async coroutine in a new event loop (Celery worker compatible).

    Celery worker 进程中没有现成的事件循环，因此每次任务创建一个临时循环来执行异步协程。
    )asyncionew_event_looprun_until_completeclose)coroloops     /D:\work\zm-rag\backend\app\tasks\ingest_task.py
_run_asyncr      sI    
 !##D&&t,,



s	   > Aztasks.ingest_documentT      )namebindmax_retriesdefault_retry_delay	acks_latetrack_started doc_idstr	file_pathmetadatadict[str, Any] | Nonereturndict[str, Any]c                8   	 pi  j         j        	 j         j        dz   t                              d                                dddd            	fd}	 t           |                      }|d	         d
k    r{|                    dd          }t                              d|            j         j         j	        k     r4 
                    t          |          d j         j        dz   z            |S # t          t          f$ r  t          $ ru}t                              d|            j         j         j	        k     r' 
                    |d j         j        dz   z            d
t          |          dcY d}~S d}~ww xY w)a  Celery task: run the full document ingest pipeline.

    This is the main entry point called when OA pushes a new document.
    It creates a fresh IngestPipeline, runs it, and handles retries.

    Args:
        doc_id: Unique document identifier from OA.
        file_path: Filename (not absolute path) of the document file.
            Resolved to full path via settings.file_storage_path.
        metadata: Document metadata dict.
       z2Starting ingest for doc_id=%s, file=%s, attempt=%s
PROCESSINGinitializingr   )r   stepprogress)statemetac            	     B  K   ddl m}  dd l}ddlm} ddlm} ddlm} ddl	m
} ddlm} |j                                         |j                                       |j        gd	d
}|j        r|j        |j        f|d<    | |d%i |          } ||          }	d }
d }	 |	                    ddd dj        j        pddd           d {V  dk    r"|	                    dd d d d d           d {V   |           }|                                st1          |j        |j        z            n}ddlm} |                                }d }|j        rp|j        ri|j         rb	 ddl!m"} |                                 d {V }n@# tF          $ r3}tH          %                    dt1          |                     Y d }~nd }~ww xY w |||          }
|
&                    ||	           d {V }|'                    dd          }|dv r|	(                    |           d {V  nF|	(                    d|'                    d d!          |'                    d"d          #           d {V  ||
|
)                                 d {V  |)                                 d {V  ||)                                 d {V  ||)                                 d {V  S S # tF          $ r1}|	(                    dd$t1          |          #           d {V   d }~ww xY w# |
|
)                                 d {V  |)                                 d {V  ||)                                 d {V  ||)                                 d {V  w w xY w)&Nr   )PathAsyncOpenSearchsettings)create_pipeline)IngestTraceRecorderESClient)trace_idr   task_id<   hoststimeout	http_authtask_started	completedu   Worker 开始处理，第 u
    次尝试r   )attemptworkerr>   )summarydetailsservicer#   running)attempt_countstatus
error_codeerror_messagefinished_atduration_msRedisClient)MySQLClientcelery_mysql_init_failed)error)redis_clientmysql_client)r   r   r   r4   recorderrD   failed)r<   partial_failedrE   INGEST_PIPELINE_FAILEDrM   )rE   rF   INGEST_UNEXPECTED_ERROR )*pathlibr+   	structlogopensearchpyr-   
app.configr/   app.core.ingest_pipeliner0   app.core.ingest_trace_recorderr1   app.infrastructure.es_clientr3   contextvarsclear_contextvarsbind_contextvarses_hostes_usernamees_passwordrecordrequesthostnameupdate_trace_fieldsis_absoluter   file_storage_pathr   app.infrastructure.redis_clientrJ   from_settingsmysql_enabled
mysql_hostmysql_databaseapp.infrastructure.mysql_clientrK   	Exceptionloggerwarningingest_documentgetfinish_tracer   )r+   rW   r-   r/   r0   r1   r3   	es_kwargsrecorder_esrP   pipeline_redis_clientpresolvedrJ   _mysql_clientrK   excresultrD   r=   r   r   r   selfr4   s                       r   _runz"ingest_document_task.<locals>._runN   s           000000''''''<<<<<<FFFFFF999999 	//111..fh 	/ 	
 	
 	
 08/?.@R$P$P	 	R&.&:H<P%QIk"h;;;;<<&&(68
 
 
 Q	,//HWHHH$+t|7L7RPRSS 	 "          {{ 22%,'"&%)#'#'4 4          YAGH}}es85>???\eH DCCCCC'5577M !M% O(*= O(BY OOKKKKKK*5*C*C*E*E$E$E$E$E$E$EMM  O O ONN#=SXXNNNNNNNNNO 'MP]^^^H#33"!!! 4        F ZZ(33F888
 ++F3333333333++%zz,8PQQ"(**Wb"9"9 ,           #nn&&&&&&&&&##%%%%%%%%%(#))+++++++++(#))++++++++++ )!  	 	 	''4!#hh (         
 	 #nn&&&&&&&&&##%%%%%%%%%(#))+++++++++(#))++++++++++ )sQ   *C	K/ 4 F K/ 
G)GK/ GB-K/ /
L*9,L%%L**L- -A1NrD   rQ   rM   zUnknown errorzIngest failed for doc_id=%s: %sr   )r|   	countdownz,Unexpected error in ingest for doc_id=%s: %s)r   rD   rM   N)rd   idretriesrp   infoupdate_stater   rs   rM   r   retryRuntimeErrorr   r   ro   r   )
r~   r   r   r   r   r}   	error_msgr|   r=   r4   s
   ````    @@r   ingest_document_taskr   (   s   2 ~2H|Hl"Q&G
KKDfiY`aaa 	AFF    
u, u, u, u, u, u, u, u, u, u,n
DDFF##(x''

7O<<ILL:FINNN |#d&666jj$Y// DL$81$<= !   
 *+    
 
 
CVSQQQ<$"222**dl6JQ6N0O*PPPXX
 
 	
 	
 	
 	
 	
 	
	
s    )BD	 	F$A*FFFztasks.update_permissions
   )r   r   r   r   acl_idslist[str] | Nonec                0   t                               dt          pg                      dddS fd}	 t           |                      S # t          $ r7}t                               d|           |                     |          d}~ww xY w)	a  Celery task: update a document's ACL and recompute chunk permissions.

    Called when OA pushes a permission change.  Strategy:
      1. Update the meta record's acl_ids (source of truth).
      2. Recompute the chunks' acl_ids from all metas sharing the content_hash.
    z0Updating permissions for doc_id=%s, acl_count=%sNskippedzacl_ids missing)r   rD   reasonc            	     .  K   ddl m}  ddlm}m} ddlm} ddlm} ddlm	} |
                                }|j        gdd}|j        r|j        |j        f|d	<    | di |} |||
          }		 |                    |j        d|                    |j                                                  did           d {V  |                    |j        dg           d {V }
t)          |
t*                    r|
n|
j        }|                    di                               dd          }|s:ddd|                                 d {V  |                                 d {V  S |	                    |           d {V  |	                               d {V }d||d|                                 d {V  |                                 d {V  S # |                                 d {V  |                                 d {V  w xY w)Nr   r,   )datetimetimezoner.   r2   rI   r6   r7   r:   )rN   doc)r   
updated_atT)indexr   bodyrefreshcontent_hash)r   r   _sourcer   r   r<   zno content_hash)r   rD   note)r   rD   r   guide_updatesrU   )rX   r-   r   r   rY   r/   r\   r3   ri   rJ   rj   r`   ra   rb   updatees_meta_indexnowutc	isoformatrs   
isinstancedictr   r   recompute_chunk_aclsync_service_guide_acl)r-   r   r   r/   r3   rJ   rN   ru   raw_es	es_client	meta_resprawr   r   r   r   s                 r   r   z%update_permissions_task.<locals>._run   s:     000000////////''''''999999?????? #0022/7/?.@R$P$P	 	R&.&:H<P%QIk" --9-- HV,???	(	'--,#*&.ll8<&@&@&J&J&L&L     
 
 
 
 
 
 
 
 
 %jj,'( )        I
  *)T::N))	C779b1155nbIIL \"(KIZ[[ ,,..       $$&&&&&&&&&& //========="+"B"B67"S"SSSSSSSM !% ,!.	  ,,..       $$&&&&&&&&&& ,,..       $$&&&&&&&&&&s   1CG ,=G 6Hz*Permission update failed for doc_id=%s: %s)r|   )rp   r   lenr   ro   rM   r   )r~   r   r   r   r|   s    ``  r   update_permissions_taskr      s    " KKBFCPWP][]L^L^___ IARSSS<' <' <' <' <' <'|"$$&&!!! " " "A63OOOjjSj!!!"s   A 
B2BB)r   N)r   r   r   r   r   r   r    r!   )N)r   r   r   r   r    r!   )__doc__
__future__r   r
   typingr   celery.exceptionsr   r   celery.utils.logr   app.tasks.celery_appr   __name__rp   r   taskr   r   rU       r   <module>r      sR    # " " " " "        = < < < < < < < , , , , , , + + + + + +		"	"	 	 	 	 	   &*	p
 p
 p
 p
 p
f 	#		   !%R" R" R" R" R" R" R"r   