o
    )i                     @   s6  d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZmZmZmZmZmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZ d dlmZ d dlmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 d dl4m5Z5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZBmCZCmDZD e/eEZFejGZHG dd deIZJde jKdeeLgdf ddfddZMeL ZNG d d! d!ZOG d"d# d#ZPG d$d% d%e#ZQG d&d' d'e'ZReSd(rejTrd d)lUmVZV eVZRdS dS dS )*    N)partial)AnyAsyncGeneratorCallableDictIterableListMappingOptionalSetTupleTypeUnion)ReferenceType)DecodingConfig
LoRAConfigModelConfigParallelConfigSchedulerConfig
VllmConfig)SchedulerOutputs)AsyncEngineArgs)asyncio_timeout)	LLMEngine)StatLoggerBase)EngineClient)ExecutorBase)
PromptType)InputPreprocessor)init_logger)LoRARequest)SamplerOutput)PoolingRequestOutputRequestOutput)PoolingParams)SamplingParams)ExecuteModelRequest)AnyTokenizer)UsageContext)Devicedeprecate_kwargs	weak_bindc                   @   s   e Zd ZdS )AsyncEngineDeadErrorN)__name__
__module____qualname__ r0   r0   h/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/engine/async_llm_engine.pyr,   '   s    r,   taskerror_callbackreturnc              
   C   sx   d}z|   }td|  tjjy   td Y dS  ty; } z|}tjd|d || t	d|d}~ww )zThis function is only intended for the `engine.run_engine_loop()` task.

    In particular, that task runs a `while True` loop that can only exit if
    there is an exception.
    NzEThe engine background task should never finish without an exception. z#Engine is gracefully shutting down.zEngine background task failed)exc_infozTask finished unexpectedly. This should never happen! Please open an issue on GitHub. See stack trace above for the actual cause.)
resultAssertionErrorasyncio
exceptionsCancelledErrorloggerinfo	Exceptionerrorr,   )r2   r3   	exceptionreturn_valueer0   r0   r1   _log_task_completion+   s*   rB   c                   @   s   e Zd ZdZdedeegdf ddfddZdeee	e
f ddfd	d
Z	ddeeeee f  ddfddZedefddZdeeee	f df fddZedefddZdS )AsyncStreamzA stream of RequestOutputs or PoolingRequestOutputs for a request
    that can be iterated over asynchronously via an async generator.
request_idcancelNr4   c                 C   s    || _ || _t | _d| _d S )NF)rD   _cancelr8   Queue_queue	_finished)selfrD   rE   r0   r0   r1   __init__N   s   

zAsyncStream.__init__itemc                 C   s   | j s| j| d S d S N)rI   rH   
put_nowaitrJ   rL   r0   r0   r1   putT   s   zAsyncStream.putr?   c                 C   s.   | j sd| _ | j| |r|nt d S d S )NT)rI   rH   rN   _is_raisableSTOP_ITERATION)rJ   r?   r0   r0   r1   finishY   s   zAsyncStream.finishc                 C   s   | j S rM   )rI   rJ   r0   r0   r1   finishedb   s   zAsyncStream.finishedc                 C  s\   z	 | j  I d H }| |r|tkrW d S ||V  q ty-   | | j tjd w rM   )	rH   getrQ   rR   GeneratorExitrF   rD   r8   r:   )rJ   r6   r0   r0   r1   	generatorf   s   
zAsyncStream.generatorvaluec                 C   s   t | tpt | tot| tS rM   )
isinstanceBaseExceptiontype
issubclass)rY   r0   r0   r1   rQ   u   s   

zAsyncStream._is_raisablerM   )r-   r.   r/   __doc__strr   rK   r   r#   r"   r=   rP   r
   r[   r   rS   propertyboolrU   r   rX   staticmethodr   rQ   r0   r0   r0   r1   rC   J   s*     

	
rC   c                	   @   s  e Zd ZdZd"ddZdd Zdefdd	Z	d#d
ede	e
 ddfddZdddeeef deddfddZddde
dededdfddZddde
dedefddZdddde
de	eeee f  deddfddZdeee ee
 f fddZdd Zd d! ZdS )$RequestTrackerz.Synchronous abstraction for tracking requests.r4   Nc                 C   s(   i | _ t | _t | _t | _d S rM   )_request_streamsr8   rG   _aborted_requests_new_requestsEventnew_requests_eventrT   r0   r0   r1   rK      s
   
