o
    pi                     @  sd  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZmZmZmZ d dlmZ ddlmZ er`d dlmZ edZed	Zed
eedZG dd deZg ZG dd deZG dd deZG dd deZG dd deZ G dd deZ!G dd dZ"G dd de"Z#	d&d'd d!Z$G d"d# d#e"Z%G d$d% d%e"Z&dS )(    )annotationsN)TYPE_CHECKINGCallableLiteral	TypedDictTypeVar)core   )logger)	ParamSpec_InputT_RetT_HDFSClientConfig)zfs.default.namezhadoop.job.ugic                   @  s   e Zd ZU ded< ded< dS )	_FileInfostrpathintsizeN)__name__
__module____qualname____annotations__ r   r   h/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/fleet/utils/fs.pyr   ,   s   
 r   c                   @     e Zd ZdS )ExecuteErrorNr   r   r   r   r   r   r   r   4       r   c                   @  r   )FSFileExistsErrorNr   r   r   r   r   r   8   r   r   c                   @  r   )FSFileNotExistsErrorNr   r   r   r   r   r   <   r   r   c                   @  r   )	FSTimeOutNr   r   r   r   r   r    @   r   r    c                   @  r   )FSShellCmdAbortedNr   r   r   r   r   r!   D   r   r!   c                   @  s   e Zd Zejdd Zejdd Zejdd Zejdd Zejd	d
 Z	ejdd Z
ejdd Zejdd Zejdd Zejdd Zejd"ddZejdd Zejdd Zejd#ddZejd$d d!ZdS )%FSc                 C     t NNotImplementedErrorselffs_pathr   r   r   ls_dirI      z	FS.ls_dirc                 C  r#   r$   r%   r'   r   r   r   is_fileM   r+   z
FS.is_filec                 C  r#   r$   r%   r'   r   r   r   is_dirQ   r+   z	FS.is_dirc                 C  r#   r$   r%   r'   r   r   r   is_existU   r+   zFS.is_existc                 C  r#   r$   r%   )r(   
local_pathr)   r   r   r   uploadY   r+   z	FS.uploadc                 C  r#   r$   r%   )r(   r)   r/   r   r   r   download]   r+   zFS.downloadc                 C  r#   r$   r%   r'   r   r   r   mkdirsa   r+   z	FS.mkdirsc                 C  r#   r$   r%   r'   r   r   r   deletee   r+   z	FS.deletec                 C  r#   r$   r%   r(   r   r   r   need_upload_downloadi   r+   zFS.need_upload_downloadc                 C  r#   r$   r%   r(   fs_src_pathfs_dst_pathr   r   r   renamem   r+   z	FS.renameFc                 C  r#   r$   r%   r(   r7   r8   	overwritetest_existsr   r   r   mvq   r+   zFS.mvc                 C  r#   r$   r%   )r(   	local_dirdest_dirr   r   r   
upload_diru   r+   zFS.upload_dirc                 C  r#   r$   r%   r'   r   r   r   	list_dirsy   r+   zFS.list_dirsTc                 C  r#   r$   r%   r(   r)   exist_okr   r   r   touch}   r+   zFS.touchNc                 C  r#   r$   r%   r'   r   r   r   cat   r+   zFS.catFFTr$   )r   r   r   abcabstractmethodr*   r,   r-   r.   r0   r1   r2   r3   r5   r9   r=   r@   rA   rD   rE   r   r   r   r   r"   H   s>    











r"   c                   @  s   e Zd ZdZd.ddZd/d	d
Zd0ddZdd Zdd Zd/ddZ	d1ddZ
d2ddZd2ddZd2ddZd3d4d!d"Z	#	#d5d6d(d)Zd7d+d,Zd-S )8LocalFSa(  
    A tool of local file system.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import LocalFS

            >>> client = LocalFS()
            >>> subdirs, files = client.ls_dir("./")

    r)   r   returntuple[list[str], list[str]]c                 C  s\   |  |s	g g fS g }g }t|D ]}tj|d | r$|| q|| q||fS )a{  
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            Tuple: Return a 2-tuple, the first is a list of all its subdirectories,
            and the second is a list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs, files = client.ls_dir("./")

        /)r.   oslistdirr   isdirappend)r(   r)   dirsfilesfr   r   r   r*      s   
zLocalFS.ls_dirNonec                 C  s,   t j|rJ | dt j|dd dS )a  
        Create a local directory.

        Args:
            fs_path(str): The local directory path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_mkdirs")
                >>> client.delete("test_mkdirs")

        z is already a fileT)rC   N)rN   r   isfilemakedirsr'   r   r   r   r2      s   zLocalFS.mkdirsr7   r8   c                 C  s   t || dS )aI  
        Rename the file.

        Args:
            fs_src_path(str): The actual name of the file or directory
            fs_dst_path(str): The new name of the file or directory.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_rename_src")
                >>> print(client.is_exist("test_rename_src"))
                True
                >>> client.rename("test_rename_src", "test_rename_dst")
                >>> print(client.is_exist("test_rename_src"))
                False
                >>> print(client.is_exist("test_rename_dst"))
                True
                >>> client.delete("test_rename_dst")

        N)rN   r9   r6   r   r   r   r9      s   zLocalFS.renamec                 C     t | d S r$   )shutilrmtreer'   r   r   r   _rmr      zLocalFS._rmrc                 C  rX   r$   )rN   remover'   r   r   r   _rm   r\   zLocalFS._rmc                 C  s.   |  |sdS tj|r| |S | |S )a  
        Delete the local file path, whether it's a file or directory.

        Args:
            fs_path(str): The local file path.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_localFS_mkdirs")
                >>> client.delete("test_localFS_mkdirs")

        N)r.   rN   r   rV   r^   r[   r'   r   r   r   r3      s
   


zLocalFS.deleteLiteral[False]c                 C     dS )NFr   r4   r   r   r   r5        zLocalFS.need_upload_downloadboolc                 C     t j|S )au  
        Whether the local file path is a file.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_is_file")
                >>> print(client.is_file("test_is_file"))
                True
                >>> client.delete("test_is_file")

        )rN   r   rV   r'   r   r   r   r,        zLocalFS.is_filec                 C  rc   )a|  
        Whether the local file path is a directory.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.mkdirs("test_is_dir")
                >>> print(client.is_dir("test_is_dir"))
                True
                >>> client.delete("test_is_dir")

        rN   r   rP   r'   r   r   r   r-   $  rd   zLocalFS.is_dirc                 C  rc   )a  
        Whether the local file path exists.

        Args:
            fs_path(str): The local file path.

        Returns:
            Bool: Whether it's a file or directory, return true if the path exists,
            otherwise return false.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> local_fs = LocalFS()
                >>> ret = local_fs.is_exist("test_is_exist")

        )rN   r   existsr'   r   r   r   r.   =  s   zLocalFS.is_existTrC   c                 C  sF   |  |r|r	dS tt|d	 W d   dS 1 sw   Y  dS )a1  
        Create a local file.

        Args:
            fs_path(str): The local file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_touch")
                >>> client.delete("test_touch")

        Na)r.   r   openrB   r   r   r   rD   T  s   
"zLocalFS.touchFsrc_pathdst_pathr;   r<   c                 C  s@   |  |st|r|  |r| | |  |rt| ||S )a  
        Move a local file or directory from `src_path` to `dst_path` .

        Args:
            src_path(str):  Name of the file or directory, that's needed to be moved.
            dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `dst_path` if that exists. Default is False.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> client.touch("test_mv_src")
                >>> client.mv("test_mv_src", "test_mv_dst")
                >>> client.delete("test_mv_dst")

        )r.   r   r3   r   r9   )r(   ri   rj   r;   r<   r   r   r   r=   o  s   


z
LocalFS.mv	list[str]c                   s*   |   sg S  fddt D }|S )a  
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The local file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import LocalFS

                >>> client = LocalFS()
                >>> subdirs = client.list_dirs("./")

        c                   s$   g | ]}t j d  | r|qS rM   re   ).0rT   r)   r   r   
<listcomp>  s
    z%LocalFS.list_dirs.<locals>.<listcomp>)r.   rN   rO   )r(   r)   rR   r   rn   r   rA     s   

zLocalFS.list_dirsNr)   r   rK   rL   r)   r   rK   rU   )r7   r   r8   r   rK   rU   )rK   r_   r)   r   rK   rb   rG   r)   r   rC   rb   rK   rU   rF   )
ri   r   rj   r   r;   rb   r<   rb   rK   rU   r)   r   rK   rk   )r   r   r   __doc__r*   r2   r9   r[   r^   r3   r5   r,   r-   r.   rD   r=   rA   r   r   r   r   rJ      s"    

"





&rJ   max_time_outfloat | NonerK   >Callable[[Callable[_InputT, _RetT]], Callable[_InputT, _RetT]]c                   s   d fdd}|S )NrT   Callable[_InputT, _RetT]rK   c                   s   t  d	 fdd}|S )
Nargs_InputT.argskwargs_InputT.kwargsrK   r   c               
     s   | d }}|d u rt |jd }n|d }t |jd }t }|}	 z | i |W S  tyY } z!t | |krJtd|  dt |  t| W Y d }~nd }~ww t | dkrttd|  dt |   t }q$)Nr   g     @@Tzargs:z	 timeout:   zhadoop operator timeout:args:)float	_time_out_sleep_intertimer   r    sleepprint)rz   r|   otime_outinterstartZlast_print_timee)rT   rv   r   r   handler  s2   z2_handle_errors.<locals>.decorator.<locals>.handler)rz   r{   r|   r}   rK   r   )	functoolswraps)rT   r   rv   )rT   r   	decorator  s   z!_handle_errors.<locals>.decorator)rT   ry   rK   ry   r   )rv   r   r   r   r   _handle_errors  s   !r   c                   @  s  e Zd ZU dZded< 		d]d^ddZd_ddZd_ddZe d`ddZ	e daddZ
