o
    )iC                  
   @   s  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ erbd dlmZ d dlm Z  ee!Z"dZ#z7d dl$Z$d dl%m&Z& d dl'm(Z( zd dl)m*Z* W n e+y   d dl)m,Z- e-j.Z*Y nw G dd deZ/dZ0W n e+y Z1 zdZ$e2e1Z0dZ/W Y dZ1[1ndZ1[1ww de3fddZ4dd Z5dddede2fddZ6d+d d!Z7d+d"d#Z8	d,ded$ee2 fd%d&Z9de:fd'd(Z;de:fd)d*Z<dS )-    N)defaultdict)TYPE_CHECKINGDictListOptionalTupleUnion)ParallelConfig)decode_hookencode_hook)init_loggercurrent_platform)ExecuteModelRequestIntermediateTensorsget_ip)WorkerWrapperBase)SchedulerOutput)ModelRunnerOutputi  )placement_group_table)PlacementGroup)available_resources_per_node)statec                       s   e Zd ZdZd fddZdefddZdeeee	 f fdd	Z
d
eeeeee f f defddZdd Zdeded f deded f fddZdeeef fddZ  ZS )RayWorkerWrapperzyRay wrapper for vllm.worker.Worker, allowing Worker to be
        lazily initialized after Ray sets CUDA_VISIBLE_DEVICES.returnNc                    s>   t  j|i | d| _tjjttd| _tjj	t
d| _d S )NF)Zdec_hook)Zenc_hook)super__init__compiled_dag_cuda_device_setmsgspecmsgpackZDecoderr   r
   input_decoderZEncoderr   output_encoder)selfargskwargs	__class__ c/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/executor/ray_utils.pyr   *   s   zRayWorkerWrapper.__init__c                 C   s   t  S Nr   r#   r(   r(   r)   get_node_ip6   s   zRayWorkerWrapper.get_node_ipc                 C   sB   t   }tjjj}|stdtjjjt  	 | }||fS )Nz)current platform %s does not support ray.)
rayget_runtime_contextget_node_idvllm	platformsr   ray_device_keyRuntimeErrordevice_nameZget_accelerator_ids)r#   node_idZ
device_keyZgpu_idsr(   r(   r)   get_node_and_gpu_ids9   s   

z%RayWorkerWrapper.get_node_and_gpu_idsreq_or_tuplec                 C   sx   t |tr|d}}n|\}}| j|}| js"t| jj d| _| j	||}t |t
r4||f}|S | j|}|S )a  Execute model in SPMD fashion: used only when SPMD worker and
            compiled DAG are both enabled.

            Args:
                req_or_tuple: A request or a tuple containing the
                    request and intermediate tensors. Intermediate tensors are
                    None unless if it is provided because it is > 0 pipeline
                    stage. The request is serialized by msgspec.
            NT)
isinstancebytesr!   decoder   r   
set_deviceworkerdeviceZ_execute_model_spmdr   r"   encode)r#   r7   Zserialized_reqintermediate_tensorsZexecute_model_reqoutputr(   r(   r)   execute_model_spmdC   s   

z#RayWorkerWrapper.execute_model_spmdc                 C   s>   | j d us	J d| jst rnt| j j d| _d S d S )NWorker is not initializedT)r<   r   r   Zis_tpur;   r=   r+   r(   r(   r)   setup_device_if_necessaryi   s   
z*RayWorkerWrapper.setup_device_if_necessaryscheduler_outputr   )r   r   r   c                 C   s^   |    | jd usJ dt|tr|\}}n|d }}| jj||}t|tr-||f}|S )NrB   )rC   r<   r8   tupleZmodel_runnerZexecute_modelr   )r#   rD   r?   r@   r(   r(   r)   execute_model_rayx   s   	



z"RayWorkerWrapper.execute_model_rayvarsc                 C   s   t j| d S r*   )osenvironupdate)r#   rG   r(   r(   r)   override_env_vars   s   z"RayWorkerWrapper.override_env_vars)r   N)__name__
__module____qualname____doc__r   strr,   r   r   intr6   r   r9   r   r   rA   rC   rF   r   rK   __classcell__r(   r(   r&   r)   r   &   s.    

&
r   r   c                   C   s   t duS )z!Returns True if Ray is available.N)r-   r(   r(   r(   r)   ray_is_available   s   rS   c                   C   s   t du rtdt ddS )z+Raise an exception if Ray is not available.NzFailed to import Ray: z+.Please install Ray with `pip install ray`.)r-   
ValueErrorray_import_errr(   r(   r(   r)   assert_ray_available   s   rV   placement_groupr   parallel_config
device_strc           
   
   C   s   t  sJ dt| }|d }|d }tt}| D ]\}}|| ||  qt   }	|	|vrCt	d|	 d| j
 d| d| D ]\}}t||jk ratd|j|t||||j qGd	S )
zVerify a given placement group has bundles located in the right place.

    There are 2 rules.
    - Warn if all tensor parallel workers cannot fit in a single node.
    - Fail if driver node is not included in a placement group.
    zDRay is not initialized although distributed-executor-backend is ray.bundles_to_node_idbundleszdriver node id z& is not included in a placement group z. Node id -> bundles z. You don't have enough GPUs available in a current node. Check `ray status` and `ray list nodes` to see if you have available GPUs in a node `{driver_node_id}` before starting an vLLM engine.aC  tensor_parallel_size=%d is bigger than a reserved number of %ss (%d %ss) in a node %s. Tensor parallel workers can be spread out to 2+ nodes which can degrade the performance unless you have fast interconnect across nodes, like Infiniband. To resolve this issue, make sure you have more than %d GPUs available at each node.N)r-   is_initializedr   r   listitemsappendr.   r/   r3   idlenZtensor_parallel_sizeloggerwarning)
