o
    ñpiÂ   ã                   @   sf   d Z ddlZddlmZ ddlmZ g ZG dd„ dƒZG dd„ deƒZG d	d
„ d
ƒZ	G dd„ dƒZ
dS )z£
Communicator is used for async distribute training in distribute_transpiler mode.
It's a wrapper of a cpp class Communicator and should be used inside fleet API.
é    N)ÚDistributedMode)Úcorec                   @   s|   e Zd Zddd„Z	ddd„Z			d d	d
„Zdd„ Zdd„ Zdd„ Zdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ Zd!dd„ZdS )"ÚCommunicatorNc                 C   sÖ   |du r|du r
i }n.|t jkrd |d ¡|d< t|d ƒ|d< t|d ƒ|d< t|d ƒ|d< t|d ƒ|d< d}|t jkrCd}n|t jkrKd	}n|t jkrSd
}n|t jkrZd}|| _|| _d| _	d| _
d| _dS )a¼  
        Communicator is used for async distribute training in distribute_transpiler mode.
        It's a wrapper of a cpp class Communicator and should be used inside fleet API.

        Args:
            program(Program): the trainers program after transpile of distribute_transpiler.
            It's used by communicator to extract the information to do communication.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        Nú,Zpserver_endpointsZtrainersÚ
trainer_idZneed_global_stepZbarrier_table_idÚSYNCÚASYNCÚ
HALF_ASYNCÚGEO)r   r   ÚjoinÚstrr   r	   r
   ÚmodeÚenvsÚcommunicator_Ú	send_ctx_Ú	recv_ctx_)Úselfr   Úkwargsr   Zmode_str© r   úf/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/communicator.pyÚ__init__*   s4   €
ÿ




zCommunicator.__init__c              	   C   s>   |d u r	t j ¡ }t | j|||||| j¡| _|| _|| _	d S ©N)
ÚpaddleÚstaticÚglobal_scoper   ZDistCommunicatorr   r   r   r   r   )r   Úsend_ctxZrecv_ctxZ	proto_txtZunit64_hostsÚscoper   r   r   Úinit_with_ctxa   s   
ù	
zCommunicator.init_with_ctxé ¡ é'  é   c                 C   s   | j  |||¡ d S r   )r   Ú"create_client_to_client_connection)r   Zpserver_timeout_msZpserver_connect_timeout_msZ	max_retryr   r   r   r!   r   s   ÿz/Communicator.create_client_to_client_connectionc                 C   s
   | j  ¡ S r   )r   Úget_client_info©r   r   r   r   r"   |   s   
zCommunicator.get_client_infoc                 C   ó   | j  |¡ d S r   )r   Úset_clients)r   Z	host_listr   r   r   r%      ó   zCommunicator.set_clientsc                 C   ó$   | j du rtdƒ dS | j  ¡  dS )a‰  
        Start communicator. Should call before training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        Nz;you must call init_with_ctx first to init comm before start)r   ÚprintÚstartr#   r   r   r   r)   ‚   ó   
zCommunicator.startc                 C   r'   )a‡  
        Stop communicator. Should call after training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        Nú:you must call init_with_ctx first to init comm before stop)r   r(   Ústopr#   r   r   r   r,   ˜   r*   zCommunicator.stopc                 C   r'   )aZ  
        Get communicator is running or stop.

        Returns:
            bool

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.is_running()
        Nr+   )r   r(   Ú
is_runningr#   r   r   r   r-   ®   s   
zCommunicator.is_runningc                 C   ó   | j  ¡  d S r   )r   Úrecvr#   r   r   r   r/   Ã   ó   zCommunicator.recvc                 C   r$   r   )r   Úinit_params©r   Úcontextr   r   r   r1   Æ   r&   zCommunicator.init_paramsc                 C   r$   r   )r   Ú
pull_denser2   r   r   r   r4   É   r&   zCommunicator.pull_denseéÿÿÿÿc                 C   sh   |d u r	t j ¡ }|  ¡ stdƒ‚t|tƒsJ ‚t|tƒsJ ‚|dkr*| j|  	¡ }| j
 |||¡ d S )NzTCommunicator should init first. Using fleet.init_worker() before push_sparse_param()r5   )r   r   r   r-   Ú
ValueErrorÚ
isinstancer   Úintr   Útable_idr   Úpush_sparse_param)r   Úvar_namer9   r   r   r   r   r:   Ì   s   
ÿzCommunicator.push_sparse_param)NNr   )r   r   r    )r5   N)Ú__name__Ú
__module__Ú__qualname__r   r   r!   r"   r%   r)   r,   r-   r/   r1   r4   r:   r   r   r   r   r   )   s"    
8
ÿ
ü
r   c                       s6   e Zd Zd
‡ fdd„	Zdd„ Zdd„ Zdd	„ Z‡  ZS )ÚFLCommunicatorNc                    s8   d }t ƒ  ||¡ i }i }d}d| _|  ||||¡ d S )NÚ ZWITH_COORDINATOR)Úsuperr   r   r   )r   Zps_hostsr   r   r   Z	dense_mapZprototxt©Ú	__class__r   r   r   Û   s   zFLCommunicator.__init__c                 C   s    | j d ur| j  ||¡ d S d S r   )r   Ústart_coordinator)r   Zself_endpointZtrainer_endpointsr   r   r   rD   ä   s
   
ÿÿz FLCommunicator.start_coordinatorc                 C   s"   | j d ur| j  |¡ d S tdƒ‚)Nzself.communicator_ is null)r   Úsave_fl_strategyr6   )r   Úmpr   r   r   rE   ê   s   
zFLCommunicator.save_fl_strategyc                 C   s   i }| j d ur| j  ¡ }|S r   )r   Úquery_fl_clients_info)r   Zinfo_mpr   r   r   rG   ð   s   

z$FLCommunicator.query_fl_clients_infor   )r<   r=   r>   r   rD   rE   rG   Ú__classcell__r   r   rB   r   r?   Ú   s
    	r?   c                   @   s,   e Zd Zdd„ Zdd„ Zdd„ Zdd„ Zd	S )
ÚLargeScaleKVc                 C   s   t  ¡ | _d S r   )r   rI   Úscale_kvr#   r   r   r   r   ø   r0   zLargeScaleKV.__init__c                 C   ó   | j  ||¡ d S r   )rJ   Úsave©r   ÚvarnameÚdirnamer   r   r   rL   û   ó   zLargeScaleKV.savec                 C   rK   r   )rJ   ÚloadrM   r   r   r   rQ   þ   rP   zLargeScaleKV.loadc                 C   s   | j  |¡S r   )rJ   Úsize)r   rN   r   r   r   rR     s   zLargeScaleKV.sizeN)r<   r=   r>   r   rL   rQ   rR   r   r   r   r   rI   ÷   s
    rI   c                   @   s   e Zd Zdd„ Zdd„ ZdS )ÚHeterClientc                 C   s   t  |||¡| _d S r   )r   rS   Úheter_client_)r   ZendpointZprevious_endpointr   r   r   r   r     s   
ÿzHeterClient.__init__c                 C   r.   r   )rT   r,   r#   r   r   r   r,     r0   zHeterClient.stopN)r<   r=   r>   r   r,   r   r   r   r   rS     s    rS   )Ú__doc__r   Z"paddle.distributed.ps.utils.publicr   Zpaddle.frameworkr   Ú__all__r   r?   rI   rS   r   r   r   r   Ú<module>   s    2