dd Zdd  Ze dbd"d#Zd$d% Zdbd&d'Ze dbd(d)Z	dcddd-d.Z		dedfd1d2Ze d3d4 Z		dedgd5d6Ze d7d8 Ze dhd9d:Z		;didjd?d@Ze dAdB ZdCdD ZdEdF Ze dhdGdHZdkdldJdKZe dLdM ZdmdOdPZdndodSdTZe dUdV Z dWdX Z!dpd[d\Z"dQS )q
HDFSClienta  
    A tool of HDFS.

    Args:
        hadoop_home(str): Hadoop home.
        configs(dict): Hadoop config. It is a dictionary and needs to contain the
            keys: "fs.default.name" and "hadoop.job.ugi".

    Examples:

        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.distributed.fleet.utils import HDFSClient
            >>> hadoop_home = "/home/client/hadoop-client/hadoop/"

            >>> configs = {
            ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
            ...     "hadoop.job.ugi": "hello,hello123"
            ... }

            >>> client = HDFSClient(hadoop_home, configs)
            >>> client.ls_dir("hdfs:/test_hdfs_client")
            ([], [])

    rk   pre_commands   hadoop_homer   configsr   r   r   sleep_interrK   rU   c           
      C  s   g | _ | d}| j | d}| j | |r/| D ]\}}d| d| }	| j |	 q|| _|| _d| j | _td| _	d S )Nz/bin/hadoopfsz-D= z8\s?responseErrorMsg\s?\:.*, errorCode\:\s?[0-9]+, path\:)
