o
    )iZ                     @   s   d Z ddlZddlmZ ddlmZ ddlmZmZ ddl	Z	ddl
mZmZ ddlmZ ddlmZmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ eeZeG dd dZdeeef de e!e!f de!fddZ"dS )a  
Expert parallelism load balancer (EPLB) metrics and states.

# Glossary

- **Logical Expert**: An expert that is part of the model's logical structure.
  It holds a set of weights and is replicated across multiple physical
  experts.
- **Redundant Expert**: To achieve load balancing, for some popular logical
  experts, we create additional copies of the expert weights. During inference,
  each of these copies can be routed to by the same set of tokens.
- **Physical Expert**: An expert that is instantiated on a specific device.
  It is a replica of a logical expert and can be rearranged across devices.
  I.e., one logical expert may have multiple sets of weights initialized on
  different devices, and each of these sets is a physical expert.
- **Local Physical Expert**: A physical expert that is instantiated on the
  current device.

For example: DeepSeek-R1 has 256 logical experts, so each MoE layer
has 256 sets of linear layer weights in the model parameters. If we add 32
redundant experts, DeepSeek-R1 will have 256 + 32 = 288 physical experts in
total. And when deploying, we'll have 288 sets of linear layer weights for each
MoE layer. If we have 32 EP ranks, then each GPU will hold 288 / 32 = 9 local
physical experts.
    N)Sequence)	dataclass)OptionalUnion)ProcessGroup
all_reduce)ParallelConfig)get_ep_groupget_node_countin_the_same_node_as)StatelessProcessGroup)init_logger)MixtureOfExperts   )rebalance_experts) rearrange_expert_weights_inplacec                   @   sr  e Zd ZU dZejed< 	 ejed< 	 ejed< 	 ejed< 	 ejed< 	 dZeed< 	 dZ	eed	< 	 dZ
eed
< 	 dZeed< 	 edededee fddZe			d&dedejdedeej deej deeeef  dd fddZ			d'dededededdf
ddZ		 		d(deded!edeej deeeef  ddfd"d#Zedeejejf fd$d%ZdS ))	EplbStatezEPLB metrics.physical_to_logical_maplogical_to_physical_maplogical_replica_countexpert_load_passexpert_load_windowr   expert_load_window_stepexpert_load_window_sizeexpert_rearrangement_step"expert_rearrangement_step_intervalnum_routed_expertsnum_redundant_expertsreturnc                    s*   t t }| fddt|D 7 }|S )aj  
        Build an initial expert arrangement using the following structure:
        [original routed experts, redundant experts]

        Returns:
            physical_to_logical_map (Sequence[int]): A list of integers,
                where each integer is the index of the logical expert
                that the corresponding physical expert maps to.
        c                    s   g | ]}|  qS  r   ).0ir   r   l/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/distributed/eplb/eplb_state.py
<listcomp>   s    zJEplbState.build_initial_global_physical_to_logical_map.<locals>.<listcomp>)listrange)r   r   Zglobal_physical_to_logical_mapr   r"   r#   ,build_initial_global_physical_to_logical_map   s
   z6EplbState.build_initial_global_physical_to_logical_mapNmodeldeviceparallel_configglobal_expert_loadold_global_expert_indicesrank_mappingc              
   C   s~  |  |j|j}tj||d}d}	|j|	ks!J d|j d|	 |	d }
tj|j|
fd|d}tj|jf|tjd}t	|j
D ]}|| }||||| f< ||  d7  < q@|d|jd }|d|jdd }|d|jd }tj|j|j
ftj|d	}|j}tj||j|j
ftj|d	}|j}td||d
  }|durt j}|j|j|jfksJ |jtjksJ |j
}|j}t }| }|| dkrd}td|d| t|||||\}}}|jd }||jd ksJ tjjj |d|jd | fdd}|!|}|"| |"| |#||| |dur3t$|||j%|d| d}| ||||||||dS )z/
        Build the initial EPLB state.
        )r)   i  znum_redundant_experts z must be less than or equal to r   )r)   dtyper   r/   r)      NTnum_gpus % num_nodes != 0, not using hierarchical rearrangement algorithm.
num_gpus=, num_nodes=valueF)r   r   r   )&r'   r   r   torchtensorfullnum_logical_expertszeroslongr&   num_physical_experts	unsqueezeexpandnum_moe_layers
contiguousint32Zeplb_window_sizeeplb_step_intervalmaxr	   device_groupshaper/   int64num_expert_groupsr
   sizeloggerwarning_oncer   nn
functionalpadtocopy_Zset_eplb_stater   expert_weights)clsr(   r)   r*   r+   r,   r-   Zphysical_to_logical_map_listr   ZMAX_EXPERT_REDUNDANCYZmax_slots_per_logical_expertr   r   r!   Zlogical_idxr   r   r   rB   r   ep_groupnum_replicas
num_groups	num_nodesnum_gpusnew_physical_to_logical_mapnew_logical_to_physical_mapnew_logical_replica_countmax_physical_slotsr   r   r#   build   s  








	



