o
    * i                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlmZ d dlm  m  mZ d dlmZ d dlmZ edZde_G dd dZG d	d
 d
ZG dd dZG dd dZG dd dZG dd dZdJddZ dd Z!dd Z"dd Z#dd Z$dd Z%d d! Z&dKd"d#Z'G d$d% d%Z(da)d&d' Z*	dLd(d)Z+d*d+ Z,d,d- Z-d.d/ Z.d0d1 Z/d2d3 Z0d4d5 Z1d6d7 Z2dMd8d9Z3d:d; Z4d<d= Z5d>d? Z6d@dA Z7G dBdC dCZ8dDdE Z9dFdG Z:dHdI Z;dS )N    N)closing)	framework)	strtoboolrootFc                   @   s   e Zd ZdZdZdZdZdS )DistributeModez\
    There are various mode for fleetrun, each of them is designed for different model.
    r         N)__name__
__module____qualname____doc__Z
COLLECTIVEZPSPS_HETER r   r   q/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/fleet/launch_utils.pyr   &   s
    r   c                   @   s$   e Zd ZdZdZdZdZdZdZdS )
DeviceModez
    Training devices type
    r   r   r   N)	r	   r
   r   r   UNKNOWNCPUGPUZKUNLUNXPUr   r   r   r   r   0   s    r   c                   @   sd   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd ZdS )Clusterc                 C   s   d | _ g | _d | _d | _d S N)
job_serverpodshdfsjob_stage_flag)selfr   r   r   r   __init__=      
zCluster.__init__c                 C   s.   d| j  ddd | jD  d| j d| j S )Nzjob_server:z pods:c                 S      g | ]}t |qS r   str).0podr   r   r   
<listcomp>D       z#Cluster.__str__.<locals>.<listcomp>z job_stage_flag:z hdfs:)r   r   r   r   r   r   r   r   __str__C   s   .zCluster.__str__c                 C   sR   t | jt |jkrdS t| j|jD ]\}}||kr dS q| j|jkr'dS dS NFT)lenr   zipr   )r   clusterabr   r   r   __eq__F   s   zCluster.__eq__c                 C   s   |  | S r   )r.   r   r+   r   r   r   __ne__S      zCluster.__ne__c                 C   s   t  |j| _d S r   )copyr   r/   r   r   r   update_podsV   s   zCluster.update_podsc                 C   s   t |  S r   )r)   trainers_endpointsr&   r   r   r   trainers_nranksY   r1   zCluster.trainers_nranksc                 C   s
   t | jS r   )r)   r   r&   r   r   r   pods_nranks\      
zCluster.pods_nranksc                 C   s,   g }| j D ]}|jD ]}||j q
q|S r   )r   trainersappendendpoint)r   rr#   tr   r   r   r4   _   s   

zCluster.trainers_endpointsc                 C   s:   g }| j D ]}|jD ]}dd |jD }|| q
q|S )Nc                 S   r   r   r    r"   accr   r   r   r$   j   r%   z,Cluster.world_device_ids.<locals>.<listcomp>)r   r8   acceleratorsr9   )r   r;   r#   r<   Zstr_acceleratorsr   r   r   world_device_idsf   s   

zCluster.world_device_idsc                 C   sR   g }| j D ]!}|j d|j }|jd ur|jd us!J | d|| q|S )N:z not a valid endpoint)r   addrportr9   )r   r;   r#   epr   r   r   pods_endpointsn   s   
zCluster.pods_endpointsc                 C   s*   | j D ]}t|t|jkr|  S qd S r   )r   r!   id)r   Zpod_idr#   r   r   r   get_pod_by_idx   s
   
zCluster.get_pod_by_idN)r	   r
   r   r   r'   r.   r0   r3   r5   r6   r4   r@   rE   rG   r   r   r   r   r   <   s    
r   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
	JobServerc                 C   s
   d | _ d S r   r:   r&   r   r   r   r      r7   zJobServer.__init__c                 C   s   | j  S r   rI   r&   r   r   r   r'      s   zJobServer.__str__c                 C   s   | j |j kS r   rI   r   jr   r   r   r.      r1   zJobServer.__eq__c                 C   
   | |k S r   r   rJ   r   r   r   r0      r7   zJobServer.__ne__N)r	   r
   r   r   r'   r.   r0   r   r   r   r   rH      s
    rH   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )Trainerc                 C   s   g | _ d | _d | _d | _d S r   )r?   r:   rankstager&   r   r   r   r      r   zTrainer.__init__c                 C   s   d| j  d| j d| j S )Nzaccelerator:z
 endpoint:z rank:)r?   r:   rN   r&   r   r   r   r'      s   zTrainer.__str__c                 C   s^   t | jt |jkrdS | j|jks| j|jkrdS t| j|jD ]\}}||kr, dS q!dS r(   )r)   r?   r:   rN   r*   )r   r<   r,   r-   r   r   r   r.      s   zTrainer.__eq__c                 C   rL   r   r   )r   r<   r   r   r   r0      r7   zTrainer.__ne__c                 C      | j S r   rN   r&   r   r   r   rN         zTrainer.rankN)r	   r
   r   r   r'   r.   r0   rN   r   r   r   r   rM      s    rM   c                   @   D   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dS )Podc                 C   sF   d | _ d | _d | _d | _g | _g | _g | _g | _g | _g | _	d | _
