o
    1 in                  	   @   s  d Z ddlZddlZddlZddlmZ ddlZddlZddl	m
  mZ ddlmZ ddlmZ ddlmZ eeZzddlmZ d	Zd
aW n eyU   d
Zd	aY nw z
ddlmZ d	ZW n eyk   d
ZY nw z
ddlm Z  d	Z!W n ey   d
Z!Y nw dd Z"dd Z#dd Z$dd Z%G dd de&Z'e' a(dd Z)ej*j+ddfde,de,de-de,fdd Z.ej*j+ddfde,d!ee, de-de,fd"d#Z/dgde-d$dfd%d&Z0dgde-d$e,fd'd(Z1dgde-d$e,fd)d*Z2dej3j4fde-fd+d,Z5dej3j4fd-e6de-fd.d/Z7dgde-fd0d1Z8ddej3j4fd2e,de-fd3d4Z9dddej3j4fd-e6d2e,d5e,de-fd6d7Z:dhd8e,de-fd9d:Z;	did8e,d;e,de-fd<d=Z<dgd-e6de-fd>d?Z=	dgd@e6dAe6de-fdBdCZ>dej3j4fd-e6de-fdDdEZ?dej3j4fde-fdFdGZ@dgd2e,de-fdHdIZA		djd2e,dJe,de-dKe,fdLdMZBdgd8e,de-fdNdOZC		djd8e,dPe,de-dKe,fdQdRZDdSe,fdTdUZEdgde-fdVdWZFdXdY ZGdZej*fd[d\ZHd]d^ ZIde,fd_d`ZJdadb ZKdcdd ZLdedf ZMdS )kz5APIs exposed under the namespace ray.util.collective.    N)List   )types)get_address_and_port)get_master_address_metadata_key)	NCCLGroupTF)TorchGLOOGroup)NixlBackendc                   C   s   t  rtrtd datS )NzqNCCL seems unavailable. Please install Cupy following the guide at: https://docs.cupy.dev/en/stable/install.html.F)rayZget_gpu_ids_LOG_NCCL_WARNINGloggerwarning_NCCL_AVAILABLE r   r   j/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/util/collective/collective.pynccl_available1   s   r   c                   C      t S N_TORCH_DISTRIBUTED_AVAILABLEr   r   r   r   gloo_available=   s   r   c                   C   r   r   r   r   r   r   r   torch_distributed_availableC      r   c                   C   r   r   )_NIXL_AVAILABLEr   r   r   r   nixl_availableG   r   r   c                   @   s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )GroupManagera  Use this class to manage the collective groups we created so far.

    Each process will have an instance of `GroupManager`. Each process
    could belong to multiple collective groups. The membership information
    and other metadata are stored in the global `_group_mgr` object.
    c                 C   s   i | _ i | _d S r   )_name_group_map_group_name_map)selfr   r   r   __init__S   s   
zGroupManager.__init__c                 C   sh  t |}|t jjkrtd|t jjks|t jjkrot|}|dkr4t \}}t	|| d|  n+t

 |r=|d nd }		 t|}
|
durKnt

 |	krYtd| d	t
d
 qAtd| t||||}n6|t jjkrt| td| t|||}n|t jjkrt| td| t }ntd| || j|< || j|< | j| S )zThe entry to create new collective groups in the manager.

        Put the registration and the group information into the manager
        metadata as well.
        zRay does not support MPI.r   :g     @@g      >@TNz:Timed out waiting for GLOO rendezvous metadata for group 'z'.g?z.Creating torch.distributed GLOO group: '{}'...zCreating NCCL group: '{}'...zCreating NIXL Backend: '{}'...zUnexpected backend: )r   BackendZMPIRuntimeErrorGLOO
TORCH_GLOO_get_master_addr_key_get_address_and_port_internal_kvZ_internal_kv_puttimeZ_internal_kv_getTimeoutErrorsleepr   debugformatr   NCCL_check_backend_availabilityr   NIXLr	   r   r   )r   backend
world_sizerank
group_namegloo_timeoutZmetadata_keyaddrportZ
deadline_smetagr   r   r   create_collective_groupW   sJ   








z$GroupManager.create_collective_groupc                 C   s
   || j v S r   )r   r   r3   r   r   r   is_group_exist   s   