zRequestTracker.__init__c                 C   s
   || j v S rM   )rd   rO   r0   r0   r1   __contains__      
zRequestTracker.__contains__c                 C   s
   t | jS rM   )lenrd   rT   r0   r0   r1   __len__   rj   zRequestTracker.__len__excrD   c                 C   s@   |dur| j ||d dS t| j D ]	}| j ||d qdS )zNPropagate an exception to request streams
        (all if request_id is None).Nr?   )abort_requesttuplerd   keys)rJ   rm   rD   Zridr0   r0   r1   propagate_exception   s
   z"RequestTracker.propagate_exceptionFverboserequest_outputrt   c                C   sj   |j }|j}|r| j|d}n| j|}|dur%|| |r%|  |r1|r3td| dS dS dS )z)Process a request output from the engine.NFinished request %s.)	rD   rU   rd   poprV   rP   rS   r;   r<   )rJ   ru   rt   rD   rU   streamr0   r0   r1   process_request_output   s   
z%RequestTracker.process_request_outputr?   c                C   s"   |rt d| | j||d dS )z'Propagate an exception from the engine.rv   rn   N)r;   r<   ro   )rJ   rD   r?   rt   r0   r0   r1   process_exception   s   z RequestTracker.process_exceptionc                K   sh   || j v rtd| dt| j|d}t||}| j|d|i|f | j  |r2t	
d| |S )zUAdd a request to be sent to the engine on the next background
        loop iteration.zRequest z already exists.rs   rD   zAdded request %s.)rd   KeyErrorr   ro   rC   rf   rN   rh   setr;   r<   )rJ   rD   rt   Zengine_add_request_kwargsro   rx   r0   r0   r1   add_request   s   


zRequestTracker.add_requestr?   rt   c                C   sF   |rt d| | j| | j|d}|dur!|j|d dS dS )z6Abort a request during next background loop iteration.zAborted request %s.Nrn   )r;   r<   re   rN   rd   rw   rS   )rJ   rD   r?   rt   rx   r0   r0   r1   ro      s   zRequestTracker.abort_requestc                 C   s   g }t  }| j s| j }|| | j r
| j sG| j \}}|j}||v r8|tj	 |
| n
|| j|< || | j r||fS )zLGet the new requests and finished requests to be
        sent to the engine.)r|   re   empty
get_nowaitaddrf   rD   rS   r8   r:   discardrd   append)rJ   new_requestsZfinished_requestsrD   rx   new_requestr0   r0   r1   get_new_and_aborted_requests   s    







z+RequestTracker.get_new_and_aborted_requestsc                    s(   |   s| j I d H  | j  d S rM   )has_new_requestsrh   waitclearrT   r0   r0   r1   wait_for_new_requests   s   z$RequestTracker.wait_for_new_requestsc                 C   s   | j   S rM   )rf   r   rT   r0   r0   r1   r      s   zRequestTracker.has_new_requestsr4   NrM   )r-   r.   r/   r^   rK   ri   intrl   r=   r
   r_   rr   r   r#   r"   ra   ry   r[   rz   rC   r}   r   ro   r   r   r   r   r   r   r   r0   r0   r0   r1   rc   |   sr    





rc   c                       s  e Zd ZdZ fddZdedeeee	f  fddZ
d"d	d
Z	d#dee defddZ						d$dededeeef dee dee deeeef  dedee deeeef  ddfddZd"ddZ			d%dedee dedee fd d!Z  ZS )&_AsyncLLMEnginez,Extension of LLMEngine to add async methods.c                    s   t  j|i | d S rM   )superrK   )rJ   argskwargs	__class__r0   r1   rK     s   z_AsyncLLMEngine.__init__virtual_enginer4   c                    s  | j | }|j}|j}|j}| j| }|j  | |sI| j| 	 \}}}||_||_|
 s9| j|  }|sHt|jdkrH| j|d nt }|dusRJ |dusXJ |
 s| |}t||j|j|j||j|j||d	}	|r{| j| |	_| j|	I dH }