d S r   )rN   rF   rB   rC   r8   serversworkerscoordinatorsheter_workersr?   device_moder&   r   r   r   r      s   
zPod.__init__c                 C   s   d| j  d| j d| j d| j d| j ddd | jD  d	d
d | jD  ddd | jD  ddd | jD  ddd | j	D  S )Nzrank:z id:z addr:z port:z visible_accelerator:z
 trainers:c                 S   r   r   r    )r"   r<   r   r   r   r$      r%   zPod.__str__.<locals>.<listcomp>z	 servers:c                 S   r   r   r    )r"   sr   r   r   r$      r%   z             workers:c                 S   r   r   r    )r"   wr   r   r   r$      r%   z heter_workers:c                 S   r   r   r    )r"   hr   r   r   r$      r%   z coordinators:c                 S   r   r   r    )r"   cr   r   r   r$      r%   )
rN   rF   rB   rC   r?   r8   rU   rV   rX   rW   r&   r   r   r   r'      s   NzPod.__str__c                 C   s  | j |j ks| j|jks| j|jks| j|jkr%td|  d|  dS t| jt|jkr>td| j d|j  dS tt| jD ] }| j| |j| kretd| j|  d|j|    dS qEt| j	t|j	krtd| j	 d|j	  dS tt| j	D ] }| j	| |j	| krtd| j	|  d|j	|    dS qt| j
t|j
krtd| j
 d|j
  dS tt| j
D ] }| j
| |j
| krtd| j
|  d|j
|    dS qdS )	Nzpod z != Fz	trainers ztrainer zservers zworkers T)rN   rF   rB   rC   loggerdebugr)   r8   rangerU   rV   )r   r#   ir   r   r   r.      s>   """z
Pod.__eq__c                 C   rL   r   r   )r   r#   r   r   r   r0      r7   z
Pod.__ne__c                 C   s   d S r   r   )r   Zres_podsr   r   r   parse_response   s   zPod.parse_responsec                 C   rP   r   rQ   r&   r   r   r   rN      rR   zPod.rankc                 C   sF   d}| j D ]	}|| d7 }q|dksJ d|  d|d d }|S )N ,z	this pod z can't see any acceleratorsr   )r?   )r   r;   gr   r   r   get_visible_accelerators   s   
zPod.get_visible_acceleratorsN)
r	   r
   r   r   r'   r.   r0   rb   rN   rf   r   r   r   r   rT      s    'rT      c                 C   s>   t |}||  t  }t d}|| || |S )Nz>%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s)logging	getLoggersetLevelStreamHandler	FormattersetFormatter
addHandler)	log_levelnamer^   Zlog_handlerZ
log_formatr   r   r   
get_logger   s   



rq   c                 C   sl  t |tu s
J dtd d}d}t| D ]\}}t }	||	_||	_||	_|| }
t|
t|ks5J dt	t|D ]g}t
 }|tjkrpt|| ttfr_|j||  |	j||  n0|j||  |	j||  n|tjkrt|| ttfr|j||  n|j||  |
|  |_||_|d7 }|	j| q;|j|	 q| |}||j| fS )Ntrainer_endpoints must be listr   r   zNcurrent trainer_endpoints size should be greater equal than accelerators size.r   )typelistr   	enumeraterT   rN   rB   rY   r)   r`   rM   r   r   
isinstancetupler?   extendr9   r   r:   r8   r   index)node_ipsnode_iptrainer_endpointsrY   devices_per_procr+   Ztrainer_rank	node_rankipr#   cur_node_endpointsra   trainerpod_rankr   r   r   get_cluster  s>   



r   c                 C   s0  t jdkr4| D ]'}|j d u r.t t |jjtj |j	r$|j	
  td|jj  qtd | D ] }|j d u rV|j  |j	rL|j	
  td|jj  q6td tddD ]*}d}| D ]}|j d u r{t |jjtj d	}qg|std
  d S td qatd td d S )Nntzterminate process group gid:r   zterminate process id:   r   2   FTzterminate all the procszcan't kill all process and exit)osrp   procpollkillpggetpgidpidsignalSIGTERMlog_fncloser^   infotimesleep	terminater_   r`   killSIGKILLfatalsysexit)procspstepaliver   r   r   terminate_local_procs1  s<   







r   c                  C   s*   zt  } t | }| |fW S    Y d S r   )socketgethostnamegethostbyname)Z	host_namehost_ipr   r   r   get_host_name_ipW  s   