r   rQ   itemsr   r   join	_base_cmdrecompile
_bd_err_re)
r(   r   r   r   r   Z
hadoop_bindfskvZconfig_commandr   r   r   __init__  s   

zHDFSClient.__init__F   c           	      C  s|   | j  d| }d}d }d}t|d D ]}t|dd|\}}t|}|dkr* nt| q|dkr8t||| fS )Nz -r      r	      )	r   ranger   Zshell_execute_cmdr   r   r   r!   
splitlines)	r(   cmdredirect_stderrretry_timesexe_cmdretoutputretry_sleep_secondxr   r   r   _run_cmd  s   zHDFSClient._run_cmdc                 C  s   | j g| }d}d}d}t|d D ]I}ztj|dtj|r"tjntjdd}	|	j}W  n0 tjyK }
 z|
j	}|
j
}t| W Y d }
~
qd }
~
w ty] }
 zW Y d }
~
 nd }
~
ww |dkrft|d S )Nr    r   r	   T)checkstdoutstderrtextr   )r   splitr   
subprocessrunPIPESTDOUTr   CalledProcessError
returncoder   r   r   	Exceptionr!   )r(   r   r   r   r   r   r   r   r   processr   r   r   r   _run_safe_cmd!  s:   zHDFSClient._run_safe_cmdr)   c                 C      |  |sg S | |\}}|S )a*  
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        r.   _ls_dirr(   r)   rR   rS   r   r   r   rA   ?  s   
zHDFSClient.list_dirsrL   c                 C     |  |s	g g fS | |S )a  
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        r   r'   r   r   r   r*   a  s   

zHDFSClient.ls_dirc           
      C  s   d| }|  |\}}|dkrt|g }g }|D ](}| }t|dkr'qtj|d }	|d d dkr=||	 q||	 q||fS )Nls r         d)r   r   r   lenrN   r   basenamerQ   )
r(   r)   r   r   linesrR   rS   linearrpr   r   r   r     s   
zHDFSClient._ls_dirc                 C  s*   |D ]}| j |}|d ur|  S qd S r$   )r   match)r(   r   lmr   r   r   _test_match  s   zHDFSClient._test_matchrb   c                 C     |  |sdS | |S )a.  
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr.   _is_dirr'   r   r   r   r-     s   

zHDFSClient.is_dirc                 C  sR   d| }| j |ddd\}}|r'| |r%td td| t|dS dS )Nztest -d Tr	   r   r   zraise exception: 
F)r   r   r   r   r   )r(   r)   r   r   r   r   r   r   r     s   

zHDFSClient._is_dirc                 C     |  |sdS | | S )a$  
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr   r'   r   r   r   r,     s   
zHDFSClient.is_filec                 C  s0   d| d}| j |ddd\}}|dkrdS dS )aA  
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        ztest -e r   Tr	   r   r   F)r   )r(   r)   r   r   outr   r   r   r.     s
   zHDFSClient.is_existr>   r?   r;   c                 C  sl   | d}| d}tj|}| |d | r$|r$| |d |  | |s.| | | || dS z
        upload dir to hdfs
        Args:
            local_dir(str): local dir
            dest_dir(str): hdfs dest dir
            overwrite(bool): is overwrite
        Returns:
            return code
        rM   N)rstriprN   r   r   r.   r3   r2   _try_uploadr(   r>   r?   r;   Zlocal_basenamer   r   r   r@     s   



zHDFSClient.upload_dirr/   multi_processesc                   s    fdd}dd }t  }||st| d||}|s%td dS  |r6|r6 |  | g }	t|D ]}
 ||
|}tj	|||fd}|	
| |  q<|	D ]}|  qZdS )	a  
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        c                   s   |D ]}  ||  qd S r$   )r   )Zhdfs_path_singledatasdatar4   r   r   Z__subprocess_uploadL  s   z.HDFSClient.upload.<locals>.__subprocess_uploadc                 S  sZ   g }t j| s
|S t j| r&t | D ]}t j| |}|| q|S ||  |S )z
            get local files
            Args:
                path(str): local path
            Returns:
                list of local files
            )rN   r   rf   rP   rO   r   rQ   )r   Zrlistfiletr   r   r   get_local_filesP  s   
z*HDFSClient.upload.<locals>.get_local_files not existsz/there are nothing need to upload, function exitNtargetrz   )rJ   r.   r   r   r3   r2   r   _split_filesmultiprocessingProcessrQ   r   r   )r(   r/   r)   r   r;   Z_HDFSClient__subprocess_uploadr   local	all_filesprocsiprocess_datasr   procr   r4   r   r0   *  s.   "





