o
    1 i                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ ededee fd	d
ZedededefddZG dd dZG dd deZG dd deZG dd dZdee fddZdS )    N)deque)AnyCallableDequeDictListOptionalUnion)Dataset)AggregateFnV2)DeveloperAPIvaluereturnc                 C   s
   |  dS )z*Tokenize a string using a split on spaces. )split)r    r   h/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/preprocessors/utils.pysimple_split_tokenizer
   s   
r   num_featuresc                 C   s,   t |  }t|}t| d}|| S )z6Deterministically hash a value into the integer space.   )strencodehashlibsha1int	hexdigest)r   r   Zencoded_valueZhashed_valueZhashed_value_intr   r   r   simple_hash   s   
r   c                	   @   sB   e Zd ZdZdd ddeeef dedeegef fdd	Zd
S )BaseStatSpeczEEncapsulates a statistical computation with optional post-processing.c                 C      | S Nr   xr   r   r   <lambda>        zBaseStatSpec.<lambda>post_process_fnstat_fnr%   post_key_fnc                C   s   || _ || _|| _d S r   r&   r%   r'   )selfr&   r%   r'   r   r   r   __init__   s   
zBaseStatSpec.__init__N)	__name__
__module____qualname____doc__r	   r   r   r   r*   r   r   r   r   r      s    
r   c                
       s^   e Zd ZdZdd dddeeeegef f dedeegef d	ee f fd