r   c                 K   s6   |t krtn|}|jd|  f|||d d| dS )ad  Add argparse's argument.

    Examples:
        .. code-block:: python

            >>> import argparse
            >>> from paddle.distributed.fleet.launch_utils import add_arguments
            >>> parser = argparse.ArgumentParser()
            >>> add_arguments("name", str, "Jonh", "User name.", parser)
            >>> args = parser.parse_args()

    z--z Default: %(default)s.)defaultrt   helpN)boolr   add_argument)argnamert   r   r   Z	argparserkwargsr   r   r   add_arguments`  s   
r   c                 C   sZ   dd }t  }d}	 | }||vr|| t|| kr|S |d7 }|dkr,td d S q
)Nc               
   S   sj   t ttjtj!} | tjtjtddd | 	d | 
 d W  d    S 1 s.w   Y  d S )Niir   r   )rc   r   )r   r   AF_INETSOCK_STREAM
setsockopt
SOL_SOCKET	SO_LINGERstructpackbindgetsockname)rZ   r   r   r   __free_portx  s   

$z$find_free_ports.<locals>.__free_portr   Tr   i  z@can't find available port and use the specified static port now!)setaddr)   print)numr   Zport_setr   rC   r   r   r   find_free_portsw  s    

r   c                 C   sX   t jdd u rt| }|d urt|}|S tt jd}t|| || |  d}|S )NFLAGS_START_PORTr   )r   environgetr   ru   intr`   )r   offsetports
start_portr   r   r   	get_ports  s   r   c                 C   sN  d}d}d}|   D ]\}}t|t|}q
dd|d| | }dd| d| d	 }|| | }	d
ddg|	  d }
d
ddg|	  d }d}||
d 7 }|ra|||d |d 7 }n||dd7 }||d 7 }|   D ]'\}}t|trt||krd|dd   }n|}|||d| t|7 }qs||
7 }d| d}|S )Nr   (   -   z    z|{{:>{}s}}{}{{:^{}s}}|
 z|{:>zs}{}{:^zs}|
z    +rc   =+-
r   r   zfleetrun Distributed EnvsValuez... i)itemsmaxr)   formatjoinrw   r!   )envsheaderspacingZmax_kZmax_vkvZh_formatZl_formatlengthborderlineZdrawsZstr_v_strr   r   r   pretty_print_envs  s4   
r   c                   @   s   e Zd Zdd ZdS )TrainerProcc                 C   s(   d | _ d | _d | _d | _d | _d | _d S r   )r   r   
log_offsetrN   
local_rankcmdr&   r   r   r   r     s   
zTrainerProc.__init__N)r	   r
   r   r   r   r   r   r   r     s    r   c                  G   sJ   t | dksJ dt |  dt | dkr#t| d tsJ | d atS )Nr   z
len(args) z should <= 1r   )r)   rw   r   _run_with_coverage)argsr   r   r   run_with_coverage  s
    r   c              
   C   s  |d u rt  tj  }nt  |}|dd  |dd  |  }dd |D }g }	t|jD ]\}
}t|jt|j	t| 
 d|  t|
ddd |jD d|d}|dd d urj|d |d< |d	d d urx|d	 |d	< |d
d d ur|d
 |d
< t|jdkr|jtjkrdddd |jD |d< t|jdkrdddd |jD |d< tj rt|jdkrdddd |jD |d< || g }t stjdddkrg d}tjdg|||}td| d|  |
dkr#tdt|jt|d td| d| d d }tjdkr-d ntj }|d urtj!|d d! tj"#| d"rNt$| d" t%| d"d#}|&d$ |&d%|   W d    n	1 sqw   Y  |d
d ur|d&' d'krt%| d(|
 d)}n
t%| d*|
 d)}t(j)|||||d+}nt(j)|||d,}t* }||_+|j|_|
|_,||_-|r|. nd |_/||_0|	1| q0|	S )-N
http_proxyhttps_proxyc                 S   s   g | ]}d  |qS rA   )r   )r"   Zeler   r   r   r$     s    z(start_local_trainers.<locals>.<listcomp>rd   c                 S   r   r   r    r=   r   r   r   r$     r%   )PADDLE_TRAINER_IDZPADDLE_CURRENT_ENDPOINTPADDLE_TRAINERS_NUMPADDLE_TRAINER_ENDPOINTSZPADDLE_RANK_IN_NODEZPADDLE_LOCAL_DEVICE_IDSZPADDLE_WORLD_DEVICE_IDSZPADDLE_CLUSTER_TOPO_PATHPADDLE_RANK_MAPPING_PATHZPADDLE_ENABLE_AUTO_MAPPINGr   z{}c                 S   r   r   r    r"   re   r   r   r   r$     r%   FLAGS_selected_gpusc                 S   r   r   r    r   r   r   r   r$     r%   ZFLAGS_selected_acceleratorsc                 S   r   r   r    r   r   r   r   r$     r%   FLAGS_selected_xpusZWITH_COVERAGEZOFFON)z-mZcoveragerunz--branchz-p-uzstart trainer procz  env:zYLocal start {} processes. First process distributed environment info (Only For Debug): {}zDistributed Envsr   z7details about PADDLE_TRAINER_ENDPOINTS can be found in z7/endpoints.log, and detail running logs maybe found in z/workerlog.0r   Texist_okz/endpoints.logr[   zPADDLE_TRAINER_ENDPOINTS: 
r   ZPADDLE_NEED_RANK_MAPPINGtruez/prelaunchlog.r,   /workerlog.)envstdoutstderr
preexec_fn)r   r   )2r2   r   r   popr@   rv   r8   r!   rN   r:   r5   r   r4   r?   r   r)   rY   r   r   r   r   coreis_compiled_with_xpuupdater   r   
executabler^   r_   r   r   rp   setsidmakedirspathexistsremoveopenwritelower
subprocessPopenr   r   r   r   tellr   r   r9   )r+   r#   training_scripttraining_script_argslog_dirr   current_envidsresr   idxr<   proc_envZcoverage_argsr   fnZpre_fnfr   tpr   r   r   start_local_trainers  s   






r  c              
   C   s   | j rKt| j jd7}|| jd |D ]}ztj| W q ty3   tjd| j j d Y qw |	 | _W d    d S 1 sDw   Y  d S d S )Nr;   r   zOUnicodeEncodeError occurs at this line. Please refer to the original log file "z"
)
r   r  rp   seekr   r   r   r  UnicodeEncodeErrorr  )r  Zfinr   r   r   r   pull_worker_log_  s    "r  c              
   C   s   z@d}g }d}| D ]&}|j r|jdkrt| |j }|d u r#d}q	|dkr/d}||j q	|r>t|  t	d W |S W |S  t
