o
    pivJ                     @   s   d dl Z d dlZd dlZd dlm  m  mZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZmZmZ d d	lmZ G d
d deZe ZG dd dejZG dd deZG dd deZdS )    N)base)CompiledProgram)Executor)Program)CheckpointSaverPaddleModel)RawProgramOptimizer)DistributedOptimizerFleetMode)ioc                       s   e Zd Z fddZdd ZdddZddd	Zd
d Zdd ZdddZ					dddZ
	dddZ			dddZ			dddZ  ZS )
Collectivec                    s<   t  tj d| _d | _d | _d | _d | _d| _	d| _
d S )Nr   Z__paddle_fleet_checkpoint__Z_paddle_fleet_param__)super__init__r   Z
COLLECTIVEZ	_local_ipstartup_program_origin_program_transpiled_programmain_programZ_checkpoint_prefixZ_param_file_nameself	__class__ s/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/incubate/distributed/fleet/collective.pyr   %   s   
zCollective.__init__c                 C      t d d S )Nz=You should not call 'init_worker' method for collective mode.loggingwarningr   r   r   r   init_worker0      zCollective.init_workerNc                 C   r   )Nz<You should not call 'run_worker' method for collective mode.r   )r   Zmain_programsZscopesr   r   r   
run_worker5   r   zCollective.run_workerc                 C   r   )Nz=You should not call 'init_server' method for collective mode.r   )r   Z	model_dirr   r   r   init_server:   r   zCollective.init_serverc                 C   r   )Nz<You should not call 'run_server' method for collective mode.r   r   r   r   r   
run_server?   r   zCollective.run_serverc                 C   r   )Nz=You should not call 'stop_worker' method for collective mode.r   r   r   r   r   stop_workerD   r   zCollective.stop_workerc                 C   s   t ||| _| jS N)CollectiveOptimizer
_optimizerr   	optimizerstrategyr   r   r   distributed_optimizerI   s   z Collective.distributed_optimizerFc                 C   sL   t |ts	J d|du r| j}t |tsJ dtj||||||d dS )z
        Prune the given `main_program` to build a new program especially for
        inference, and then save it and all related parameters to given
        `dirname` by the `executor`.
        LIn fleet.save_inference_model() function, executor must be as Executor type.NOIn fleet.save_inference_model() function, main_program must be as Program type.)programlegacy_format)
