o
    )il                     @   s  d dl Z d dlZd dlmZmZmZ d dlmZ d dlmZm	Z	m
Z
 d dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlm Z m!Z! d dl"m#Z#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6m7Z7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z>m?Z? d dl@mAZAmBZB d dlCmDZD d dlEmFZF d dlGmHZH d dlImJZJmKZK d dlLmMZM d d lNmOZO eePZQG d!d" d"eZRdS )#    N)AsyncGeneratorIterableMapping)copy)AnyOptionalUnion)ModelConfig
VllmConfig)AsyncEngineArgs)EngineClient)VLLM_V1_OUTPUT_PROC_CHUNK_SIZE)
PromptType)InputPreprocessor)init_logger)LoRARequest)MULTIMODAL_REGISTRYMultiModalRegistry)PoolingRequestOutputRequestOutput)PoolingParams)SamplingParams)SupportedTask)(maybe_register_config_serialize_by_value)AnyTokenizer)init_tokenizer_from_configs)UsageContext)Deviceas_listcancel_task_threadsafecdivdeprecate_kwargs)EngineCoreRequest)EngineCoreClient)EngineDeadErrorEngineGenerateError)OutputProcessorRequestOutputCollector)ParentRequest)	Processor)Executor)StatLoggerFactoryStatLoggerManager)shutdown_prometheus)IterationStatsc                   @   sn  e Zd Zejedddddddf	dedee de	d	ed
e
de	de	de	deee  deeeef  dededdfddZeeddddejdddddddf	dede	d	edeee  de	de	deeeef  dedede	dd fddZedejdfdede	d	edeee  dd f
ddZdd  Zd!d" Zdeed#f fd$d%Z						dd&ed'ed(eeef d)ee  d*ee! d+eeee"f  d,ee#eef  d-ed.ee de$fd/d0Z%d1e&d'ee d2ee' d3ed4e$f
d5d6Z(				dd'ed7ed&ed*ee! d,ee#eef  d-ed.ee de)e*df fd8d9Z+d:d; Z,d&eee-e f ddfd<d=Z.				dd'ed>ed&ed*ee! d,ee#eef  d-ed+eeee"f  de)e/df fd?d@Z0defdAdBZ1de2fdCdDZ3dEdF Z4de5fdGdHZ6	dd*ee! de7fdIdJZ8de	fdKdLZ9		d	ddMdNZ:ddOdPZ;ddQdRZ<ddSdTZ=ddUdVZ>	ddWee? ddfdXdYZ@ddZeddfd[d\ZAdd]eee  ddfd^d_ZBde	fd`daZCd*e!de	fdbdcZDddede	fdedfZEdeFe fdgdhZGddede	fdidjZH		k	ddledmee  dnedoee fdpdqZIddsefdtduZJ	rddvedsefdwdxZKeLde	fdydzZMeLde	fd{d|ZNeLde	fd}d~ZOeLdePfddZQdS )AsyncLLMFTN   r   vllm_configexecutor_class	log_statsusage_contextmm_registryuse_cached_outputslog_requestsstart_engine_loopstat_loggersclient_addressesclient_countclient_indexreturnc                 C   s   t jstdt  |j| _|| _|| _|| _| jjrd| _	nt
|j|j|jd| _	t|| j	|d| _t| j	| jd| _tj||| j|
||d| _d| _| jr^t|| jj|	d| _| j  d| _zt  |   W dS  tyv   Y dS w )a  
        Create an AsyncLLM.

        Args:
            vllm_config: global configuration.
            executor_class: an Executor impl, e.g. MultiprocExecutor.
            log_stats: Whether to log stats.
            usage_context: Usage context of the LLM.
            mm_registry: Multi-modal registry.
            use_cached_outputs: Whether to use cached outputs.
            log_requests: Whether to log requests.
            start_engine_loop: Whether to start the engine loop.
            stat_loggers: customized stat loggers for the engine.
                If not provided, default stat loggers will be used.
                PLEASE BE AWARE THAT STAT LOGGER IS NOT STABLE
                IN V1, AND ITS BASE CLASS INTERFACE MIGHT CHANGE.

        Returns:
            None
        Using V1 AsyncLLMEngine, but envs.VLLM_USE_V1=False. This should not happen. As a workaround, try using AsyncLLMEngine.from_vllm_config(...) or explicitly set VLLM_USE_V1=0 or 1 and report this issue on Github.N)model_configscheduler_configlora_config)r1   	tokenizerr5   )r3   )r1   r2   r3   r:   r;   r<   r1   Zengine_idxsZcustom_stat_loggers)envsVLLM_USE_V1
