o
    1 io                     @   sx  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlm Z m!Z!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, e-e&Z.e/e0gZ1eG dd dZ2eG dd dZ3eG dd dZ4deee  deee eeef f fddZ5G dd dZ6G dd dZ7dd Z8dd Z9d e:ddfd!d"Z;ed#d$d%Z<ed&Z=ed'Z>G d(d) d)eee<e=e>f Z?G d*d+ d+eee<e=e>f Z@ed,eee= gee> f dee=ge>f fd-d.ZAed/eee= geeeee> f f dee=geeee>f f fd0d.ZAed1e?e<e=e>f dee<e=ge>f fd2d.ZAed3e@e<e=e>f dee<e=geeee>f f fd4d.ZAe		5	6	7dId8e:d9eBd e:d:ed dd;f
d<d.ZAG d=d; d;eZCe,d>d?		5	6	7dId8e:d9eBd e:d@ee def
dAd.ZAdBe jDdCefdDdEZEdBe jDdFefdGdHZFdS )J    N)deque)	dataclass)wraps)isasyncgenfunctioniscoroutinefunction)AnyAsyncGeneratorCallable	CoroutineDictGenericIterableListLiteralOptionalProtocolSetTupleTypeVaroverload)serve)extract_signatureflatten_argsrecover_args)get_or_create_event_loop)SERVE_LOGGER_NAME)extract_self_if_method_call)RayServeException)	PublicAPIc                   @   s8   e Zd ZU eed< ee ed< ejed< ej	j
ed< dS )_SingleRequestself_argflattened_argsfuturerequest_contextN)__name__
__module____qualname__r   __annotations__r   asyncioFuturer   contextZ_RequestContext r+   r+   ^/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/serve/batching.pyr   +   s
   
 
r   c                   @   s    e Zd ZU eed< ejed< dS )_GeneratorResultresultnext_futureN)r$   r%   r&   r   r'   r(   r)   r+   r+   r+   r,   r-   3   s   
 r-   c                   @   sn   e Zd ZU ee ed< edee fddZedee fddZ	edee fddZ
edefd	d
ZdS )_RuntimeSummaryStatisticsstart_timesreturnc                 C      | j rt| j S d S N)r1   minselfr+   r+   r,   min_start_time=      z(_RuntimeSummaryStatistics.min_start_timec                 C   s   | j rt| j t| j  S d S r4   )r1   sumlenr6   r+   r+   r,   mean_start_timeA   s   z)_RuntimeSummaryStatistics.mean_start_timec                 C   r3   r4   )r1   maxr6   r+   r+   r,   max_start_timeG   r9   z(_RuntimeSummaryStatistics.max_start_timec                 C   s
   t | jS r4   )r;   r1   r6   r+   r+   r,   num_requestsK   s   
z&_RuntimeSummaryStatistics.num_requestsN)r$   r%   r&   r   floatr'   propertyr   r8   r<   r>   intr?   r+   r+   r+   r,   r0   9   s   
 r0   list_of_flattened_argsr2   c                    s|   dd | D }t |dksJ d| }g }t|D ]  d dkr-|| d    q| fdd| D  qt|S )	z@Batch a list of flatten args and returns regular args and kwargsc                 S   s   h | ]}t |qS r+   )r;   ).0argsr+   r+   r,   	<setcomp>W       z%_batch_args_kwargs.<locals>.<setcomp>   z=All batch requests should have the same number of parameters.   r   c                    s   g | ]}|  qS r+   r+   rD   itemidxr+   r,   
<listcomp>c   rG   z&_batch_args_kwargs.<locals>.<listcomp>)r;   poprangeappendr   )rC   Zarg_lengthsZ
arg_lengthZbatched_flattened_argsr+   rL   r,   _batch_args_kwargsP   s   rR   c                   @   s4  e Zd Z	d-dedededee ddf
ddZd	d
 ZdeddfddZ	de