isinstancer   r   r   r   save_inference_model)r   executorZpath_prefixZ
feeded_vasZ
fetch_varsr-   r.   r   r   r   r0   M   s    
zCollective.save_inference_modelc                 C   sL   t |ts	J d|du r| j}t |tsJ dtjjj||||d dS )a  
        This function filters out all variables with `persistable==True` from
        the give `main_program` and then saves these variables to the folder
        `dirname` or file `filename`.

        The `dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set `filename` None; if you would like to save all variables in a
        single file, use `filename` to specify the file name.
        r+   Nr,   )filename)r/   r   r   r   paddledistributedr   save_persistables)r   r1   dirnamer   r2   r   r   r   r5   p   s   
zCollective.save_persistables.cacheTc	                 C   sT   |du r| j }t||}	|}
t|}|j||	|
g||d\}}|s&|| ||fS )zP
        This function save persistables and current epoch num to path.
        N)pathZslists
trainer_idlocal_cache_path)r   r   r   save_checkpointZclean_redundant_checkpoints)r   r1   r8   r9   train_statusfsr   r:   Zremain_all_checkpointmtc	real_pathZcheckpoint_nor   r   r   r;      s   


zCollective.save_checkpointc	                 C   s8   |du r| j }t||}	t|}
|
j||	|g|||dS )zR
        This function load persistables and current epoch num from path.
        N)r9   ignore_emptyr:   )r   r   r   load_checkpoint)r   r1   r8   r9   r<   r=   r   r:   rB   r>   r@   r   r   r   rC      s   
zCollective.load_checkpoint)NNr$   )NNNF)Nr7   T)__name__
__module____qualname__r   r   r    r!   r"   r#   r*   r0   r5   r;   rC   __classcell__r   r   r   r   r   $   s.    



$
%
'r   c                       s    e Zd ZdZ fddZ  ZS )DistributedStrategyz.
    Init function of DistributedStrategy
    c                    sP   t    d| _d| _d | _d| _d | _d| _d| _g | _	d| _
d| _d| _d S )NFZnccl2   i   )r   r   use_local_sgduse_dist_fcdist_fc_configmodecollective_modenccl_comm_numforward_recomputerecompute_checkpointsuse_ampamp_loss_scaling_ut4grad_allreducer   r   r   r   r      s   

zDistributedStrategy.__init__)rD   rE   rF   __doc__r   rG   r   r   r   r   rH      s    rH   c                       s<   e Zd ZdZd	 fdd	Z				d
ddZdd Z  ZS )CollectiveOpBasedOptimizerzg
    Collective Operator Base Class For Distributed Optimizer
    The class is invisible to a user
    Nc                    s$   t |ts	J dt || d S )Nz$strategy must be DistributedStrategy)r/   rH   r   r   r'   r   r   r   r      s   z#CollectiveOpBasedOptimizer.__init__c                 C      | j |||||S r$   r&   backwardr   lossr   parameter_listno_grad_set	callbacksr   r   r   rY         
z#CollectiveOpBasedOptimizer.backwardc                 C      | j |S r$   r&   apply_gradientsr   Zparams_gradsr   r   r   rb         z*CollectiveOpBasedOptimizer.apply_gradientsr$   NNNN)rD   rE   rF   rU   r   rY   rb   rG   r   r   r   r   rV      s    	
rV   c                       s   e Zd ZdZe f fdd	Z				dddZdd Zd	d
 Zdd Z	dd Z
dd Zdd Zdd Zdd Z	dddZ  ZS )r%   a  
    DistributedOptimizer is a wrapper for paddle.base.optimizer
    A user should pass a paddle.base.optimizer to DistributedOptimizer
    minimize() function is implemented.
    DistributedOptimizer is the starting point for a user who wants to
    run distributed training. The optimized information will be stored in
    Fleet() instance who holds the global information about current distributed
    training.
    c                    sZ   |d u rt  }t || |j| _t|jtstd|j| _	|j
| _|j| _d| _d S )Nz3DistStrategy.recompute_checkpoints should be a ListF)rH   r   r   rP   _forward_recomputer/   rQ   list
ValueError_recompute_checkpointsrR   _use_amprS   _amp_loss_scalingprint_configr'   r   r   r   r     s   
zCollectiveOptimizer.__init__Nc                 C   rW   r$   rX   rZ   r   r   r   rY     r_   zCollectiveOptimizer.backwardc                 C   r`   r$   ra   rc   r   r   r   rb   &  rd   z#CollectiveOptimizer.apply_gradientsc                 K   s4   |  D ]\}}|du rtd| d| dqd S )NTzyou can't use z and z	 together)itemsAssertionError)r   namekwargskvr   r   r   _check_condition)  s
   z$CollectiveOptimizer._check_conditionc                 C   s   |j rd|_d|_| jd|j|j|jd |jr-| jd|j|j |jd |jdus-J d|jr@d|_d	|_| jd
|j|jd | j	jdksL| j	jd	krX| j	jdksVJ ddS dS )z0
        Check the conflict conditions.
        
collectiveZ	local_sgdrJ   )use_dgcrK   use_lambrK   )ru   rJ   rv   Nz0DistributedStrategy.dist_fc_config should be setZgrad_allreducerT   )ru   rv   z>local_sgd and grad_allreduce can be used under collective mode)
rJ   rM   rN   rs   Z_enable_dgcrK   Z	_use_lambrL   rT   	_strategy)r   r   r(   r)   r   r   r   _check_collective_mode.  sB   z*CollectiveOptimizer._check_collective_modec           
   	   C   s   t  }t  }t  | }d|}t  }| jr*td| d| d| d|  t }| j	j
|_
| j	j|_| j	j|_| j	j|_| j	j|_tj|d}	|	j|||||d dS )	zX
        Transpile the programs to distributed programs. And add the variables.
        ,zworker_endpoints:z trainers_num:z current_endpoint:z                   trainer_id:)config)r9   Ztrainersr   r-   current_endpointN)fleetworker_endpointsworker_indexjoin
