o
    1 i"+                     @   s:  d dl Z d dlZd dlmZmZmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZmZ d dlmZ d dlmZm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl&m'Z' erxd dl(Z(e )e*Z+G dd ded Z,e'G dd de,Z-e'G dd de,Z.dS )    N)TYPE_CHECKINGAnyDictIterableOptional)urlparse)call_with_retry)%add_creatable_buckets_param_if_s3_uri)DelegatingBlockBuilder)TaskContext)WRITE_UUID_KWARG_NAME)SaveMode)RetryingPyFileSystem_is_local_scheme)BlockBlockAccessor)DataContext)DatasinkWriteResult)FilenameProvider_DefaultFilenameProvider)_resolve_paths_and_filesystem)DeveloperAPIc                   @   s   e Zd Zddddddejddeded dedeeee	f  d	ee
 d
ee dee defddZdeddfddZd$ddZdefddZdee deddfddZdededefddZded fd d!Zedefd"d#ZdS )%_FileDatasinkNT)
filesystemtry_create_diropen_stream_argsfilename_providerdataset_uuidfile_formatmodepathr   zpyarrow.fs.FileSystemr   r   r   r   r   r    c          
      C   s   |du ri }|du rt ||d}t | _|| _t||\}	| _tj| j| jj	d| _t
|	dks7J t
|	|	d | _|| _|| _|| _|| _|| _|| _d| _dS )a
  Initialize this datasink.

        Args:
            path: The folder to write files to.
            filesystem: The filesystem to write files to. If not provided, the
                filesystem is inferred from the path.
            try_create_dir: Whether to create the directory to write files to.
            open_stream_args: Arguments to pass to ``filesystem.open_output_stream``.
            filename_provider: A :class:`ray.data.datasource.FilenameProvider` that
                generates filenames for each row or block.
            dataset_uuid: The UUID of the dataset being written. If specified, it's
                included in the filename.
            file_format: The file extension. If specified, files are written with this
                extension.
        N)r   r   )Zretryable_errors   r   F)r   r   Zget_current_data_contextunresolved_pathr   r   r   wrapretried_io_errorslenr!   r   r   r   r   r   r    has_created_dir)
selfr!   r   r   r   r   r   r   r    paths r+   m/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/datasource/file_datasink.py__init__!   s*   



z_FileDatasink.__init__returnpyarrow.NativeFilec                 C   s   | j j|fi | jS N)r   open_output_streamr   )r)   r!   r+   r+   r,   r1   U   s   z _FileDatasink.open_output_streamc                 C   s   ddl m} | j| jj|ju}|rQ| jtj	kr"t
d| j d| jtjkr7td| j d| j  d S | jtjkrQtd| j d| j  | j| j | | j| _d S )Nr   FileTypezPath zO already exists. If this is unexpected, use mode='ignore' to ignore those filesz
[SaveMode=z] Skipping z] Replacing contents )
pyarrow.fsr3   r   get_file_infor!   typeNotFoundr    r   ERROR
ValueErrorZIGNOREloggerwarningZ	OVERWRITEZdelete_dir_contents_create_dirr(   )r)   r3   Z
dir_existsr+   r+   r,   on_write_startX   s   z_FileDatasink.on_write_startc                 C   sj   ddl m} t|}|jdk}|o| jj }| jr3|s3| j|j	|j
u r3t|}| jj|dd dS dS )zsCreate a directory to write files to.

        If ``try_create_dir`` is ``False``, this method is a no-op.
        r   r2   Zs3T)	recursiveF)r4   r3   r   schemer#   Zs3_try_create_dirr   r   r5   r6   r7   r	   Z
create_dir)r)   destr3   Z
parsed_uriZ	is_s3_uriZskip_create_dir_for_s3tmpr+   r+   r,   r<   k   s   