zHDFSClient.uploadc              
   C  s`   d| d| }d}z|  |\}}|dkrt|W d S  ty/ } z| | |d }~ww )Nzput r   r   )r   r   r   r3   )r(   r/   r)   r   r   _r   r   r   r   r     s   
zHDFSClient._try_uploadc                   s   fdd}  st  d r |S  \}} fdd|D }| fdd|D  g }	t|D ]}
||
|}tj	|||fd}|	
| |  q?|	D ]}|  q]dS )	al  
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.download("hdfs:/test_hdfs_client", "./")

        c                   s   |D ]}  ||  qdS )z
            download file from HDFS
            Args:
                local_path(str): the local file path
                datas(str): the hdfs file path list
            N)_try_download)r/   r   r   r4   r   r   Z__subprocess_download  s   z2HDFSClient.download.<locals>.__subprocess_download
 not exitsc                      g | ]} d  | qS rl   r   rm   r   rn   r   r   ro         z'HDFSClient.download.<locals>.<listcomp>c                   r   rl   r   r   rn   r   r   ro     r   r   N)r.   r   r,   r   r*   extendr   r   r   r   rQ   r   r   )r(   r)   r/   r   r;   Z _HDFSClient__subprocess_downloadrR   all_filenamesr   r   r   r   r   r   r   )r)   r(   r   r1     s&   "





zHDFSClient.downloadc              
   C  sf   d| d| }d}z|  |\}}|dkrt|W d S  ty2 } z
t }|| |d }~ww )Nzget r   r   )r   r   r   rJ   r3   )r(   r)   r/   r   r   r   r   Zlocal_fsr   r   r   r     s   
zHDFSClient._try_downloadc                 C  s   |  |rdS d}d| d}| j|dd\}}|dkr/|D ]
}d|v r(d} nq|s/t||rJ|  |sLd	| }| |\}}|dkrNt|dS dS dS )
a  
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        NFzmkdir r   T)r   r   zNo such file or directoryz	mkdir -p )r.   r   r   )r(   r)   Zout_hdfsr   r   r   r   r   r   r   r   r2     s(   

zHDFSClient.mkdirsTr7   r8   r<   c                 C  sX   |r|  |r| | |r&|  |st| d|  |r&t| d| ||S )a  
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

         is not exists exists already)r.   r3   r   r   _try_mvr:   r   r   r   r=   	  s   !


zHDFSClient.mvc              
   C  s|   d| d| }d}z| j |dd\}}|dkrt|W d S  ty= } z| |s7| |r7W Y d }~d S |d }~ww )Nzmv r   r   r	   r   )r   r   r   r.   )r(   r7   r8   r   r   r   r   r   r   r   r   6  s   zHDFSClient._try_mvc                 C  ,   d| }|  |\}}|dkrt|d S )Nzrmr r   r   r   r(   r)   r   r   r   r   r   r   r[   C  
   
zHDFSClient._rmrc                 C  r   )Nzrm r   r   r   r   r   r   r^   I  r   zHDFSClient._rmc                 C  s0   |  |sdS | |}|r| |S | |S )a  
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.delete("hdfs:/test_hdfs_client")

        N)r.   r   r[   r^   )r(   r)   r-   r   r   r   r3   O  s   



zHDFSClient.deleterC   c                 C  s    |  |r|r	dS t| |S )a6  
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.touch("hdfs:/test_hdfs_client")

        N)r.   r   _touchzrB   r   r   r   rD   q  s
   

zHDFSClient.touchc                 C  r   )Nztouchz r   r   r   r   r   r   r    s
   
zHDFSClient._touchzLiteral[True]c                 C  r`   NTr   r4   r   r   r   r5     ra   zHDFSClient.need_upload_downloadN
str | Nonec                 C  s"   |  |r| |}d|S dS )a  
        Cat a remote HDFS file.

        Args:
            fs_path(str|None): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> from paddle.distributed.fleet.utils import HDFSClient

                >>> hadoop_home = "/home/client/hadoop-client/hadoop/"
                >>> configs = {
                ...     "fs.default.name": "hdfs://xxx.hadoop.com:54310",
                ...     "hadoop.job.ugi": "hello,hello123"
                ... }

                >>> client = HDFSClient(hadoop_home, configs)
                >>> client.cat("hdfs:/test_hdfs_client")
                ''

        r   r   )r,   _try_catr   )r(   r)   r   r   r   r   rE     s   


zHDFSClient.catc                 C  s0   d| }| j |dd\}}|dkrt||S )Nzcat r	   r   r   r   )r(   r)   r   r   r   r   r   r   r    s
   
zHDFSClient._try_catc           
      C     t || }t || }|g| }t|D ]
}||  d7  < qg g| }d}	t|D ]}||	|	||   ||< |	|| 7 }	q+|| S z
        split file list
        Args:
            files(list): file list
            trainer_id(int): trainer mpi rank id
            trainers(int): all trainers num
        Returns:
            filelist(list): file list of current trainer
        r	   r   r   r   