nt|jdkr| j|d g }
| |s|sdn|d jjdk}|j|
|||d|d |
r|rt|
dksJ d	| |
d ||j |s| j|d |  ||
 | !| n|jS | " st|jdkr| j|d t|jdksJ |jS )
a  Performs one decoding iteration and returns newly generated results.
        The workers are ran asynchronously if possible.

        This function performs one decoding iteration of the engine. It first
        schedules the sequences to be executed in the next iteration and the
        token blocks to be swapped in/out/copy. Then, it executes the model
        and updates the scheduler with the model outputs. Finally, it decodes
        the sequences and returns the newly generated results.
        r   )ctxN)	seq_group_metadata_listblocks_to_swap_inblocks_to_swap_outblocks_to_copyr   num_lookahead_slotsrunning_queue_sizefinished_requests_idslast_sampled_token_idsF   T)outputsr   scheduler_outputsis_asyncZis_last_stepis_first_step_outputz4Async postprocessor expects only a single output set)#Zcached_scheduler_outputsr   r   allow_async_output_procZscheduler_contextsrequest_outputsr   Z_has_remaining_stepsZ	schedulerZscheduleZis_emptyZ#get_and_reset_finished_requests_idsrk   Zoutput_queueZ_process_model_outputslistZ_get_last_sampled_token_idsr&   r   r   r   r   r   Zasync_callbacksZasync_callbackmodel_executorZexecute_model_asyncstateZ	num_stepsZappend_outputZ_advance_to_next_stepZscheduled_seq_groupsdo_log_statsZ
do_tracinghas_unfinished_requests)rJ   r   Zcached_outputsr   r   r   r   r   r   Zexecute_model_reqr   r   r0   r0   r1   
step_async  s   





z_AsyncLLMEngine.step_asyncNc                    s   | j  I dH  dS )z&Stop the remote worker execution loop.N)r   'stop_remote_worker_execution_loop_asyncrT   r0   r0   r1   r     s   z7_AsyncLLMEngine.stop_remote_worker_execution_loop_asynclora_requestc                    s   |   |I d H S rM   )Zget_tokenizer_groupZget_lora_tokenizer_asyncrJ   r   r0   r0   r1   get_tokenizer_async  s   z#_AsyncLLMEngine.get_tokenizer_asyncr   rD   promptparamsarrival_timetrace_headersprioritydata_parallel_ranktokenization_kwargsc
              	      s   |dur| j std| d|dkr"| jjdks"td| d|du r*t }|dur2tdt|trQ|d	ddurQ|d
dsQdg|d	 jd  |d
< | j	j
|||	dI dH }
| j||
|||||d dS )zi
        Async version of
        [`add_request`][vllm.engine.llm_engine.LLMEngine.add_request].
        NzGot lora_request z but LoRA is not enabled!r   r   Got priority ( but Priority scheduling is not enabled.z9Targeting data_parallel_rank only supported in v1 client.Zprompt_embedsZprompt_token_ids)r   r   )rD   processed_inputsr   r   r   r   r   )Zlora_config
ValueErrorscheduler_configpolicytimerZ   dictrV   shapeinput_preprocessorZpreprocess_asyncZ_add_processed_request)rJ   rD   r   r   r   r   r   r   r   r   r   r0   r0   r1   add_request_async  s<   


z!_AsyncLLMEngine.add_request_asyncc                       | j   d S rM   )r   check_healthrT   r0   r0   r1   check_health_async     z"_AsyncLLMEngine.check_health_asyncr0   methodtimeoutr   r   c                    s   t rM   )NotImplementedErrorrJ   r   r   r   r   r0   r0   r1   collective_rpc_async  s   z$_AsyncLLMEngine.collective_rpc_asyncr   rM   NNNr   NNNr0   N)r-   r.   r/   r^   rK   r   r   r   r#   r"   r   r   r
   r    r'   r   r_   r   r%   r$   floatr	   r   r   r   r   rp   r   __classcell__r0   r0   r   r1   r     sp    
 


	


