o
    )iH                     @   s2  U d Z ddlZddlZddlZddlZddlmZ ddlmZmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZ ddlmZ ddlZddlZdd	lmZmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$m%Z%m&Z& e
G dd dZ'edg dZ(de)e*eej+ef f de,e-e,e*ef  e-ej+ f fddZ.i Z/e)e*e0f e1d< de*de*fddZ2i Z3e)e*eg ed f f e1d< dddZ4d ej+d!e*dej+fd"d#Z5d ej+d!e*dej+fd$d%Z6d ej+d&e0d'e0d!e*dej+f
d(d)Z7d ej+d&e0d'e0d!e*dej+f
d*d+Z8d ej+d&e0d'e0d!e*dej+f
d,d-Z9d ej+d&e0d'e0d!e*dej+f
d.d/Z:e& rVdd0l;m<Z< e#d#e5g e6e<j=d1 e#d)e7g e8e<j=d1 e#d-e9g e:e<j=d1 G d2d dZ>da?ee> e1d3< da@ee0 e1d4< de>fd5d6ZAd7e-e0 d8e0d9e*de>fd:d;ZB	<	dd=e-e-e0  d8e0d9e*d>eCd!ee* de>fd?d@ZDdaEee> e1dA< de>fdBdCZFedDdEdF ZGdaHee> e1dG< daIee> e1dH< de>fdIdJZJdaKee> e1dK< de>fdLdMZLde>fdNdOZMedPdQdR ZNedSejOfdTdUZPe!eQZRdVaSdWeCfdXdYZT	Z	Z	[	Z	\dd'e0d]e0d^e*d8e0d9e*f
d_d`ZU	a	a	ddbe0dce0d9ee* ddfdddeZV	ddbe0dce0d9ee* ddfdfdgZWdhejXjYfdidjZZdkdl Z[d<a\edme>fdndoZ]dpdq Z^drds Z_de0fdtduZ`dvdw Zadxdy ZbddzeCfd{d|Zc	dd}eeef d~e0de-eC fddZddeCfddZed}eeef de0fddZfdS )a  vLLM distributed state.
It takes over the control of the distributed environment from PyTorch.
The typical workflow is:

- call `init_distributed_environment` to initialize the distributed environment.
- call `initialize_model_parallel` or `ensure_model_parallel_initialized` to
 initialize the model parallel groups.

- any code dealing with the distributed stuff

- call `destroy_model_parallel` to destroy the model parallel groups.
- call `destroy_distributed_environment` to destroy the distributed environment.

If you only need to use the distributed environment without model/pipeline
 parallelism, you can skip the model parallel initialization and destruction
 steps.
    N)
namedtuple)contextmanagernullcontext)	dataclass)shared_memory)AnyCallableOptionalUnion)patch)BackendProcessGroup)
deprecated)DeviceCommunicatorBase)StatelessProcessGroup)init_logger)direct_register_custom_opget_distributed_init_methodresolve_obj_by_qualnamesupports_custom_opc                   @   s   e Zd ZU ejjed< dS )GraphCaptureContextstreamN)__name__
__module____qualname__torchcudaStream__annotations__ r   r   k/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/distributed/parallel_state.pyr   2   s   
 r   TensorMetadata)devicedtypesizetensor_dictreturnc              	   C   sl   g }g }|   D ])\}}t|tjr*|jj}||t||j|	 f || q|||f q||fS )zSplit the tensor dictionary into two parts:
    1. A list of (key, value) pairs. If the value is a tensor, it is replaced
         by its metadata.
    2. A list of tensors.
    )
items
isinstancer   Tensorr"   typeappendr!   r#   r$   )r%   metadata_listtensor_listkeyvaluer"   r   r   r    _split_tensor_dict:   s   r0   _group_name_counternamec                 C   s6   | t vrdt | < |  dt |   }t |   d7  < |S )z|Get a unique name for the group.
    Example:
    _get_unique_name("tp") -> "tp:0"
    _get_unique_name("tp") -> "tp:1"
    r   :   )r1   )r2   Znewnamer   r   r    _get_unique_nameV   s
   r5   GroupCoordinator_groupsgroupc                 C   s   t | t| j< d S N)weakrefrefr7   unique_namer8   r   r   r    _register_groupf   s   r>   tensor