rW   rX   rY   Zpg_dataZbundle_to_node_idsr[   Znode_id_to_bundleZ
bundle_idxr5   Zdriver_node_idr(   r(   r)   _verify_bundles   s6   
	rd   current_placement_groupc              	   C   s   | j }t }|  }d}t | tk r?tj|g|d\}}t|dkr&n|d9 }tdt	t | | t | tk sz
tj
|dd W d	S  tjjy^   td|dt dd	w )
zWait until a placement group is ready.

    It prints the informative log messages if the placement group is
    not created within time.

    
   )timeoutr      a6  Waiting for creating a placement group of specs for %d seconds. specs=%s. Check `ray status` and `ray list nodes` to see if you have enough resources, and make sure the IP addresses used by ray cluster are the same as VLLM_HOST_IP environment variable specified in each node if you are running on a multi-node.z:Cannot provide a placement group of placement_group_specs=z within z^ seconds. See `ray status` and `ray list nodes` to make sure the cluster has enough resources.N)bundle_specstimereadyPG_WAIT_TIMEOUTr-   waitra   rb   inforQ   get
exceptionsZGetTimeoutErrorrT   )re   placement_group_specssZpg_ready_refwait_intervalrk   _r(   r(   r)   _wait_until_pg_ready   s6   
ru   c                 C   s   t j|  t }d}t | tk r>t j }|d u rd S |d9 }tdtt |  t	| t | tk sd S d S )Nrf   rh   z?Waiting for removing a placement group of specs for %d seconds.)
r-   utilZremove_placement_grouprj   rl   get_current_placement_grouprb   rn   rQ   sleep)re   rr   rs   pgr(   r(   r)   _wait_until_pg_removed   s   

rz   ray_addressc                    s.  t   ddlm} t rtd n0| s| r;zt	d W n  t
y:   td tj	|| j| jd Y n	w tj	|| jd |j  sQtd|j d	| jrX| j}ntj }|rtd
 |j}d}|D ]}| d}|dkrtd  d|r|d7 }qk| j|krtd  d  d| j d| d	ngtd t  d}| j|krtd    fddt| jD }	t }
t  }t | }| ddk rtd  d|d  d  d|d|
dd|	d d|
 < tjj|	dd}t| |d usJ t||   || _d S )!a  Initialize the distributed cluster with Ray.

    it will connect to the Ray cluster and create a placement group
    for the workers, which includes the specification of the resources
    for each distributed worker.

    Args:
        parallel_config: The configurations for parallel execution.
        ray_address: The address of the Ray cluster. If None, uses
            the default Ray cluster address.
    r   r   z8Ray is already initialized. Skipping Ray initialization.autoz_No existing RAY instance detected. A new instance will be launched with current node resources.)addressZnum_gpusruntime_env)r}   r~   zcurrent platform z does not support ray.z"Using the existing placement group   z/Placement group bundle cannot have more than 1 .zThe number of required z(s exceeds the total number of available z6s in the placement group. Required number of devices: z. Total number of devices: zANo current placement group found. Creating a new placement group.z\The number of required %ss exceeds the total number of available %ss in the placement group.c                    s   g | ]} d iqS )g      ?r(   ).0rt   rY   r(   r)   
<listcomp>_  s    z*initialize_ray_cluster.<locals>.<listcomp>zCurrent node has no z" available. current_node_resource=z#. vLLM engine cannot start without z . Make sure you have at least 1 z% available in a node current_node_id=z current_ip=gMbP?znode:ZPACK)ZstrategyN)rV   vllm.platformsr   r-   r\   rb   rn   Zis_rocmZis_xpuinitConnectionErrorrc   Z
world_sizeZray_runtime_envr2   rT   r4   rW   rv   rw   ri   ro   cluster_resourcesranger   r.   r/   r   ru   rd   )rX   r{   r   re   r[   Zdevice_bundlesZbundleZbundle_devicesZnum_devices_in_clusterrq   Z
current_ipZcurrent_node_idZcurrent_node_resourcer(   r   r)   initialize_ray_cluster  s   








r   c                  C   s@   ddl m}  t }t|d }|  }|| dksJ || S )Nr   )TPUAcceleratorManagerZTPU)Zray._private.acceleratorsr   r-   r   rQ   Z!get_current_node_num_accelerators)r   r   Z
total_tpusZtpus_per_noder(   r(   r)   get_num_tpu_nodes~  s   r   c                  C   sn   t j } t j }d}|r5t }|  D ]\}}||j kr0|d  D ]	\}}|| q&qt	|}|S )Nr   rZ   )
r-   rv   r   rw   setr^   r`   hexaddra   )Zpg_tableZ
current_pgZ	num_nodesZnodes_in_pgZpg_keyry   rt   noder(   r(   r)    get_num_nodes_in_placement_group  s   

r   )re   r   r*   )=rH   rj   collectionsr   typingr   r   r   r   r   r   r   r   r0   Zvllm.configr	   Zvllm.executor.msgspec_utilsr
   r   Zvllm.loggerr   r   Zvllm.sequencer   r   Z
vllm.utilsr   Zvllm.worker.worker_baser   Zvllm.v1.core.sched.outputr   Zvllm.v1.outputsr   rL   rb   rl   r-   Zray.utilr   Zray.util.placement_groupr   Zray._private.stater   ImportErrorr   _stateZ_available_resources_per_noder   rU   erP   boolrS   rV   rd   ru   rz   r   rQ   r   r   r(   r(   r(   r)   <module>   sj    
j

.
)
p	