5r   c                   @   s  e Zd ZU dZeZee ed< ddddededdfd	d
Z	dd Z
ededee fddZeeddddejddddfdedededeeeef  dedededd fddZedejdfdedededeeeef  dd f
ddZedefddZedefd d!Zedefd"d#Zedefd$d%Zd&e ddfd'd(Z!d&e ddfd)d*Z"de#fd+d,Z$	dd-ee% de&fd.d/Z'dd0d1Z(dd2d3Z)d4e*defd5d6Z+defd7d8Z,d9e-e fd:d;Z.e/d<e0fd=d>Z1				?		dd@edAe2dBe3e4e5f dCee6 d-ee% dDee7eef  dEe*dFee* dGeeee8f  de9e3e:e;f df fdHdIZ<			?	ddAe2dJe4d@ed-ee% dDee7eef  dEe*dFee* de9e:df fdKdLZ=			?	ddAe2dMe5d@ed-ee% dDee7eef  dEe*dGeeee8f  de9e;df fdNdOZ>d@e3ee-e f ddfdPdQZ?d@eddfdRdSZ@defdTdUZAdeBfdVdWZCdeDfdXdYZEdeFfdZd[ZGdeHfd\d]ZIdeJfd^d_ZK		dd`eeL daeeMeN  ddfdbdcZOddddeZPdefdfdgZQdhedieddfdjdkZRdheddfdldmZSddndoZTddpdqZUddrdsZV	ddteeW ddfdudvZXddxe*ddfdydzZYdd{eeZe  ddfd|d}Z[defd~dZ\d-e%ddfddZ]			ddedee6 de^dee fddZ_dS )AsyncLLMEnginea  An asynchronous wrapper for [`LLMEngine`][vllm.LLMEngine].

    This class is used to wrap the [`LLMEngine`][vllm.LLMEngine] class to
    make it asynchronous. It uses asyncio to create a background loop that keeps
    processing incoming requests. The [`LLMEngine`][vllm.LLMEngine] is kicked
    by the generate method when there are requests in the waiting queue. The
    generate method yields the outputs from the [`LLMEngine`][vllm.LLMEngine]
    to the caller.

    Args:
        log_requests: Whether to log the requests.
        start_engine_loop: If True, the background task to run the engine
            will be automatically started in the generate call.
        *args: Arguments for [`LLMEngine`][vllm.LLMEngine].
        **kwargs: Arguments for [`LLMEngine`][vllm.LLMEngine].
    _engine_classT)log_requestsstart_engine_loopr   r   r4   Nc                O   sf   t jrtd|| _| j|i || _| jjj| _| jr#t	| j
| j_d | _d | _|| _d | _|  d S )NzUsing V0 AsyncLLMEngine, but envs.VLLM_USE_V1=True. This should not happen. As a workaround, try using AsyncLLMEngine.from_vllm_config(...) or explicitly set VLLM_USE_V1=0 or 1 and report this issue on Github.)envsVLLM_USE_V1r   r   r   engineZmodel_configZuse_async_output_proc$use_process_request_outputs_callbackr+   process_request_outputsZ process_request_outputs_callbackbackground_loop_background_loop_unshieldedr   _errored_with)rJ   r   r   r   r   r0   r0   r1   rK     s    zAsyncLLMEngine.__init__c                 C   s"   t | dd  }r|j  d S d S )Nrequest_tracker)getattrrh   r|   )rJ   rtr0   r0   r1   __del__  s   zAsyncLLMEngine.__del__engine_configc                 C   s
   t |S rM   )r   _get_executor_cls)clsr   r0   r0   r1   r     s   
z AsyncLLMEngine._get_executor_clsdisable_log_requestszEThis argument will have no effect. Use `enable_log_requests` instead.)Zadditional_messageFvllm_configusage_contextstat_loggersenable_log_requestsdisable_log_statsc              	   C   s   | ||  |||| ||dS )z-Create an AsyncLLMEngine from the EngineArgs.)r   Zexecutor_classr   r   Z	log_statsr   r   )r   )r   r   r   r   r   r   r   r   r0   r0   r1   from_vllm_config  s   zAsyncLLMEngine.from_vllm_configengine_argsc                 C   s>   | |}| }tjrddlm} |}|j|||||j|jdS )z6Creates an async LLM engine from the engine arguments.r   AsyncLLM)r   r   r   r   r   r   )Zcreate_engine_configr   r   vllm.v1.engine.async_llmr   r   r   r   )r   r   r   r   r   r   Zasync_engine_clsZV1AsyncLLMEnginer0   r0   r1   from_engine_args2  s   