group_namec                 C   sD   |t v sJ d| dt |  }|d u rtd| d|| S NzGroup z is not found.z is destroyed.)r7   
ValueError_all_reduce_out_place)r?   r@   r8   r   r   r    
all_reducej   s
   

rD   c                 C   s
   t | S r9   )r   Z
empty_like)r?   r@   r   r   r    all_reduce_faker   s   
rE   dim
world_sizec                 C   F   |t v sJ d| dt |  }|d u rtd| d|| |S rA   )r7   rB   _reduce_scatter_out_placer?   rF   rG   r@   r8   r   r   r    reduce_scatterv   
   
rK   c                 C   s0   t | j}| j| | ||< tj|| j| jdS Nr#   r"   listshaper   emptyr#   r"   r?   rF   rG   r@   Z	new_shaper   r   r    reduce_scatter_fake      
rT   c                 C   rH   rA   )r7   rB   _all_gather_out_placerJ   r   r   r    
all_gather   rL   rW   c                 C   s0   t | j}| j| | ||< tj|| j| jdS rM   rO   rS   r   r   r    all_gather_fake   rU   rX   current_platform)Zop_nameZop_funcZmutates_argsZ	fake_impldispatch_keyc                   @   s$  e Zd ZU dZeed< ee ed< eed< eed< eed< eed< eed< ee	 ed	< ee
 ed
< 		dgdeee  dedeeef dededee fddZedd Zedd Zedd Zedd Zedd Zedd Ze	dhd ee fd!d"Zd#ejd$ejfd%d&Zd#ejd$ejfd'd(Zdid#ejd*ed$ejfd+d,Zd#ejd*ed$ejfd-d.Z	/	djd#eejeej f d*ed0eee  fd1d2Z 	)did#ejd*ed$ejfd3d4Z!	)	dkd#ejd*ed0eee  d$ejfd5d6Z"d#ejd*ed$ejfd7d8Z#	/	)dld#ejd9ed*ed$eej fd:d;Z$dmd#ejd<efd=d>Z%dnd?ee
 d<efd@dAZ&	/	djdBee
 d<edCee fdDdEZ'd?e