dZ	  Z
S )AggregateStatSpecz5Represents an AggregateFnV2 spec for a single column.c                 C   r   r   r   r    r   r   r   r"   /   r#   zAggregateStatSpec.<lambda>N)r%   columnaggregator_fnr%   r'   r0   c                   s   t  j|||d || _d S Nr(   )superr*   r0   )r)   r1   r%   r'   r0   	__class__r   r   r*   +   s   
zAggregateStatSpec.__init__)r+   r,   r-   r.   r	   r   r   r   r   r*   __classcell__r   r   r4   r   r/   (   s    r/   c                       s`   e Zd ZdZdd ddedeeegef  deeegef  ded	ee f
 fd
dZ  Z	S )CallableStatSpeczPRepresents a user-defined stat function that operates outside Dataset.aggregate.c                 C   r   r   r   r    r   r   r   r"   D   r#   zCallableStatSpec.<lambda>r$   r&   stat_key_fnr'   r%   columnsc                   s"   t  j|||d || _|| _d S r2   )r3   r*   r9   r8   )r)   r&   r8   r'   r%   r9   r4   r   r   r*   >   s
   	
zCallableStatSpec.__init__)
r+   r,   r-   r.   r   r   r   r   r*   r6   r   r   r4   r   r7   ;   s    r7   c                   @   s  e Zd ZdZdd Zdd Zdd dd	d
eegef dede	eegef  de
e ddf
ddZdd dd	deg ef dedeegef de	eegef  de
e ddfddZdedeeef fddZde
e fddZde
e fddZde
e fddZdd  ZdS )!StatComputationPlana\  
    Encapsulates a set of aggregators (AggregateFnV2) and legacy stat functions
    to compute statistics over a Ray dataset.

    Supports two types of aggregations:
    1. AggregateFnV2-based aggregators, which are batch-executed using `Dataset.aggregate(...)`.
    2. Callable-based stat functions, executed sequentially (legacy use case).
    c                 C   s   t  | _d S r   )r   _aggregatorsr)   r   r   r   r*   X   s   zStatComputationPlan.__init__c                 C   s   | j   d S r   )r;   clearr<   r   r   r   reset[   s   zStatComputationPlan.resetc                 C   r   r   r   r    r   r   r   r"   b   r#   zStatComputationPlan.<lambda>N)r%   r'   r1   r%   r'   r9   r   c             	   C   s.   |D ]}||}| j t||||d qdS )a  
        Registers an AggregateFnV2 factory for one or more columns.

        Args:
            aggregator_fn: A callable (typically a lambda or class) that accepts a column name and returns an instance of AggregateFnV2.
            post_process_fn: Function to post-process the aggregated result.
            post_key_fn: Optional key generator to use to save aggregation results after post-processing.
            columns: List of column names to aggregate.
        )r1   r%   r'   r0   N)r;   appendr/   )r)   r1   r%   r'   r9   r0   Zagg_instancer   r   r   add_aggregator^   s   z"StatComputationPlan.add_aggregatorc                 C   r   r   r   r    r   r   r   r"   ~   r#   r&   r8   c             	   C   s"   | j t|||||p|d dS )a  
        Registers a custom stat function to be run sequentially.

        This supports legacy use cases where arbitrary callables are needed
        and cannot be run via Dataset.aggregate().

        :param post_key_fn:
        :param stat_fn: A zero-argument callable that returns the stat.
        :param post_process_fn: Function to apply to the result.
        :param columns:
        :param stat_key_fn:
        )r&   r%   r9   r8   r'   N)r;   r?   r7   )r)   r&   r%   r8   r'   r9   r   r   r   add_callable_statz   s   z%StatComputationPlan.add_callable_statdatasetc           
      C   s   i }|   }|r.|j| }|  D ]}|jj}|jdur"||jn|}||| ||< q|  D ]!}||j	}|j
D ]}	|	|	}||	}||| ||< q=q2|S )aq  
        Executes all registered aggregators and stat functions.

        AggregateFnV2-based aggregators are batched and executed via Dataset.aggregate().
        Callable-based stat functions are run sequentially.

        Args:
            dataset: The Ray Dataset to compute statistics on.

        Returns:
            A dictionary of computed statistics.
        N)_get_aggregate_fn_listZ	aggregate_get_aggregate_specsr&   namer'   r0   r%   _get_custom_stat_fn_specsr8   r9   )
r)   rB   statsZaggregatorsZ
raw_resultspecZstat_keyZpost_keyresultcolr   r   r   compute   s&   




zStatComputationPlan.computec                 C      dd | j D S )Nc                 S   s   g | ]
}t |tr|jqS r   )
isinstancer/   r&   .0rH   r   r   r   
<listcomp>   s    z>StatComputationPlan._get_aggregate_fn_list.<locals>.<listcomp>r;   r<   r   r   r   rC      s   z*StatComputationPlan._get_aggregate_fn_listc                 C   rL   )Nc                 S      g | ]	}t |tr|qS r   )rM   r/   rN   r   r   r   rP      
    
z<StatComputationPlan._get_aggregate_specs.<locals>.<listcomp>rQ   r<   r   r   r   rD         z(StatComputationPlan._get_aggregate_specsc                 C   rL   )Nc                 S   rR   r   )rM   r7   rN   r   r   r   rP      rS   zAStatComputationPlan._get_custom_stat_fn_specs.<locals>.<listcomp>rQ   r<   r   r   r   rF      rT   z-StatComputationPlan._get_custom_stat_fn_specsc                 C   s   t |  S )z4
        Iterates over all AggregatorSpecs.
        )iterrD   r<   r   r   r   __iter__   s   zStatComputationPlan.__iter__)r+   r,   r-   r.   r*   r>   r   r   r   r   r   r@   r   rA   r
   r   rK   rC   r/   rD   r7   rF   rV   r   r   r   r   r:   N   sJ    	
 

%r:   	callbacksc                    s    fdd}|S )z
    Wraps a base post-processing function with a sequence of callback functions.
    Useful when multiple post-processing steps need to be applied in order.
    c                    s    | }D ]}||}q|S r   r   )rI   	processedcbbase_fnrW   r   r   wrapper   s   
z$make_post_processor.<locals>.wrapperr   )r[   rW   r\   r   rZ   r   make_post_processor   s   r]   )r   collectionsr   typingr   r   r   r   r   r   r	   Zray.datar
   Zray.data.aggregater   Zray.util.annotationsr   r   r   objectr   r   r   r/   r7   r:   r]   r   r   r   r   <module>   s     $ 	