r(   rS   Z
trainer_idZtrainers	remainder	blocksizeblocksr   Ztrainer_filesbeginr   r   r   r        


zHDFSClient._split_files	path_listlist[_FileInfo]c                 C  s   t |dkrg S g }d}|D ]}||d 7 }qd| d }| |\}}t |dkr5td| d g S |D ] }|d}	t |	dk rEq7|	d	 }
t|	d }||
|d
 q7|S )z
        list_files return file path and size
        Args:
            path_list(list): file list
        Returns:
            filelist(list): file list with file path and size
        r   r   r   r   z) | awk '{if ($8 != "") {print $5" "$8 }}'zlist_files empty, path[]   r	   )r   r   )r   r   r
   warningr   r   rQ   )r(   r  	file_listZ
str_concatr   r   r   r   r   r   	file_path	file_sizer   r   r   list_files_info  s(   

zHDFSClient.list_files_infor   r   )
r   r   r   r   r   r   r   r   rK   rU   )Fr   rt   rp   rr   F)r>   r   r?   r   r;   rb   rK   rU   )r   F)
r/   r   r)   r   r   r   r;   rb   rK   rU   )
r)   r   r/   r   r   r   r;   rb   rK   rU   rq   FT)
r7   r   r8   r   r;   rb   r<   rb   rK   rU   rG   rs   )rK   r  r$   )r)   r  rK   r   )r  rk   rK   r  )#r   r   r   ru   r   r   r   r   r   rA   r*   r   r   r-   r   r,   r.   r@   r0   r   r1   r   r2   r=   r   r[   r^   r3   rD   r  r5   rE   r  r   r  r   r   r   r   r     sh   
 

!! 
 $U
B
1-
!!

"
r   c                   @  s   e Zd ZdZd.ddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zd/ddZd0ddZd0ddZdd Zd1d!d"Zd#d$ Zd2d%d&Zd'd( Zd3d*d+Zd,d- Zd)S )4	AFSClienta:  
    A tool of AFS. Use AfsWrapper.
    When WITH_PSLIB=ON, you can use this class directly.
    When WITH_PSCORE=ON, you should export LD_LIBRARY_PATH='YOUR_AFSAPISO_PATH' before using this class.

    Examples:

        .. code-block:: python

            >>> # doctest: +SKIP('depend on external file')
            >>> from paddle.distributed.fleet.utils.fs import AFSClient

            >>> client = AFSClient()
            >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
            >>> client.ls_dir("hdfs:/test_hdfs_client")

    r   r   c                 C  s   t  | _|| _d S r$   )r   Z
AfsWrapper_fsr   )r(   r   r   r   r   r   r     s   

zAFSClient.__init__c                 C  s   | j |||| d S r$   )r  init)r(   Zfs_nameZfs_userZ	fs_passwdZfs_confr   r   r   r    s   zAFSClient.initc                 C  r   )a{  
        Only list directories under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            List: A list of all its subdirectories, e.g. [subdirname1, subdirname1, ...].

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs = client.list_dirs("hdfs:/test_hdfs_client")

        r   r   r   r   r   rA     s   
zAFSClient.list_dirsc                 C  r   )a  
        List directories and files under `fs_path` .

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Tuple: Return a 2-tuple, the first element is the list of all its subdirectories,
            and the second one is the list of all its subfiles, e.g. ([subdirname1, subdirname1, ...], [filename1, filename2, ...]).

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> subdirs, files = client.ls_dir("hdfs:/test_hdfs_client")

        r   r'   r   r   r   r*   8  s   

zAFSClient.ls_dirc                 C  s   | j |}|g}||fS r$   )r  list)r(   r)   rS   rR   r   r   r   r   T  s   zAFSClient._ls_dirc                 C  r   )a~  
        Whether the remote HDFS path is a directory.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a directory, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_dir("hdfs:/test_hdfs_client")

        Fr   r'   r   r   r   r-   Y  s   

zAFSClient.is_dirc                 C  s    | j |}t|dkrdS dS )Nr   TF)r  r  r   )r(   r)   	list_pathr   r   r   r   t  s   zAFSClient._is_dirc                 C  r   )au  
        Whether the remote HDFS path is a file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            Bool: Return true if the path exists and it's a file, otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_file("hdfs:/test_hdfs_client")

        Fr   r'   r   r   r   r,   {  s   
zAFSClient.is_filec                 C  s   | j |S )a  
        Whether the remote HDFS path exists.

        Args:
            fs_path(str): The hdfs file path.

        Returns:
            Bool: Whether it's is file or directory, return true if the path exists,
            otherwise return false.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> ret = client.is_exist("hdfs:/test_hdfs_client")

        )r  existr'   r   r   r   r.     rd   zAFSClient.is_existFc                 C  sn   | d}| d}tj|}| |d | r$|r$| |d |  | |s.| | | j|| dS r   )	r   rN   r   r   r.   r3   r2   r  r0   r   r   r   r   r@     s   