eejf ddfddZdee fddZdee deddfddZdedeej deddfddZdejdeej defddZdeddfd d!Zded"ee ddfd#d$Zd%ejddfd&d'Zed(ee ddfd)d*Zd+d, ZdS )._BatchQueueNmax_batch_sizebatch_wait_timeout_smax_concurrent_batcheshandle_batch_funcr2   c                 C   sv   t  | _|| _|| _|| _t || _t  | _	t
 | _i | _d| _t | _|dur5| j| || _|   dS )aO  Async queue that accepts individual items and returns batches.

        Respects max_batch_size and batch_wait_timeout_s; a batch will be returned when
        max_batch_size elements are available or the timeout has passed since
        the previous get.

        If handle_batch_func is passed in, a background coroutine will run to
        poll from the queue and call handle_batch_func on the results.

        Cannot be pickled.

        Arguments:
            max_batch_size: max number of elements to return in a batch.
            batch_wait_timeout_s: time to wait before returning an incomplete
                batch.
            max_concurrent_batches: max number of batches to run concurrently.
            handle_batch_func(Optional[Callable]): callback to run in the
                background to handle batches if provided.
        N)r(   QueuequeuerT   rU   rV   	Semaphore	semaphoreEventrequests_available_eventsettaskscurr_iteration_start_times_handle_batch_taskr   _loopcreate_task_process_batches4_warn_if_max_batch_size_exceeds_max_ongoing_requestsr7   rT   rU   rV   rW   r+   r+   r,   __init__j   s   

z_BatchQueue.__init__c              	   C   sF   t  jj}|| j| j k r!td| j d| j d| d dS dS )zHelper to check whether the max_batch_size is bounded.

        Log a warning to configure `max_ongoing_requests` if it's bounded.
        z`max_batch_size` (z) * `max_concurrent_batches` (z)) is larger than `max_ongoing_requests` (z). This means the replica will never achieve the configured `max_batch_size` concurrently. Please update `max_ongoing_requests` to be >= `max_batch_size` * `max_concurrent_batches`.N)r   Zget_replica_contextZ_deployment_configmax_ongoing_requestsrT   rV   loggerwarning)r7   rh   r+   r+   r,   re      s   

z@_BatchQueue._warn_if_max_batch_size_exceeds_max_ongoing_requestsnew_max_batch_sizec                 C   s   || _ |   dS zUpdates queue's max_batch_size.N)rT   re   r7   rk   r+   r+   r,   set_max_batch_size   s   z_BatchQueue.set_max_batch_sizerequestc                 C   s   | j | | j  d S r4   )rY   
put_nowaitr]   r^   )r7   ro   r+   r+   r,   put   s   z_BatchQueue.putc                    s   g }| | j I dH  | j}| j}t }	 t|t |  d}zt| j	
 |I dH  W n
 tjy<   Y nw t||k r[| j s[| | j  t||k r[| j rH| j re| j	  t | |ksst||krv	 |S q)a  Wait for batch respecting self.max_batch_size and self.timeout_s.

        Returns a batch of up to self.max_batch_size items. Waits for up to
        to self.timeout_s after receiving the first request that will be in
        the next batch. After the timeout, returns as many items as are ready.

        Always returns a batch with at least one item - will block
        indefinitely until an item comes in.
        NTr   )rQ   rY   getrT   rU   timer=   r(   wait_forr]   waitTimeoutErrorr;   empty
get_nowaitclear)r7   batchrT   rU   Zbatch_start_timeZremaining_batch_time_sr+   r+   r,   wait_for_batch   s6   

	
z_BatchQueue.wait_for_batchresultsinput_batch_lengthc                 C   s*   t ||krtd| dt | dd S )NzHBatched function doesn't preserve batch size. The input list has length z" but the returned list has length .)r;   r   )r7   r|   r}   r+   r+   r,   _validate_results   s   z_BatchQueue._validate_resultsfunc_generatorinitial_futuresc              
      s"  d}zlt |}t||ksJ |2 zK3 dH W }| || t|D ]:}|| |d }}	|	|u r7|| n!|tv rFt|	t || nt 	 }
t
|	t||
 ||
 |  q"q6 |D ]}	|	|urlt|	t qaW dS  ty } z|D ]}	|	|urt|	| qyW Y d}~dS d}~ww )zConsumes batch function generator.

        This function only runs if the function decorated with @serve.batch
        is a generator.
        Nr   )r   r;   r   rP   rQ   USER_CODE_STREAMING_SENTINELS_set_exception_if_not_doneStopAsyncIterationr   create_future_set_result_if_not_doner-   popleft	Exception)r7   r   r   r}   ZFINISHED_TOKENfuturesr|   rM   r.   r"   r/   er+   r+   r,   _consume_func_generator   sF   






z#_BatchQueue._consume_func_generatorfunc_futurer   c              
      sx   z|I dH }|  || t||D ]	\}}t|| qW dS  ty; } z|D ]}t|| q(W Y d}~dS d}~ww )z.Assigns func's results to the list of futures.N)r   zipr   r   r   )r7   r   r   r}   r|   r.   r"   r   r+   r+   r,   _assign_func_results#  s   