ValueErrorr   r?   r1   r7   r3   Zskip_tokenizer_initrB   r   r@   rA   r)   	processorr&   output_processorr#   Zmake_async_mp_clientengine_corelogger_managerr,   Zengine_ranks_managedZlog_engine_initializedoutput_handlerasyncioget_running_loop_run_output_handlerRuntimeError)selfr1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<    rQ   d/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/v1/engine/async_llm.py__init__1   s^   #

zAsyncLLM.__init__disable_log_requestszEThis argument will have no effect. Use `enable_log_requests` instead.)Zadditional_messageenable_log_requestsdisable_log_statsc                 C   s2   t jstd| |t||||| ||||	d
S )Nr>   )
r1   r2   r8   r9   r7   r3   r4   r:   r;   r<   )rD   rE   rF   r*   	get_class)clsr1   r8   r4   r9   rU   rV   r:   r;   r<   rT   rQ   rQ   rR   from_vllm_config   s    zAsyncLLM.from_vllm_configengine_argsc              	   C   s0   | |}t|}| |||j|j |||dS )z'Create an AsyncLLM from the EngineArgs.)r1   r2   r7   r3   r8   r4   r9   )Zcreate_engine_configr*   rW   rU   rV   )rX   rZ   r8   r4   r9   r1   r2   rQ   rQ   rR   from_engine_args   s   

zAsyncLLM.from_engine_argsc                 C   s   |    d S N)shutdownrP   rQ   rQ   rR   __del__   s   zAsyncLLM.__del__c                 C   s2   t   t| dd }r|  tt| dd dS )z2Shutdown, cleaning up the background proc and IPC.rI   NrK   )r-   getattrr]   r   )rP   rI   rQ   rQ   rR   r]      s   zAsyncLLM.shutdown.c                       | j  I d H S r\   )rI   Zget_supported_tasks_asyncr^   rQ   rQ   rR   get_supported_tasks      zAsyncLLM.get_supported_tasks
request_idpromptparamsarrival_timelora_requesttokenization_kwargstrace_headersprioritydata_parallel_rankc
                    s   | j rt t|t}
t|jd}| j|||||||||		\}}|
s)|jdkr7| 	||dd|I dH  |S t
||}t|jD ](}||\}}||jd krS|nt|}||_||_| 	|||||I dH  qA|S )z Add new request to the AsyncLLM.)output_kindr0   Nr   )erroredr$   
isinstancer   r'   rm   rG   Zprocess_inputsn_add_requestr(   rangeZget_child_infor   rd   sampling_params)rP   rd   re   rf   rg   rh   ri   rj   rk   rl   Z
is_poolingqueueZ
prompt_strrequestZparent_requestidxZchild_requestrQ   rQ   rR   add_request   s,   



zAsyncLLM.add_requestru   
parent_reqindexrt   c                    sD   | j ||||| | j|I d H  | jr td|j d S d S )NzAdded request %s.)rH   rw   rI   Zadd_request_asyncr7   loggerinford   )rP   ru   re   rx   ry   rt   rQ   rQ   rR   rq     s   zAsyncLLM._add_requestrs   c              
   C  s  z0|    | j|||||||dI dH }d}	|	s/| p#| I dH }
