o
    )i:                     @   s  d Z ddlmZmZmZ ddlmZ ddlmZ ddl	Z	ddl
mZmZmZmZmZ deded	ed
efddZdeded	ed
efddZdeded
efddZdededee dee d
eee ee f f
ddZded	edee dee dee	j dee	j ded
dfddZ		d)de	jde	jdeee	j  ded ed!eeeef  d
dfd"d#Zde	jd!eeef d$ed
e	jfd%d&Zde	jd!eeef d
e	jfd'd(Zd#gZdS )*zh
The actual execution of the rearrangement.

This involves the exchange of expert weights between GPUs.
    )IterableMutableSequenceSequence)partial)OptionalN)P2POpProcessGroup
all_gatherbatch_isend_irecvget_global_rank	local_idx	local_cntep_rankreturnc                 C   s   || |  S )z@
    Convert a local expert index to a global expert index.
     )r   r   r   r   r   s/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/distributed/eplb/rebalance_execute.pyidx_local_to_global      r   
global_idxc                 C   s   | ||  S )z@
    Convert a global expert index to a local expert index.
    r   )r   r   r   r   r   r   idx_global_to_local   r   r   c                 C   s   | | S )z8
    Convert a global expert index to a rank index.
    r   )r   r   r   r   r   global_idx_to_rank(   s   r   idxnum_local_expertsold_indicesnew_indicesc                    s   t t|d}g }g }t|D ]\}}|| kr'||}	|r"|d |	kr'||	 qt|D ]\}}|| krE||}	|r@|d |	krE||	 q,t|  fdd|D }
||
fS )a  
    Get the ranks of the experts that need to be exchanged.

    Args:
        idx: The index of the expert.
        num_local_experts: The number of local experts.
        old_indices: The old indices of the experts.
        new_indices: The new indices of the experts.

    Returns:
        A tuple of two lists:
        - The ranks of the experts that need to be sent.
        - The ranks of the experts that need to be received.
    )r   c                    s   g | ]}| vr|qS r   r   ).0rankZranks_to_send_setr   r   
<listcomp>\   s    z,get_ep_ranks_with_expert.<locals>.<listcomp>)r   r   	enumerateappendset)r   r   r   r   Zglobal2rankranks_to_sendranks_to_recvier   Zranks_to_recv_actualr   r   r   get_ep_ranks_with_expert2   s.   


r'   expert_weightsexpert_weights_bufferep_groupc                    sV  t t| |dfddt| D }|dd }t| D ]Bt| D ]7  |  r3q( dks? dkr@q(  kr_d| < t||D ]\}	}
|
  |	  qQq(qg }i }t| D ] }|dkrvqi||v r{qi||< qit| D ]U\}t|| \}}t|t| }|	|}|| }|| }||| }t|| }|| }|t|k r|
||  |D ] t| |fdd|D 7 }qqi }t| D ] |  rq  }|dkrq||v rq ||< qt| D ]D\} t|| \}}t|t| }|	|}t|| }||k r0|||  n|||  t|| fdd|D 7 }q|rZt|}|D ]}|  qRt| D ]J |  rgq^|  rt||D ]\}	}
|	  |
   qqq^  }|dkrq^|| t||D ]\}	}
|	  |
  qq^dS )	z<
    Perform expert weights rearrangement of one layer.
    )r   r   c                    s$   g | ]} |  | kqS r   r   )r   r%   )local2globalr   r   r   r   r   v   s    z!shuffle_layer.<locals>.<listcomp>Nr   Tc                    s    g | ]}t tjj|  qS r   )r   torchdistributedZisendr   weight)
dst_globalsrcr   r   r          c                    s    g | ]}t tjj|  qS r   )r   r,   r-   Zirecvr.   )dst
src_globalr   r   r      r2   )r   r   rangezipZcopy_sorteditemsr'   lenindexr!   r   r
   wait)r   r   r   r   r(   r)   r*   Zis_unchangedZis_received_locallyr/   bufferZp2p_opsZexperts_send_locZexpertr#   r$   Znum_dst_per_senderZ
sender_posZ
recv_beginZrecv_endZ
recv_ranksZremainder_startZ
recver_posZexperts_recv_locreqsreqr   )r3   r0   r+   r   r   r1   r4   r   shuffle_layerc   s   







	


r?   Fold_global_expert_indicesnew_global_expert_indices
is_profilerank_mappingc              	      sR  |durt || krt||}nt| || } | jd |jd ks&J | j\}}t ||ks3J tt|d jd }|j||fksGJ | }	| }
||
| ksWJ dd |d D }|rt|d |D ]\}  fddt	|
D }t
j  t|||d qidS t	|D ]}t
j  t||	| |  ||  || || qdS )a  
    Rearranges the expert weights in place according to the new expert indices.

    The value of the indices arguments are logical indices of the experts,
    while keys are physical.

    Args:
        old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        expert_weights: A sequence of shape (num_moe_layers)(weight_count)
            of tensors of shape (num_local_physical_experts, hidden_size_i).
            For example, a linear layer may have up and down projection,
            so weight_count = 2. Each weight's hidden size can be different.
        ep_group: The device process group for expert parallelism.
        is_profile (bool): If `True`, do not perform any actual weight copy.
            This is used during profile run, where we only perform dummy
            communications to reserve enough memory for the buffers.
        rank_mapping: A dictionary mapping old rank to new rank.
    N   r   c                 S   s   g | ]}t |qS r   )r,   Z
empty_like)r   wr   r   r   r   1  s    z4rearrange_expert_weights_inplace.<locals>.<listcomp>c                    s   g | ]} qS r   r   )r   _r<   r   r   r   8  s    )group)r9   size)_map_new_expert_indices_with_rank_mapping)_map_old_expert_indices_with_rank_mappingshapenextiterr   r6   r5   r,   r-   Zbarrierr	   cudaZsynchronizer?   tolist)r@   rA   r(   r*   rB   rC   Znum_moe_layersZnum_physical_expertsnum_local_physical_expertsr   Zep_sizer)   r/   Zdummy_recv_bufferlayerr   rG   r    rearrange_expert_weights_inplace   sd   





rS   new_ep_sizec                 C   s   | j \}}|sJ dt|}|| }|| }tj||fd| j| jd}t|D ]9}	||	}
|
dura|
dkra|
|k ra|	| }|	d | }|
| }|
d | }| dd||f |dd||f< q(|S )a  
    Map the old global expert indices to the new global expert indices.
    
    Args:
        old_global_expert_indices:
            Shape (num_layers, old_ep_size * num_local_physical_experts).
        rank_mapping: Mapping from old rank to new rank.
        new_ep_size: New expert parallelism size.
    
    Returns:
        Mapped expert indices with shape
        (num_layers, new_ep_size * num_local_physical_experts).
    Rank mapping is requiredr   Z
fill_valuedtypedeviceNr   rD   )rL   r9   r,   fullrW   rX   r5   get)r@   rC   rT   
num_layersold_num_physical_expertsold_ep_sizerQ   new_num_physical_expertsmapped_expert_indicesold_ranknew_rankold_start_idxold_end_idxnew_start_idxnew_end_idxr   r   r   rK   R  s,   

rK   c                 C   s   | j \}}|sJ dt|}tdd | D }|| }|| }tj||fd| j| jd}t|D ]4}	||	 }
|
dkrg|
|k rg|	| }|	d | }|
| }|
d | }| d d ||f |d d ||f< q3|S )NrU   c                 s   s    | ]}|d kV  qdS )r   Nr   )r   ra   r   r   r   	<genexpr>  s    z<_map_new_expert_indices_with_rank_mapping.<locals>.<genexpr>r   rV   r   rD   )	rL   r9   sumvaluesr,   rY   rW   rX   r5   )rA   rC   r[   r^   r]   rT   rQ   r\   r_   r`   ra   rb   rc   rd   re   r   r   r   rJ     s.   
rJ   )FN)__doc__collections.abcr   r   r   	functoolsr   typingr   r,   Ztorch.distributedr   r   r	   r
   r   intr   r   r   tupler'   ZTensorr?   booldictrS   rK   rJ   __all__r   r   r   r   <module>   s   




1
 
^

4


"