o
    * iI"                     @  sn   d dl mZ d dlZd dlmZmZ d dlmZ d dlm	Z	 g Z
dadd ZdddZdddZdddZdS )    )annotationsN)ManagerProcess)core)wait_server_readyc                 C  sf   ddl m} |t| |d}|  d}|dds| s-t| |dds| r|  d S )Nr   )KVServer)size   runningF)	Z*paddle.distributed.fleet.utils.http_serverr   intstartgetZshould_stoptimesleepstop)portZhttp_server_dr   r   Zhttp_serverZwait_seconds r   q/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/parallel_with_gloo.py_start_kv_server   s   
r   rank_idr   rank_numserver_endpointstrreturnNonec                 C  s   |dk du s
J dt  }| }d|d< | dkr8d|i}ttt|dd ||fd	}d
|_d
|d< |  t|g t	
 }| |_||_|dd |_t|dd |_d|_d|_t	|at  | dkrvd|d< |  dS dS )a  
    Initialize parallel environment with gloo for cpu only.

    Args:
        - rank_id (int, required) - the index of current rank;
        - rank_num (int, required) - the number of ranks in this parallel env;
        - server_endpoint (str, required) - endpoint of server to init gloo context in ip:port format;

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set() # type: ignore

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_init(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)

            >>> def test_gloo_init_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_init,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_init_with_multiprocess(2)
       FzSrank_num should greater than or equal to 2 for parallel environment initialization.r
   r   Z_worker:   )targetargsTi  i N)r   dictr   r   r   splitdaemonr   r   r   ZGlooParallelStrategyZrankr   
ip_addressZip_portZinit_secondsZrun_secondsZGlooParallelContext_global_gloo_ctxinitjoin)r   r   r   managerZhttp_server_statusr   Zhttp_server_procZgloo_strategyr   r   r   gloo_init_parallel_env*   s:   9

r(   c                   C  s   t dusJ dt   dS )a  
    Call barrier function with initialized gloo context.

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set() # type: ignore

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_barrier(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)
            ...     paddle.distributed.gloo_barrier()

            >>> def test_gloo_barrier_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_barrier,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_barrier_with_multiprocess(2)
    Nz gloo context is not initialized.)r$   Zbarrierr   r   r   r   gloo_barrier   s   6r)   c                   C  s   t dur
t   dS dS )aN  
    Release the parallel environment initialized by gloo

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set() # type: ignore

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_release(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)
            ...     paddle.distributed.gloo_barrier()
            ...     paddle.distributed.gloo_release()

            >>> def test_gloo_release_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_release,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_release_with_multiprocess(2)
    N)r$   releaser   r   r   r   gloo_release   s   7r+   )r   r   r   r   r   r   r   r   )r   r   )
__future__r   r   multiprocessingr   r   Zpaddle.baser   Z5paddle.distributed.fleet.base.private_helper_functionr   __all__r$   r   r(   r)   r+   r   r   r   r   <module>   s   

c: