o
    pi3V                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZmZ d dlmZ ddlmZ eddZd	Zd
ZdZdZG dd dZG dd dZG dd dZG dd dZdS )    )annotationsN)cloud_utilslaunch_utils)
get_logger   )getenv_or_backupINFOELASTICe   f   x   <   c                   @  s   e Zd ZdZdZdS )ElasticLevel      N)__name__
__module____qualname__FAULT_TOLERANCEr	    r   r   o/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/fleet/elastic/manager.pyr   ,   s    r   c                   @  s    e Zd ZdZdZdZdZdZdS )ElasticStatus	completederrorZholdZrestartexitN)r   r   r   	COMPLETEDERRORHOLDRESTARTEXITr   r   r   r   r   1   s    r   c                   @  s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )LauncherInterfacec                 C  s   || _ g | _d S N)argsprocs)selfr"   r   r   r   __init__:   s   
zLauncherInterface.__init__c                 C  s  t jdkr5| jD ]'}|j d u r/t t |jjtj	 |j
r%|j
  td|jj  qtd | jD ] }|j d u rX|j  |j
rN|j
  td|jj  q8tddD ]+}d}| jD ]}|j d u ryt |jjtj d}qe|std	  dS td q^dS )
Nntzterminate process group gid:r   zterminate process id:r   2   FTzterminated all the procs)osnamer#   procpollkillpggetpgidpidsignalSIGTERMZlog_fncloseloggerinfotimesleep	terminaterangekillSIGKILL)r$   pstepaliver   r   r   _terminate_procs>   s8   








z"LauncherInterface._terminate_procsc                 C  s   d}d }| j D ]3}|j }|d u rd}q|dkr:|tkr&td |  S td td|j d| d |}q|sC|d u rCdS |S )	NFTr   z+return form elastic auto parallel re-launchzABORT!!! ABORT!!! ABORT!!!zERROR rank z error with exit code z, check log for detail.)r#   r*   r+   ELASTIC_AUTO_PARALLEL_EXIT_CODEr2   r3   r   rank)r$   r<   resultr:   retr   r   r   _check_procs^   s&   



zLauncherInterface._check_procsc                 C     t r!   NotImplementedErrorr$   r   r   r   launchs      zLauncherInterface.launchc                 C  rC   r!   rD   rF   r   r   r   stopv   rH   zLauncherInterface.stopc                 C  rC   r!   rD   rF   r   r   r   watchy   rH   zLauncherInterface.watchN)	r   r   r   r%   r=   rB   rG   rI   rJ   r   r   r   r   r    9   s     r    c                   @  s   e Zd Zdd Z	d0d1ddZd2ddZdd Zd3ddZdd Zdd Z	d4d5ddZ
dd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ ZdS )6ElasticManagerc                   s$  |_ |jp
td}|jptd}|j\__|j	p$td}|j
p/ttdd}|jp7td}|r<|n _	t|\__ttdt_ttdt d _t rtd	d
_tjd_ttdd_tdd
_tdd
}|d_n2|jptd	d
_jd}	t|	_ttdd_|	jj_fdd|	D _j	 dj _ t!"dj  t!"dj dj  ttdt#j$_%jjksjdkrjdkrt#j$_%t!"d jdkrjjkrt#j&_%t!"d |s6tdr6tdr6d'tdtd}t!(d| d|  g _)d_*d_+d_,d _-|r_d|vs_|r_jsst!"d | d!| d"j  d_.d S d#_.|_/d$| _0j0d% _1j0d& _2j0d' _3d
4d(d) t5d*D }
j1 d+|
 t66  _7	 j/8j0d, fd-d.}j/9j1|}j/:  fd/d0}t;j<d1|d#d2}|=  j/j8j7j >d3d4 j/8j3j d5j >d3 fd6d7}j/?j3|}||g_@d _Ad S )8NZPADDLE_ELASTIC_SERVERZPADDLE_ELASTIC_JOB_IDZPOD_IPZPADDLE_ELASTIC_SCALEr   ZPADDLE_ELASTIC_FORCEZPADDLE_ELASTIC_TIMEOUTZPADDLE_ELASTIC_TTLPADDLE_TRAINERS ,ZPADDLE_PORTZ6170DISTRIBUTED_TRAINER_ENDPOINTSPADDLE_TRAINER_ENDPOINTSZFLAGS_START_PORTc                   s   g | ]
}| d  j  qS :)
start_port).0iprF   r   r   
<listcomp>   s    z+ElasticManager.__init__.<locals>.<listcomp>rR   zstart job with np=z	trainers=z, trainer_endpoints_list=Z#PADDLE_ELASTIC_FAULT_TOLERANC_LEVELz+start job with ElasticLevel.FAULT_TOLERANCEz#start job with ElasticLevel.ELASTICZ PADDLE_ELASTIC_ETCD_SERVICE_HOSTZ PADDLE_ELASTIC_ETCD_SERVICE_PORTz{}:{}zinit with server z host Fz#Elastic is not enabled with server z name z and np Tz/paddle/z/nodesz/npz
/endpointsc                 s  s    | ]}t d V  qdS )abcdefghijklmnopqrstuvwxyzN)randomchoice)rT   _r   r   r   	<genexpr>   s    

z*ElasticManager.__init__.<locals>.<genexpr>   /   0c                   s`   dd  j  jD  _ jrtt jn j _td j d j  d _	d  _
d S )Nc                 S     g | ]}|d    qS r   decoderT   ir   r   r   rV          zCElasticManager.__init__.<locals>.host_call_back.<locals>.<listcomp>zhost_call_back curr_host=z, hosts:T)etcd
get_prefixnode_prefixhostslistsetr2   r3   	curr_host	need_syncelastic_startup_time)eventrF   r   r   host_call_back   s   
z/ElasticManager.__init__.<locals>.host_call_backc               
     s   	 zD   dd jjD } | rtt| n| } tdj d|   j| vrDtdj  jj	j
jdd W n  tye } ztd	| d
t   W Y d }~d S d }~ww t d  q)NTc                 S  r_   r`   ra   rc   r   r   r   rV     s    
zDElasticManager.__init__.<locals>.lease_heartbeat.<locals>.<listcomp>z[lease_heartbeat] curr_host=, hosts=z [lease_heartbeat] register host=latin-1leasez![lease_heartbeat] internal error: r   )refreshrf   rg   rh   rj   rk   r2   r3   rl   put	host_pathencode	Exceptionr   	traceback
format_excr4   r5   )ri   eZelastic_ttlZ
host_leaser$   r   r   lease_heartbeat   s:   


z0ElasticManager.__init__.<locals>.lease_heartbeatr   )r)   targetdaemonrr   rs   |c                   sn    j sd S  j jd }|d ur| nd}|d\ _  _td j  d td j d d S )Nr   rM   r   z"set DISTRIBUTED_TRAINER_ENDPOINTS ru   zset PADDLE_TRAINERS )	dist_endpointsrf   getendpoints_pathrb   splittrainersr2   r3   )ro   valueZedpsrF   r   r   endpoints_call_back*  s   z4ElasticManager.__init__.<locals>.endpoints_call_back)Br"   Zelastic_serverr(   getenvZjob_id	_parse_npnpmin_npmax_nphostscaleintforce	_get_hostr   Zget_device_proc_infoZdevice_modedevices_per_procELASTIC_TIMEOUTelastic_timeoutELASTIC_TTLrS   r   Zuse_paddlecloudr   lenr   r   r   trainer_endpoints_listips_host_to_endpointsrl   r2   r3   r   r   elastic_levelr	   formatdebugri   stoppedsigintrm   rn   enablerf   prefixrh   Znp_pathr   joinr7   r4   rx   rw   Zadd_watch_prefix_callbackrt   	threadingThreadstartry   Zadd_watch_callbackwatcheslauncher)r$   r"   Zetcd_clientserverr)   r   r   r   Ztrainer_endpointsZnode_ipsZnode_tagrp   Z
host_watchr   Zkeepalived_threadr   Zendpoints_watchr   r~   r   r%   ~   s   




$





