o
    piv.                     @   sP   d dl Z d dlZddlmZ ddlmZmZ G dd deZG dd	 d	eZdS )
    N   )
DeviceType   )
ControllerControllerModec                       sJ   e Zd Z fddZedd Zdd Zdd Zd	d
 ZdddZ	  Z
S )CollectiveControllerc                    s   d | _ t | d S )N)_tuner_run_modesuper__init__)selfctx	__class__ w/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/launch/controllers/collective.pyr
      s   zCollectiveController.__init__c                 C   s*   |r|j | j d tj|j_dS dS )N enabledTF)loggerdebug__name__r   
COLLECTIVEargsrun_modeclsr   r   r   r   enable   s
   
zCollectiveController.enablec                 C   sV   |   }|rd S | jjjd u r| jjjr| jjjr|  S | jjjd u r&d}| |S )NT)	_build_pod_with_tunerr   r   master
start_portips_build_pod_with_argsauto_parallel_config_build_pod_with_master)r   Zskip_runr   r   r   	build_pod%   s   
zCollectiveController.build_podc              
   C   s  | j jj}|d urtj|s| j jd |ds"| j jd t	|d}t
| }|dd| _W d    n1 s@w   Y  | j jd| j  d| j j  }|  }| jd	v r| j jjd
|d|dd|t| j jj  | d	}d}| j||dd | jdkrdS dS )Nzauto_parallel_conf not exists!z.jsonz2auto_parallel_config should be a json format file!rZtuner_run_modetuner_and_runztuner_run_mode is: z
127.0.0.1:)
tuner_onlyr$   10Ztuner)	PADDLE_AUTO_PARALLEL_CONFIGPADDLE_TRAINERS_NUMPADDLE_TRAINER_ENDPOINTSPADDLE_TRAINER_IDPADDLE_CURRENT_ENDPOINTZFLAGS_selected_gpusPADDLE_AUTO_PARALLEL_STAGEPADDLE_GLOBAL_SIZEPADDLE_LOCAL_SIZEz	tuner.logT)envslog_fileZis_initr%   F)r   r   r    ospathexistsr   warningendswithopenjsonloadsreadgetr   infonodeget_free_portpod_replicasintZnnodesadd_container)r   r    ZrobjZauto_parallel_dataZendpointr?   er1   r   r   r   r   5   sB   



z*CollectiveController._build_pod_with_tunerc           	         s      j_t jjj jjjd} fdd|D } jj	
d|   jj	d|d  d |d  jj_ jjj|v rP| jjj jj nd} |  jjj } jjj jjj}t jjD ]} jjjt|  jj ||  | t| |||  ||  t| t|t jjjd}|d	d|i  jd ur| jjjd
d t|dkr jjjjtjkr| jjj    jjdkr||d|i n|||| i n|ddi d| } j!||d qodS )N,c                    s0   g | ]}t  jjD ]}| d |  q
qS :)rangepodreplicas).0hpr   r   r   r   
<listcomp>a   s    
z=CollectiveController._build_pod_with_args.<locals>.<listcomp>zjob endpoints: z1master is set by args, it will be overwritten by r   .ZPADDLE_MASTERr.   r/   ZPADDLE_GLOBAL_RANKZPADDLE_LOCAL_RANKZPADDLE_NNODESr,   r+   r)   ZPADDLE_RANK_IN_NODEZPADDLE_AUTO_CLUSTERr*   runr(   r-   r   PADDLE_DISTRI_BACKENDgloo
workerlog.r0   r1   T)"r?   rG   rH   r@   r   r   r   r   splitr   r   r5   r   r=   ipindexsave_pod_logdeviceget_selected_device_keyget_selected_devicesdevicesrF   lenstrauto_cluster_configupdatejoinr   r    dtyper   CUSTOM_DEVICEget_custom_device_envsrA   )	r   r   job_endpointsrank_offsetselected_dev_keyselected_dev_listirB   r1   r   rL   r   r   [   sd   