zGroupManager.is_group_existc                 C   s(   |  |std| dS | j| S )z,Get the collective group handle by its name.z"The group '{}' is not initialized.N)r;   r   r   r,   r   r:   r   r   r   get_group_by_name   s   

zGroupManager.get_group_by_namec                 C   sx   |  |std| dS | j| }| j|= | j|= |  d| }zt|}t	| W dS  t
y;   Y dS w )zGroup destructor.zThe group '{}' does not exist.Ninfo_)r;   r   r   r,   r   r   Zdestroy_groupr
   	get_actorkill
ValueError)r   r3   r8   namestorer   r   r   destroy_collective_group   s   


z%GroupManager.destroy_collective_groupN)	__name__
__module____qualname____doc__r   r9   r;   r<   rC   r   r   r   r   r   K   s    4r   c                 C   s
   t | S )zDCheck if the group is initialized in this process by the group name.)
_group_mgrr;   r3   r   r   r   is_group_initialized   s   
rJ   default0u  r1   r2   r3   r4   c                 C   sv   t   t|}t| |std|t|rtd| dks$J |dks*J || k s0J t	|| ||| dS )a=  Initialize a collective group inside an actor process.

    Args:
        world_size: the total number of processes in the group.
        rank: the rank of the current process.
        backend: the CCL backend to use, NCCL or GLOO.
        group_name: the name of the collective group.

    Returns:
        None
    z%group_name '{}' needs to be a string.#Trying to initialize a group twice.r   N)
_check_inside_actorr   r!   r.   r@   r,   rH   r;   r"   r9   )r1   r2   r0   r3   r4   r   r   r   init_collective_group   s   


rO   ranksc           
   	   C   s6  t |}t| d| }z	t| td ty   Y nw t|t| kr4tdt|t| t	|t	t
t|krRtdt|ddd |D |dkr]td	|t|dksgtd
t||k sqtdddlm} d| }dd | D }|j|dd }	t|	j|||||g dS )a  Declare a list of actors as a collective group.

    Note: This function should be called in a driver process.

    Args:
        actors: a list of actors to be set in a collective group.
        world_size: the total number of processes in the group.
        ranks (List[int]): the rank of each actor.
        backend: the CCL backend to use, NCCL or GLOO.
        group_name: the name of the collective group.

    Returns:
        None
    r=   rM   zHEach actor should correspond to one rank. Got '{}' ranks but '{}' actorsz5Ranks must be a permutation from 0 to '{}'. Got '{}'. c                 S   s   g | ]}t |qS r   )str).0rr   r   r   
<listcomp>  s    z+create_collective_group.<locals>.<listcomp>r   z/World size must be greater than zero. Got '{}'.zRanks must be non-negative.z(Ranks cannot be greater than world_size.)Infoc                 S   s   g | ]}|j qS r   )Z_ray_actor_id)rS   ar   r   r   rU     s    Zdetached)rA   ZlifetimeN)r   r!   r.   r
   r>   r"   r@   lenr,   setrangejoinallZray.util.collective.utilrV   optionsremotegetZset_info)
Zactorsr1   rP   r0   r3   r4   rA   rV   Z	actors_idinfor   r   r   r9      sB   

 r9   returnc                 C   s   t   t|  dS )z0Destroy a collective group given its group name.N)rN   rH   rC   rI   r   r   r   rC     s   rC   c                 C   "   t   t| s	dS t| }|jS )a  Return the rank of this process in the given group.

    Args:
        group_name: the name of the group to query

    Returns:
        the rank of this process in the named group,
        -1 if the group does not exist or the process does
        not belong to the group.
    )rN   rJ   rH   r<   r2   r3   r8   r   r   r   get_rank"  s
   
re   c                 C   rb   )a  Return the size of the collective group with the given name.

    Args:
        group_name: the name of the group to query

    Returns:
        The world size of the collective group, -1 if the group does
            not exist or the process does not belong to the group.
    rc   )rN   rJ   rH   r<   r1   rd   r   r   r   get_collective_group_size4  s
   