zElasticManager.__init__  ip_port_listrj   r   rS   r   returnstrc           
        s~   g }|D ]3}| d}t|dkr|d  t|d }n| |}tt||t| }| fdd|D  qd|}	|	S )NrR   r   r   r   c                   s   g | ]	}  d | qS rQ   r   )rT   portrU   r   r   rV   J      z5ElasticManager._host_to_endpoints.<locals>.<listcomp>rN   )r   r   r   rj   r7   extendr   )
r$   r   r   rS   Zendpoint_listip_port	endpointsr   Zportsr   r   r   r   r   <  s   

z!ElasticManager._host_to_endpointsFc                 C  s   t d|  | jr| j  | jsd S |r| j| jd | jD ]}| j	| q"| j
| j t| j| j}t|dkrJ| j| j d S d S )Nzmanager exist completed    1r   )r2   r3   r   rI   r   rf   rw   r   r   Zcancel_watchdeleterx   rj   rg   rh   r   Zdelete_prefix)r$   r   rJ   ri   r   r   r   r   O  s   

zElasticManager.exitc                 C  s   | j jstd d S td ttj }tj| j j|tj	tj	dd
 \}}|r3td d S td|d   d S )Nzskip pre_hookzexecute pre_hook...T)envstdoutstderrshellzpre_hook exec failedzpre_hook exec result: zutf-8)r"   Zelastic_pre_hookr2   r3   copyr(   environ
subprocessPopenPIPEcommunicatewarningrb   strip)r$   Zcurrent_envouterrr   r   r   pre_hookc  s"   

zElasticManager.pre_hookr   c                 C  s   |pt dd}|d}d }}t|dkr+t|d }|dkr#dn|}d}||fS t|dkrNt|d }t|d }|dkrCdn|}t||}||fS td| d)	z1
        np format is "MIN" or "MIN:MAX"
        ZPADDLE_ELASTIC_NP0rR   r   r   r   zthe np=z) needs to be in "MIN" or "MIN:MAX" format)r(   r   r   r   r   max
ValueError)r$   r   Znp_strZnp_dictr   r   r   r   r   r   u  s"   


zElasticManager._parse_npc                 C  s$   zt t t  W S    Y dS )Nz	127.0.0.1)socketgethostbynamegetfqdngethostnamerF   r   r   r   r     s   zElasticManager._get_hostc                 C  s$   | j sdS t| j| jd dkS )NTr   r   )r   r   rf   r   r   rF   r   r   r   
_completed  s   zElasticManager._completedN	host_listlist | Nonec              
   C  s  |r|| _ ndd | j| jD | _ | j rtt| j n| j | _ | jtjkr3t	| j | j