worker_numrl   printdist_transpilerZDistributeTranspilerConfigrw   rM   rN   rO   use_hierarchical_allreduceZ#hierarchical_allreduce_inter_nranksZDistributeTranspilerZ	transpile)
r   r   r   r}   r9   r{   worker_endpoints_envZtrainers_numrz   r?   r   r   r   
_transpileX  s6   




zCollectiveOptimizer._transpilec                 C   sH   t  }g }|D ]}|dd  }||vr!|| || qq|S )N:r   )setsplitstripaddappend)r   	endpointsssipsepipr   r   r   _get_node_ips_from_endpoints~  s   
z0CollectiveOptimizer._get_node_ips_from_endpointsc                 C   sJ   t  }t  t   }d|}| |}|dd  }t|}|S )Nry   r   r   )r|   r}   r~   r   r   r   r   len)r   r}   r{   r   Znode_ipsZnode_ipnode_numr   r   r   	_node_num  s   

zCollectiveOptimizer._node_numc              	   C   s~  |   }|dksJ d| |dkr/| jjdkrtd d| j_| jjr+td d| j_td}| jj}|d urM|du rMd| j_d| j_td | j	r_t
d	|d
| jjd| jjd| | || | jjdkrm|S t | j_t | j_t | j_d| j_t| j}d|_d|_| jj|_|jt  |_t |_t |_||_|jdkr|| j t |j| jd| _!| j!S )NrI   znccl2 node_num must >= 1, now:z/set nccl_comm_num=1 since you only have 1 node.z@set use_hierarchical_allreduce=False since you only have 1 node.FZFLAGS_sync_nccl_allreduceTzuse sync_batch_norm will hang when set num_threads > 1, so set num_threads=1, nccl_comm_num=1, use_hierarchical_allreduce=False.z	node_num:zuse_hierarchical_allreduce:znccl_comm_num:zFLAGS_sync_nccl_allreduce:rt   )Zbuild_strategy)"r   rw   rO   r   r   r   osgetenvsync_batch_normrl   r   r   rM   r|   r   Znum_trainersr~   r9   r}   Ztrainers_endpointsZ!enable_backward_optimizer_op_depsr   r&   Zfuse_all_reduce_opsZfuse_grad_size_in_numr   r{   ZrankZnranksr   Z_transpile_main_program_lossr   Z_compiled_program)r   r   r   r   Zsync_allreducer   Zcomm_optr   r   r   _try_to_compile  sj   





z#CollectiveOptimizer._try_to_compilec                 C   s   t d| d| d)Nzcan not use z when you set DistStrategy.z as True)rh   )r   Zstrategy_nameZoptimize_namer   r   r   raiseOptimizeError  s   z&CollectiveOptimizer.raiseOptimizeErrorc                 C   s  | j r,| jg krtd| jjjdv r| d| jjj tjj	
| j| _| j| j | jrL| jjjdv r?| d| jjj tjjj| j| jdd| _|jj}|du rXt }|t_|| _| || j| j | jj||||d	\}}|jd
dt_|t_| ||t_||fS )a  
        minimize a program through loss
        Args:
            loss (Variable|Variable List): loss variable or loss variable list to run optimization.
            startup_program (Program): startup_program for initializing parameters
                in `parameter_list`.
            parameter_list (list): list of Variables to update.
            no_grad_set (set|None): set of Variables should be ignored.
        Returns:
            tuple: (optimize_ops, params_grads) which are, list of operators appended;
            and list of (param, grad) Variables pair for optimization.
        Note that in parameter server mode, a worker will not get anything about optimize_os
        Because optimizer algorithms run on pserver side. We will make this usable in pserver
        process, but currently the optimization part is written into Fleet(). A user does not
        need to care about how to startup a pserver node.
        zTplease set strategy.recompute_checkpointswhen set strategy.forward_recompute as True)RecomputeOptimizerOptimizerWithMixedPrecisionrP   )r   ZDGCMomentumOptimizerZmixed_precisionT)Zinit_loss_scalingZuse_dynamic_loss_scalingN)r]   F)Zfor_test) rf   ri   rh   r&   r   rD   r   r3   Zincubater(   r   Z_set_checkpointsrj   ZstaticampZdecoraterk   blockr-   r   Zdefault_startup_programr|   r   r   rx   rw   minimizecloner   r   r   r   )r   r[   r   r\   r]   r   Zoptimize_opsZparam_gradsr   r   r   r     sL   




zCollectiveOptimizer.minimizere   )NNN)rD   rE   rF   rU   rH   r   rY   rb   rs   rx   r   r   r   r   r   r   rG   r   r   r   r   r%     s$    

*&Dr%   ) r   r   r3   Z3paddle.distributed.transpiler.distribute_transpilerr4   Z
transpilerZdistribute_transpilerr   r   Zpaddle.base.compilerr   Zpaddle.base.executorr   Zpaddle.base.frameworkr   Z0paddle.base.incubate.checkpoint.checkpoint_saverr   r   Z(paddle.distributed.fleet.meta_optimizersr   Z&paddle.incubate.distributed.fleet.baser	   r
   r   Zpaddle.staticr   r   r|   ZBuildStrategyrH   rV   r%   r   r   r   r   <module>   s$    (