rf   c                 C   s.   t |  t|}tj}||_|| g| dS )a   Collective allreduce the tensor across the group.

    Args:
        tensor: the tensor to be all-reduced on this process.
        group_name: the collective group name to perform allreduce.
        op: The reduce operation.

    Returns:
        None
    N)_check_single_tensor_inputget_group_handler   AllReduceOptionsreduceOp	allreduce)tensorr3   opr8   optsr   r   r   rk   E  s
   rk   tensor_listc                 C   s<   t  stdt|  t|}t j}||_|| | dS )a  Collective allreduce a list of tensors across the group.

    Args:
        tensor_list (List[tensor]): list of tensors to be allreduced,
            each on a GPU.
        group_name: the collective group name to perform allreduce.

    Returns:
        None
    &Multigpu calls requires NCCL and Cupy.N)r   cupy_availabler"   _check_tensor_list_inputrh   ri   rj   rk   )ro   r3   rm   r8   rn   r   r   r   allreduce_multigpuW  s   rs   c                 C   s   t | }|  dS )zBarrier all processes in the collective group.

    Args:
        group_name: the name of the group to barrier.

    Returns:
        None
    N)rh   barrierrd   r   r   r   rt   m  s   	rt   dst_rankc                 C   sF   t |  t|}t|| t }||_||_d|_|| g| dS )a:  Reduce the tensor across the group to the destination rank.

    Args:
        tensor: the tensor to be reduced on this process.
        dst_rank: the rank of the destination process.
        group_name: the collective group name to perform reduce.
        op: The reduce operation.

    Returns:
        None
    r   N)	rg   rh   _check_rank_validr   ReduceOptionsrj   	root_rankroot_tensorreduce)rl   ru   r3   rm   r8   rn   r   r   r   rz   z  s   
rz   
dst_tensorc                 C   sb   t  stdt|  t|}t|| tt| | t  }||_	||_
||_|| | dS )a  Reduce the tensor across the group to the destination rank
    and destination tensor.

    Args:
        tensor_list: the list of tensors to be reduced on this process;
            each tensor located on a GPU.
        dst_rank: the rank of the destination process.
        dst_tensor: the index of GPU at the destination.
        group_name: the collective group name to perform reduce.
        op: The reduce operation.

    Returns:
        None
    rp   N)r   rq   r"   rr   rh   rv   _check_root_tensor_validrX   rw   rj   rx   ry   rz   )ro   ru   r{   r3   rm   r8   rn   r   r   r   reduce_multigpu  s   
r}   src_rankc                 C   s@   t |  t|}t|| t }||_d|_|| g| dS )a(  Broadcast the tensor from a source process to all others.

    Args:
        tensor: the tensor to be broadcasted (src) or received (destination).
        src_rank: the rank of the source process.
        group_name: the collective group name to perform broadcast.

    Returns:
        None
    r   N)rg   rh   rv   r   BroadcastOptionsrx   ry   	broadcastrl   r~   r3   r8   rn   r   r   r   r     s   
r   
src_tensorc                 C   s\   t  stdt|  t|}t|| tt| | t  }||_	||_
|| | dS )ag  Broadcast the tensor from a source GPU to all other GPUs.

    Args:
        tensor_list: the tensors to broadcast (src) or receive (dst).
        src_rank: the rank of the source process.
        src_tensor: the index of the source GPU on the source process.
        group_name: the collective group name to perform broadcast.

    Returns:
        None
    rp   N)r   rq   r"   rr   rh   rv   r|   rX   r   rx   ry   r   )ro   r~   r   r3   r8   rn   r   r   r   broadcast_multigpu  s   
r   c                 C   sL   t | t|  t|}t| |jkrtdt }|| g|g| dS )a   Allgather tensors from each process of the group into a list.

    Args:
        tensor_list: the results, stored as a list of tensors.
        tensor: the tensor (to be gathered) in the current process
        group_name: the name of the collective group.

    Returns:
        None
    zPThe length of the tensor list operands to allgather must be equal to world_size.N)	rg   rr   rh   rX   r1   r"   r   AllGatherOptions	allgather)ro   rl   r3   r8   rn   r   r   r   r     s   r   output_tensor_listsinput_tensor_listc                 C   sB   t  stdt|  t| t|}t  }|| || dS )a  Allgather tensors from each gpus of the group into lists.

    Args:
        output_tensor_lists (List[List[tensor]]): gathered results, with shape
            must be num_gpus * world_size * shape(tensor).
        input_tensor_list: (List[tensor]): a list of tensors, with shape
            num_gpus * shape(tensor).
        group_name: the name of the collective group.

    Returns:
        None
    rp   N)r   rq   r"   _check_tensor_lists_inputrr   rh   r   r   )r   r   r3   r8   rn   r   r   r   allgather_multigpu  s   r   c                 C   sR   t |  t| t|}t }||_t||jkrtd|	| g|g| dS )a  Reducescatter a list of tensors across the group.

    Reduce the list of the tensors across each process in the group, then
    scatter the reduced list of tensors -- one tensor for each process.

    Args:
        tensor: the resulted tensor on this process.
        tensor_list: The list of tensors to be reduced and scattered.
        group_name: the name of the collective group.
        op: The reduce operation.

    Returns:
        None
    zXThe length of the tensor list operands to reducescatter must not be equal to world_size.N)
