o
    1 i!                     @   sN  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlZd dl	m
Z d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZ d dlmZm Z  d dl!m"Z" e #e$Z%G dd dZ&e"ddeG dd deZ'	d$de(de)de)de(de)f
ddZ*d%ddZ+d d! Z,G d"d# d#eZ-dS )&    N)	dataclass)	timedelta)Optional)Version)build_address)ray_constants)"register_custom_torch_dist_backend)GetTimeoutError)BaseWorkerGroup)get_address_and_port)BackendBackendConfig).DEFAULT_TORCH_PROCESS_GROUP_SHUTDOWN_TIMEOUT_S&TORCH_PROCESS_GROUP_SHUTDOWN_TIMEOUT_S)	PublicAPIc                   @   s   e Zd Zdd Zdd ZdS )TorchConfigContextManagerc                 C   s8   t j rtjj  }|jdkrt j| d S d S d S )Ncuda)torchr   is_availableraytrain
get_devicetypeZ
set_device)selfdevice r   b/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/train/torch/config.py	__enter__   s   

z#TorchConfigContextManager.__enter__c                 C   s   dS )NFr   )r   r   value	tracebackr   r   r   __exit__$      z"TorchConfigContextManager.__exit__N)__name__
__module____qualname__r   r    r   r   r   r   r      s    r   Zstable)Z	stabilityc                   @   sR   e Zd ZU dZdZee ed< dZeed< dZ	e
ed< edd	 Zed
d ZdS )TorchConfiga  Configuration for torch process group setup.

    See https://pytorch.org/docs/stable/distributed.html for more info.

    Args:
        backend: The backend to use for training.
            See ``torch.distributed.init_process_group`` for more info and
            valid values.
            If set to None, nccl will be used if GPUs are requested, else gloo
            will be used.
        init_method: The initialization method to use. Either "env"
            for environment variable initialization or "tcp" for TCP
            initialization. Defaults to "env".
        timeout_s: Seconds for process group operations to timeout.
    Nbackendenvinit_method  	timeout_sc                 C      t S N)_TorchBackendr   r   r   r   backend_cls@   r!   zTorchConfig.backend_clsc                 C   r+   r,   )r   r.   r   r   r   train_func_contextD   r!   zTorchConfig.train_func_context)r"   r#   r$   __doc__r&   r   str__annotations__r(   r*   intpropertyr/   r0   r   r   r   r   r%   )   s   
 
r%   r)   r&   
world_rank
world_sizer(   r*   c              	   C   s   |dkrt d| d| d| d nt d| d| d| d t d|   | dkr]ttjtdk r=d	}d
}nd}d}|tjvr\|tjvr\t d| d| d dtj|< n| dkret|  t	j
| |||t|dd dS )a{  Connects the distributed PyTorch backend.

    Args:
        backend: The backend (nccl, gloo, etc.) to use for training.
        world_rank: Rank of the current worker.
        world_size: Number of workers participating in the job.
        init_method: URL specifying how to initialize the process group.
        timeout_s: Seconds for process group operations to timeout.
    r   zSetting up process group for: z [rank=z, world_size=]zusing ncclz2.2.0ZNCCL_ASYNC_ERROR_HANDLINGZNCCL_BLOCKING_WAITZTORCH_NCCL_ASYNC_ERROR_HANDLINGZTORCH_NCCL_BLOCKING_WAITzSetting zn=1 to fail if NCCL collective communication operations are timing out. To override this behavior, you can set z=0.1Zhccl)seconds)r&   r(   Zrankr7   timeoutN)loggerinfodebugr   r   __version__osenvironr   distZinit_process_groupr   )r&   r6   r7   r(   r*   Z'TORCH_NCCL_ASYNC_ERROR_HANDLING_ENV_VARZ TORCH_NCCL_BLOCKING_WAIT_ENV_VARr   r   r   _setup_torch_process_groupI   sH   



rD   Fc              	   C   sp   ddl m} | }| rt  tj r4|D ]}tj| tj  W d    n1 s.w   Y  qd S d S )Nr   )get_devices)	Zray.air._internal.torch_utilsrE   rC   destroy_process_groupr   r   r   r   Zempty_cache)rF   rE   Zdevicesr   r   r   r   _shutdown_torch   s   
rG   c                  C   s   ddl m}  tj }t| tjd< t|	 tjd< t|
 tjd< t| tjd< t| tjd< |  }t|tjd< d S )	Nr   )r   Z
LOCAL_RANKZRANKZLOCAL_WORLD_SIZEZ
WORLD_SIZEZ	NODE_RANKZACCELERATE_TORCH_DEVICE)Zray.train.torchr   r   r   Zget_contextr2   Zget_local_rankrA   rB   Zget_world_rankZget_local_world_sizeZget_world_sizeZget_node_rank)r   contextr   r   r   r   _set_torch_distributed_env_vars   s   
rI   c                   @   sL   e Zd ZU dZeed< dedefddZdefddZ	dede
fd	d
ZdS )r-   Tshare_cuda_visible_devicesworker_groupbackend_configc                 C   s   t  rw|jd u r| }|dd}|dkrd}nd}n|j}|dt\}}|jdkr<dd }|j|||d d	}	n|jd
krJdt	|| }	n	t
d|j dg }
tt|D ]}|
|j|t||t||	|jd q[t|
 d S td)NZGPUr   r9   Zgloor'   c                 S   s   | t jd< t|t jd< d S )NZMASTER_ADDRZMASTER_PORT)rA   rB   r2   addrportr   r   r   set_env_vars   s   
z,_TorchBackend.on_start.<locals>.set_env_varsrM   zenv://Ztcpztcp://zThe provided init_method (z2) is not supported. Must be either 'env' or 'tcp'.)r&   r6   r7   r(   r*   z#Distributed torch is not available.)rC   r   r&   Zget_resources_per_workergetZexecute_singler   r(   executer   
ValueErrorrangelenappendZexecute_single_asyncrD   r*   r   RuntimeError)r   rK   rL   	resourcesZnum_gpus_per_workerr&   Zmaster_addrZmaster_portrP   urlZsetup_futuresir   r   r   on_start   sJ   


z_TorchBackend.on_startc                 C   s^   |j tt|dkd}ttt}z
tj||d W d S  t	y.   t
d| d Y d S w )N   )rF   )r<   z-Torch process group shutdown timed out after z seconds)Zexecute_asyncrG   rU   r   Zenv_integerr   r   r   rQ   r	   r=   warning)r   rK   rL   futuresr*   r   r   r   on_shutdown   s   


z_TorchBackend.on_shutdownc                 C   s   | t d S r,   )rR   rI   )r   rK   rL   r   r   r   on_training_start   s   z_TorchBackend.on_training_startN)r"   r#   r$   rJ   boolr3   r
   r%   r[   r_   r   r`   r   r   r   r   r-      s   
 3r-   )r)   )F).loggingrA   dataclassesr   datetimer   typingr   r   Ztorch.distributeddistributedrC   Zpackaging.versionr   r   Zray._common.network_utilsr   Zray._privater   Z ray.air._internal.device_managerr   Zray.exceptionsr	   Z%ray.train._internal.base_worker_groupr
   Zray.train._internal.utilsr   Zray.train.backendr   r   Zray.train.constantsr   r   Zray.utilr   	getLoggerr"   r=   r   r%   r2   r4   rD   rG   rI   r-   r   r   r   r   <module>   sL    
#

: