o
    1 i'                     @   s.  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ zd dlZW n ey_   dZY nw erhd d
lmZ ee Z!eddG dd dZ"ej#d dG dd dZ$dd Z%G dd dZ&dd Z'dS )    N)defaultdict)TYPE_CHECKINGAnyListOptional)(_ref_bundles_iterator_to_block_refs_list)cached_remote_fn)BlockAccessor)DataContext)	ObjectRef)	PublicAPI)Datasetalpha)Z	stabilityc                   @   s   e Zd ZdZdddedefddZdd	 Zded
e	e fddZ
dee d
eee  fddZd
efddZdefddZded
efddZdS )RandomAccessDatasetzuA class that provides distributed, random access to a Dataset.

    See: ``Dataset.to_random_access_dataset()``.
    dsr   keynum_workersc                    sj  |j dd}|du st|trtdt }td |}t	t
 | }t|}td t fdd|D }	g _d_g _t|	D ] \}
}|rlj||
  jdu rd|d	 _j|d
  qLtd| t }|jfddt|D _ \__tdj tfddjD  td t | _dS )zConstruct a RandomAccessDataset (internal API).

        The constructor is a private API. Use ``ds.to_random_access_dataset()``
        to construct a RandomAccessDataset.
        T)Zfetch_if_missingNz6RandomAccessDataset only supports Arrow-format blocks.z%[setup] Indexing dataset by sort key.z%[setup] Computing block range bounds.c                    s   g | ]}  |qS  )remote).0b)
get_boundsr   r   j/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/random_access_dataset.py
<listcomp>=   s    z0RandomAccessDataset.__init__.<locals>.<listcomp>r      z*[setup] Creating {} random access workers.c                    s   g | ]}t jd  qS ))scheduling_strategy)_RandomAccessWorkeroptionsr   )r   _)r   r   r   r   r   K   s    z'[setup] Worker to blocks assignment: {}c                    s,   g | ]}|j  fd d j| D qS )c                    s   i | ]}| j | qS r   )_non_empty_blocks)r   iselfr   r   
<dictcomp>\   s    
z;RandomAccessDataset.__init__.<locals>.<listcomp>.<dictcomp>)assign_blocksr   _worker_to_blocks_mapr   wr!   r   r   r   Z   s    
z-[setup] Finished assigning blocks to workers.)schema
isinstancetype
ValueErrortimeperf_counterloggerinfosortr   _get_boundsZiter_internal_ref_bundlesr   raygetr   _lower_bound_upper_bounds	enumerateappendformatr
   Zget_currentr   range_workers$_compute_block_to_worker_assignments_block_to_workers_mapr%   _build_time)r"   r   r   r   r(   startZ	sorted_dsZbundlesblocksZboundsr    r   ctxr   )r   r   r   r"   r   __init__&   sR   




	


zRandomAccessDataset.__init__c                 C   s  t t}t t}t t}tdd | jD }t|D ]\}}|| | j|  qtj| j	}t| j	D ](\}}	||	 }
|
dg }|D ]}|| D ]}|| | || | qLqFq6t| j	D ] \}}	t
|| dkrt| j}|| | || | qd||fS )Nc                 S      g | ]}|j  qS r   )pingr   r&   r   r   r   r   o       zLRandomAccessDataset._compute_block_to_worker_assignments.<locals>.<listcomp>Znode_idsr   )r   listr2   r3   r:   r6   r7   ZexperimentalZget_object_locationsr   lenrandomchoice)r"   Zblock_to_workersZworker_to_blocksZloc_to_workerslocsr    locZ
block_locsZ	block_idxblockZ
block_infoZworkerr   r   r   r;   h   s.   z8RandomAccessDataset._compute_block_to_worker_assignmentsreturnc                 C   s0   |  |}|du rtdS | |j||S )zAsynchronously finds the record for a single key.

        Args:
            key: The key of the record to find.

        Returns:
            ObjectRef containing the record (in pydict form), or None if not found.
        N)_find_ler2   put_worker_forr3   r   )r"   r   block_indexr   r   r   	get_async   s   
	
zRandomAccessDataset.get_asynckeysc                    s   t t}|D ]}|| | | qi }| D ]\}}|du r"q| |j|gt| |}|||< qi  | D ]\}}|| }t	
|}	t||	D ]\}}
|
 |< qNq< fdd|D S )zSynchronously find the records for a list of keys.

        Args:
            keys: List of keys to find the records for.

        Returns:
            List of found records (in pydict form), or None for missing records.
        Nc                    s   g | ]}  |qS r   )r3   )r   kresultsr   r   r      rD   z0RandomAccessDataset.multiget.<locals>.<listcomp>)r   rE   rM   r7   itemsrO   multigetr   rF   r2   r3   zip)r"   rR   ZbatchesrS   futuresindexZkeybatchfutr    valuesvr   rT   r   rW      s&   	