|
j}	|
V  |	rW dS W dS  tjtfyM   | |I dH  | j	rLt
d|   ty]   | j	r\t
d|   tym   | j	rlt
d|   ty } z| |I dH  | j	rt
d| t |d}~ww )aj  
        Main function called by the API server to kick off a request
            * 1) Making an AsyncStream corresponding to the Request.
            * 2) Processing the Input.
            * 3) Adding the Request to the Detokenizer.
            * 4) Adding the Request to the EngineCore (separate process).

        A separate output_handler loop runs in a background AsyncIO task,
        pulling outputs from EngineCore and putting them into the
        per-request AsyncStream.

        The caller of generate() iterates the returned AsyncGenerator,
        returning the RequestOutput back to the caller.
        )rh   rj   rk   rl   NFRequest %s aborted. Request %s failed (engine dead). Request %s failed (bad request).Request %s failed.)rN   rw   
get_nowaitgetfinishedrL   CancelledErrorGeneratorExitabortr7   rz   r{   r$   rF   	Exceptionr%   )rP   re   rs   rd   rh   rj   rk   rl   qr   outerQ   rQ   rR   generate"  sL   zAsyncLLM.generatec                    sJ   | j durdS | j | j| j| j fdd}t| | _ dS )zBBackground loop: pulls from EngineCore and pushes to AsyncStreams.Nc               
      s  ze	    I d H } t| j}r|rt nd }|tkr!| jf}n
t| jt|t}t|D ]*\}}	|| j
|}|jr@J |d t|k rPtdI d H   |jI d H  q/rfj| j| j|d q ty } ztd | W Y d }~d S d }~ww )NTr0   r   )Z
engine_idxscheduler_statsiteration_statszAsyncLLM output_handler failed.)Zget_output_asynclenoutputsr.   r   npZarray_splitr    	enumerateZprocess_outputs	timestampZrequest_outputsrL   sleepabort_requests_asyncZreqs_to_abortrecordZengine_indexr   r   rz   	exceptionZpropagate_error)r   Znum_outputsr   ZslicesiZoutputs_sliceZprocessed_outputsr   rI   r3   rJ   rH   rQ   rR   rK     sN   


*
z4AsyncLLM._run_output_handler.<locals>.output_handler)rK   rI   rH   r3   rJ   rL   create_task)rP   rK   rQ   r   rR   rN   t  s   
0zAsyncLLM._run_output_handlerc                    sX   t |tr	|fnt|}| j|}| j|I dH  | jr*t	dd
| dS dS )z2Abort RequestId in OutputProcessor and EngineCore.NzAborted request(s) %s.,)ro   strr   rH   Zabort_requestsrI   r   r7   rz   r{   join)rP   rd   Zrequest_idsZall_request_idsrQ   rQ   rR   r     s   
zAsyncLLM.abortpooling_paramsc              
   C  s&  z7|    | j|||||||dI dH }d}	|	s6| p#| I dH }
t|
ts+J |
j}	|
V  |	rW dS W dS  tjyR   | 	|I dH  | j
rQtd|   tyb   | j
ratd|   tyr   | j
rqtd|   ty } z| 	|I dH  | j
rtd| t |d}~ww )a2  
        Main function called by the API server to kick off a request
            * 1) Making an AsyncStream corresponding to the Request.
            * 2) Processing the Input.
            * 3) Adding the Request to the EngineCore (separate process).

        A separate output_handler loop runs in a background AsyncIO task,
        pulling outputs from EngineCore and putting them into the
        per-request AsyncStream.

        The caller of generate() iterates the returned AsyncGenerator,
        returning the RequestOutput back to the caller.
        )rh   rj   rk   ri   NFr|   r}   r~   r   )rN   rw   r   r   ro   r   r   rL   r   r   r7   rz   r{   r$   rF   r   r%   )rP   re   r   rd   rh   rj   rk   ri   r   r   r   r   rQ   rQ   rR   encode  sN   zAsyncLLM.encodec                       | j S r\   )r1   r^   rQ   rQ   rR   get_vllm_config     zAsyncLLM.get_vllm_configc                    r   r\   )r?   r^   rQ   rQ   rR   get_model_config  r   zAsyncLLM.get_model_configc                    s
   t d)NzNot Supported on V1 yet.)rF   r^   rQ   rQ   rR   get_decoding_config     zAsyncLLM.get_decoding_configc                    s
   | j jS r\   )rG   Zinput_preprocessorr^   rQ   rQ   rR   get_input_preprocessor  r   zAsyncLLM.get_input_preprocessorc                    s    | j d u r