yS   td t|  Y d S  tyj   td| d| d t|      td| d| d t|  Y d S )	NFr   Tr   zKeyboardInterrupt, exitzABORT!!! Out of all z) trainers, the trainer process with rank=z# was aborted. Please check its log.)r   r   r  r   r   r9   rN   r   r   r   KeyboardInterruptr^   warning
SystemExiterror)r   Znranksr  Z
error_rankr   r   retr   r   r   watch_local_trainersn  sH   

r  c                       | d u rt j }dd td|D }|S td}|d u s"|dkr.dd | dD }|S |d | dD ]}| v sIJ d| d	| d
q8 fdd| dD }td|  d| d   |S )Nc                 S   r   r   r    r"   xr   r   r   r$     r%   zget_gpus.<locals>.<listcomp>r   CUDA_VISIBLE_DEVICESrc   c                 S      g | ]}|  qS r   stripr  r   r   r   r$     r%   rd   zCan't find your gpus z in CUDA_VISIBLE_DEVICES[].c                       g | ]	}  | qS r   rz   r"  r  cuda_visible_devices_listr   r   r$         z1Change selected_gpus into relative values. --ips: will change into relative_ips:z( according to your CUDA_VISIBLE_DEVICES:)	r   r   get_cuda_device_countr`   r   getenvsplitr^   r   )gpusgpus_numZres_gpuscuda_visible_devicesr  r   r&  r   get_gpus  8   




r0  c                    r  )Nc                 S   r   r   r    r  r   r   r   r$     r%   zget_xpus.<locals>.<listcomp>r   XPU_VISIBLE_DEVICESrc   c                 S   r   r   r!  r  r   r   r   r$     r%   rd   zCan't find your xpus z in XPU_VISIBLE_DEVICES[r#  c                    r$  r   r%  r  Zxpu_visible_devices_listr   r   r$     r(  z1Change selected_xpus into relative values. --ips:r)  z' according to your XPU_VISIBLE_DEVICES:)	r   r   get_xpu_device_countr`   r   r+  r,  r^   r   )xpusZxpus_numZres_xpusZxpu_visible_devicesr  r   r3  r   get_xpus  r1  r6  c                 C   s   | dkr*t j rt j dkrtd tjS t j r*t j dkr*td tj	S | dkr<t j dkr<td tjS | dkrNt j dkrNtd tj	S | d	krYtd
 tj
S td)Nheterr   z+launch train in heter mode with GPU device.z+launch train in heter mode with XPU device.ncclzlaunch train in GPU mode!bkclzlaunch train in XPU modegloozlaunch train in CPU modezDon't supported devices)r   r   is_compiled_with_cudar*  r   r   r   r   r4  r   r   RuntimeErrorbackendr   r   r   get_device_mode  s*   r?  c                    s  t | j}g }|tjkrTt| j | jd urNt t| j dks/J dt  d| j dtt t| j  fddt	dt D }||fS  }||fS |tj
krt| j| jd urtt| j dks|J dt d| j dttt| j fddt	dtD }||fS }||fS |tjkrt| d	r| jd u rt | _| jd u rdg}||fS tt	d| j}||fS td
| d)Nr   zgpus' number:z mod args.nproc_per_node:z
 must == 0c                    s   g | ]
} ||  qS r   r   r"   ra   )r-  nr   r   r$         z(get_device_proc_info.<locals>.<listcomp>zxpus' number:c                    s   g | ]
}||   qS r   r   r@  )rA  r5  r   r   r$     rB  Zpaddle_cpuonlyzCan't support device_mode:z, support only cpu|gpu|xpu now.)r?  r>  r   r   r0  r-  Znproc_per_noder)   r   r`   r   r6  r5  r   hasattrmultiprocessing	cpu_countru   AssertionError)r   rY   r~   r   )r-  rA  r5  r   get_device_proc_info  sF   



 


 



rG  c                 C   s*   t jd| jg| j}t|}|  d S )Nr   )r   r   r  r  r  r  wait)r   r   r   r   r   r   direct_start  s   
rI  c                 C   sn   | dusJ g }|  dD ]"}| dd }| dd }t|| }|d|t|f qd|}|S )zM
    origin_endpoint: ip:port
    user_define_endpoint: ip:(port+offset)
    Nrd   rA   r   r   )r,  r   r9   r   r!   )Zorigin_endpointsr   Z!paddle_user_define_endpoints_listZip_portr   rC   Znew_portZpaddle_user_define_endpointsr   r   r   get_custom_endpoints+  s   
rJ  c                 C   s   t |tu s
J d|tjksJ dtd d}t| D ]C\}}t }||_||_||_	|| }	|| }
t
|
dks<J tt
|
D ]}t }|	|  |_|
| |_|j| qB|j| q| |}||j| fS )Nrr   ,Only support get mapped cluster for gpu now.rs   r   )rt   ru   r   r   r   rv   rT   rN   rB   rY   r)   r`   rM   r:   r8   r9   r   rz   )r{   r|   r}   rY   
node_ranksr+   r   r   r#   r   ranks_per_nodera   r   r   r   r   r   'get_mapped_cluster_without_rank_mappingc  s*   


rN  c                    s  |t jks	J dtj }d }t| jd}t|}W d    n1 s&w   Y  g }g }t	|d D ]\}}|
|d  |
|g q5t|dkrR|d }	n| jrY| j}	nt \}
}	|	|v smJ d|	 d| d	||	}t|t|ks~J d
td| d|	 d| d||   g }g }|D ]W | }tjdd urttdd}tt||t||  }n&tjdd urttjd}tt||t||  }ntt|| }|
 fdd|D  qt||	|||S )NrK  r;   ZmachinesrB   r   r   Can't find your local ip {} in node_ips: {}+ranks length should be equal to ips length.parsed from args: node_ips:	 node_ip: node_rank: node_ranks:PADDLE_PORTrc   r   c                       g | ]	}  d | qS r   r   r"   rC   r   r   r   r$         zEget_mapped_cluster_from_args_without_rank_mapping.<locals>.<listcomp>)r   r   r   r   r*  r  Zcluster_topo_pathjsonloadrv   r9   r)   hostr   rz   r^   r_   r   r   r   r   r+  ru   r`   r   rN  )r   rY   r.  Zcluster_topo	json_filer{   rL  r  Zcur_cluster_topor|   _r   
free_portsr}   r   r   rZ  r   1get_mapped_cluster_from_args_without_rank_mapping  sf   