z _BatchQueue._assign_func_resultsfuncc                    sr   t j  | j s7|  I dH }| ||}t|}| j	
| t | j|< || j | j rdS dS )z6Loops infinitely and processes queued request batches.N)r   r*   Z_unset_request_contextrb   	is_closedr{   _process_batchr(   rc   r_   addrs   r`   add_done_callback_handle_completed_task)r7   r   rz   Zpromisetaskr+   r+   r,   rd   4  s   


z_BatchQueue._process_batchesrz   c                    s  | j 4 I dH  dd |D }t|dkr"	 W d  I dH  dS dd |D }z[|d j}tdd |D \}}|durJ||g|R i |}n||i |}tjdd |D  t|ro|}| ||t|I dH  n|}	| 	|	|t|I dH  tjg  W n! t
y }
 ztd |D ]}t||
 qW Y d}
~
nd}
~
ww W d  I dH  dS W d  I dH  dS 1 I dH sw   Y  dS )	zProcesses queued request batch.Nc                 S   s   g | ]	}|j  s|qS r+   )r"   	cancelledrD   reqr+   r+   r,   rN   H  s    z._BatchQueue._process_batch.<locals>.<listcomp>r   c                 S      g | ]}|j qS r+   )r"   rJ   r+   r+   r,   rN   L      c                 S   r   r+   )r!   rJ   r+   r+   r,   rN   T  r   c                 S   r   r+   )r#   r   r+   r+   r,   rN   `  r   z0_process_batch ran into an unexpected exception.)r[   r;   r    rR   r   r*   Z_set_batch_request_contextr   r   r   r   ri   	exceptionr   )r7   r   rz   r   r    rE   kwargsZfunc_future_or_generatorr   r   r   r"   r+   r+   r,   r   A  sJ   


-.z_BatchQueue._process_batchr   c                 C   s&   | j | | j|= | |  d S r4   )r_   remover`   _log_if_exceptionr   )r7   r   r+   r+   r,   r   t  s   z"_BatchQueue._handle_completed_taskexception_maybec                 C   s4   | d urt | tjrtd d S td d S d S )NzTask was cancelledzTask failed unexpectedly)
isinstancer(   CancelledErrorri   debugr   )r   r+   r+   r,   r   y  s
   z_BatchQueue._log_if_exceptionc                 C   s&   | j d u s
t  sd S | j   d S r4   )ra   r   
is_runningcancelr6   r+   r+   r,   __del__  s
   
z_BatchQueue.__del__r4   )r$   r%   r&   rB   r@   r   r	   rg   re   rn   r   r   r(   r)   rq   r   r{   r   r   r   r   r   r   rd   r   Taskr   staticmethodBaseExceptionr   r   r+   r+   r+   r,   rS   i   s\    
-5


3
3rS   c                
   @   s   e Zd ZdZ				ddededed	ee fd
dZe	de
fddZdeddfddZdeddfddZdefddZdefddZdefddZdefddZdee fddZdS ) _LazyBatchQueueWrapperzStores a _BatchQueue and updates its settings.

    _BatchQueue cannot be pickled, you must construct it lazily
    at runtime inside a replica. This class initializes a queue only upon
    first access.
    
           rH   NrT   rU   rV   rW   c                 C   s"   d | _ || _|| _|| _|| _d S r4   )_queuerT   rU   rV   rW   rf   r+   r+   r,   rg     s
   
z_LazyBatchQueueWrapper.__init__r2   c                 C   s(   | j du rt| j| j| j| j| _ | j S )zXReturns _BatchQueue.

        Initializes queue when called for the first time.
        N)r   rS   rT   rU   rV   rW   r6   r+   r+   r,   rY     s   
z_LazyBatchQueueWrapper.queuerk   c                 C   s$   || _ | jdur| j| dS dS rl   )rT   r   rn   rm   r+   r+   r,   rn     s   
z)_LazyBatchQueueWrapper.set_max_batch_sizenew_batch_wait_timeout_sc                 C   s    || _ | jd ur|| j_ d S d S r4   )rU   r   )r7   r   r+   r+   r,   set_batch_wait_timeout_s  s   
z/_LazyBatchQueueWrapper.set_batch_wait_timeout_sc                 C      | j S r4   rT   r6   r+   r+   r,   get_max_batch_size     z)_LazyBatchQueueWrapper.get_max_batch_sizec                 C   r   r4   rU   r6   r+   r+   r,   get_batch_wait_timeout_s  r   z/_LazyBatchQueueWrapper.get_batch_wait_timeout_sc                 C   s   t t| jj S )z;Gets summary statistics of current iteration's start times.)r0   listrY   r`   valuesr6   r+   r+   r,   _get_curr_iteration_start_times  s   z6_LazyBatchQueueWrapper._get_curr_iteration_start_timesc                    s    t | jdr| jj  S dS )zGets whether default _BatchQueue's background task is alive.

        Returns False if the batch handler doesn't use a default _BatchQueue.
        ra   F)hasattrrY   ra   doner6   r+   r+   r,   _is_batching_task_alive  s   z._LazyBatchQueueWrapper._is_batching_task_alivec                    s2   t | jdrt }| jjj|d | S dS )zGets the stack for the default _BatchQueue's background task.

        Returns empty string if the batch handler doesn't use a default _BatchQueue.
        ra   )fileN)r   rY   ioStringIOra   print_stackgetvalue)r7   Z