zAsyncLLMEngine.from_engine_argsc                 C   s    | j d uo| jd uo| j  S rM   )r   r   donerT   r0   r0   r1   
is_runningL  s
   

zAsyncLLMEngine.is_runningc                 C   s$   | j p| jd uo| jd uo| j S rM   )erroredr   r   r   rT   r0   r0   r1   
is_stoppedR  s
   zAsyncLLMEngine.is_stoppedc                 C   s
   | j d uS rM   r   rT   r0   r0   r1   r   X  s   
zAsyncLLMEngine.erroredc                 C   s   t dS )NBackground loop is not running. If it was running, inspect the output to find the stacktrace of the error that caused the background loop to stop (AsyncEngineDeadError).)r,   rT   r0   r0   r1   
dead_error\  s   zAsyncLLMEngine.dead_errorrm   c                 C   s
   || _ d S rM   r   rJ   rm   r0   r0   r1   set_erroredd  rj   zAsyncLLMEngine.set_erroredc                 C   s   |  | | j| d S rM   )r   _request_trackerrr   r   r0   r0   r1   _error_callbackg  s   
zAsyncLLMEngine._error_callbackc                    s
   | j jS rM   )r   r   rT   r0   r0   r1   get_input_preprocessork  s   z%AsyncLLMEngine.get_input_preprocessorr   c                    s   | j |I d H S rM   )r   r   r   r0   r0   r1   get_tokenizern  s   zAsyncLLMEngine.get_tokenizerc                 C   sl   | j r	td| j| jrtdt | _t 	| 
t| | _| jtt| jd t| j| _dS )zStart the background loop.z$Background loop has errored already.z#Background loop is already running.)r3   N)r   r,   r   r   RuntimeErrorrc   r   r8   get_event_loopcreate_taskrun_engine_loopweakrefrefr   add_done_callbackr   rB   r   shieldr   rT   r0   r0   r1   start_background_loopt  s    z$AsyncLLMEngine.start_background_loopc                 C   s$   | j dur| j   d| _ d| _dS )a  
        Shut down the background loop.

        This method needs to be called during cleanup to remove
        references to `self` and properly GC the resources held
        by the async LLM engine (e.g., the executors as well as
        their resources).
        N)r   rE   r   rT   r0   r0   r1   shutdown_background_loop  s   
	

z'AsyncLLMEngine.shutdown_background_loopr   c                    s   | j  \}}|D ].}z| jjdi |I dH  W q
 ty8 } z| j j|d || jd W Y d}~q
d}~ww |rC| |I dH  | j|I dH }| j	sW| 
|}| S tdd |D }| S )ziKick the engine to process the waiting requests.

        Returns True if there are in-progress requests.NrD   rs   c                 s   s    | ]}|j V  qd S rM   )rU   ).0ru   r0   r0   r1   	<genexpr>  s    z-AsyncLLMEngine.engine_step.<locals>.<genexpr>r0   )r   r   r   r   r   rz   r   _engine_abortr   r   r   all)rJ   r   r   Zaborted_requestsr   rA   r   all_finishedr0   r0   r1   engine_step  s0   
zAsyncLLMEngine.engine_stepc                 C   s.   d}|D ]}| j j|| jd |o|j}q|S )NTrs   )r   ry   r   rU   )rJ   r   r  ru   r0   r0   r1   r     s   z&AsyncLLMEngine.process_request_outputsrequest_idsc                       | j | d S rM   )r   ro   )rJ   r
  r0   r0   r1   r       zAsyncLLMEngine._engine_abort
