o
    1 i                  	   @   s   d dl Z d dlZd dlmZ d dlmZmZ d dlmZ d dl	m
Z
mZ d dlmZ d dlmZmZ d dlmZ eeZe
fd	ed
edefddZeddd	edefddZeddd	efddZedd	ddddee dee defddZdS )    N)wraps)CallableOptional)import_attr)DEFAULT_CONSUMER_CONCURRENCYSERVE_LOGGER_NAMETaskConsumerWrapper)TaskProcessorAdapterTaskProcessorConfig)	PublicAPItask_processor_configconsumer_concurrencyreturnc              
   C   s   | j }t|trt|}nt|r|}ntdt|j d| z|| }W n ty> } zt	d|j d| d }~ww t|t
sQt|j dt|j z|| W |S  typ } zt	d|j d| d }~ww )Nz>Adapter must be either a string path or a callable class, got z: zFailed to instantiate z- must inherit from TaskProcessorAdapter, got zFailed to initialize )adapter
isinstancestrr   callable	TypeErrortype__name__	ExceptionRuntimeErrorr
   Z
initialize)r   r   r   adapter_classZadapter_instancee r   c/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/serve/task_consumer.py_instantiate_adapter   s2   


r   alpha)Z	stabilityc                 C   s   t | S )a  
    Create a TaskProcessorAdapter instance from the provided configuration and call .initialize(). This function supports two ways to specify an adapter:

    1. String path: A fully qualified module path to an adapter class
       Example: "ray.serve.task_processor.CeleryTaskProcessorAdapter"

    2. Class reference: A direct reference to an adapter class
       Example: CeleryTaskProcessorAdapter

    Args:
        task_processor_config: Configuration object containing adapter specification.
    Returns:
        An initialized TaskProcessorAdapter instance ready for use.

    Raises:
        ValueError: If the adapter string path is malformed or cannot be imported.
        TypeError: If the adapter is not a string or callable class.

    Example:
        .. code-block:: python

            config = TaskProcessorConfig(
                adapter="my.module.CustomAdapter",
                adapter_config={"param": "value"},
                queue_name="my_queue"
            )
            adapter = instantiate_adapter_from_config(config)
    )r   r   r   r   r   instantiate_adapter_from_config9   s   !r    c                    s    fdd}|S )a  
    Decorator to mark a class as a TaskConsumer.

    Args:
        task_processor_config: Configuration for the task processor (required)

    Note:
        This decorator must be used with parentheses:
        @task_consumer(task_processor_config=config)

    Returns:
        A wrapper class that inherits from the target class and implements the task consumer functionality.

    Example:
        .. code-block:: python

            from ray import serve
            from ray.serve.task_consumer import task_consumer, task_handler

            @serve.deployment
            @task_consumer(task_processor_config=config)
            class MyTaskConsumer:

                @task_handler(name="my_task")
                def my_task(self, *args, **kwargs):
                    pass

    c                    s   G  fddd t }|S )Nc                       sB   e Zd ZU eed<  fddZdef fddZ fddZd	S )
z>task_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper_adapterc                    s    j | g|R i | d S N)__init__)selfargskwargs
target_clsr   r   r#      s   zGtask_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper.__init__r   c              
      s   t || _tj tjdD ]\}}t|ddr*t|d|}t| |}| j|| qz| j  t	d W d S  t
yM } z	td|   d }~ww )N)	predicate_is_task_handlerF
_task_namez"task consumer started successfullyzFailed to start task consumer: )r   r!   inspect
getmembers
isfunctiongetattrZregister_task_handleZstart_consumerloggerinfor   error)r$   r   namemethodZ	task_nameZbound_methodr   r(   r   r   r   initialize_callable   s&   

zRtask_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper.initialize_callablec                    s0   | j   | j   t dr |  d S d S )N__del__)r!   Zstop_consumershutdownhasattrr7   )r$   r'   r   r   r7      s
   


zFtask_consumer.<locals>.decorator.<locals>._TaskConsumerWrapper.__del__N)	r   
__module____qualname__r
   __annotations__r#   intr6   r7   r   r5   r   r   _TaskConsumerWrapper}   s
   
 r>   r   )r(   r>   r   r'   r   	decorator|   s   $z task_consumer.<locals>.decoratorr   )r   r?   r   r   r   task_consumer]   s   'r@   r3   _funcr3   c                   sH    durt  tr  std   fdd}| dur"|| S |S )a  
    Decorator to mark a method as a task handler.
    Optionally specify a task name. Default is the method name.

    Arguments:
        _func: The function to decorate.
        name: The name of the task. Default is the method name.

    Returns:
        A wrapper function that is marked as a task handler.

    Example:
        .. code-block:: python

            from ray import serve
            from ray.serve.task_consumer import task_consumer, task_handler

            @serve.deployment
            @task_consumer(task_processor_config=config)
            class MyTaskConsumer:

                @task_handler(name="my_task")
                def my_task(self, *args, **kwargs):
                    pass

    Nz*Task name must be a non-empty string, got c                    s<   t  st  fdd}d|_p j|_|S td)Nc                     s    | i |S r"   r   )r%   r&   fr   r   wrapper   s   z0task_handler.<locals>.decorator.<locals>.wrapperTz)Async task handlers are not supported yet)r,   iscoroutinefunctionr   r*   r   r+   NotImplementedError)rD   rE   rA   rC   r   r?      s   
ztask_handler.<locals>.decorator)r   r   strip
ValueError)rB   r3   r?   r   rA   r   task_handler   s    rJ   r"   )r,   logging	functoolsr   typingr   r   Zray._common.utilsr   Zray.serve._private.constantsr   r   Z ray.serve._private.task_consumerr	   Zray.serve.schemar
   r   Zray.util.annotationsr   	getLoggerr0   r=   r   r    r@   r   rJ   r   r   r   r   <module>   sJ    

$#H