str_bufferr+   r+   r,   _get_handling_task_stack  s   z/_LazyBatchQueueWrapper._get_handling_task_stack)r   r   rH   N)r$   r%   r&   __doc__rB   r@   r   r	   rg   rA   rS   rY   rn   r   r   r   r0   r   boolr   strr   r+   r+   r+   r,   r     s0    	
r   c                 C   sN   t | tst | tr|  rt| } ntd|  | dk r%td|  d S )Nz)max_batch_size must be integer >= 1, got rH   z,max_batch_size must be an integer >= 1, got )r   rB   r@   
is_integer	TypeError
ValueErrorr   r+   r+   r,   _validate_max_batch_size  s   

r   c                 C   s6   t | ttfstd|  | dk rtd|  d S )Nz/batch_wait_timeout_s must be a float >= 0, got r   )r   r@   rB   r   r   r   r+   r+   r,   _validate_batch_wait_timeout_s  s   r   rV   c                 C   s$   t | tr	| dk rtd|  d S )NrH   z4max_concurrent_batches must be an integer >= 1, got )r   rB   r   )rV   r+   r+   r,    _validate_max_concurrent_batches   s
   r   SelfTypeT)contravariantTRc                   @   *   e Zd Zdedee dee fddZdS )_SyncBatchingMethodself__SyncBatchingMethod__batchr2   c                C      d S r4   r+   )r7   r   r   r+   r+   r,   __call__  s   z_SyncBatchingMethod.__call__Nr$   r%   r&   r   r   r   r   r   r+   r+   r+   r,   r         "r   c                   @   r   )_AsyncBatchingMethodr   _AsyncBatchingMethod__batchr2   c                   s   d S r4   r+   )r7   r   r   r+   r+   r,   r     s   z_AsyncBatchingMethod.__call__Nr   r+   r+   r+   r,   r     r   r   
_sync_funcc                C   r   r4   r+   )r   r+   r+   r,   rz        rz   _async_funcc                C   r   r4   r+   )r   r+   r+   r,   rz        
_sync_methc                C   r   r4   r+   )r   r+   r+   r,   rz   "  r   _async_methc                C   r   r4   r+   )r   r+   r+   r,   rz   )  r   r   {Gz?rH   rT   rU   __BatchDecoratorc                C   r   r4   r+   )r   rT   rU   rV   r+   r+   r,   rz   0  s   c                
   @   s   e Zd ZdZedeee gee f deegef fddZ	edeee ge