engine_refc              
      s  |    sdS  j jj}dg| }	 t|sZtd  j  I dH   j} t	dI dH  |  du r7dS |
 I dH  |    sEdS td  fddt|D }dg| }zgtt4 I dH ' tj|tjd	I dH \}}t|D ]
}t	dI dH  quW d  I dH  n1 I dH sw   Y  |D ](}| }||}	 j |	}
|s|
rt |	||	< d||	< qd||	< qW n tjy } ztd
  |  d}~ww t	dI dH  q)zsWe use a weakref to the engine so that the running loop
        doesn't prevent the engine being garbage collected.NFTzWaiting for new requests...r   zGot new requests!c                    s   g | ]
}t  |qS r0   )r8   r   r	  )r  ver   r0   r1   
<listcomp>  s    z2AsyncLLMEngine.run_engine_loop.<locals>.<listcomp>)return_whenz5Engine iteration timed out. This should never happen!)r   Zparallel_configpipeline_parallel_sizeanyr;   debugr   r   r8   sleepr   ranger   ENGINE_ITERATION_TIMEOUT_Sr   FIRST_COMPLETEDr6   indexZ*has_unfinished_requests_for_virtual_enginer   r	  TimeoutErrorr>   r   )r  r  Zhas_requests_in_progressr   Zrequests_in_progressr   _r2   r6   r   r   rm   r0   r  r1   r     sz   





(



zAsyncLLMEngine.run_engine_loopr   rD   r   r   r   r   r   r   r   c
                    sx   | j s| jr|   ntd|dkr#| jjjdks#td| d| jj	|| j
|||p0t |||||	d
}
|
 S )Nr   r   r   r   r   )	rt   r   r   r   r   r   r   r   r   )r   r   r  r,   r   r   r   r   r   r}   r   r   rX   )rJ   rD   r   r   r   r   r   r   r   r   rx   r0   r0   r1   r}   
  s.   

zAsyncLLMEngine.add_requestsampling_paramsc           	   	   C  h   z!| j |||||||dI dH 2 z3 dH W }t|tV  q6 W dS  tjy3   | |I dH   w )a  Generate outputs for a request.

        Generate outputs for a request. This method is a coroutine. It adds the
        request into the waiting queue of the LLMEngine and streams the outputs
        from the LLMEngine to the caller.

        Args:
            prompt: The prompt to the LLM. See
                [`PromptType`][vllm.inputs.PromptType] for more details about
                the format of each input.
            sampling_params: The sampling parameters of the request.
            request_id: The unique id of the request.
            lora_request: LoRA request to use for generation, if any.
            trace_headers: OpenTelemetry trace headers.
            priority: The priority of the request.
                Only applicable with priority scheduling.
            data_parallel_rank: The (global) data parallel rank that must
                handle this request. Only applicable if DP is enabled.
        Yields:
            The output `RequestOutput` objects from the LLMEngine
            for the request.

        Details:
            - If the engine is not running, start the background loop,
              which iteratively invokes
              [`engine_step`][vllm.engine.async_llm_engine.AsyncLLMEngine.engine_step]
              to process the waiting requests.
            - Add the request to the engine's `RequestTracker`.
              On the next background loop, this request will be sent to
              the underlying engine.
              Also, a corresponding `AsyncStream` will be created.
            - Wait for the request outputs from `AsyncStream` and yield them.

        Example:
            >>> # Please refer to entrypoints/api_server.py for
            >>> # the complete example.
            >>>
            >>> # initialize the engine and the example input
            >>> # note that engine_args here is AsyncEngineArgs instance
            >>> engine = AsyncLLMEngine.from_engine_args(engine_args)
            >>> example_input = {
            >>>     "prompt": "What is LLM?",
            >>>     "stream": False, # assume the non-streaming case
            >>>     "temperature": 0.0,
            >>>     "request_id": 0,
            >>> }
            >>>
            >>> # start the generation
            >>> results_generator = engine.generate(
            >>>    example_input["prompt"],
            >>>    SamplingParams(temperature=example_input["temperature"]),
            >>>    example_input["request_id"])
            >>>
            >>> # get the results
            >>> final_output = None
            >>> async for request_output in results_generator:
            >>>     if await request.is_disconnected():
            >>>         # Abort the request if the client disconnects.
            >>>         await engine.abort(request_id)
            >>>         # Return or raise an error
            >>>         ...
            >>>     final_output = request_output
            >>>
            >>> # Process and return the final output
            >>> ...
        )r   r   r   r   N)r}   r   validate_outputr#   r8   r:   abort)	rJ   r   r  rD   r   r   r   r   outputr0   r0   r1   generate4  s"   L	