z)CollectiveController._build_pod_with_argsTc                    s      j_t jjj j_ jj } fdd jj	 jj jjD }t
 jj jj jj jjjj jjj d| d|d} jd jj d jj| jj jj\}}| j_t|dk rmd	S d
d |D } jjd|   | tdd |D }tdd |d | D }	 |d d }	|	dd  }
|
tjd< dd |D }|r j   jjj } jjj  jjj!}t" jjD ]}|	|  jj ||  |  jj || ||  | t#|t# jjj$d}|%dd|i  j&d ur|% jjj'dd t|dkrN jjjjt(j)kr2|% jjj*   jjdkrD|%|d|i n|%||| i n|%ddi d| } j+||d qdS )Nc                    s    g | ]} j jj d | qS rD   )r   r=   rW   )rI   rK   r   r   r   rM      s    z?CollectiveController._build_pod_with_master.<locals>.<listcomp>rE   rC   )namerankrH   rc   	candidate	endpoints/z/infor   Fc                 S   s   g | ]}t |qS r   )r8   r9   rI   rj   r   r   r   rM      s    zsync peers done c                 S      g | ]}|d  qS rH   r   rq   r   r   r   rM          c                 S   rr   rs   r   rq   r   r   r   rM      rt   r   rn   ZCOLLECTIVE_MASTER_IPc                 S   rr   )ro   r   rq   r   r   r   rM      rt   rO   r*   rP   rQ   rR   rS   rT   rU   T),r?   rG   rH   r@   r   r   rm   r=   r>   Zget_free_portsr8   dumpsrl   rZ   rc   rW   rb   r   Z
sync_peersjobidr^   r   r   rY   sumrV   stripr2   environresetr[   r\   r]   rF   r_   r`   ra   r   r    r   rd   re   rA   )r   Z	reset_podportro   dataZ	peer_listrm   Zglobal_sizerg   Zcollective_masterZcollective_master_iprf   rh   ri   rj   rB   r1   r   rk   r   r!      s   






z+CollectiveController._build_pod_with_master)T)r   
__module____qualname__r
   classmethodr   r"   r   r   r!   __classcell__r   r   r   r   r      s    
	&Hr   c                   @   s(   e Zd Zedd Zdd Zdd ZdS )CollectiveElasticControllerc                 C   s<   |j jr|j jdr|j| j d tj|j _dS dS )Nzetcd://r   TF)	r   r   
startswithr   r   r   r   r   r   r   r   r   r   r     s
   
z"CollectiveElasticController.enablec                 C   s4   | j jdkr| jjd | j| j j| jj d S )Ndefaultz?Using default job name may cause conflict, add --job_id in args)	rv   rw   r   r   r5   r   Zregister_heartbeatrG   rl   rk   r   r   r   register  s
   z$CollectiveElasticController.registerc                 C   s.  t | jjj}| jjr|n|d }|   | jj| jjj	kr| 
  | jjd | j| jj| jj|\}}|r>|| j_n!| jjd| j  | j r^| jjd dd l}|d n+| jjd| j  |  soq| j| jjj |   |  rn	| jj| jjj	ks| jjd| j  d S )	N
   zWaiting peer ready...zpeer not ready z&Failed to start peer, auto tuner exit.r   zRun z	Job done )r@   r   r   Zelastic_timeoutrv   Zelasticr   rG   ZrestartZmax_restartZ	build_jobr   r<   r   Zwait_peer_readyZreplicas_minZreplicas_maxrH   r5   Zis_auto_tuner_modesysexitr   r"   Z
set_statusstatusRUNNINGZ
deploy_podwatch)r   timeoutokrH   r   r   r   r   rP     s8   


!zCollectiveElasticController.runN)r   r~   r   r   r   r   rP   r   r   r   r   r     s
    
r   )	r8   r2   Zcontext.devicer   
controllerr   r   r   r   r   r   r   r   <module>   s    v