d9ed$dfdFdGZ(d<ed$e
fdHdIZ)		/		dodJee*eeeje
f f  d<edCee dKee d$ee*eeeje
f f  f
dLdMZ+		dpdJe*eeeje
f f d9ee dNed  d$ee*eeeje
f f  fdOdPZ,		dpd<ee dNed  d$ee*eeeje
f f  fdQdRZ-dSdT Z.dhdUejd9ee d$dfdVdWZ/	dhdXej0dYej1d<ee d$ejfdZd[Z2d\d] Z3d^ej4j5fd_d`Z6daejdbejd$e7ejejf fdcddZ8d$ejfdedfZ9dS )qr6   aR  
    PyTorch ProcessGroup wrapper for a group of processes.
    PyTorch ProcessGroup is bound to one specific communication backend,
        e.g. NCCL, Gloo, MPI, etc.
    GroupCoordinator takes charge of all the communication operations among
        the processes in the group. It manages both CPU and device
        communication.
    rankranksrG   
local_rankrank_in_group	cpu_groupdevice_groupdevice_communicatormq_broadcasterFNgroup_rankstorch_distributed_backenduse_device_communicatoruse_message_queue_broadcasterr@   c                 C   s  |pd}t || _t|  tj | _|| _d }d }|D ]*}	tjj|	|d}
tjj|	dd}| j|	v rF|	| _	t
|	| _|	| j| _|
}|}q|d usMJ |d usSJ || _|| _ddlm} | rmtd| | _n%| r{td| | _n| rt|j d| | _ntd	| _|| _d | _|r| jd
krt| }|| j| j| j| jd| _ddlm} d | _|r| jd
kr|| jdd| _ddlm} | p| | _ |! ot"tj#j$d| _%d S )NZ	anonymous)backendgloor   rY   zcuda:zxpu:r3   cpur4   )r`   r"   ra   r<   )MessageQueuei  @    Zinit_shm_manager)&r5   r<   r>   r   distributedget_rankr\   r^   Z	new_groupr]   lenrG   indexr_   r`   ra   vllm.platformsrZ   Zis_cuda_aliker"   Zis_xpuZis_out_of_treeZdevice_namerf   rb   r   Zget_device_communicator_clsZ3vllm.distributed.device_communicators.shm_broadcastrk   rc   Zcreate_from_process_groupZis_tpuuse_custom_op_callis_cpuhasattrops_Cuse_cpu_custom_send_recv)selfrd   r^   re   rf   rg   r@   Zself_device_groupZself_cpu_groupr]   ra   r`   rZ   Zdevice_comm_clsrk   r   r   r    __init__   st   	




zGroupCoordinator.__init__c                 C   
   | j d S )z8Return the global rank of the first process in the groupr   r]   rx   r   r   r    
first_rank     
zGroupCoordinator.first_rankc                 C   rz   )z7Return the global rank of the last process in the groupr{   r|   r   r   r    	last_rank  r~   zGroupCoordinator.last_rankc                 C      | j | jkS )z;Return whether the caller is the first process in the group)r\   r}   r|   r   r   r    is_first_rank$     zGroupCoordinator.is_first_rankc                 C   r   )z:Return whether the caller is the last process in the group)r\   r   r|   r   r   r    is_last_rank)  r   zGroupCoordinator.is_last_rankc                 C   s   | j }| j}| j|d |  S )z=Return the global rank of the process that follows the callerr4   r_   rG   r]   rx   r_   rG   r   r   r    	next_rank.     zGroupCoordinator.next_rankc                 C   s   | j }| j}| j|d |  S )z>Return the global rank of the process that precedes the callerr4   r   r   r   r   r    	prev_rank5  r   zGroupCoordinator.prev_rankgraph_capture_contextc              	   c   s    |d u rt j }t|}n|j}t }ddlm} | jd ur4t	| j|s(J | jj
}|d ur4| }t j }||krB|| t j|& | |V  W d    n1 sYw   Y  W d    d S W d    d S 1 sqw   Y  d S )Nr   )CudaCommunicator)r   r   r   r   r   r   Z7vllm.distributed.device_communicators.cuda_communicatorr   rb   r(   ca_commcaptureZcurrent_streamZwait_stream)rx   r   r   Zmaybe_ca_contextr   r   Zcurr_streamr   r   r    graph_capture<  s$   




PzGroupCoordinator.graph_captureinput_r&   c                 C   s2   | j dkr|S | jrtjjj|| jdS | |S )a^  
        User-facing all-reduce function before we actually call the
        all-reduce operation.

        We need this because Dynamo does not support passing an arbitrary
        object (`self` in this case) to a custom op. We need to pass the
         group name as a string, and then look up the group coordinator from
         the group name, dispatch the all-reduce operation to the group
         coordinator.

        In addition, PyTorch custom ops do not support mutation or returning
        a new tensor in the same op. So we always make the all-reduce operation
        out-of-place.
        r4   r@   )rG   rr   r   ru   vllmrD   r<   rC   rx   r   r   r   r    rD   Y  s   


zGroupCoordinator.all_reducec                 C   s   | j d u r	td| j |S NNo device communicator found)rb   rB   rD   r   r   r   r    rC   r  s   
z&GroupCoordinator._all_reduce_out_placer   rF   c                 C   t   | j }|dkr	|S |  |  kr| k s%n J d| d|  | jr4tjjj|||| jdS | 	||S Nr4   zInvalid dim (z) for input tensor with shape r   )
rG   rF   r$   rr   r   ru   r   rW   r<   rV   rx   r   rF   rG   r   r   r    rW   w  s   "
zGroupCoordinator.all_gatherc                 C       | j d u r	td| j ||S r   )rb   rB   rW   rx   r   rF   r   r   r    rV        
z&GroupCoordinator._all_gather_out_placer   sizesc                 C   "   | j d u r	td| j |||S r   )rb   rB   all_gathervrx   r   rF   r   r   r   r    r        
zGroupCoordinator.all_gathervc                 C   r   r   )
rG   rF   r$   rr   r   ru   r   rK   r<   rI   r   r   r   r    rK     s   "
zGroupCoordinator.reduce_scatterc                 C   r   r   )rb   rB   reduce_scattervr   r   r   r    r     r   z GroupCoordinator.reduce_scattervc                 C   r   r   )rb   rB   rK   r   r   r   r    rI     r   z*GroupCoordinator._reduce_scatter_out_placedstc                 C   s4   | j }|dkr	|S | jdu rtd| j|||S )z
        NOTE: We assume that the input tensor is on the same device across
        all the ranks.
        NOTE: `dst` is the local rank of the destination rank.
        r4   Nr   )rG   rb   rB   gather)rx   r   r   rF   rG   r   r   r    r     s   	
zGroupCoordinator.gathersrcc                 C   F   || j k sJ d| d| j dkr|S tjj|| j| | jd |S )z^Broadcast the input tensor.
        NOTE: `src` is the local rank of the source rank.
        Invalid src rank ()r4   r   r8   )rG   r   rm   	broadcastr]   ra   )rx   r   r   r   r   r    r     s   
zGroupCoordinator.broadcastobjc                 C   s   || j k sJ d| d| j dkr|S | jdur'|dks!J d| j|S | j|kr<tjj|g| j| | jd |S dg}tjj|| j| | jd |d S )z^Broadcast the input object.
        NOTE: `src` is the local rank of the source rank.
        r   r   r4   Nr   z-Message queue broadcaster only supports src=0r   )	rG   rc   broadcast_objectr_   r   rm   broadcast_object_listr]   r`   )rx   r   r   recvr   r   r    r     s$   