rb  c                 C   s  t |tu s
J d|tjksJ ddd }td d}t| D ]]\}}	t }
||
_|	|
_||
_	|| }|| }|| }t
t|D ]4}t }|d t||  }t|dks[J d|j||d	  ||  |_|| |_|
j| qB|j|
 q | |}||j| fS )
Nrr   rK  c                 S   sV   t d}|d u s|dkr| S |d}|t| }td|  d| d|  |S )Nr  rc   rd   zChange gpu id from z to z based on CUDA_VISIBLE_DEVICES )r   r+  r,  rz   r!   r^   r   )Zgpu_idr/  r'  Zrelative_idr   r   r   get_relative_gpu_id  s   

zAget_mapped_cluster_with_rank_mapping.<locals>.get_relative_gpu_idrs   ranksr   z.Only support one process to one device mappingr   )rt   ru   r   r   r   rv   rT   rN   rB   rY   r`   r)   rM   r!   r?   r9   r:   r8   r   rz   )r{   r|   r}   rY   rL  node_rank_mappingsrc  r+   r   r   r#   r   rM  Zcur_node_rank_mappingra   r   Zlocal_device_idsr   r   r   r   $get_mapped_cluster_with_rank_mapping  s>   




rf  c                    sD  |t jks	J dtj }| jptd}d }t|d}t	
|}W d    n1 s-w   Y  dtjd< g }g }g }|D ]$}	||	d  dd t|	d  D }
|
  ||
 ||	 q?t|d	kro|d
 }n| jrv| j}nt \}}||v sJ d| d| d||}t|| |ksJ dt|t|ksJ dtd| d| d| d||   g }g }|D ]X | }tjdd urttdd}tt||t||  }n'tjdd urttjd}tt||t||  }ntt|| }| fdd|D  qt||||||S )NrK  r   r;   rc   rB   c                 S   r   r   r   r@  r   r   r   r$     s    zBget_mapped_cluster_from_args_with_rank_mapping.<locals>.<listcomp>rd  r   r   rO  rP  rQ  zHnumber of ranks mapped to one node should not exceed the available ones.rR  rS  rT  rU  rV  rW  r   c                    rX  r   r   rY  rZ  r   r   r$   =  r[  )r   r   r   r   r*  rank_mapping_pathr   r+  r  r\  r]  r   r9   ru   keyssortr)   r^  r   rz   r^   r_   r   r   r`   r   rf  )r   rY   r.  rh  Zrank_mappingr_  r{   rL  re  Zcur_rank_mappingZcur_node_rank_listr|   r`  r   ra  r}   r   r   rZ  r   .get_mapped_cluster_from_args_with_rank_mapping  s   








rk  c                   @   rS   )ParameterServerLauncherc                 C   s   || _ || _d| _d| _d| _d| _d| _d| _g | _g | _	d| _
g | _g | _d| _g | _g | _d| _g | _g | _d| _d| _g | _i | _g | _i | _d| _| | d S )NFr   rc   T)r   distribute_modewith_coordinator
server_num
worker_numheter_worker_numcoordinator_numserver_endpointsserver_endpoints_ipsserver_endpoints_portworker_endpointsworker_endpoints_ipsworker_endpoints_portheter_worker_endpointsheter_worker_endpoints_ipsheter_worker_endpoints_portcoordinator_endpointscoordinator_endpoints_ipscoordinator_endpoints_portis_localcurrent_node_ipstage_trainer_numstage_heter_map
stage_liststage_device_map	stage_numget_role_endpoints)r   r   rm  r   r   r   r   J  s6   z ParameterServerLauncher.__init__c              
   C   s2	  |j r;|j | _ |jr)t|jd| j ks$J dt|jd| j |j| _n(t| j d}ddd |D | _n|jdksDJ d|j| _t| jd| _ |jr|j| _|j	rzt|j	d| jksuJ dt|j	d| j|j	| _
nqt| j| j }dd	d |D | _
n^|j	dksJ d
dd |j	dD }t|| _dd |j	dD }d|v rd}t|| j  || j  | j d}g }t| jD ]}|d|| t|| f qd|| _
n|j	| _
|jr/d| _|j| _|jrt|jd| jksJ dt|jd| j|j| _nt| jd}ddd |D | _td | jtjkrZ|jdks@J dd| jd< |jd}	tt|	D ]}|	| | j|d < qQ| j
| jd< |jr|jd| _dd | jD | _|jrGt|jdt| jksJ dt|jdt| j|jd}
d| _tt| jD ]}| jdkr|  jd7  _|
| d}t|| j| ksJ d| ddd |D }dd |D }d|v rtt|| j| j  | j }g }tt|D ]}|d|| t|| f qd|}nd|}|| j|d < | j|d gt|d  |  j| j| 7  _|  j|7  _qntt| jD ]P}| j| }t|| j | j | j }ddd |D }|| j|d < | j|d gt|d  |  j|7  _| jdkr|  jd7  _|  j|7  _qNn|jdksJ dg | _|jd}
d| _tt|
D ]}|
| d}| jt| d d |D }d!d |D }d|v rtt|| j| j  | j }g }tt|D ]}|d|| t|| f qd|}nd|}|| j|d < | j|d gt|d  |  j| jd" 7  _| jdkrC|  jd7  _|  j|7  _q| jg| j| _t| j| _ |j!rc|j!g}ntd| j | j | j }| jdd dd }|d t|d  | _!d#d | jdD | _"d$d | j
dD | _#| jrd%d | jdD | _$d&d | jdD | _%d'd | jdD | _&d(d | j
dD | _'g | _(| j"D ]}|| j(vr| j(| q| j#D ]}|| j(vr| j(| q| jtjkr.d)d | jdD | _)d*d | jdD | _*| j)D ]}|| j(vr,| j(| qtt+| j(dkrBd| _,| j(d | _-n3d+| _,t./d,d }|d u rWt0 \}| _-n|| _-| jtjksu| j-| j(v suJ d-| j- d.| j( d/| j-| j(v r| j(1| j-| _2t34d0| j( d1| j- d2| j2  d S d S )3Nrd   zThe server_num and servers doesn't match. Expect servers endpoints num equal to server_num, but received servers endpoint num: {} and server_num {}r   c                 S      g | ]}d t | qS z
127.0.0.1:r    r  r   r   r   r$   {      z>ParameterServerLauncher.get_role_endpoints.<locals>.<listcomp>rc   z?The setting of Parameter-Server must has server_num or servers.zThe worker_num and workers doesn't match. Expect workers endpoints num equal to worker_num, but received workers endpoint num: {} and worker_num {}c                 S   r  r  r    r  r   r   r   r$     r  z?The setting of Parameter-Server must has worker_num or workers.c                 S      g | ]}|  d d qS rA   r   r"  r,  r  r   r   r   r$         c                 S      g | ]}t | d qS r   r)   r"  r,  r  r   r   r   r$     r  r   i  rA   TzThe coordinator_num and coordinators doesn't match. Expect coordinators endpoints num equal to coordinator_num, but received coordinator endpoint num: {} and coordinator_num {}c                 S   r  r  r    r  r   r   r   r$     r  z2>>> use default coordinator addr(only one process)zBThe setting of Parameter-Server heter mode must has heter_devices.cpu;r   c                 S   r   r   rg  )r"   Ztrainer_numr   r   r   r$     s    zThe stage_num and heter_workers doesn't match. Expect heter_workers endpoints stage num equal to heter_worker_num stage, but received heter_workers endpoint stage num: {} and heter_worker_num stage {}zThe heter trainer num in stage z= is not equal in args.heter_worker_num and args.heter_workersc                 S   r  r  r  r  r   r   r   r$         c                 S   r  r   r  r  r   r   r   r$     r  c                 S   r  r  r    r  r   r   r   r$   &  r  zVThe setting of Parameter-Server heter mode must has heter_worker_num or heter_workers.c                 S   r  r  r  r  r   r   r   r$   >  r  c                 S   r  r   r  r  r   r   r   r$   A  r  r   c                 S   r  r  r  r  r   r   r   r$   w  r  c                 S   r  r  r  r  r   r   r   r$   z  r  c                 S   r  r  r  r  r   r   r   r$     r  c                 S   r  rA   r   r  r  r   r   r   r$     r  c                 S   r  r  r  r  r   r   r   r$     r  c                 S   r  r  r  r  r   r   r   r$     r  c                 S   r  r  r  r  r   r   r   r$     r  c                 S   r  r  r  r  r   r   r   r$     r  FPOD_IPrO  z)} in args.servers and args.workers ips: {rQ  rS  z current_node_ip:rU  )5ro  rU   r)   r,  r   rs  r   r   rp  rV   rv  r`   r9   r!   rr  rn  rW   r|  r   rm  r   r   Zheter_devicesr  r  rq  Zstage_heter_trainer_numrX   ry  r  ry   r  r  	http_portrt  rw  r}  r~  ru  rx  r{   rz  r{  r   r  r  r   r+  r   rz   r   r^   r_   )r   r   r   rw  Zworker_endpoints_lenr   rx  rv  ra   Zheter_devices_listZheter_worker_endpoints_listry  rz  Zheter_worker_endpoints_lenr{  Znew_heter_worker_endpointsrK   Zip_port_listZheter_trainer_numr  Zhttp_ipr   Zpod_ipr`  r   r   r   r  n  s:  












4
















z*ParameterServerLauncher.get_role_endpointsc                 C   s  | j | jvrd S td d}d}d}d}d}t| jD ]\}}t }||_||_tt| j	D ]$}	|| j	|	 krRt
 }
| d| j|	  |
_||
_|d7 }|j|
 q.tt| jD ]'}|| j| krt
 }| d| j|  |_||_d|_|d7 }|j| qZtt| jD ]'}|| j| krt
 }| d| j|  |_||_d|_|d7 }|j| qtt| jD ]*}|| j| krt
 }| d| j|  |_||_| j| |_|d7 }|j| q|j| q|j| j }t | _g g g g d| _g g g g d| _ g g g g d| _!| "| j#| | $| j#| | j%r&| &| j#| | j't(j)kr4| *| j#| t+,d| j#j- d| j#j- d| j#j- d	| j#j- d
	 t| jd dkrt| jd D ]"\}	}| jd |	 j./  t| j!d dkr| j!d |	 0  q_t+,d t| jd dkrt| jd D ]\}	}| j!d |	 0  | jd |	 j.1  qt+,d t| jd dkrt| jd D ]\}	}| j!d |	 0  | jd |	 j.1  qt+,d t| jd dkrt| jd D ]\}	}| j!d |	 0  | jd |	 j.1  qt+,d nBt| jd dkr6t| jd D ]\}	}| jd |	 j./  q&t| jd dkrWt| jd D ]\}	}| jd |	 j./  qGt2j34| jrgt56| j d S d S )Nrs   r   rA   r   )workercoordinatorserverheter_workerzDPlease check servers, workers, coordinator and heter_worker logs in z/workerlog.*, z/serverlog.* , z/coordinatorlog.*, and z/heterlog.*r  zDall workers exit, going to finish parameter server and heter_worker.r  zall heter_worker are killedr  zall parameter server are killedr  zall coordinators are killed)7r  r{   r   rv   rT   rN   rB   r`   r)   rt  rM   ru  r:   rU   r9   rw  rx  rO   rV   r}  r~  rW   rz  r{  r  rX   r   r   tempfilemkdtempgloo_rendezvous_dirr   cmdslog_fnsstart_pod_serverr   start_pod_workerrn  start_pod_coordinatorrm  r   r   start_pod_heter_workerr^   r   r	  r   rH  r   r   r   r   r   shutilrmtree)r   r+   Zserver_rankZworker_rankZheter_worker_rankZcoordinator_rankr   r   r#   ra   r  rK   r  mr  r   r  r   r   r   r   start_ps  s   

