o
    * iY`                     @  sl  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZmZmZ d dlm Z  erzd dl!m"Z"m#Z# d dl$m%Z%m&Z& G dd deZ'g Z(G dd dZ)dd Z*dd Z+dd Z,dd Z-dd Z.dd Z/dd Z0d d! Z1G d"d# d#Z2	$	%	&	'd6d7d4d5Z3dS )8    )annotationsN)TYPE_CHECKINGAnyLiteral	TypedDict)core)
get_device)_get_trainers_numget_cluster_and_pod)use_paddlecloud)get_cluster_from_args)
DeviceModeblock_windows_and_macoscheck_backend)_prepare_trainer_env_print_argumentsget_host_name_ip)	set_flags)CallableIterable)NotRequiredUnpackc                   @  s.   e Zd ZU ded< ded< ded< ded< dS )	_SpawnOptionsz3NotRequired[Literal['spawn', 'fork', 'forkserver']]start_methodzNotRequired[str | None]gpusxpuszNotRequired[str]ipsN)__name__
__module____qualname____annotations__ r!   r!   d/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/spawn.pyr   3   s
   
 r   c                   @  s   e Zd Zdd ZdS )ParallelEnvArgsc                 C  s(   d | _ d | _d | _d | _d| _d | _d S )NT)cluster_node_ipsnode_ipr   started_portprint_configselected_devices)selfr!   r!   r"   __init__>   s   
zParallelEnvArgs.__init__N)r   r   r   r*   r!   r!   r!   r"   r#   =   s    r#   c                 C  sT   g d}g d}| D ]}||vr'||v rt d| dt q
td| dq
d S )N)r   r   r   r   r'   backend)r(   r&   r$   r%   r   zThe config option (z|) of `paddle.distributed.spawn` is deprecated. Please use the latest config options stated in the `spawn` API documentation.z1) of `paddle.distributed.spawn` is not supported.)warningswarnDeprecationWarning
ValueError)optionsZsupported_optionsZdeprecated_optionskeyr!   r!   r"   _options_valid_checkV   s   

r2   c                  C  sf   t  } | t v rt| dd S d| v rt S d| v r#t S d| v r+t S t	d|  d)N:r   gpuxpucpuI`paddle.distributed.spawn` does not support parallel training on device `` now.)
r   r   get_available_custom_deviceget_custom_device_countsplitget_cuda_device_countget_xpu_device_countmultiprocessing	cpu_countRuntimeErrorZdevicer!   r!   r"   _get_default_nprocsu   s   
rB   c                  C  sJ   t  } | t v rdS d| v rdS d| v rdS d| v rdS td|  d	)
Nxcclr4   ncclr5   bkclr6   gloor7   r8   )r   r   r9   r@   rA   r!   r!   r"   _get_default_backend   s   
rG   c                 C  s>   d }dd |  dD }t|dkr|d }|S t \}}|S )Nc                 S  s   g | ]}|  qS r!   )strip.0xr!   r!   r"   
<listcomp>   s    z _get_node_ip.<locals>.<listcomp>,   r   )r;   lenr   )r   r%   Znode_ips_r!   r!   r"   _get_node_ip   s   
rQ   c                   s  d|vs
|d dkrt  |d< t|d  t|d  g }t }|dd |_|jd u r;|dd |_|jd u r;d|_|d dkr|dd |_|jd u rT|dd |_td	d }|d u sb|d
krndd t	t
 D  n|d |jd u rt | k rtdt  d|  dd fddt	d| D |_n_|jd}t|| krtdt| d|  d|D ]}| vrtd|d qn0|d dkrb|dd |_|jd u r|dd |_tdd }|d u s|d
krdd t	t
 D  n|d |jd u r1t | k r tdt  d|  dd fddt	d| D |_n|jd}t|| krKtdt| d|  d|D ]}| vr_td|d qMn|d dkrtd  d!|_d |_|j|_|d"d d u sJ d#t|jdd$ksJ d%t d$ksJ d&n]|d d'krd |_t
 d }td(| d)d }|d u s|d
krd*d t	t
|D  n|d t | k rtdt  d|  d+| d,d fd-dt	d| D |_|d.d |_|jd u rt|j|_|d/d |_|d"d |_|jd u r't |_|d dkr?tt	d| }t|t j!|\}	}
nt"|\}	}
|
j#D ]}|$t%|	||d  qH|d0d1|_&|j&rft'| |S )2Nr+   autor   r$   z	127.0.0.1rD   r   r(   ZCUDA_VISIBLE_DEVICES c                 S     g | ]}t |qS r!   strrI   r!   r!   r"   rL          z,_get_subprocess_env_list.<locals>.<listcomp>rM   zthe number of visible devices(z-) is less than the number of spawn processes(z), please ensure that the correct `nprocs` argument is passed or the environment variable `CUDA_VISIBLE_DEVICES` is correctly configured.c                      g | ]}t  | qS r!   rU   rI   Zenv_devices_listr!   r"   rL          r   zThe number of selected devices(z0) is not equal to the number of spawn processes(zK), please ensure that the correct `nprocs` and `gpus` arguments are passed.zCThe selected gpu card {} cannot found in CUDA_VISIBLE_DEVICES ({}).rE   r   ZXPU_VISIBLE_DEVICESc                 S  rT   r!   rU   rI   r!   r!   r"   rL      rW   z), please ensure that the correct `nprocs` argument is passed or the environment variable `XPU_VISIBLE_DEVICES` is correctly configured.c                   rX   r!   rU   rI   rY   r!   r"   rL      rZ   zK), please ensure that the correct `nprocs` and `xpus` arguments are passed.zBThe selected xpu card {} cannot found in XPU_VISIBLE_DEVICES ({}).rF   zYour model will be trained under CPUONLY mode by using GLOO,because CPUPlace is specified manually or your installed PaddlePaddle only support CPU Device.Tr   z.CPUONLY spawn doesn't support use paddle cloudrN   zJCPUONLY spawn only support single trainer, that is len(ips)=1, but got %s.z+CPUONLY spawn doesn't support multi-trainerrC   ZFLAGS_selected_sc                 S  rT   r!   rU   rI   r!   r!   r"   rL   #  s    zj), please ensure that the correct `nprocs` argument is passed or the environment variable `FLAGS_selected_zs` is correctly configured.c                   rX   r!   rU   rI   rY   r!   r"   rL   2  rZ   r%   r&   r'   F)(rG   r   r   r#   getr$   r(   osgetenvranger   r<   r;   rO   r@   joinr/   formatr=   r,   r-   Zpaddle_cpuonlyr   r	   Zget_all_custom_device_typer:   r%   rQ   r&   r   listr   r   ZCPUr
   Ztrainersappendr   r'   r   )nprocsr0   Zprocesses_env_listargsZenv_devicesZselected_device_listZcard_idZcustom_device_nameZdevices_per_procZclusterZpodZtrainerr!   rY   r"   _get_subprocess_env_list   s  









	







rf   c                   C  s    t jdd  t jdd  d S )NZ
http_proxyZhttps_proxy)r]   environpopr!   r!   r!   r"   _remove_risky_envW  s   ri   c                 C  sR   |dkrt d| d i n|dkrt d| d i n	 | D ]	}| | tj|< qd S )NrD   ZFLAGS_selected_gpusrE   ZFLAGS_selected_xpus)r   r]   rg   )env_dictr+   var_namer!   r!   r"   _set_trainer_env^  s   
rl   c                 C  sp   zt   t|| | | }|| W d S  ty   Y d S  ty7   dd l}||  td Y d S w )Nr   rN   )	ri   rl   putKeyboardInterrupt	Exception	traceback
format_excsysexit)funcre   error_queuereturn_queuerj   r+   resultrp   r!   r!   r"   _func_wrapperv  s   
rx   c                   @  s&   e Zd Zdd ZdddZdd ZdS )	MultiprocessContextc                 C  s*   || _ || _|| _dd t|D | _d S )Nc                 S  s   i | ]\}}|j |qS r!   )sentinel)rJ   indexprocessr!   r!   r"   
<dictcomp>  s    z0MultiprocessContext.__init__.<locals>.<dictcomp>)error_queuesreturn_queues	processes	enumerate	sentinels)r)   r   r~   r   r!   r!   r"   r*     s   zMultiprocessContext.__init__Nc                 C  s   t | jdkr	dS tjj| j |d}d }|D ]}| j|}| j| }|  |j	dkr2|} nq|d u r>t | jdkS | jD ]}|
 rK|  |  qA| | d S )Nr   T)timeout)rO   r   r>   
connectionwaitkeysrh   r   r`   exitcodeis_alive	terminate_throw_exception)r)   r   readyerror_indexrz   r{   r|   r!   r!   r"   r`     s*   




zMultiprocessContext.joinc                 C  s   | j |  r.| j| j}|dk r#t| j}td| d| dtd| d| d| j |  }d| d}||7 }t|)Nr   zProcess z terminated with signal .z terminated with exit code z9