z!GroupCoordinator.broadcast_objectobj_listr8   c                 C   r   )zcBroadcast the input object list.
        NOTE: `src` is the local rank of the source rank.
        r   r   r4   r   )rG   r   rm   r   r]   ra   )rx   r   r   r8   r   r   r    r     s   
z&GroupCoordinator.broadcast_object_listc                 C   s   	 || j k sJ d| d|| jksJ dtjt|tjd}tj| gtj	dd}tj
j|| j| | jd tj
j|| j| | jd dS )	z3Send the input object list to the destination rank.Invalid dst rank (r   zKInvalid destination rank. Destination rank is the same as the current rank.r#   rj   rN   r   r8   N)rG   r_   r   Z
frombufferpickledumpsuint8r?   numellongrm   sendr]   r`   )rx   r   r   object_tensorsize_tensorr   r   r    send_object  s&   zGroupCoordinator.send_objectc                 C   s   	 || j k sJ d| d|| jksJ dtjdtjdd}tjj|| j| | jd}tj|	 tj
dd}tjj|| j| | jd}||ksMJ dt|  }|S )	z3Receive the input object list from the source rank.r   r   zAInvalid source rank. Source rank is the same as the current rank.r4   rj   rN   r   z@Received object sender rank does not match the size sender rank.)rG   r_   r   rR   r   rm   r   r]   r`   itemr   r   loadsnumpytobytes)rx   r   r   Z	rank_sizer   Zrank_objectr   r   r   r    recv_object  s0   
zGroupCoordinator.recv_objectr%   metadata_groupc                 C   s  t j r
| jdkr|S | j}| j}|| jk sJ d| d| j}||krg }t|ts6J dt	| t
|\}}| j||d g }|D ],}	|	 dkrPqG|	jrat jj|	| j| |dd}
nt jj|	| j| |dd}
||
 qG|D ]}|  qv|S | jd	|d}i }g }|D ]K\}}t|trt j|j|j|jd
}	|	 dkr|	||< q|	jrt jj|	| j| |dd}
nt jj|	| j| |dd}
||
 |	||< q|||< q|D ]}|  q|S )ziBroadcast the input tensor dictionary.
        NOTE: `src` is the local rank of the source rank.
        r4   r   r   Expecting a dictionary, got r   r   T)r   r8   Zasync_opNrN   )r   rm   is_initializedrG   ra   r`   r_   r(   dictr*   r0   r   r   rs   r   r]   r+   waitr!   rR   r$   r#   r"   )rx   r%   r   r8   r   r_   r,   r-   Zasync_handlesr?   handleZasync_handler.   r/   r   r   r    broadcast_tensor_dict?  s   
#




z&GroupCoordinator.broadcast_tensor_dictall_gather_groupc                 C   sZ  t j r
| jdkr|S |du rdn|j}|du rdn|j}| j}| j}|du r0| jd | j }|| jk s=J d| d| jrR| jdu rIt	d| j
|| dS g }t|tsbJ dt| t|\}}	| j||d |	D ]9}
|
 dkrzqq|dur|
 | dkr|
|d	| }
|
jrt jj|
| j| |d
 qqt jj|
| j| |d
 qqdS )zdSend the input tensor dictionary.
        NOTE: `dst` is the local rank of the source rank.
        r4   Nr   r   r   r   r   )r   r   r   )r   rm   r   rG   r_   ra   r`   rw   rb   rB   send_tensor_dictr(   r   r*   r0   r   r   reshapers   r   r]   )rx   r%   r   r   all_gather_sizeall_gather_rankr8   r   r,   r-   r?   r   r   r    r     sZ   