td| j |S )Nz;Unable to get tokenizer because skip_tokenizer_init is True)rB   rF   Zget_lora_tokenizerrP   rh   rQ   rQ   rR   get_tokenizer  s   
zAsyncLLM.get_tokenizerc                    s   dS NFrQ   r^   rQ   rQ   rR   is_tracing_enabled$  s   zAsyncLLM.is_tracing_enabledc                    s   | j r| j   d S d S r\   )rJ   log)rP   Zscheduler_outputsZmodel_outputrQ   rQ   rR   do_log_stats'  s   zAsyncLLM.do_log_statsc                    s   t d | jr| jd S )NzCalled check_health.)rz   debugrn   
dead_errorr^   rQ   rQ   rR   check_health/  s
   
zAsyncLLM.check_healthc                       | j dI d H  d S )NTrI   Zprofile_asyncr^   rQ   rQ   rR   start_profile4     zAsyncLLM.start_profilec                    r   r   r   r^   rQ   rQ   rR   stop_profile7  r   zAsyncLLM.stop_profilec                    s2   | j j| j | j j  | j I d H  d S r\   )rG   r5   Zreset_processor_cacher?   Zmm_input_cache_clientresetrI   Zreset_mm_cache_asyncr^   rQ   rQ   rR   reset_mm_cache:  s   zAsyncLLM.reset_mm_cachedevicec                    s(   |t jkr
td| j I d H  d S )NzNot supported on CPU.)r   ZCPUrF   rI   Zreset_prefix_cache_async)rP   r   rQ   rQ   rR   reset_prefix_cache?  s   
zAsyncLLM.reset_prefix_cachelevelc                    s&   |   I d H  | j|I d H  d S r\   )r   rI   Zsleep_async)rP   r   rQ   rQ   rR   r   E  s   zAsyncLLM.sleeptagsc                    s   | j |I d H  d S r\   )rI   Zwake_up_async)rP   r   rQ   rQ   rR   wake_upI  r   zAsyncLLM.wake_upc                    ra   r\   )rI   Zis_sleeping_asyncr^   rQ   rQ   rR   is_sleepingL  rc   zAsyncLLM.is_sleepingc                       | j |I dH S )z<Load a new LoRA adapter into the engine for future requests.N)rI   Zadd_lora_asyncr   rQ   rQ   rR   add_loraO     zAsyncLLM.add_loralora_idc                    r   )z&Remove an already loaded LoRA adapter.N)rI   Zremove_lora_asyncrP   r   rQ   rQ   rR   remove_loraS  r   zAsyncLLM.remove_lorac                    s   | j  I dH S )zList all registered adapters.N)rI   Zlist_loras_asyncr^   rQ   rQ   rR   
list_lorasW  s   zAsyncLLM.list_lorasc                    r   )z&Prevent an adapter from being evicted.N)rI   Zpin_lora_asyncr   rQ   rQ   rR   pin_lora[  r   zAsyncLLM.pin_lorarQ   methodtimeoutargskwargsc                    s   | j ||||I dH S )zB
        Perform a collective RPC call to the given path.
        N)rI   Zcollective_rpc_async)rP   r   r   r   r   rQ   rQ   rR   collective_rpc_  s   
zAsyncLLM.collective_rpc,  drain_timeoutc                    sl   t   }t   | |k r.| j std dS td tdI dH  t   | |k std| d)z$Wait for all requests to be drained.z,Engines are idle, requests have been drainedNz;Engines are still running, waiting for requests to drain...r0   zTimeout reached after z' seconds waiting for requests to drain.)timerI   Zdp_engines_runningrz   r{   rL   r   TimeoutError)rP   r   
start_timerQ   rQ   rR   wait_for_requests_to_drainj  s   

	z#AsyncLLM.wait_for_requests_to_drainnew_data_parallel_sizec                    s   | j jj}||krtd| dS td| | |I dH  td| | j|I dH  || j j_||krJ| jrLt	| j t
t|dd| _dS dS dS )a  
        Scale up or down the data parallel size by adding or removing
        engine cores.
        Args:
            new_data_parallel_size: The new number of data parallel workers
            drain_timeout:
                Maximum time to wait for requests to drain (seconds)
        z0Data parallel size is already %s, skipping scaleNz@Waiting for requests to drain before scaling up to %s engines...z?Requests have been drained, proceeding with scale to %s enginesrC   )r1   Zparallel_configZdata_parallel_sizerz   r{   r   rI   scale_elastic_epr3   r,   listrr   rJ   )rP   r   r   Zold_data_parallel_sizerQ   rQ   rR   r   y  s6   
zAsyncLLM.scale_elastic_epc                 C   s   | j d u p
| j   S r\   )rK   doner^   rQ   rQ   rR   
is_running  s   zAsyncLLM.is_runningc                 C   s   | j S r\   )rn   r^   rQ   rQ   rR   
is_stopped     zAsyncLLM.is_stoppedc                 C   s   | j jjp| j S r\   )rI   	resourcesZengine_deadr   r^   rQ   rQ   rR   rn     s   zAsyncLLM.erroredc                 C   s   t  S r\   )r$   r^   rQ   rQ   rR   r     r   zAsyncLLM.dead_error)NNNNr   N)NNr   Nr\   )NN)r=   N)r0   )NrQ   N)r   )R__name__
__module____qualname__r   ZENGINE_CONTEXTr   r
   typer*   boolr   r   r   r+   dictr   intrS   classmethodr!   rY   r   r[   r_   r]   tupler   rb   r   r   r   r   floatr   r   r   r'   rw   r"   r(   rq   r   r   r   rN   r   r   r   r   r   r	   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   setr   r   r   r   r   propertyr   r   rn   BaseExceptionr   rQ   rQ   rQ   rR   r/   /   s   	


b
	
"


	

*

	
R?
	
P









(r/   )SrL   r   collections.abcr   r   r   r   typingr   r   r   numpyr   Z	vllm.envsrD   Zvllm.configr	   r
   Zvllm.engine.arg_utilsr   Zvllm.engine.protocolr   r   Zvllm.inputsr   Zvllm.inputs.preprocessr   Zvllm.loggerr   Zvllm.lora.requestr   Zvllm.multimodalr   r   Zvllm.outputsr   r   Zvllm.pooling_paramsr   Zvllm.sampling_paramsr   Z
vllm.tasksr   Zvllm.transformers_utils.configr   Z!vllm.transformers_utils.tokenizerr   Z'vllm.transformers_utils.tokenizer_groupr   Zvllm.usage.usage_libr   Z
vllm.utilsr   r   r   r    r!   Zvllm.v1.enginer"   Zvllm.v1.engine.core_clientr#   Zvllm.v1.engine.exceptionsr$   r%   Zvllm.v1.engine.output_processorr&   r'   Z vllm.v1.engine.parallel_samplingr(   Zvllm.v1.engine.processorr)   Zvllm.v1.executor.abstractr*   Zvllm.v1.metrics.loggersr+   r,   Zvllm.v1.metrics.prometheusr-   Zvllm.v1.metrics.statsr.   r   rz   r/   rQ   rQ   rQ   rR   <module>   sJ   