rg   rr   rh   r   ReduceScatterOptionsrj   rX   r1   r"   reducescatter)rl   ro   r3   rm   r8   rn   r   r   r   r     s   r   c                 C   sH   t  stdt| t|  t|}t  }||_|| || dS )a  Reducescatter a list of tensors across all GPUs.

    Args:
        output_tensor_list: the resulted list of tensors, with
            shape: num_gpus * shape(tensor).
        input_tensor_lists: the original tensors, with shape:
            num_gpus * world_size * shape(tensor).
        group_name: the name of the collective group.
        op: The reduce operation.

    Returns:
        None.
    rp   N)	r   rq   r"   r   rr   rh   r   rj   r   )Zoutput_tensor_listZinput_tensor_listsr3   rm   r8   rn   r   r   r   reducescatter_multigpu9  s   r   c                 C   R   t |  t|}t|| ||jkrtd|t }||_|	| g| dS )zSend a tensor to a remote process synchronously.

    Args:
        tensor: the tensor to send.
        dst_rank: the rank of the destination process.
        group_name: the name of the collective group.

    Returns:
        None
    "The destination rank '{}' is self.N)
rg   rh   rv   r2   r"   r,   r   SendOptionsru   send)rl   ru   r3   r8   rn   r   r   r   r   V     

r   dst_gpu_index
n_elementsc                 C      t  stdt|  t|}t|| ||jkr!td||dk r,td|t  }||_	||_
||_|| g| dS )a  Send a tensor to a remote GPU synchronously.

    The function assumes each process owns >1 GPUs, and the sender
    process and receiver process has equal number of GPUs.

    Args:
        tensor: the tensor to send, located on a GPU.
        dst_rank: the rank of the destination process.
        dst_gpu_index: the destination gpu index.
        group_name: the name of the collective group.
        n_elements: if specified, send the next n elements
            from the starting address of tensor.

    Returns:
        None
    z!send_multigpu call requires NCCL.GThe dst_rank '{}' is self. Considering doing GPU to GPU memcpy instead?r   z The n_elements '{}' should >= 0.N)r   rq   r"   rg   rh   rv   r2   r,   r   ru   r   r   r   )rl   ru   r   r3   r   r8   rn   r   r   r   send_multigpuk  s"   

r   c                 C   r   )zReceive a tensor from a remote process synchronously.

    Args:
        tensor: the received tensor.
        src_rank: the rank of the source process.
        group_name: the name of the collective group.

    Returns:
        None
    r   N)
rg   rh   rv   r2   r"   r,   r   RecvOptionsr~   recvr   r   r   r   r     r   r   src_gpu_indexc                 C   r   )a  Receive a tensor from a remote GPU synchronously.

    The function asssume each process owns >1 GPUs, and the sender
    process and receiver process has equal nubmer of GPUs.

    Args:
        tensor: The received tensor, located on a GPU.
        src_rank: The rank of the source process.
        src_gpu_index: The index of the source GPU on the src process.
        group_name: The name of the collective group.

    Returns:
        None
    z!recv_multigpu call requires NCCL.r   r   z#The n_elements '{}' should be >= 0.N)r   rq   r"   rg   rh   rv   r2   r,   r   r~   r   r   r   )rl   r~   r   r3   r   r8   rn   r   r   r   recv_multigpu  s"   

r   gpu_idc                 C   s,   t  stdddl}|j|   dS )zSynchronize the current process to a give device.

    Args:
        gpu_id: the GPU device id to synchronize.

    Returns:
        None
    z(synchronize call requires CUDA and NCCL.r   N)r   rq   r"   ZcupycudaZDevicesynchronize)r   cpr   r   r   r     s   	r   c              
   C   sB  | t jkrt  t| szD| t jkrtt jjdd| d n1d|  }tj	|d}t