z!GroupCoordinator.send_tensor_dictc                 C   s  t j r
| jdkrdS |du rdn|j}|du rdn|j}| j}| j}|du r0| jd | j }|| jk s=J d| d| jrO| jdu rIt	d| j
|S | j|d}i }|D ]k\}	}
t|
trt j|
j|
j|
jd}| dkrx|||	< qY|duo| | dk}|r|j}||d	| }|jrt jj|| j| |d
 nt jj|| j| |d
 |r|j|dd}||}|||	< qY|
||	< qY|S )zdRecv the input tensor dictionary.
        NOTE: `src` is the local rank of the source rank.
        r4   Nr   r   r   r   r   rN   r   r   )rF   )r   rm   r   rG   r_   ra   r`   rw   rb   rB   recv_tensor_dictr   r(   r!   rR   r$   r#   r"   r   rQ   r   rs   r   r]   rW   )rx   r   r   r   r   r8   r   Zrecv_metadata_listr%   r.   r/   r?   Zuse_all_gatherZ
orig_shaper   r   r    r     sr   	




z!GroupCoordinator.recv_tensor_dictc                 C   s   t jj| jd dS )a+  Barrier synchronization among the group.
        NOTE: don't use `device_group` here! `barrier` in NCCL is
        terrible because it is internally a broadcast operation with
        secretly created GPU tensors. It is easy to mess up the current
        device. Use the CPU group instead.
        r=   N)r   rm   barrierr`   r|   r   r   r    r     s   zGroupCoordinator.barrierr?   c                 C   s&   	 | j du r
td| j || dS )z8Sends a tensor to the destination rank in a blocking wayNr   )rb   rB   r   )rx   r?   r   r   r   r    r   $  s   
zGroupCoordinator.sendr$   r#   c                 C   s$   	 | j du r
td| j |||S )z'Receives a tensor from the source rank.Nr   )rb   rB   r   )rx   r$   r#   r   r   r   r    r   +  s   
zGroupCoordinator.recvc                 C   sd   t | drtj| j | `t | drtj| j | `| jd ur&| j  | jd ur0d | _d S d S )Nra   r`   )	rt   r   rm   destroy_process_groupra   r`   rb   destroyrc   r|   r   r   r    r   5  s   





zGroupCoordinator.destroymodelc                 C   s   | j d ur| j | d S d S r9   )rb   &prepare_communication_buffer_for_model)rx   r   r   r   r    r   A  s
   
z7GroupCoordinator.prepare_communication_buffer_for_modelhidden_statesrouter_logitsc                 C   s    | j d ur| j ||S ||fS r9   )rb   dispatch)rx   r   r   r   r   r    r   F  s
   
zGroupCoordinator.dispatchc                 C   s   | j d ur| j |S |S r9   )rb   combine)rx   r   r   r   r    r   O  s   
zGroupCoordinator.combineFNr9   )r   )r   N)r   N)r   r   r   )Nr   )Nr   NN)NN):r   r   r   __doc__intr   rP   r   r	   r   r   r
   strr   boolry   propertyr}   r   r   r   r   r   r   r   r   r   r)   rD   rC   rW   rV   r   rK   r   rI   r   r   r   r   r   r   r   r   r   r   r   r   Sizer#   r   r   nnModuler   tupler   r   r   r   r   r    r6      sJ  
 



L