,


z ParameterServerLauncher.start_psc                 C   s  t j }t|}|dd  |dd  t|jD ]\}}| jtjkrP| j	| j
| j| j|jdd dt| j|jdd tt ddd	| j| jd
}n(| j	| j
| j|jdd dt| j|jdd tt ddd	| j| jd}|| tjd|jg|j}| jd | |dkrtdt|jt|d |j d urt j!|j dd t"|j  d| d}	| j#d |	 t$j%|||	|	d}
nt$j%||d}
t& }|
|_'|j(|_(||_)|	|_*|	r|	+ nd |_,||_-| j.d | qd S )Nr   r   rA   r   ZPSERVERr   PADDLE_WITH_GLOO03)PADDLE_PSERVERS_IP_PORT_LISTr   PADDLE_COORDINATOR_ENDPOINTS%PADDLE_ALL_HETER_TRAINER_IP_PORT_LISTrW  TRAINING_ROLEr   r  r  PADDLE_GLOO_RENDEZVOUSPADDLE_GLOO_FS_PATHPADDLE_GLOO_HTTP_ENDPOINT)r  r   r  rW  r  r   r  r  r  r  r  r   r  z`Local server start {} processes. First process distributed environment info (Only For Debug): {}r   Tr   z/serverlog.r[   r   r   r   r   )/r   r   r2   r   rv   rU   rm  r   r   rs  rv  r|  ry  r:   r,  r!   rp  r+  r  r  r   r   r   r  r  r  r9   r^   r   r   r)   r   r	  r   r  r  r  r  r   r   rN   r   r   r  r   r   r   )r   r   r#   default_envr
  r  Z