zAsyncLLMEngine.generatepooling_paramsc           	   	   C  r  )a	  Generate outputs for a request from a pooling model.

        Generate outputs for a request. This method is a coroutine. It adds the
        request into the waiting queue of the LLMEngine and streams the outputs
        from the LLMEngine to the caller.

        Args:
            prompt: The prompt to the LLM. See
                [`PromptType`][vllm.inputs.PromptType] for more details about
                the format of each input.
            pooling_params: The pooling parameters of the request.
            request_id: The unique id of the request.
            lora_request: LoRA request to use for generation, if any.
            trace_headers: OpenTelemetry trace headers.
            priority: The priority of the request.
                Only applicable with priority scheduling.

        Yields:
            The output `PoolingRequestOutput` objects from the LLMEngine
            for the request.

        Details:
            - If the engine is not running, start the background loop,
                which iteratively invokes
                [`vllm.engine.async_llm_engine.AsyncLLMEngine.engine_step`][]
                to process the waiting requests.
            - Add the request to the engine's `RequestTracker`.
                On the next background loop, this request will be sent to
                the underlying engine.
                Also, a corresponding `AsyncStream` will be created.
            - Wait for the request outputs from `AsyncStream` and yield them.

        Example:
        ```
        # Please refer to entrypoints/api_server.py for
        # the complete example.

        # initialize the engine and the example input
        # note that engine_args here is AsyncEngineArgs instance
        engine = AsyncLLMEngine.from_engine_args(engine_args)
        example_input = {
            "input": "What is LLM?",
            "request_id": 0,
        }

        # start the generation
        results_generator = engine.encode(
        example_input["input"],
        PoolingParams(),
        example_input["request_id"])

        # get the results
        final_output = None
        async for request_output in results_generator:
            if await request.is_disconnected():
                # Abort the request if the client disconnects.
                await engine.abort(request_id)
                # Return or raise an error
                ...
            final_output = request_output

        # Process and return the final output
        ...
        ```
        )r   r   r   r   N)r}   r   r  r"   r8   r:   r  )	rJ   r   r"  rD   r   r   r   r   r   r0   r0   r1   encode  s"   K	
zAsyncLLMEngine.encodec                    s,   t |ts
td| jstd| |S )Abort a request.

        Abort a submitted request. If the request is finished or not found,
        this method will be a no-op.

        Args:
            request_id: The unique id of the request.
        z4Only single-request abort supported in deprecated V0r   )rZ   r_   r   r   r,   _abortrJ   rD   r0   r0   r1   r    s   
	
