o
    1 iz2                     @   s   d dl Z d dlZd dlZd dlmZmZmZmZ d dlm	Z	 d dl
mZmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZ e eZd	Zd
ZdZdZdZeeeeegZ eddG dd deZ!dS )    N)AnyDictListOptional)Celery)task_failuretask_unknown)get_replica_context)DEFAULT_CONSUMER_CONCURRENCYSERVE_LOGGER_NAME)CeleryAdapterConfigTaskProcessorAdapterTaskProcessorConfig
TaskResult)	PublicAPIZworker_poolZworker_concurrencyZtask_ignore_resultZtask_acks_lateZtask_reject_on_worker_lostalpha)Z	stabilityc                       s\  e Zd ZU dZeed< eed< dZee	j
 ed< dZee ed< eZeed< def fd	d
ZefdefddZd4ddZ	d5defddZdefddZdd Zd6defddZdd Zdd Zdeeef fdd Zdee fd!d"Z					d7d#ed$ed%ed&ed'ef
d(d)Z 					d7d#ed*ed+ed,ed-ef
d.d/Z!d0ed1ed%e"fd2d3Z#  Z$S )8CeleryTaskProcessorAdapterz
    Celery-based task processor adapter.
    This adapter does NOT support any async operations.
    All operations must be performed synchronously.
    _app_configN_worker_thread_worker_hostname_worker_concurrencyconfigc                    sh   t  j|i | t|jtstd|jjr/t|jj tt	@ }|r/t
dt| d|| _d S )NzMTaskProcessorConfig.adapter_config must be an instance of CeleryAdapterConfigzJThe following configuration keys cannot be changed via app_custom_config: zA. These are managed internally by the CeleryTaskProcessorAdapter.)super__init__
isinstanceadapter_configr   	TypeErrorapp_custom_configsetkeysCELERY_DEFAULT_APP_CONFIG
ValueErrorsortedr   )selfr   argskwargsZconflicting_keys	__class__ d/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/serve/task_processor.pyr   6   s    

z#CeleryTaskProcessorAdapter.__init__consumer_concurrencyc              
   C   s6  t | jj| jjj| jjjd| _tdt|t	dt
dtdi}| jjjr)|| jjj | jj| | jj| jjd| jjdi}| jjrP| jjd| jjd|| jj< | jjrb| jjd| jjd|| jj< | jjj|dd| jjiid	 | jjjd ur| jjj| jjjd
 | jjrt| j | jjrt| j d S d S )N)backendZbrokerthreadsFTdirect)ZexchangeZexchange_typeZrouting_key*queue)Ztask_queuesZtask_routes)broker_transport_options)r   r   
queue_namer   Zbackend_urlZ
broker_urlr   CELERY_WORKER_POOLCELERY_WORKER_CONCURRENCYCELERY_TASK_IGNORE_RESULTCELERY_TASK_ACKS_LATE!CELERY_TASK_REJECT_ON_WORKER_LOSTr   updateconffailed_task_queue_nameunprocessable_task_queue_namer1   r   connect_handle_task_failurer   _handle_unknown_task)r$   r+   Zapp_configurationZqueue_configr)   r)   r*   
initializeN   sX   
z%CeleryTaskProcessorAdapter.initializec                 C   sp   t fd| jjidddd}| jjjr|| jjj |r+| jjdd|i|| d S | jjdi || d S )Nmax_retriesT<   F)Zautoretry_forZretry_kwargsZretry_backoffZretry_backoff_maxZretry_jitternamer)   )	Exceptionr   r@   r   Ztask_custom_configr8   r   task)r$   funcrB   Ztask_optionsr)   r)   r*   register_task_handle   s   

z/CeleryTaskProcessorAdapter.register_task_handlereturnc                 K   s:   | j j|f||| jjd|}t|j|jt |jdS )N)r%   r&   r0   )idstatusZ
created_atresult)	r   	send_taskr   r2   r   rH   rI   timerJ   )r$   	task_namer%   r&   optionsZtask_responser)   r)   r*   enqueue_task_sync   s   z,CeleryTaskProcessorAdapter.enqueue_task_syncc                 C   s    | j |}t|j|j|jdS )N)rH   rJ   rI   )r   ZAsyncResultr   rH   rJ   rI   )r$   task_idZtask_detailsr)   r)   r*   get_task_status_sync   s   z/CeleryTaskProcessorAdapter.get_task_status_syncc                 K   s   | j dur| j  rtd dS t j}| jj d| | _dd| j d| j	j
g}tj| jj|fd| _ | j   td| j  dS )	z Starts the Celery worker thread.Nz(Celery worker thread is already running._Zworkerz--hostname=z-Q)targetr%   z,Celery worker thread started with hostname: )r   is_aliveloggerinfor	   Zreplica_tagr   mainr   r   r2   	threadingThreadZworker_mainstart)r$   r&   	unique_idZworker_argsr)   r)   r*   start_consumer   s$   