cur_serverr  r   r  r   r  r   r   r   r  7  s   




z(ParameterServerLauncher.start_pod_serverc              	   C   s0  t j }t|}|dd  |dd  d}g }tj r)t|j}t	|}ntj
 r=tj }dd td|D }t|jD ]R\}}|dkrMdnt|||  }	| jtjkri d| jd| jd	t| jd
| jdt| jdddt| jddd| jd d| jd| jd ddd|jdd d|jdd dt|jdtt dddd| j dd|	|	| j!d}
nOi d| jd| jd	t| jddd
| jd|jdd d|jdd dt|jdtt ddddd | j d!dd"dd#|	d$|	d%| j!}
|"|
 t#j$d&|j%g|j&}| j'd' (| |dkr>t)*d(+t	|jt,|
d) |j-d urit j.|j-d*d+ t/|j- d,| d-}| j0d' (| t1j2||||d.}nt1j2||d/}t3 }||_4|j|_||_5||_6|r|7 nd |_8||_9| j:d' (| qBd S )0Nr   r   r   c                 S   r   r   r    r  r   r   r   r$     r%   z<ParameterServerLauncher.start_pod_worker.<locals>.<listcomp>r  r  r   r   r  PADDLE_STAGE_TRAINERS_NUMSTAGE_ID1	STAGE_NUM*PADDLE_PREVIOUS_HETER_TRAINER_IP_PORT_LISTrc   &PADDLE_NEXT_HETER_TRAINER_IP_PORT_LISTr   r  HETER_DEVICE_TYPEr   r  ZTRAINERr  rA   rW  r   r  r  r  )r  r   r   r  r2  r  r  r   r   r  r2  r  r   r  z`Local worker start {} processes. First process distributed environment info (Only For Debug): {}r   Tr   r   r[   r  r  );r   r   r2   r   r   r   r;  r0  r-  r)   r   r4  r`   rv   rV   r!   rm  r   r   rs  rv  rp  r|  r  r  r  ry  r  r:   r,  rN   r+  r  r  r   r   r   r  r  r  r9   r^   r   r   r   r	  r   r  r  r  r  r   r   r   r   r  r   r   r   )r   r   r#   r  r
  heter_device_numdevice_listr  Z