kr1dS dS | jtjkrt	| j }|| j
krEdS | jsMt | _|| jkrWd | _dS || jkr|| jk rt | j }|| jkrtd| d| j d| d| j  dS dS d | _dS dS )	Nc                 S  r_   r`   ra   rc   r   r   r   rV     re   z)ElasticManager._match.<locals>.<listcomp>TFzawait for timeout, you can set value by PADDLE_ELASTIC_TIMEOUT,                         hosts_num=z	, min_np=z(,                         interval_time=z, elastic_timeout=)ri   rf   rg   rh   rj   rk   r   r   r   r   r   r	   rn   r4   r   r   r   r2   r3   )r$   r   Z	hosts_numZinterval_timer   r   r   _match  sL   




zElasticManager._matchc                 C  s$   | j | j| d| d d S )Nr   rr   )rf   rw   r   ry   )r$   r   ri   r   r   r   _update_endpoint  s   zElasticManager._update_endpointc                 C  s   t tdd}td| j d| j  | j| jv r=| jtjd< | jtjd< t	d| j d t	d	| j d d S | j
| j}|d
krW| j
| | j
|< | j| j
|< n| tjd< ddd | j
D }|| j_|tjd< d S )NPADDLE_TRAINER_IDzself.curr_host=z, self.dist_endpoints=rO   rL   z)update env DISTRIBUTED_TRAINER_ENDPOINTS ru   zupdate env PADDLE_TRAINERS r   rN   c                 S     g | ]	}| d d qS rR   r   r   rT   	host_portr   r   r   rV     r   z:ElasticManager._update_fault_tolerance.<locals>.<listcomp>)r   r(   r   r2   r   rl   r   r   r   r3   ri   indexr   r"   r   )r$   r?   idxri   r   r   r   _update_fault_tolerance  s(   z&ElasticManager._update_fault_tolerancec              
   C  s   t | j}tdt| j d| j d| j d|  | jD ]}||vr*|| qt	|
| jtjd< ddd |D }|| j_|tjd	< t|| _d|tjd
< | jtjd< || _d S )Nzelastic scale out, from  to rq   , host_endpoints=r   rN   c                 S  r   r   r   r   r   r   r   rV     r   z<ElasticManager._update_elastic_scale_out.<locals>.<listcomp>rL   rP   rO   )r   deepcopyr   r2   r3   r   ri   r   appendr   r   rl   r(   r   r   r"   r   r   )r$   host_endpointsZcurr_host_portri   r   r   r   _update_elastic_scale_out  s(   $






z(ElasticManager._update_elastic_scale_outc              
   C  s|  t | j}td| j dt| j d| j d|  i }g }t| jD ]!\}}|	|}|t| jd krA|
|sA|||< q%|| q%d}g }tt| jD ]}|
|sit|dkri|| ||< |d7 }||
| qRtd|  || _dd	 |D }	d
|	}
| || j}|
| j_t|	| jtjd< |
tjd< t|| _d
|tjd< |tjd< | ||
 d S )Nzelastic scale in, from r   rq   r   r   r   z#elastic scale in, sorted_endpoints=c                 S  r   r   r   )rT   r   r   r   r   rV     r   z;ElasticManager._update_elastic_scale_in.<locals>.<listcomp>rN   r   rL   rP   rO   )r   r   r   r2   r3   r   r   ri   	enumerater   r   r   r7   r   r   r   r"   r   r   rl   r(   r   r   )r$   r   Zendpoints_dictZunsorted_endpointsidr   r   Z
idle_indexZsorted_endpointsZip_listri   Znew_endpointsr   r   r   _update_elastic_scale_in  sD   $








z'ElasticManager._update_elastic_scale_inc                 C  s   t | jdksJ d| jtjkr|   d S t | j| jkr.td| j  |   d S t | j| jkr<| 	  d S | 
  d S )Nr   zhosts emptyzelastic startup, hosts=)r   ri   r   r   r   r   r   r2   r3   r   r   rF   r   r   r   _update_hosts,  s   zElasticManager._update_hostsc                 C  sp   | j sd S d}| js6|  rtd| j  |   d S td| j d| j  |d7 }t	d | jr
d S )Nr   zready with hosts znot ready for np z with hosts r   )
r   r   r   r2   r3   ri   r   r   r4   r5   )r$   r   r   r   r   wait=  s   
zElasticManager.waitc                 C  s$   | j rd S || j| _| j  d S r!   )r   r"   r   rG   )r$   r   r   r   r   runL  s   zElasticManager.runc                 C  s   | j rd| _ | jsm| j }td|  |d urRtd|  |tkr3td | j  t	j
S |dkr9dnd}| j|d |rFt	jS | jtjkrOt	jS t	jS |  se|  r]| j re| j  t	j
S td | jr	| jru| j  t	jS )	NFzlauncher.watch():zjob exit with code zjob re-launch for auto parallelr   T)r   r   )rm   r   r   rJ   r2   r   r3   r>   rI   r   r   r   r   r   r   r   r   r   r   r   r4   r5   r   )r$   rA   r   r   r   r   rJ   S  s4   





zElasticManager.watchc                 C  s   | j r|   || _d| _d S )NT)r   r   r   r   )r$   r   framer   r   r   signal_handlerw  s   
zElasticManager.signal_handler)r   )r   rj   r   rj   rS   r   r   r   )F)r   r   r!   )r   r   )r   r   r   r%   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rJ   r   r   r   r   r   rK   }   s(     @

)1$rK   )
__future__r   r   r(   rX   r/   r   r   r   r4   r{   Zpaddle.distributed.fleetr   r   Z"paddle.distributed.utils.log_utilsr   Z
backup_envr   r2   ZELASTIC_EXIT_CODEr>   r   r   r   r   r    rK   r   r   r   r   <module>   s,   
D