----------------------------------------------
Process zV terminated with the following error:
----------------------------------------------

)	r~   emptyr   r   signalSignalsnamero   r\   )r)   r   r   r   Zoriginal_tracemsgr!   r!   r"   r     s"   z$MultiprocessContext._throw_exception)N)r   r   r   r*   r`   r   r!   r!   r!   r"   ry     s    
ry   r!   TFrt   Callable[..., None]re   Iterable[Any]rd   intr`   booldaemonr0   Unpack[_SpawnOptions]returnc              
   K  s   t | |dkrt }t||}|dd}|du rd}t|}g }	g }
g }t|D ]1}| }| }|jt	| ||||| |d fd}||_
|  |	| |
| || q+t||	|
}|sg|S | sp	 | rk|S )a6  
    Start multiple processes with ``spawn`` method for parallel training.

    .. note::
        ``spawn`` now only supports GPU or XPU collective mode. The collective mode
        of GPU and XPU cannot be started at the same time, so the option `gpus` and
        `xpus` cannot be configured at the same time.

    Args:
        func (function): The target function is called by spawned process.
            This function need to be able to pickled, so it must be defined
            at the top level of a module.
        args (list|tuple, optional): Arguments passed to ``func``.
        nprocs (int, optional): Number of processed to start. Default: -1.
            when nprocs is -1, the available device will be obtained from
            the environment variable when the model is executed: If use GPU,
            the currently available device ID is obtained from the environment
            variable CUDA_VISIBLE_DEVICES; If use XPU, the currently available
            device ID is obtained from the environment variable XPU_VISIBLE_DEVICES.
        join (bool, optional): Perform a blocking join on all spawned processes.
            Default: True.
        daemon (bool, optional): The spawned processes' daemon flag. Default: False.
        **options(dict, optional): Other initial parallel execution environment
            configuration options. The following options are currently supported:
            (1) start_method (string): the way to start a process.
            The start method can be ``spawn`` , ``fork`` , ``forkserver`` .
            Because the CUDA runtime does not support the ``fork`` start method,
            when use CUDA in subprocesses, we should start process by ``spawn``
            or ``forkserver`` method. Default: "spawn" ;
            (2) gpus (string): The training process will run on the
            selected gpus, such as "0,1,2,3". Default: None;
            (3) xpus (string): The training process will run on the
            selected xpus, such as "0,1,2,3". Default: None;
            (5) ips (string): Paddle cluster nodes ips, such as
            "192.168.0.16,192.168.0.17". Default: "127.0.0.1" .

    Returns:
        ``MultiprocessContext`` object, it hold the spawned processes.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle
            >>> import paddle.nn as nn
            >>> import paddle.optimizer as opt
            >>> import paddle.distributed as dist

            >>> class LinearNet(nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self._linear1 = nn.Linear(10, 10)
            ...         self._linear2 = nn.Linear(10, 1)
            ...     def forward(self, x):
            ...         return self._linear2(self._linear1(x))

            >>> def train(print_result=False):
            ...     # 1. initialize parallel environment
            ...     group = dist.init_parallel_env()
            ...     process_group = group.process_group if group else None
            ...     # 2. create data parallel layer & optimizer
            ...     layer = LinearNet()
            ...     dp_layer = paddle.DataParallel(layer, group = process_group)  # type: ignore[arg-type]
            ...     loss_fn = nn.MSELoss()
            ...     adam = opt.Adam(
            ...         learning_rate=0.001, parameters=dp_layer.parameters())
            ...     # 3. run layer
            ...     inputs = paddle.randn([10, 10], 'float32')
            ...     outputs = dp_layer(inputs)
            ...     labels = paddle.randn([10, 1], 'float32')
            ...     loss = loss_fn(outputs, labels)
            ...     if print_result is True:
            ...         print("loss:", loss.numpy())
            ...     loss.backward()
            ...     adam.step()
            ...     adam.clear_grad()

            >>> # Usage 1: only pass function.
            >>> # If your training method no need any argument, and
            >>> # use all visible devices for parallel training.
            >>> if __name__ == '__main__':
            ...     dist.spawn(train)

            >>> # Usage 2: pass function and arguments.
            >>> # If your training method need some arguments, and
            >>> # use all visible devices for parallel training.
            >>> if __name__ == '__main__':
            ...     dist.spawn(train, args=(True,))

            >>> # Usage 3: pass function, arguments and nprocs.
            >>> # If your training method need some arguments, and
            >>> # only use part of visible devices for parallel training.
            >>> # If your machine hold 8 cards {0,1,2,3,4,5,6,7},
            >>> # this case will use cards {0,1}; If you set
            >>> # CUDA_VISIBLE_DEVICES=4,5,6,7, this case will use
            >>> # cards {4,5}
            >>> if __name__ == '__main__':
            ...     dist.spawn(train, args=(True,), nprocs=2)

            >>> # Usage 4: pass function, arguments, nprocs and gpus.
            >>> # If your training method need some arguments, and
            >>> # only use part of visible devices for parallel training,
            >>> # but you can't set your machine's environment variable
            >>> # CUDA_VISIBLE_DEVICES, such as it is None or all cards
            >>> # {0,1,2,3,4,5,6,7}, you can pass `gpus` to
            >>> # select the GPU cards you want to use. For example,
            >>> # this case will use cards {4,5} if your machine hold 8 cards.
            >>> if __name__ == '__main__':
            ...     dist.spawn(train, args=(True,), nprocs=2, gpus='4,5')

    r   r   Nspawnr+   )targetre   )r2   rB   rf   r\   r>   Zget_contextr_   SimpleQueueProcessrx   r   startrc   ry   r`   )rt   re   rd   r`   r   r0   Zprocs_env_listr   mpr~   r   r   iru   rv   r|   contextr!   r!   r"   r     sH   y



r   )r!   r   TF)rt   r   re   r   rd   r   r`   r   r   r   r0   r   r   ry   )4
__future__r   r>   r]   r   rr   r,   typingr   r   r   r   Zpaddle.baser   Zpaddle.devicer   Zpaddle.distributed.cloud_utilsr	   r
   Z$paddle.distributed.fleet.cloud_utilsr   Zpaddle.distributed.fleet.launchr   Z%paddle.distributed.fleet.launch_utilsr   r   r   Z%paddle.distributed.utils.launch_utilsr   r   r   Zpaddle.frameworkr   collections.abcr   r   Ztyping_extensionsr   r   r   __all__r#   r2   rB   rG   rQ   rf   ri   rl   rx   ry   r   r!   r!   r!   r"   <module>   sH   
 9B