o
    81 i                     @   sF  d dl mZ d dlZd dlmZ d dlmZ deejvr#ejjej_	deejvr0ejj
ej_d$ded	ed
efddZd$ded	ed
efddZd$ded	ed
efddZG dd dejjZejZG dd dejjZejZG dd dejjZejZdejjd	efddZdejjd	efddZd%dededed ed!ef
d"d#ZdS )&    )OptionalN)Tensor)ProcessGroupall_gather_into_tensorreduce_scatter_tensorFinput_process_groupasync_opc                 C   s^   t j|}t j|| jd  g| jdd  R | j| jd}t jj||  ||d}||fS Nr      )dtypedevicegroupr	   )	torchdistributedget_world_sizeemptyshaper   r   r   
contiguousr   r   r	   
world_sizeoutputhandle r   h/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/flash_attn/utils/distributed.pyall_gather_raw   s   r   c                 C   st   t j|}| jd | dksJ t j| jd | g| jdd  R | j| jd}t jj||  ||d}||fS r
   )	r   r   r   r   r   r   r   r   r   r   r   r   r   reduce_scatter_raw   s   r   c                 C   s"   |   } tjj| ||d}| |fS )Nr   )r   r   r   
all_reduce)r   r   r	   r   r   r   r   all_reduce_raw+   s   r   c                   @   <   e Zd ZdZedededefddZedefdd	Zd
S )AllGatherFunc?Gather the input from sequence parallel region and concatenate.r   r   returnc                 C      || _ t||\}}|S N)r   r   ctxr   r   r   _r   r   r   forward4      zAllGatherFunc.forwardgrad_outputc                 C      t || j\}}|d fS r%   )r   r   r'   r+   Z
grad_inputr(   r   r   r   backward:      zAllGatherFunc.backwardN	__name__
__module____qualname____doc__staticmethodr   r   r)   r.   r   r   r   r   r!   1       r!   c                   @   r    )ReduceScatterFunczKReduce scatter the input from the sequence parallel region and concatenate.r   r   r#   c                 C   r$   r%   )r   r   r&   r   r   r   r)   G   r*   zReduceScatterFunc.forwardr+   c                 C   r,   r%   )r   r   r-   r   r   r   r.   M   r/   zReduceScatterFunc.backwardNr0   r   r   r   r   r7   D   r6   r7   c                   @   r    )AllReduceFuncr"   r   r   r#   c                 C   r$   r%   )r   r   r&   r   r   r   r)   Z   r*   zAllReduceFunc.forwardr+   c                 C   s   |d fS r%   r   )r'   r+   r   r   r   r.   `   s   zAllReduceFunc.backwardNr0   r   r   r   r   r8   W   r6   r8   modelc              	   C   sp   dd |   D }t| D ]&\}}t  tjj|tj|d|d W d    n1 s0w   Y  qd S )Nc                 S   "   i | ]\}}t |d dr||qS )Z_shared_paramsFgetattr.0namepr   r   r   
<dictcomp>l   
    z&sync_shared_params.<locals>.<dictcomp>r   )srcr   )named_parameterssorteditemsr   no_gradr   	broadcastZget_global_rank)r9   r   Zpamams_sharedr(   r@   r   r   r   sync_shared_paramsi   s   
rI   c                 C   s   dd |   D }dd t| D }|rOt + tj|}tjj||d t	|tj
||D ]	\}}|| q3W d    d S 1 sHw   Y  d S d S )Nc                 S   r:   )Z_sequence_parallelFr;   r=   r   r   r   rA   {   rB   z4allreduce_sequence_parallel_grad.<locals>.<dictcomp>c                 S   s   g | ]\}}|j qS r   )Zgrad)r>   r(   r@   r   r   r   
<listcomp>~   s    z4allreduce_sequence_parallel_grad.<locals>.<listcomp>)r   )rD   rE   rF   r   rG   _utilsZ_flatten_dense_tensorsr   r   zipZ_unflatten_dense_tensorsZcopy_)r9   r   Zparams_seqparallelZgradsZ	coalescedbufZsyncedr   r   r    allreduce_sequence_parallel_gradx   s   
"rN   r   dimr   
local_rankmultiple_ofr#   c                 C   s0   | | }|| }|| }|t ||k  }|| S )zGet the dim for the local rank derived from splitting dim on world_size processes.

    The split may not be even across the world_size processes.
    )int)rO   r   rP   rQ   ZmultipledivmodZlocal_multipler   r   r   get_dim_for_local_rank   s
   rU   )F)r   )typingr   r   r   Ztorch.distributedr   dirr   Z_all_gather_baser   Z_reduce_scatter_baser   boolr   r   r   ZautogradFunctionr!   applyZ
all_gatherr7   Zreduce_scatterr8   r   nnModulerI   rN   rR   rU   r   r   r   r   <module>   s(    $