z)CeleryTaskProcessorAdapter.start_consumer      $@timeoutc                 C   s   | j du s
| j  std dS td | jjjdd| j gd | j j|d | j  r:t	d| d	 ntd
 d| _ dS )zESignals the Celery worker to shut down and waits for it to terminate.Nz$Celery worker thread is not running.z+Sending shutdown signal to Celery worker...shutdownzcelery@)destination)r^   z&Worker thread did not terminate after z	 seconds.z!Celery worker thread has stopped.)
r   rT   rU   rV   r   control	broadcastr   joinwarning)r$   r^   r)   r)   r*   stop_consumer   s   




z(CeleryTaskProcessorAdapter.stop_consumerc                 C   s$   t d | jj  t d d S )NzShutting down Celery worker...z"Celery worker shutdown complete...)rU   rV   r   ra   r_   r$   r)   r)   r*   r_      s   
z#CeleryTaskProcessorAdapter.shutdownc                 C   s   | j j| dS )z
        Cancels a task synchronously. Only supported for Redis and RabbitMQ brokers by Celery.
        More details can be found here: https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks
        N)r   ra   Zrevoke)r$   rP   r)   r)   r*   cancel_task_sync   s   z+CeleryTaskProcessorAdapter.cancel_task_syncc                 C   s   | j j  S )z
        Returns the metrics of the Celery worker synchronously.
        More details can be found here: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html#celery.app.control.Inspect.stats
        )r   ra   inspectstatsrf   r)   r)   r*   get_metrics_sync   s   z+CeleryTaskProcessorAdapter.get_metrics_syncc                 C   s   | j j S )a  
        Checks the health of the Celery worker synchronously.
        Returns a list of dictionaries, each containing the worker name and a dictionary with the health status.
        Example: [{'celery@192.168.1.100': {'ok': 'pong'}}]
        More details can be found here: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html#celery.app.control.Control.ping
        )r   ra   Zpingrf   r)   r)   r*   health_check_sync   s   z,CeleryTaskProcessorAdapter.health_check_syncsenderrP   r%   r&   einfoc              	   K   s   t d| dt|  |t|jt|t|t|g}| jjr>| | jj|j| t d| d| d| jj d dS dS )ar  Handle task failures and route them to appropriate dead letter queues.

        This method is called when a task fails after all retry attempts have been
        exhausted. It logs the failure and moves the task to failed_task_queue

        Args:
            sender: The task object that failed
            task_id: Unique identifier of the failed task
            args: Positional arguments passed to the task
            kwargs: Keyword arguments passed to the task
            einfo: Exception info object containing exception details and traceback
            **kw: Additional keyword arguments passed by Celery
        z#Task failure detected for task_id: z	, einfo: zTask z& failed after max retries. Exception: z. Moved it to the z queue.N)	rU   rV   str	exceptionr   r:   _move_task_to_queuerB   error)r$   rl   rP   r%   r&   rm   kwZdlq_argsr)   r)   r*   r=      s&   z/CeleryTaskProcessorAdapter._handle_task_failurerB   rH   messageexcc              
   K   sX   t d| d| dt|  | jjr*| | jj|||t|t|t|g dS dS )a  Handle unknown or unregistered tasks received by Celery.

        This method is called when Celery receives a task that it doesn't recognize
        (i.e., a task that hasn't been registered with the Celery app). These tasks
        are moved to the unprocessable task queue if configured.

        Args:
            sender: The Celery app or worker that detected the unknown task
            name: Name of the unknown task
            id: Task ID of the unknown task
            message: The raw message received for the unknown task
            exc: The exception raised when trying to process the unknown task
            **kwargs: Additional context information from Celery
        z'Unknown task detected by Celery. Name: z, ID: z, Exc: N)rU   rV   rn   r   r;   rp   )r$   rl   rB   rH   rs   rt   r&   r)   r)   r*   r>   -  s   z/CeleryTaskProcessorAdapter._handle_unknown_taskr2   rM   c                 C   sn   zt d| d| d|  | jj|||d W dS  ty6 } zt d| d| d|  |d}~ww )z4Helper function to move a task to a specified queue.zMoving task: z to queue: z, args: )rB   r0   r%   zFailed to move task: z	, error: N)rU   rV   r   rK   rC   rq   )r$   r2   rM   r%   er)   r)   r*   rp   U  s    z.CeleryTaskProcessorAdapter._move_task_to_queue)N)NN)r]   )NNNNN)%__name__
__module____qualname____doc__r   __annotations__r   r   r   rX   rY   r   rn   r
   r   intr   r?   rF   r   rO   rQ   r\   floatre   r_   rg   r   r   rj   r   rk   r=   r>   listrp   __classcell__r)   r)   r'   r*   r   (   sl   
 
=

/
(r   )"loggingrX   rL   typingr   r   r   r   Zceleryr   Zcelery.signalsr   r   Z	ray.server	   Zray.serve._private.constantsr
   r   Zray.serve.schemar   r   r   r   Zray.util.annotationsr   	getLoggerrU   r3   r4   r5   r6   r7   r!   r   r)   r)   r)   r*   <module>   s0    
	