cur_worker	device_idr  r   r  r   r  r   r   r   r    s  









	




	



z(ParameterServerLauncher.start_pod_workerc              	   C   s  t d tj }t|}|dd  |dd  t|jD ]\}}d}i d| jd| jdt	| j
d| jd	t	| jd
dd|jdd d|jdd dt	|jdt	tddddd| jddddd|d|d| j}|| tjd|jg|j}	| jd |	 |dkrtdt|jt|d |jd urtj |jdd  t!|j d!| d"}
| j"d |
 t#j$|	||
|
d#}nt#j$|	|d$}t% }||_&|j|_||_'|
|_(|
r|
) nd |_*|	|_+| j,d | qd S )%Nz">>> entering start_pod_coordinatorr   r   r  r  r   r   r  ZPADDLE_COORDINATOR_NUMr  ZCOORDINATORr  rA   r   rW  r   r   r  r  r  r  r   r   r  r2  r  r   r  zeLocal coordinator start {} processes. First process distributed environment info (Only For Debug): {}r   Tr   z/coordinator.r[   r  r  )-r   r   r   r2   r   rv   rW   rs  rv  r!   rp  r|  rr  r:   r,  rN   r+  r  r  r   r   r   r  r  r  r9   r^   r   r   r)   r   r	  r   r  r  r  r  r   r   r   r   r  r   r   r   )r   r   r#   r  r
  r  Zcur_coordinatorr  r  r   r  r   r  r   r   r   r    s   




	



z-ParameterServerLauncher.start_pod_coordinatorc              	   C   s  t j }t|}|dd  |dd  d}g }tj r)t|j}t	|}ntj
 r=tj }dd td|D }t|jD ]
\}}|dkrMdnt|||  }	|j}
i d| jd| jd	|
| jd
 krp| j|
d
  ndd| j|
d
  d| jd| j|
 dt|
dt| jd|jdd
 dddt| jdt| jd|jdd dtt ddddd| jddd|	|	| jd}|| tj d|j!g|j"}| j#d $| |dkrt%&d 't	|jt(|d! |j)d ur!t j*|j)d"d# t+|j) d$| d%}| j,d $| t-j.||||d&}nt-j.||d'}t/ }||_0|j1|_1||_2||_3|r?|4 nd |_5||_6| j7d $| qBd S )(Nr   r   r   c                 S   r   r   r    r  r   r   r   r$   B  r%   zBParameterServerLauncher.start_pod_heter_worker.<locals>.<listcomp>r  r  r   r  r   rc   r  r  r  r  r  rW  rA   r  ZHETER_TRAINERr   r  r  r  r  r  r  r   )r   r  r2  r  r   r  zfLocal heter_worker start {} processes. First process distributed environment info (Only For Debug): {}r   Tr   z
/heterlog.r[   r  r  )8r   r   r2   r   r   r   r;  r0  r-  r)   r   r4  r`   rv   rX   r!   rO   rs  rv  r  r  ry  r  r:   r,  rp  r  r+  r  r  r   r   r   r  r  r  r9   r^   r   r   r   r	  r   r  r  r  r  r   r   rN   r   r   r  r   r   r   )r   r   r#   r  r
  r  r  r  Zcur_heter_workerr  Zstage_idr  r   r  r   r  r   r   r   r  5  s   












z.ParameterServerLauncher.start_pod_heter_workerN)
r	   r
   r   r   r  r  r  r  r  r  r   r   r   r   rl  I  s    $  K LmErl  c                 C   sl   | dvrt d|  | dkrtj st d| dkr%tj s%t d| dkr2tj s4t dd S d S )	N)r8  r:  r9  autor7  Zxcclflagcxzpaddle.distributed initialize error, backend argument can only be one of 'nccl', 'gloo', 'bkcl', 'auto', 'heter', 'xccl' but got r8  zlpaddle.distributed initialize error, your paddle is not compiled with cuda but you assign 'nccl' as backend.r9  zkpaddle.distributed initialize error, your paddle is not compiled with xpu but you assign 'bkcl' as backend.r  zppaddle.distributed initialize error, your paddle is not compiled with flagcx but you assign 'flagcx' as backend.)
ValueErrorr   r   r;  r   Zis_compiled_with_flagcxr=  r   r   r   check_backend  s&   	r  c                 C   s2   | dkrd S t jdrtdt jrtdd S )Nr:  darwinzDYou are going to using gloo on macos, but currently is not supportedzFYou are going to using gloo on windows, but currently is not supported)utilsZOS_NAME
startswithr  Z
IS_WINDOWSr=  r   r   r   block_windows_and_macos  s   r  c                   C   s    t j rdS t j rdS dS )Nr8  r9  r:  )r   r   r;  r   r   r   r   r   get_backend_by_compile_flag  s
   

r  )rg   r   r   )NN)r   )<r2   r\  rh   rD  r   r  r   r   r   r  r   r  r   
contextlibr   Z*paddle.utils.cpp_extension.extension_utilsr  Zcpp_extensionZextension_utilsZpaddler   Zpaddle.utilsr   ri   r^   	propagater   r   r   rH   rM   rT   rq   r   r   r   r   r   r   r   r   r   r   r  r  r  r0  r6  r?  rG  rI  rJ  rN  rb  rf  rk  rl  r  r  r  r   r   r   r   <module>   sx   

D
M*&	
)


),
8?8R      O$