$
X
A
H	



	_WORLD_NODE_COUNTc                   C      t d usJ dt S )Nzworld group is not initialized)r   r   r   r   r    get_world_groupZ     r   r]   r^   rh   c                 C   s   t | g||dddS )NFZworld)rd   r^   re   rf   r@   r6   )r]   r^   rh   r   r   r    init_world_group_  s   r   Frd   rg   c                 C   s   t | ||d||dS )NT)rd   r^   re   rf   rg   r@   r   )rd   r^   rh   rg   r@   r   r   r    init_model_parallel_groupj  s   r   _TPc                   C   r   )Nz.tensor model parallel group is not initialized)r   r   r   r   r    get_tp_group  r   r   z`get_tensor_model_parallel_group` has been replaced with `get_tp_group` and may be removed after v0.12. Please use `get_tp_group` instead.c                   C      t  S r9   )r   r   r   r   r    get_tensor_model_parallel_group     r   _PP_DPc                   C   r   )Nz&data parallel group is not initialized)r   r   r   r   r    get_dp_group  r   r   _EPc                   C   r   )Nz(expert parallel group is not initialized)r   r   r   r   r    get_ep_group  r   r   c                   C   r   )Nz0pipeline model parallel group is not initialized)r   r   r   r   r    get_pp_group  s   
r   z`get_pipeline_model_parallel_group` has been replaced with `get_pp_group` and may be removed in v0.12. Please use `get_pp_group` instead.c                   C   r   r9   )r   r   r   r   r    !get_pipeline_model_parallel_group  r   r   r"   c              	   c   s    t tjj| d}t |* t | |V  W d   n1 s%w   Y  W d   dS W d   dS 1 s=w   Y  dS )aA  
    `graph_capture` is a context manager which should surround the code that
    is capturing the CUDA graph. Its main purpose is to ensure that the
    some operations will be run after the graph is captured, before the graph
    is replayed. It returns a `GraphCaptureContext` object which contains the
    necessary data for the graph capture. Currently, it only contains the
    stream that the graph capture is running on. This stream is set to the
    current CUDA stream when the context manager is entered and reset to the
    default stream when the context manager is exited. This is to ensure that
    the graph capture is running on a separate stream from the default stream,
    in order to explicitly distinguish the kernels to capture
    from other kernels possibly launched on background in the default stream.
    )r"   N)r   r   r   r   r   r   r   )r"   contextr   r   r    r     s   Pr   Tenablec                 C   s   | a d S r9   )_ENABLE_CUSTOM_ALL_REDUCE)r   r   r   r    set_custom_all_reduce     r   r   env://ncclr\   distributed_init_methodc                 C   sP  t d| |||| ddlm} | }|d ur>|jjdkr>|j}|j|  | }|j} |j}|	 }	t
||	}t d| || tj sl|d usKJ dtj|sbt d| tj s`J dd	}tjj||| |d
 |dkrz|dkrxtj}n|}td u rtttj }
t|
||attjat dt d S tjtj ksJ dd S )NzIworld_size=%d rank=%d local_rank=%d distributed_init_method=%s backend=%sr   get_current_vllm_configr4   zAAdjusting world_size=%d rank=%d distributed_init_method=%s for DPzRdistributed_init_method must be provided when initializing distributed environmentz>Distributed backend %s is not available; falling back to gloo.z'Fallback Gloo backend is not available.ri   )rh   Zinit_methodrG   r\   r   r   z0Detected %d nodes in the distributed environmentz;world group already initialized with a different world size)loggerdebugvllm.configr   parallel_configdata_parallel_sizeZdata_parallel_rankZworld_size_across_dpZdata_parallel_master_ipZget_next_dp_init_portr   infor   rm   r   Zis_backend_availablewarningZis_gloo_availableZinit_process_groupenvsZ
LOCAL_RANKr   rP   rangeget_world_sizer   _node_countr`   r   rG   )rG   r\   r   r^   rh   r   configr  ipportr]   r   r   r    init_distributed_environment  sf   



r  r4   tensor_model_parallel_sizepipeline_model_parallel_sizec           
   	   C   s  t j sJ t j }t j }|pt jt j}d}ddlm	} | }|dur.|j
j}t |d||| }tdu sAJ d|d| d}	dd |	D }	t|	t j|d	d
datdu sdJ d|ddd|d}	dd |	D }	t|	t j|ddatdu sJ d|ddd|d}	dd |	D }	t|	t j|ddatdu sJ d|ddd||  d}	dd |	D }	t|	t j|ddatd||tjtjtjtj dS )a  
    Initialize model parallel groups.

    Arguments:
        tensor_model_parallel_size: number of GPUs used for tensor model
            parallelism.
        pipeline_model_parallel_size: number of GPUs used for pipeline model
            parallelism.
        backend: name of torch distributed communication backend.

    Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
    create 4 tensor model-parallel groups and 2 pipeline model-parallel groups:
        4 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7]
        2 pipeline model-parallel groups:
            [g0, g2, g4, g6], [g1, g3, g5, g7]
    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    r4   r   r   Nr   z2tensor model parallel group is already initializedc                 S      g | ]}|  qS r   tolist.0xr   r   r    
