o
    1 i+                     @   s*  d dl Z d dlZd dlmZmZmZmZ d dlZd dlm	Z	 d dl
mZ d dlmZmZmZmZmZ d dlmZ eeZdedee fd	d
Zdee dee dee fddZdee dee deee  fddZdedededee deeeeee f ef df f
ddZdee dedee fddZdeeee ef  deee  dedeeee ef  fddZdeeee ef  d ee deeeee   eee  f fd!d"Z	#	d*deeee ef  d$ee ded%ee deeeee   eee  f f
d&d'Z dedefd(d)Z!dS )+    N)IterableListTupleUnion)trace_deallocation)cached_remote_fn)BlockBlockAccessorBlockExecStatsBlockMetadataBlockPartition)	ObjectRefblocks_with_metadatareturnc                 C   sP   t t}g }| D ]\}}|jdu rt||}||_n|j}|| q|S )z@Calculate the number of rows for a list of blocks with metadata.N)r   _get_num_rowsnum_rowsraygetremoteappend)r   Zget_num_rows
block_rowsblockmetadatar    r   d/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/_internal/split.py_calculate_blocks_rows   s   
r   num_rows_per_blocksplit_indicesc                    s   t |   fdd|D S )zTGenerate valid split indices by apply min(index, total_num_rows)
    to every index.c                    s   g | ]}t | qS r   )min).0indexZ
total_rowsr   r   
<listcomp>,   s    z+_generate_valid_indices.<locals>.<listcomp>)sum)r   r   r   r!   r   _generate_valid_indices%   s   r$   c           	      C   s   g }d}g }d}d}|t |k rA|| }| | }|| |kr*|||  |d7 }q
|| g }|| | 7 }|d7 }|t |k st |t | k rX|| g }t |t | k sI|S )a:  Given num rows per block and valid split indices, generate per block split indices.

    Args:
        num_rows_per_block: num of rows per block.
        split_indices: The (global) indices at which to split the blocks.
    Returns:
        Per block split indices indicates each input block's split point(s).
    r      )lenr   )	r   r   per_block_split_indicesZcurrent_input_block_idZcurrent_block_split_indicesZcurrent_block_global_offsetZcurrent_index_idZsplit_indexZcurrent_block_rowr   r   r   !_generate_per_block_split_indices/   s0   

r(   block_idr   meta.c                 C   s   g }g }t |}d}||j |D ]7}td| d|  t }	|||}
t |
}t	| |
 |j|	 d}|| ||
 |}q| |fg}|| t|S )aP  Split the provided block at the given indices.

    Args:
        block_id: the id of this block in the block list.
        block: block to be split.
        meta: metadata of the block, we expect meta.num is valid.
        split_indices: the indices where the block should be split.
    Returns:
        returns block_id, split blocks metadata, and a list of blocks
        in the following form. We return blocks in this way
        so that the owner of blocks could be the caller(driver)
        instead of worker itself.
        Tuple(block_id, split_blocks_meta), block0, block1 ...
    r   zslicing block :)r   
size_bytesinput_filesZ
exec_stats)r	   	for_blockr   r   loggerdebugr
   builderslicer   r,   r-   buildextendtuple)r)   r   r*   r   Z
split_metaZsplit_blocksZblock_accessor
prev_indexr    statsZsplit_blockaccessor_metaresultsr   r   r   _split_single_block[   s,   





r;   block_split_indicesr   c                 C   s@   d}g }| D ]}|dks||krq||krq| | |}q|S )zdrop split indices that creates empty block split. This could happen when there
    are duplicated indices, or index equal to 0 (start of the block) or num_block_rows
    (end of the block).
    r   )r   )r<   r   r6   Zoptimized_indicesr    r   r   r   _drop_empty_block_split   s   
r>   r'   owned_by_consumerc                 C   s>  t t}dgt|  }g }g }g }t|D ]F\}}	| | \}
}|j}t|	|}	t|	dkr5|
|fg||< q|jddt|	 d||
||	}||d  ||dd  ||
 q|rt	
|}t||D ]\\}}}t|t|ksxJ t||||< qh|r|D ]}t|d qn|D ]	}t|ddd	 qtj|S )
z5Split all the input blocks based on the split indicesNr   ZSPREAD   )Zscheduling_strategyZnum_returnsr%   zsplit._split_all_blocksF)free)r   r;   r&   	enumerater   r>   optionsr   r   r   r   zipr   	itertoolschainfrom_iterable)r   r'   r?   Zsplit_single_blockall_blocks_split_resultsZ per_block_split_metadata_futuresZper_block_split_block_refsZblocks_splittedr)   r<   	block_refr*   Z	block_rowZobject_refsZper_block_split_metadataZ
block_refsbr   r   r   _split_all_blocks   sL   

rK   rH   global_split_sizesc           
      C   s   g }g }g }g }d}d}|t |k rP||| kr5||| ks J || || g }g }d}|d7 }nt| \}}	|| ||	 ||	j7 }|t |k s||fS )z<Reassemble per block's split result into final split result.r   r%   )r&   r   nextr   )
rH   rL   Zresult_blocksZresult_metasZcurrent_blocksZcurrent_metaZcurrent_split_sizeZcurrent_split_idrI   r*   r   r   r   _generate_global_split_results   s*   





rN   Tindicesr   c                    s   t | } t| dkrg gt|d  g gt|d  fS |du r$t| }t||}t||}t| ||}dg| t|g   fddtdt D }t||S )a!  Split blocks at the provided indices.

    Args:
        blocks_with_metadata: Block futures to split, including the associated metadata.
        indices: The (global) indices at which to split the blocks.
        owned_by_consumer: Whether the provided blocks are owned by the consumer.
        block_rows: The number of rows for each block, in case it has already been
            computed.

    Returns:
        The block split futures and their metadata. If an index split is empty, the
        corresponding block split will be empty .
    r   r%   Nc                    s    g | ]} |  |d    qS )r%   r   )r   ihelperr   r   r"   !  s     z%_split_at_indices.<locals>.<listcomp>)	listr&   r   r$   r(   rK   r#   rangerN   )r   rO   r?   r   Zvalid_indicesr'   rH   Zsplit_sizesr   rQ   r   _split_at_indices   s    $
	
rU   c                 C   s   t |  S )z7Get the number of rows contained in the provided block.)r	   r.   r   )r   r   r   r   r   &  s   r   )TN)"rE   loggingtypingr   r   r   r   r   Z!ray.data._internal.memory_tracingr   Zray.data._internal.remote_fnr   Zray.data.blockr   r	   r
   r   r   Z	ray.typesr   	getLogger__name__r/   intr   r$   r(   r;   r>   boolrK   rN   rU   r   r   r   r   r   <module>   s    





,
.

<
#
0