z_FileDatasink._create_dirblocksctxc                 C   s`   t  }|D ]}|| q| }t|}| dkr'td| j  d S | 	|d| d S )Nr   zSkipped writing empty block to )
r
   Z	add_blockbuildr   Z	for_blocknum_rowsr:   r;   r!   write_block)r)   rB   rC   builderblockZblock_accessorr+   r+   r,   write   s   
z_FileDatasink.writerH   block_indexc                 C      t r0   NotImplementedError)r)   rH   rJ   rC   r+   r+   r,   rF      s   z_FileDatasink.write_blockwrite_resultc                 C   s*   | j r|jdkr| j| j d S d S d S )Nr   )r(   rE   r   Z
delete_dirr!   )r)   rN   r+   r+   r,   on_write_complete   s   z_FileDatasink.on_write_completec                 C   s   t | j S r0   )r   r$   r)   r+   r+   r,   supports_distributed_writes   s   z)_FileDatasink.supports_distributed_writes)r.   N)__name__
__module____qualname__r   APPENDstrr   boolr   r   r   r-   r1   r=   r<   r   r   r   rI   r   intrF   r   rO   propertyrQ   r+   r+   r+   r,   r       sN    	

4
 
r   c                   @   s@   e Zd ZdZdeeef ddfddZdede	d	e
fd
dZdS )RowBasedFileDatasinka  A datasink that writes one row to each file.

    Subclasses must implement ``write_row_to_file`` and call the superclass constructor.

    Examples:
        .. testcode::

            import io
            from typing import Any, Dict

            import pyarrow
            from PIL import Image

            from ray.data.datasource import RowBasedFileDatasink

            class ImageDatasink(RowBasedFileDatasink):
                def __init__(self, path: str, *, column: str, file_format: str = "png"):
                    super().__init__(path, file_format=file_format)
                    self._file_format = file_format
                    self._column = column

                def write_row_to_file(self, row: Dict[str, Any], file: "pyarrow.NativeFile"):
                    image = Image.fromarray(row[self._column])
                    buffer = io.BytesIO()
                    image.save(buffer, format=self._file_format)
                    file.write(buffer.getvalue())
    rowfiler/   c                 C   rK   )zWrite a row to a file.

        Args:
            row: The row to write.
            file: The file to write the row to.
        rL   )r)   r[   r\   r+   r+   r,   write_row_to_file      z&RowBasedFileDatasink.write_row_to_filerH   rJ   rC   c                    s   t |jddD ]7\} j |jt |j||}tj	|t
d d  fdd}t|d djjd	 qd S )
NF)Zpublic_row_formatWriting  file.c                     <    }  |  W d    d S 1 sw   Y  d S r0   )r1   r]   r\   r[   r)   
write_pathr+   r,   write_row_to_path      "z;RowBasedFileDatasink.write_block.<locals>.write_row_to_pathwrite ''descriptionmatch)	enumerateZ	iter_rowsr   Zget_filename_for_rowkwargsr   task_idx	posixpathjoinr!   r:   debugr   r#   r&   )r)   rH   rJ   rC   Z	row_indexfilenamere   r+   rc   r,   rF      s"   
z RowBasedFileDatasink.write_blockN)rR   rS   rT   __doc__r   rV   r   r]   r   rX   r   rF   r+   r+   r+   r,   rZ      s    	rZ   c                       sn   e Zd ZdZdddee f fddZdedd	fd
dZdedede	fddZ
edee fddZ  ZS )BlockBasedFileDatasinka)  A datasink that writes multiple rows to each file.

    Subclasses must implement ``write_block_to_file`` and call the superclass
    constructor.

    Examples:
        .. testcode::

            class CSVDatasink(BlockBasedFileDatasink):
                def __init__(self, path: str):
                    super().__init__(path, file_format="csv")

                def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
                    from pyarrow import csv
                    csv.write_csv(block.to_arrow(), file)
    N)min_rows_per_fileru   c                   s   t  j|fi | || _d S r0   )superr-   _min_rows_per_file)r)   r!   ru   Zfile_datasink_kwargs	__class__r+   r,   r-      s   
zBlockBasedFileDatasink.__init__rH   r\   r/   c                 C   rK   )zWrite a block of data to a file.

        Args:
            block: The block to write.
            file: The file to write the block to.
        rL   )r)   rH   r\   r+   r+   r,   write_block_to_file  r^   z*BlockBasedFileDatasink.write_block_to_filerJ   rC   c                    sh   j  |jt |j|}tj| fdd}t	d d t
|d djjd d S )Nc                     ra   r0   )r1   rz   rb   rH   r)   rd   r+   r,   write_block_to_path  rf   z?BlockBasedFileDatasink.write_block.<locals>.write_block_to_pathr_   r`   rg   rh   ri   )r   Zget_filename_for_blockrm   r   rn   ro   rp   r!   r:   rq   r   r#   r&   )r)   rH   rJ   rC   rr   r|   r+   r{   r,   rF   
  s   

z"BlockBasedFileDatasink.write_blockr.   c                 C   s   | j S r0   )rw   rP   r+   r+   r,   min_rows_per_write  s   z)BlockBasedFileDatasink.min_rows_per_write)rR   rS   rT   rs   r   rX   r-   r   rz   r   rF   rY   r}   __classcell__r+   r+   rx   r,   rt      s    	rt   )/loggingro   typingr   r   r   r   r   urllib.parser   Zray._common.retryr   Zray._private.arrow_utilsr	   Z+ray.data._internal.delegating_block_builderr
   Z'ray.data._internal.execution.interfacesr   Z(ray.data._internal.planner.plan_write_opr   Zray.data._internal.savemoder   Zray.data._internal.utilr   r   Zray.data.blockr   r   Zray.data.contextr   Zray.data.datasource.datasinkr   r   Z%ray.data.datasource.filename_providerr   r   Zray.data.datasource.path_utilr   Zray.util.annotationsr   Zpyarrow	getLoggerrR   r:   r   rZ   rt   r+   r+   r+   r,   <module>   s4    
 
=