|j \}}}}}tjjj}|j }	|||	 }
t|||
| | W nI ty } z=dtjv rtjd | krttjd }ttjd }tjd }tdd	}t|||| | ntd
| |W Y d}~nd}~ww t| }|S )zCheck if the group is initialized and return the group handle.

    Args:
        group_name: the name of the collective group.

    Returns:
        The collective group handle.
    Nr=   )rA   Zcollective_group_nameZcollective_rankZcollective_world_sizeZcollective_backendZcollective_gloo_timeoutrL   z<The collective group '{}' is not initialized in the process.)r   ZNIXL_GROUP_NAMErN   rJ   rH   r9   r!   r/   r
   r>   r_   get_infor^   _privateworkerglobal_workerZcore_workerZget_actor_idindexr@   osenvironintgetenvr"   r,   r<   )r3   rA   Zmgridsr1   r2   r0   r4   r   Zid_rT   excr8   r   r   r   rh     sR   
	







rh   c                 C   sV   t | tjrdS t rt | tjjrdS t r"t | tjjr"dS t	d
t| )z-Check if the tensor is with a supported type.Nz[Unrecognized tensor type '{}'. Supported types are: np.ndarray, torch.Tensor, cupy.ndarray.)
isinstancenpZndarrayr   rq   r   Ztorch_availablethZTensorr"   r,   type)rl   r   r   r   rg     s   
rg   r0   c                 C   s|   | t jjkrt stddS | t jjkrt stddS | t jjkr-t s+tddS | t jjkr:t	 s<tddS dS )z'Check whether the backend is available.z#torch.distributed is not available.zNCCL is not available.zNIXL is not available.N)
r   r!   r#   r   r"   r-   r   r$   r/   r   )r0   r   r   r   r.   )  s"   r.   c                  C   s"   t jjj} | jt jkrdS td)z1Check if currently it is inside a Ray actor/task.NzBThe collective APIs shall be only used inside a Ray actor or task.)r
   r   r   r   modeZWORKER_MODEr"   )r   r   r   r   rN   :  s   
rN   c                 C   s6   |dk rt d||| jkrt d|| jdS )z'Check the rank: 0 <= rank < world_size.r   zrank '{}' is negative.z+rank '{}' must be less than world size '{}'N)r@   r,   r1   )r8   r2   r   r   r   rv   E  s   
rv   c                 C   s>   t | tstdt| | std| D ]}t| qdS )z7Check if the input is a list of supported tensor types.z.The input must be a list of tensors. Got '{}'.zGot an empty list of tensors.N)r   listr"   r,   r   rg   )ro   tr   r   r   rr   O  s   


rr   c                 C   sD   t | tstdt| | std|  | D ]}t| qdS )z@Check if the input is a list of lists of supported tensor types.z7The input must be a list of lists of tensors. Got '{}'.zDid not receive tensors. Got: N)r   r   r"   r,   r   rr   )Ztensor_listsr   r   r   r   r   \  s   


r   c                 C   s2   |dk rt d||| krt d|| dS )z9Check the root_tensor device is 0 <= root_tensor < lengthr   zroot_tensor '{}' is negative.z9root_tensor '{}' is greater than the number of GPUs: '{}'N)r@   r,   )lengthry   r   r   r   r|   i  s   r|   )rK   )r   rK   )r   r   rK   )rK   r   )NrG   loggingr   r(   typingr   numpyr   r
   Zray.experimental.internal_kvZexperimentalZinternal_kvr'   rQ   r   Z ray.experimental.collective.utilr   r&   Z@ray.util.collective.collective_group.torch_gloo_collective_groupr   r%   	getLoggerrD   r   Z:ray.util.collective.collective_group.nccl_collective_groupr   r   r   ImportErrorr   r   Z1ray.util.collective.collective_group.nixl_backendr	   r   r   r   r   r   objectr   rH   rJ   r!   r-   r   rR   rO   r9   rC   re   rf   ZReduceOpZSUMrk   r   rs   rt   rz   r}   r   r   r   r   r   r   r   r   r   r   r   rh   rg   r.   rN   rv   rr   r   r|   r   r   r   r   <module>   s6   
a
)
B



$


!

*
(7
