o
    1 i                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	 d dl
Z
d dlmZ d dlmZmZ d dlmZ eeZe	dZ	 eeG dd	 d	ee ZeG d
d dee ZeG dd ded Zdee defddZdS )    N)	dataclass)GenericIterableListOptionalTypeVar)TaskContext)BlockBlockAccessor)DeveloperAPIWriteReturnTypec                   @   s<   e Zd ZU dZeed< eed< ee ed< ed
ddZ	d	S )WriteResultz3Aggregated result of the Datasink write operations.num_rows
size_byteswrite_returnswrsreturnc                 G   sJ   t dd |D }t dd |D }ttjdd |D  }t|||dS )Nc                 s       | ]}|j V  qd S N)r   .0wr r   h/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/datasource/datasink.py	<genexpr>        z&WriteResult.combine.<locals>.<genexpr>c                 s   r   r   )r   r   r   r   r   r   !   r   c                 S   s   g | ]}|j qS r   )r   r   r   r   r   
<listcomp>"   s    z'WriteResult.combine.<locals>.<listcomp>)r   r   r   )sumlist	itertoolschainr   )clsr   r   r   r   r   r   r   combine   s   zWriteResult.combineN)r   r   r   r   )
__name__
__module____qualname____doc__int__annotations__r   r   classmethodr"   r   r   r   r   r      s   
 r   c                   @   s   e Zd ZdZdddZdee dedefdd	Z	d
e
e fddZdeddfddZdefddZedefddZedee fddZdS )DatasinkzInterface for defining write-related logic.

    If you want to write data to something that isn't built-in, subclass this class
    and call :meth:`~ray.data.Dataset.write_datasink`.
    r   Nc                 C      dS )zCallback for when a write job starts.

        Use this method to perform setup for write tasks. For example, creating a
        staging bucket in S3.
        Nr   selfr   r   r   on_write_start3      zDatasink.on_write_startblocksctxc                 C   s   t )a  Write blocks. This is used by a single write task.

        Args:
            blocks: Generator of data blocks.
            ctx: ``TaskContext`` for the write task.

        Returns:
            Result of this write task. When the entire write operator finishes,
            All returned values will be passed as `WriteResult.write_returns`
            to `Datasink.on_write_complete`.
        )NotImplementedError)r-   r0   r1   r   r   r   write;   s   zDatasink.writewrite_resultc                 C   r+   )a  Callback for when a write job completes.

        This can be used to `commit` a write output. This method must
        succeed prior to ``write_datasink()`` returning to the user. If this
        method fails, then ``on_write_failed()`` is called.

        Args:
            write_result: Aggregated result of the
               Write operator, containing write results and stats.
        Nr   r-   r4   r   r   r   on_write_completeM   s   zDatasink.on_write_completeerrorc                 C   r+   )zCallback for when a write job fails.

        This is called on a best-effort basis on write failures.

        Args:
            error: The first error encountered.
        Nr   r-   r7   r   r   r   on_write_failedZ   s   zDatasink.on_write_failedc                 C   sD   t | j}d}|dr|dd }||r |dt|  }|S )zoReturn a human-readable name for this datasink.

        This is used as the names of the write tasks.
        r*   _   N)typer#   
startswithendswithlen)r-   nameZdatasink_suffixr   r   r   get_named   s   


zDatasink.get_namec                 C   r+   )z;If ``False``, only launch write tasks on the driver's node.Tr   r,   r   r   r   supports_distributed_writesq   s   z$Datasink.supports_distributed_writesc                 C   r+   )zThe target number of rows to pass to each :meth:`~ray.data.Datasink.write` call.

        If ``None``, Ray Data passes a system-chosen number of rows.
        Nr   r,   r   r   r   min_rows_per_writev   r/   zDatasink.min_rows_per_write)r   N)r#   r$   r%   r&   r.   r   r	   r   r   r3   r   r6   	Exceptionr9   strrA   propertyboolrB   r   r'   rC   r   r   r   r   r*   +   s"    


r*   c                   @   sV   e Zd ZdZdd Zdee deddfdd	Zd
e	d fddZ
deddfddZdS )DummyOutputDatasinka0  An example implementation of a writable datasource for testing.
    Examples:
        >>> import ray
        >>> from ray.data.datasource import DummyOutputDatasink
        >>> output = DummyOutputDatasink()
        >>> ray.data.range(10).write_datasink(output)
        >>> assert output.num_ok == 1
    c                 C   sH   t jj }t j|jdG dd d}| | _d| _d| _d| _	d S )N)scheduling_strategyc                   @   s.   e Zd Zdd ZdeddfddZdd	 ZdS )
z.DummyOutputDatasink.__init__.<locals>.DataSinkc                 S   s   d| _ d| _d S )Nr   T)rows_writtenenabledr,   r   r   r   __init__   s   
z7DummyOutputDatasink.__init__.<locals>.DataSink.__init__blockr   Nc                 S   s    t |}|  j| 7  _d S r   )r
   Z	for_blockrJ   r   )r-   rM   r   r   r   r3      s   
z4DummyOutputDatasink.__init__.<locals>.DataSink.writec                 S   s   | j S r   )rJ   r,   r   r   r   get_rows_written   s   z?DummyOutputDatasink.__init__.<locals>.DataSink.get_rows_written)r#   r$   r%   rL   r	   r3   rN   r   r   r   r   DataSink   s    rO   r   T)
raydataZDataContextZget_currentremoterI   	data_sinknum_ok
num_failedrK   )r-   r1   rO   r   r   r   rL      s   

zDummyOutputDatasink.__init__r0   r1   r   Nc                 C   s>   g }| j s	td|D ]}|| jj| qt| d S )Ndisabled)rK   
ValueErrorappendrS   r3   rR   rP   get)r-   r0   r1   tasksbr   r   r   r3      s   zDummyOutputDatasink.writer4   c                 C      |  j d7  _ d S Nr;   )rT   r5   r   r   r   r6         z%DummyOutputDatasink.on_write_completer7   c                 C   r\   r]   )rU   r8   r   r   r   r9      r^   z#DummyOutputDatasink.on_write_failed)r#   r$   r%   r&   rL   r   r	   r   r3   r   r6   rD   r9   r   r   r   r   rH      s    	
rH   write_result_blocksr   c                    s`   dd l  t fdd| D sJ tdd | D }tdd | D }dd | D }t|||S )Nr   c                 3   s(    | ]}t | jot|d kV  qdS )r;   N)
isinstanceZ	DataFramer?   )r   rM   pdr   r   r      s
    
z-_gen_datasink_write_result.<locals>.<genexpr>c                 s       | ]	}|d    V  qdS )r   Nr   r   resultr   r   r   r          c                 s   rc   )r   Nrd   re   r   r   r   r      rg   c                 S   s   g | ]}|d  d qS )Zwrite_returnr   r   re   r   r   r   r      s    z._gen_datasink_write_result.<locals>.<listcomp>)Zpandasallr   r   )r_   Ztotal_num_rowsZtotal_size_bytesr   r   ra   r   _gen_datasink_write_result   s   ri   )r   loggingdataclassesr   typingr   r   r   r   r   rP   Z'ray.data._internal.execution.interfacesr   Zray.data.blockr	   r
   Zray.util.annotationsr   	getLoggerr#   loggerr   r   r*   rH   ri   r   r   r   r   <module>   s.    
S4