zRandomAccessDataset.multigetc              	   C   s   t dd | jD }tdd |D }dd |D }dd |D }d}|dt| jd	7 }|d
t|7 }|dt|t	|t
t|t| 7 }|dt|t	|t
t|t| 7 }|dt
|dt|  d 7 }|S )z6Returns a string containing access timing information.c                 S   rB   r   )statsr   r&   r   r   r   r      rD   z-RandomAccessDataset.stats.<locals>.<listcomp>c                 s   s    | ]}|d  V  qdS )
total_timeNr   r   sr   r   r   	<genexpr>   s    z,RandomAccessDataset.stats.<locals>.<genexpr>c                 S      g | ]}|d  qS )num_accessesr   r`   r   r   r   r          c                 S   rc   )
num_blocksr   r`   r   r   r   r      re   zRandomAccessDataset:
z- Build time: {}s
   z- Num workers: {}
z-- Blocks per worker: {} min, {} max, {} mean
z/- Accesses per worker: {} min, {} max, {} mean
z- Mean access time: {}us
r   g    .A)r2   r3   r:   sumr8   roundr=   rF   minmaxint)r"   r^   r_   Zaccessesr?   msgr   r   r   r^      s"   zRandomAccessDataset.statsrP   c                 C   s   t | j| S N)rG   rH   r<   )r"   rP   r   r   r   rO      s   zRandomAccessDataset._worker_forxc                 C   s.   t | j|}|t| jks|| jk rd S |S rn   )bisectbisect_leftr5   rF   r4   )r"   ro   r    r   r   r   rM      s   zRandomAccessDataset._find_leN)__name__
__module____qualname____doc__strrl   rA   r;   r   r   rQ   r   r   rW   r^   rO   rM   r   r   r   r   r      s    
Br   )Znum_cpusc                   @   sJ   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdefddZ	dd Z
dS )r   c                 C   s   d | _ || _d| _d| _d S )Nr   )r?   	key_fieldrd   r_   )r"   rw   r   r   r   rA      s   
z_RandomAccessWorker.__init__c                 C   s   dd |  D | _d S )Nc                 S   s   i | ]
\}}|t |qS r   )r2   r3   )r   rS   refr   r   r   r#          z5_RandomAccessWorker.assign_blocks.<locals>.<dictcomp>)rV   r?   )r"   Zblock_ref_dictr   r   r   r$      s   z!_RandomAccessWorker.assign_blocksc                 C   s<   t  }| ||}|  jt  | 7  _|  jd7  _|S )Nr   )r,   r-   _getr_   rd   )r"   rP   r   r>   resultr   r   r   r3      s
   z_RandomAccessWorker.getc                    s   t  }j|d  }tt|dkrFtj|d  tjrFj|d  }|j }t	
||}t|  fddt||||D }nfddt||D } jt  | 7  _ jd7  _|S )Nr   r   c                    s,   g | ]\}}}|  |kr |nd qS rn   )as_py_get_row)r   r    Zk1Zk2)accr   r   r      s    z0_RandomAccessWorker.multiget.<locals>.<listcomp>c                    s   g | ]
\}}  ||qS r   )rz   )r   r    rS   r!   r   r   r      ry   )r,   r-   r?   rF   setr)   paTablerw   npZsearchsortedr	   	for_blockrX   Ztaker_   rd   )r"   Zblock_indicesrR   r>   rK   colindicesr{   r   )r~   r"   r   rW      s    


z_RandomAccessWorker.multigetc                 C   s   t   S rn   )r2   Zget_runtime_contextZget_node_idr!   r   r   r   rC      s   z_RandomAccessWorker.pingrL   c                 C   s   t | j| j| jdS )N)rf   rd   r_   )rF   r?   rd   r_   r!   r   r   r   r^      s   z_RandomAccessWorker.statsc                 C   s^   |d u rd S | j | }|| j }t|tjrt|}t||}|d u r%d S t|}|	|S rn   )
r?   rw   r)   r   r   _ArrowListWrapper_binary_search_findr	   r   r}   )r"   rP   r   rK   columnr    r~   r   r   r   rz     s   




z_RandomAccessWorker._getN)rr   rs   rt   rA   r$   r3   rW   rC   dictr^   rz   r   r   r   r   r      s    r   c                 C   s,   t | |}|t| kr| | |kr|S d S rn   )rp   rq   rF   )r   ro   r    r   r   r   r     s   r   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )r   c                 C   s
   || _ d S rn   )	arrow_col)r"   r   r   r   r   rA        
z_ArrowListWrapper.__init__c                 C   s   | j |  S rn   )r   r|   )r"   r    r   r   r   __getitem__  s   z_ArrowListWrapper.__getitem__c                 C   s
   t | jS rn   )rF   r   r!   r   r   r   __len__  r   z_ArrowListWrapper.__len__N)rr   rs   rt   rA   r   r   r   r   r   r   r     s    r   c                 C   sX   t | dkrd S | | d | | t | d  f}t| tjr*|d  |d  f}|S )Nr   r   )rF   r)   r   r   r|   )rK   r   r   r   r   r   r1   !  s    r1   )(rp   loggingrG   r,   collectionsr   typingr   r   r   r   numpyr   r2   Z2ray.data._internal.execution.interfaces.ref_bundler   Zray.data._internal.remote_fnr   Zray.data.blockr	   Zray.data.contextr
   Z	ray.typesr   Zray.util.annotationsr   Zpyarrowr   ImportErrorZray.data.datasetr   	getLoggerrr   r.   r   r   r   r   r   r1   r   r   r   r   <module>   s<    
 
0?