<listcomp>D      z-initialize_model_parallel.<locals>.<listcomp>Ttp)rg   r@   z4pipeline model parallel group is already initialized      c                 S   r  r   r  r  r   r   r    r  S  r  ppr   z*data parallel group is already initializedc                 S   r  r   r  r  r   r   r    r  ^  r  Zdpz,expert parallel group is already initializedc                 S   r  r   r  r  r   r   r    r  h  r  epzVrank %s in world size %s is assigned as DP rank %s, PP rank %s, TP rank %s, EP rank %s)r   rm   r   r  rn   get_backendr   ra   r  r   r  r  Zaranger   r   viewZunbindr   r^   r   Z	transposer   r   r   r  r_   )
r  r  rh   rG   r\   r  r   r
  Z	all_ranksrd   r   r   r    initialize_model_parallel
  s   




r  c                 C   sr   |p	t jt j}t st| || dS t | ks%J dt d| t j	}||ks7J d|d|dS )zHelper to initialize model parallel groups if they are not initialized,
    or ensure tensor-parallel and pipeline-parallel sizes are equal to expected
    values if the model parallel groups are initialized.
    Nzotensor parallel group already initialized, but of unexpected size. got: get_tensor_model_parallel_world_size()=z( vs. wanted: tensor_model_parallel_size=zXpipeline parallel group already initialized, but of unexpected size. got: pp_world_size=z* vs. wanted: pipeline_model_parallel_size=)
r   rm   r  r   ra   model_parallel_is_initializedr  $get_tensor_model_parallel_world_sizer   rG   )r  r  rh   Zpp_world_sizer   r   r    !ensure_model_parallel_initializedu  s.   
	
r"  r   c                 C   sP   t dur	t |  tdurt|  tdurt|  tdur&t|  dS dS )a1  Prepare the communication buffer for the model.
    Traditional communication libraries like NCCL are almost
    model agnostic. However, emerging new communication libraries like
    MoE all2all (DeepEP) usually allocate the communication buffer
    based on the model shape for optimal performance.
    N)r   r   r   r   r   )r   r   r   r    r     s   


r   c                   C   s   t duotduS )z=Check if tensor and pipeline parallel groups are initialized.N)r   r   r   r   r   r    r     s   r   tp_groupc                 c   s<    t rJ dda t }| az
dV  W da |adS da |aw )a  Patch the tp group temporarily until this function ends.

    This method is for draft workers of speculative decoding to run draft model
    with different tp degree from that of target model workers.

    Args:
        tp_group (GroupCoordinator): the tp group coordinator
    z)Should not call when it's already patchedTNF)_TP_STATE_PATCHEDr   r   )r#  Zold_tp_groupr   r   r    patch_tensor_parallel_group  s   r%  c                   C      t  jS )z6Return world size for the tensor model parallel group.)r   rG   r   r   r   r    r!    r   r!  c                   C   r&  )z3Return my rank for the tensor model parallel group.)r   r_   r   r   r   r    get_tensor_model_parallel_rank  r   r'  c                   C   s   t dusJ dt S )zAReturn the total number of nodes in the distributed environment. Nz*distributed environment is not initialized)r   r   r   r   r    get_node_count  s   
r(  c                   C   sD   t rt   da trt  datrt  datrt  dadS )z(Set the groups to none and destroy them.N)r   r   r   r   r   r   r   r   r    destroy_model_parallel  s   r)  c                   C   s0   t rt   d a d atj rtj  d S d S r9   )r   r   r   r   rm   r   r   r   r   r   r    destroy_distributed_environment  s   
r*  shutdown_rayc                 C   s   t   t  | rdd l}|  t  ddlm} |j}|d ur$|  z|	 s1t
j  W d S W d S  tyB   td Y d S w )Nr   rY   z;torch._C._host_emptyCache() only available in Pytorch >=2.5)r)  r*  rayshutdowngcZcollectrq   rZ   empty_cachers   r   rv   Z_host_emptyCacheAttributeErrorr   r  )r+  r,  rZ   r/  r   r   r    cleanup_dist_env_and_memory  s&   
r1  pgsource_rankc              
   C   s  t | tr)tj| tjjjksJ dtjj| d}tjj| d}tj	| }n| j
}| j}tt|}tjdg| tjd}d}d}zztt ||krtjddd	}||jdt|< t | trstjj|jg|| | d
 n| j|j|d d||< nKt | trdg}tjj||| | d
 |d }	n| jd|d}	tddd  tj|	d}W d   n1 sw   Y  |jdt| |krd||< W d   n1 sw   Y  W n ty }
 ztd|
 W Y d}
