o
    1 it                     @   s   d dl mZmZ d dlmZ d dlmZmZ d dlm	Z	m
Z
mZmZ d dlmZ dee dedee fd	d
Zdedee dedeeeef fddZdee deee  dedeee ee ef fddZdedee dee fddZdS )    )ListTuple)	RefBundle)_calculate_blocks_rows_split_at_indices)BlockBlockMetadataBlockPartition_take_first_non_empty_schema)	ObjectRefper_split_bundlesowned_by_consumerreturnc                 C   s0  t | dkr| S dd | D }dd |D }tdd |D }|t | }t|||\}}}t||D ]\}	}
tdd |	D }||ksFJ ||
 |ksNJ q3tdd | D }t|||d	}t||}t|D ]\}}|| | td
d || D }||ksJ qhg }|D ]}|	t|||d	 q|S )zEqualize split ref bundles into equal number of rows.

    Args:
        per_split_bundles: ref bundles to equalize.
    Returns:
        the equalized ref bundles.
    r   c                 S   s   g | ]}|j qS  )blocks.0Zbundler   r   g/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/_internal/equalize.py
<listcomp>   s    z_equalize.<locals>.<listcomp>c                 S      g | ]}t |qS r   )r   )r   splitr   r   r   r      s    c                 S   r   r   )sum)r   Zblocks_rowsr   r   r   r      s    c                 S      g | ]\}}|j qS r   Znum_rowsr   _metar   r   r   r   *       c                 s   s    | ]}|j V  qd S )N)schemar   r   r   r   	<genexpr>0   s    z_equalize.<locals>.<genexpr>)owns_blocksr   c                 S   r   r   r   r   r   r   r   r   9   r   )
lenr   _shave_all_splitszipr
   r   _split_leftovers	enumerateextendappend)r   r   Zper_split_blocks_with_metadataper_split_num_rowsZ
total_rowsZtarget_split_sizeshaved_splitsper_split_needed_rows	leftoversZshaved_splitZsplit_needed_rowZnum_shaved_rowsr   Zleftover_bundleZleftover_splitsiZleftover_splitZequalized_ref_bundlesr   r   r   r   	_equalize   s8   

r-   r   num_rows_per_blocktarget_sizec           	      C   s\   g }g }d}t | |D ]\}}|| |kr|| ||7 }q|| q|| }|||fS )a  Shave a block list to the target size.

    Args:
        split: the block list to shave.
        num_rows_per_block: num rows for each block in the list.
        target_size: the upper bound target size of the shaved list.
    Returns:
        A tuple of:
            - shaved block list.
            - num of rows needed for the block list to meet the target size.
            - leftover blocks.

    r   )r#   r'   )	r   r.   r/   shavedr+   Zshaved_rowsZblock_with_metaZ
block_rowsnum_rows_neededr   r   r   _shave_one_splitE   s   


r2   input_splitsr(   c                 C   sZ   g }g }g }t | |D ]\}}t|||\}}	}
|| ||	 ||
 q|||fS )a  Shave all block list to the target size.

    Args:
        input_splits: all block list to shave.
        input_splits: num rows (per block) for each block list.
        target_size: the upper bound target size of the shaved lists.
    Returns:
        A tuple of:
            - all shaved block list.
            - num of rows needed for the block list to meet the target size.
            - leftover blocks.
    )r#   r2   r'   r&   )r3   r(   r/   r)   r*   r+   r   r.   r0   r1   Z
_leftoversr   r   r   r"   c   s   



r"   r+   r*   c                 C   sb   t |}g }d}t|D ]\}}|||  || }qt| j|| j}dd t| D d| S )z0Split leftover blocks by the num of rows needed.r   c                 S   s   g | ]\}}t t||qS r   )listr#   )r   Z
block_refsr   r   r   r   r      s    z$_split_leftovers.<locals>.<listcomp>N)r!   r%   r'   r   r   r    r#   )r+   r*   Z
num_splitsZsplit_indicesprevr,   r1   Zsplit_resultr   r   r   r$      s   
r$   N)typingr   r   Z'ray.data._internal.execution.interfacesr   Zray.data._internal.splitr   r   Zray.data.blockr   r   r	   r
   Z	ray.typesr   boolr-   intr2   r"   r$   r   r   r   r   <module>   sJ    
7


 