eeee f f deege
eeef f fddZ	edeeeef deeegef fd	dZ	ed
eeeef deeege
eeef f fddZ	dS )r   zJDescibes behaviour of decorator produced by calling `batch` with argumentsr   r2   c                C   r   r4   r+   )r7   r   r+   r+   r,   r   >  r   z_BatchDecorator.__call__r   c                C   r   r4   r+   )r7   r   r+   r+   r,   r   B  r   r   c                C   r   r4   r+   )r7   r   r+   r+   r,   r   H  r   r   c                C   r   r4   r+   )r7   r   r+   r+   r,   r   N  r   N)r$   r%   r&   r   r   r	   r   r   r   r   r
   r   r   r   r   r+   r+   r+   r,   r   ;  s,    0Zstable)Z	stability_funcc                   sd   | durt | stdt| stdt t  t  fdd}t | r0|| S |S )a  Converts a function to asynchronously handle batches.

    The function can be a standalone function or a class method. In both
    cases, the function must be `async def` and take a list of objects as
    its sole argument and return a list of the same length as a result.

    When invoked, the caller passes a single object. These will be batched
    and executed asynchronously once there is a batch of `max_batch_size`
    or `batch_wait_timeout_s` has elapsed, whichever occurs first.

    `max_batch_size` and `batch_wait_timeout_s` can be updated using setter
    methods from the batch_handler (`set_max_batch_size` and
    `set_batch_wait_timeout_s`).

    Example:

    .. code-block:: python

            from ray import serve
            from starlette.requests import Request

            @serve.deployment
            class BatchedDeployment:
                @serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
                async def batch_handler(self, requests: List[Request]) -> List[str]:
                    response_batch = []
                    for r in requests:
                        name = (await requests.json())["name"]
                        response_batch.append(f"Hello {name}!")

                    return response_batch

                def update_batch_params(self, max_batch_size, batch_wait_timeout_s):
                    self.batch_handler.set_max_batch_size(max_batch_size)
                    self.batch_handler.set_batch_wait_timeout_s(batch_wait_timeout_s)

                async def __call__(self, request: Request):
                    return await self.batch_handler(request)

            app = BatchedDeployment.bind()

    Arguments:
        max_batch_size: the maximum batch size that will be executed in
            one call to the underlying function.
        batch_wait_timeout_s: the maximum duration to wait for
            `max_batch_size` elements before running the current batch.
        max_concurrent_batches: the maximum number of batches that can be
            executed concurrently. If the number of concurrent batches exceeds
            this limit, the batch handler will wait for a batch to complete
            before sending the next batch to the underlying function.
    Nz?@serve.batch can only be used to decorate functions or methods.z9Functions decorated with @serve.batch must be 'async def'c                    s   t  dtjdtfdddtjf fddt fdd}t fd	d
}t r8|}n|}j|_j|_	j
|_
j|_j|_j|_j|_|S )Nfirst_futurer2   c                 S  s<   | }	 z|I dH }|j }|jV  W n
 ty   Y dS w q)z1Generator that handles generator batch functions.TN)r/   r.   r   )r   r"   Zasync_responser+   r+   r,   batch_handler_generator  s   
z@batch.<locals>._batch_decorator.<locals>.batch_handler_generatorc                    s`   t t | |}t|  }|d ur|dd  }j}t  }tj }|	t
|||| |S )NrI   )r   r   r   rY   r   r   r   r*   Z_get_serve_request_contextrq   r   )rE   r   r!   r7   Zbatch_queuer"   r#   )r   lazy_batch_queue_wrapperr+   r,   enqueue_request  s   


z8batch.<locals>._batch_decorator.<locals>.enqueue_requestc                     s   | |} |S r4   r+   )rE   r   r   )r   r   r+   r,   generator_batch_wrapper  s   
z@batch.<locals>._batch_decorator.<locals>.generator_batch_wrapperc                     s    | |I d H S r4   r+   )rE   r   )r   r+   r,   batch_wrapper  s   z6batch.<locals>._batch_decorator.<locals>.batch_wrapper)r   r(   r)   r   r   r   r   Z_get_max_batch_sizer   Z_get_batch_wait_timeout_srn   r   r   r   r   )r   r   r   wrapperrU   rT   rV   )r   r   r   r   r,   _batch_decorator  s@   
zbatch.<locals>._batch_decorator)callabler   r   r   r   r   )r   rT   rU   rV   r   r+   r   r,   rz   U  s   =Ur"   r.   c                 C      |   s| | dS dS )z3Sets the future's result if the future is not done.N)r   
set_result)r"   r.   r+   r+   r,   r        r   r   c                 C   r   )z6Sets the future's exception if the future is not done.N)r   set_exception)r"   r   r+   r+   r,   r     r   r   )Nr   r   rH   )Gr(   r   loggingrs   collectionsr   dataclassesr   	functoolsr   inspectr   r   typingr   r   r	   r
   r   r   r   r   r   r   r   r   r   r   r   Zrayr   Zray._common.signaturer   r   r   Zray._common.utilsr   Zray.serve._private.constantsr   Zray.serve._private.utilsr   Zray.serve.exceptionsr   Zray.util.annotationsr   	getLoggerri   StopIterationr   r   r   r-   r0   rR   rS   r   r   r   rB   r   r   r   r   r   r   rz   r@   r   r)   r   r   r+   r+   r+   r,   <module>   s    D


  'W0
 "