~
nd}
~
ww W |r|  n	|r|  w w t | trtjj| d n|   tt ||kr)|r)|   W d   n	1 s4w   Y  t | trJtjj!|| d |}nt"|}t|D ]}| j||d}||7 }qSdd |# D S )z
    This is a collective operation that returns if each rank is in the same node
    as the source rank. It tests if processes are attached to the same
    memory system (shared access to shared memory).
    z;in_the_same_node_as should be tested with a non-NCCL group.r=   r   r   s   magic_messageNT   )creater$   r   r   r4   z)multiprocessing.resource_tracker.registerc                  _   s   d S r9   r   )argskwargsr   r   r    <lambda>>  s    z%in_the_same_node_as.<locals>.<lambda>)r2   z(Error ignored in is_in_the_same_node: %sc                 S   s   g | ]}|d kqS )r4   r   r  r   r   r    r  [  r  z'in_the_same_node_as.<locals>.<listcomp>)$r(   r   r   rm   r  r   ZNCCLrn   r  Zget_process_group_ranksr\   rG   rP   r  r?   Zint32
contextlibsuppressOSErrorr   ZSharedMemorybufro   r   r2   Zbroadcast_objr   	Exceptionr   errorcloser   unlinkrD   Z
zeros_liker  )r2  r3  r\   rG   r]   Zis_in_the_same_nodeZmagic_messageZshmr   r2   eZaggregated_dataiZ	rank_datar   r   r    in_the_same_node_as	  s   







rC  c                   C   sF   zt dur	t jW S tj sW dS tj dkW S  ty"   Y dS w )a  
    Check if the current process is the first rank globally across all
    parallelism strategies (PP, TP, DP, EP, etc.).

    Unlike group-specific checks like `get_tensor_model_parallel_rank() == 0`
    or `get_pp_group().is_first_rank`, this function checks the global rank
    across all parallelism dimensions.

    Returns:
        bool: True if this is the global first rank (rank 0), False otherwise.
              Returns True if distributed is not initialized (single process).
    NTr   )r   r   r   rm   r   rn   r=  r   r   r   r    is_global_first_rank^  s   
rD  c                 C   s   t | trtjj| d}n| j}|dkrdS dg| }d}t|D ]+}|| dkr*q!|d7 }|||< t| |}t|D ]\}}|rK|| dkrK|||< q;q!|S )z
    Returns the total number of nodes in the process group.

    Args:
        pg: The process group to analyze

    Returns:
        int: The total number of nodes
    r=   r4   r   )	r(   r   r   rm   r  rG   r  rC  	enumerate)r2  rG   Znode_assignmentZnext_node_idZcurrent_rankZsame_node_flagsZ
other_rankZis_same_noder   r   r    r	  }  s&   



r	  )r8   r6   r&   Nr   )r   r   r   r   r   )r4   r4   Nr9   )Fr   )gr   r9  r.  r   r:   collectionsr   r   r   dataclassesr   multiprocessingr   typingr   r   r	   r
   Zunittest.mockr   r   Ztorch.distributedr   r   Ztyping_extensionsr   Z	vllm.envsr  Z>vllm.distributed.device_communicators.base_device_communicatorr   Zvllm.distributed.utilsr   Zvllm.loggerr   Z
vllm.utilsr   r   r   r   r   r!   r   r   r)   r   rP   r0   r1   r   r   r5   r7   r>   rD   rE   rK   rT   rW   rX   rq   rZ   r[   r6   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r"   r   r   r   r   r   r  r  r"  r   r   r   r   r$  r%  r!  r'  r(  r)  r*  r1  rC  rD  r	  r   r   r   r    <module>   sh  
 

	

	
	     *





B
n


U