zAFSClient.upload_dirr	   c                 C  s0   t  }||st| d| j|| dS )a  
        Upload the local path to remote HDFS.

        Args:
            local_path(str): The local path.
            fs_path(str): The HDFS path.
            multi_processes(int|1): the upload data process at the same time, default=5
            overwrite(bool|False): will overwrite file on HDFS or not

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.upload("test_hdfs_client", "hdfs:/test_hdfs_client")

        r   N)rJ   r.   r   r  r0   )r(   r/   r)   r   r;   r   r   r   r   r0     s   
zAFSClient.uploadc           	      C  st   |  |st| d| |r| j||S | |\}}|D ]}tj|tj	|d }| j|| q!dS )a  
        Download remote HDFS path to the local.

        Args:
            fs_path(str):  The HDFS path.
            local_path(str): The local path.
            multi_processes(int|1): the download data process at the same time, default=1
            overwrite(bool): is overwrite

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.download("hdfs:/test_hdfs_client", "./")

        r   r	   N)
r.   r   r,   r  r1   r*   rN   r   r   r   )	r(   r)   r/   r   r;   r   r   	file_nameZlocal_file_namer   r   r   r1     s   

zAFSClient.downloadc                 C  s   |  |rdS | j| dS )a  
        Create a remote HDFS directory.

        Args:
            fs_path(str): The HDFS directory path.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mkdirs("hdfs:/test_hdfs_client")

        N)r.   r  mkdirr'   r   r   r   r2     s   
zAFSClient.mkdirsTc                 C  s^   |r|  |r| | |r&|  |st| d|  |r&t| d| j|| dS )a  
        Move a remote HDFS file or directory from `fs_src_path` to `fs_dst_path` .

        Args:
            fs_src_path(str):  Name of the file or directory, that's needed to be moved.
            fs_dst_path(str):  Name of the file or directory to which to move to.
            overwrite(bool): Whether to re-write `fs_dst_path` if that exists. Default is False.
            test_exists(bool): Check the existence of `fs_src_path` and `fs_dst_path` . When `test_exists` is set true, if `fs_src_path` doesn't exist or `fs_dst_path` exists, program will throw an Exception.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.mv("hdfs:/test_hdfs_client", "hdfs:/test_hdfs_client2")

        r   r   N)r.   r3   r   r   r  r=   r:   r   r   r   r=     s   


zAFSClient.mvc                 C  s   |  |sdS | j| dS )a  
        Delete a remote HDFS path, whether it's a file or directory.

        Args:
            fs_path(str): The HDFS file path.

        Examples:

            .. code-block:: python


                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.delete("hdfs:/test_hdfs_client")

        N)r.   r  r]   r'   r   r   r   r3   >  s   
zAFSClient.deletec                 C  s"   |  |r|r	dS t| j|S )a  
        Create a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.
            exist_ok(bool): When `fs_path` exists, if `exist_ok` is set false,
            program will throw an Exception. Default is true.

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.touch("hdfs:/test_hdfs_client")

        N)r.   r   r  ZtouchzrB   r   r   r   rD   V  s
   
zAFSClient.touchc                 C  r`   r  r   r4   r   r   r   r5   r  ra   zAFSClient.need_upload_downloadNc                 C  s   |  |r| j|S dS )a  
        Cat a remote HDFS file.

        Args:
            fs_path(str): The HDFS file path.

        Returns:
            file content

        Examples:

            .. code-block:: python

                >>> # doctest: +SKIP('depend on external file')
                >>> from paddle.distributed.fleet.utils.fs import AFSClient

                >>> client = AFSClient()
                >>> client.init("hdfs://xxx.hadoop.com:54310", "hello", "hello123", "./fs_conf")
                >>> client.cat("hdfs:/test_hdfs_client")

        r   )r,   r  rE   r'   r   r   r   rE   u  s   
zAFSClient.catc           
      C  r  r  r  r	  r   r   r   r     r  zAFSClient._split_filesr  r  )r	   Fr  rG   r$   )r   r   r   ru   r   r  rA   r*   r   r-   r   r,   r.   r@   r0   r1   r2   r=   r3   rD   r5   rE   r   r   r   r   r   r    s*    



%
"

r  r$   )rv   rw   rK   rx   )'
__future__r   rH   r   r   rN   r   rY   r   r   typingr   r   r   r   r   Zpaddle.baser   Zlog_utilr
   Ztyping_extensionsr   r   r   r   r   r   __all__r   r   r   r   r    r!   r"   rJ   r   r   r  r   r   r   r   <module>   sP   
>  0'      .