zEplbState.buildFis_dummy
is_profile	log_statsc                 C   sP  |r| j |dd dS |r| j  |ro| j }t j}t||d ||jd |	 dj
dd }|jddj
dd}|jddjj
dd}	t||	g }
|
\}}|dkr_|| nd}| dkrotd	||| |s| j | j| j< |  jd
7  _| j| jkrd| _| j  |  jd
7  _| j| jkrd| _|  | dS dS )aD  
        Step the EPLB state.

        Args:
            model (MixtureOfExperts): The MoE model.
            is_dummy (bool): If `True`, this is a dummy step and the load
              metrics recorded in this forward pass will not count. Defaults
              to `False`.
            is_profile (bool): If `True`, perform a dummy rearrangement
              with maximum communication cost. This is used in `profile_run`
              to reserve enough memory for the communication buffer.
            log_stats (bool): If `True`, log the expert load metrics.

        # Stats
            The metrics are all summed up across layers.
            - `avg_tokens`: The average load across ranks.
            - `max_tokens`: The maximum load across ranks.
            - `balancedness`: The ratio of average load to maximum load.
        T)r]   Ngroupr   r.   dimg        z<EPLB step: avg_tokens=%.2f, max_tokens=%d, balancedness=%.4fr   )	rearranger   Zzero_cloner	   rD   r   ZreshaperE   rH   sumfloatmeanrC   valuesr6   stacktolistrankrI   infor   r   r   r   r   )selfr(   r\   r]   r^   Ztotal_expert_load_passrR   Znum_tokens_per_rankZavg_tokens_tensorZmax_tokens_tensorZtokens_tensorsZ
avg_tokensZ
max_tokensZbalancednessr   r   r#   stepI  s^   



zEplbState.stepTexecute_shufflec                 C   s  t  j}| }d}|dk}	|	r#tj  t }t	d|r dnd |du rtj
| j|j|j| jj| jjd}
|
jd| jd| j | jd |sjtj|j|j| jjd	 gtjd
d}tjj|t  jdd |
jdd}t||d |s| j}tjj||dd |S n|sJ |}|j}|j}|durt ||! krt  j}t"||}tdd |# D }||!  | }nt$ }|! }|| dkrd	| _%t&d|d| t'|||||\}}}t(| j||j)||| |s9| jjd	 |jd	 kr|*| jj| _n| j+| |jd }|| j,jd ksJ tj-j.j/|d| j,jd | fdd}| j,+| | j0+| |	r\|dusCJ tj  t }t	d|rTdnd||  dS dS )zF
        Rearrange the experts according to the current load.
        Nr   zRearranging experts %s...z	(profile) r0   r.   )rb   indexsrcr   cpur`   Z	group_srcra   r_   c                 s   s    | ]}|d kV  qdS )r.   Nr   )r    Znew_rankr   r   r#   	<genexpr>  s    z&EplbState.rearrange.<locals>.<genexpr>r2   r3   r4   z$Rearranged experts%sin %.2f seconds.z (profile)  )1r	   rD   rk   r6   cudaZsynchronizetimeperf_counterrI   rl   r:   r   r?   r9   r   r/   r)   Zscatter_add_r   r=   Z	expand_asr;   r7   rE   rA   distributed	broadcast	cpu_groupre   r   r<   rG   lenrH   _node_count_with_rank_mappingrh   r
   rU   rJ   r   r   rP   rN   rO   r   rK   rL   rM   r   )rm   r(   r]   ro   r+   r-   rR   Zep_rankZ
time_startZis_main_rankZlogical_expert_load_windowmetadataZglobal_expert_load_windowr,   rS   rT   r|   rU   rV   rW   rX   rY   rZ   Ztime_endr   r   r#   rc     s   







	

zEplbState.rearrangec                  C   s   t  } tjdtjdd}tjj|| jdd | \}}}tj||ftj	| j
d}t|| jd tj||ftj	| j
d}tjj|| jdd ||fS )zQ
        Receive the expert load and old placement from the master rank.
           rs   r0   r   rt   r_   )r	   r6   emptyrA   rz   r{   r|   rj   r:   rF   r)   r   rD   )rR   r   r?   r9   Znum_old_physical_expertsr+   r,   r   r   r#   
recv_state(  s0   zEplbState.recv_state)NNN)FFF)FTNN)__name__
__module____qualname____doc__r6   ZTensor__annotations__r   intr   r   r   staticmethodr   r'   classmethodr   r)   r   r   dictr[   boolrn   rc   tupler   r   r   r   r#   r   2   s   
 




	 
U
  r   pgr-   r   c           	      C   s   t | trtjj| d}n| j}|dkrdS dg| }d}t|D ]8}|| dkr*q!||v s0J || dkr7q!|d7 }|||< t| |}t|D ]\}}|rX|| dkrX|||< qHq!|S )Nr_   r   r   r.   )	
isinstancer   r6   rz   Zget_world_size
world_sizer&   r   	enumerate)	r   r-   r   Znode_assignmentZnext_node_idZcurrent_rankZsame_node_flagsZ
other_rankZis_same_noder   r   r#   r~   F  s,   


r~   )#r   rx   collections.abcr   dataclassesr   typingr   r   r6   Ztorch.distributedr   r   Zvllm.configr   Zvllm.distributed.parallel_stater	   r
   r   Zvllm.distributed.utilsr   Zvllm.loggerr   Z%vllm.model_executor.models.interfacesr   Zrebalance_algor   Zrebalance_executer   r   rI   r   r   r   r~   r   r   r   r#   <module>   s8       