zAsyncLLMEngine.abortc                 C   s   | j j|tj| jd dS )r$  r~   N)r   ro   r8   r:   r   r&  r0   r0   r1   r%    s   	
zAsyncLLMEngine._abortc                       | j  S )z.Get the vllm configuration of the vLLM engine.)r   get_vllm_configrT   r0   r0   r1   r(       
zAsyncLLMEngine.get_vllm_configc                    r'  )z/Get the model configuration of the vLLM engine.)r   get_model_configrT   r0   r0   r1   r*    r)  zAsyncLLMEngine.get_model_configc                    r'  )z2Get the parallel configuration of the vLLM engine.)r   get_parallel_configrT   r0   r0   r1   r+    r)  z"AsyncLLMEngine.get_parallel_configc                    r'  )z2Get the decoding configuration of the vLLM engine.)r   get_decoding_configrT   r0   r0   r1   r,    r)  z"AsyncLLMEngine.get_decoding_configc                    r'  )z4Get the scheduling configuration of the vLLM engine.)r   get_scheduler_configrT   r0   r0   r1   r-    r)  z#AsyncLLMEngine.get_scheduler_configc                    r'  )z.Get the lora configuration of the vLLM engine.)r   get_lora_configrT   r0   r0   r1   r.    r)  zAsyncLLMEngine.get_lora_configr   model_outputc                    r   rM   )r   r   )rJ   r   r/  r0   r0   r1   r   #  s   zAsyncLLMEngine.do_log_statsc                    sJ   t  }td | jrtd| j I dH  tdt  |  dS )z'Raises an error if engine is unhealthy.zStarting health check...zBackground loop is stopped.NzHealth check took %fs)r   perf_counterr;   r  r   r,   r   r   )rJ   tr0   r0   r1   r   )  s   
zAsyncLLMEngine.check_healthc                    r'  rM   )r   is_tracing_enabledrT   r0   r0   r1   r2  3     
z!AsyncLLMEngine.is_tracing_enabledlogger_namer;   c                 C   s   | j j||d d S )N)r4  r;   )r   
add_logger)rJ   r4  r;   r0   r0   r1   r5  6  s   zAsyncLLMEngine.add_loggerc                 C   s   | j j|d d S )N)r4  )r   remove_logger)rJ   r4  r0   r0   r1   r6  9  s   zAsyncLLMEngine.remove_loggerc                    r   rM   )r   start_profilerT   r0   r0   r1   r7  <  r   zAsyncLLMEngine.start_profilec                    r   rM   )r   stop_profilerT   r0   r0   r1   r8  ?  r   zAsyncLLMEngine.stop_profilec                    r   rM   )r   reset_mm_cacherT   r0   r0   r1   r9  B  r   zAsyncLLMEngine.reset_mm_cachedevicec                    r  rM   )r   reset_prefix_cache)rJ   r:  r0   r0   r1   r;  E  s   z!AsyncLLMEngine.reset_prefix_cacher   levelc                    s    |   I d H  | j| d S rM   )r;  r   r  )rJ   r<  r0   r0   r1   r  I  s   zAsyncLLMEngine.sleeptagsc                    r  rM   )r   wake_up)rJ   r=  r0   r0   r1   r>  M  r  zAsyncLLMEngine.wake_upc                    r'  rM   )r   is_sleepingrT   r0   r0   r1   r?  P  r3  zAsyncLLMEngine.is_sleepingc                    r  rM   )r   add_lorar   r0   r0   r1   r@  S  r  zAsyncLLMEngine.add_lorar0   r   r   r   r   c                    s   | j ||||I dH S )zB
        Perform a collective RPC call to the given path.
        N)r   r   r   r0   r0   r1   collective_rpcV  s   
zAsyncLLMEngine.collective_rpcrM   r   r   )NNr   N)NN)r   r   )`r-   r.   r/   r^   r   r   r   __annotations__ra   rK   r   classmethodr   r   r   r*   r(   ZENGINE_CONTEXTr
   r   r_   r   r   r   r   r   r`   r   r   r   r[   r   r=   r   r   r   r   r    r'   r   r  r  r   r	  r   r   r  rb   r   r   r   r   r%   r$   r   r	   r   r   r#   r"   r}   r!  r#  r  r%  r(  r   r*  r   r+  r   r,  r   r-  r   r.  r   r   r!   r   r   r2  r5  r6  r7  r8  r9  r)   r;  r  r   r>  r?  r@  rp   rA  r0   r0   r0   r1   r     s  
 
$	


&
I
	

/
	
`
	
Z







r   r   r   )Wr8   r   r   	functoolsr   typingr   r   r   r   r   r   r	   r
   r   r   r   r   r   Z	vllm.envsr   Zvllm.configr   r   r   r   r   r   Zvllm.core.schedulerr   Zvllm.engine.arg_utilsr   Zvllm.engine.async_timeoutr   Zvllm.engine.llm_enginer   Zvllm.engine.metrics_typesr   Zvllm.engine.protocolr   Zvllm.executor.executor_baser   Zvllm.inputsr   Zvllm.inputs.preprocessr   Zvllm.loggerr   Zvllm.lora.requestr    Z"vllm.model_executor.layers.samplerr!   Zvllm.outputsr"   r#   Zvllm.pooling_paramsr$   Zvllm.sampling_paramsr%   Zvllm.sequencer&   Z!vllm.transformers_utils.tokenizerr'   Zvllm.usage.usage_libr(   Z
vllm.utilsr)   r*   r+   r-   r;   ZVLLM_ENGINE_ITERATION_TIMEOUT_Sr  r   r,   Taskr=   rB   rR   rC   rc   r   r   is_setr   r   r   r0   r0   r0   r1   <module>   sf   8 
2  S     