o
    1 i2                     @   s   d dl Z d dlmZ d dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZmZ d dlmZ erdd dlm Z  e !e"Z#eddG dd dZ$dS )    N)Path)AnyDictTYPE_CHECKING)COMPONENT_RL_MODULE)INPUT_ENV_SPACES)OfflinePreLearner)unflatten_dict)MultiAgentBatchSampleBatch)
force_list)OverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommended)	PublicAPI)AlgorithmConfigalpha)Z	stabilityc                   @   s`   e Zd ZedddZe			dded	ed
edee	e
f fddZedd Zedd ZdS )OfflineDataconfigr   c              
   C   s.  || _ | j j| _t|jtr| j jnt|j| _| j j| _| j j	| _
| j j| _|j| _|j| _d| _| j j| _| j j| _d | _| jdkrSdd l}|jdi | j| _n@| jdkrdtjjdi | j| _n/| jdkrxdd l}|jdi | j| _nt| jtjjr| j| _n| jd urtd| j d| jr| j
d| ji z7t  }t!t"j#| j| jfi | j
| _#| jr| j#$ | _#t  }t%&d	||  d
 t%'d(| j W n t)y } zt%*| W Y d }~nd }~ww d | _+| j,| j j-B | _-| j.| j j/B | _/d| _0| j j1p
t2| _1d | _3d | _4d | _5d S )NFZgcsr   Zs3absz"Unknown `config.input_filesystem` z! Filesystems can be None for local, any instance of `pyarrow.fs.FileSystem`, 'gcs' for GCS, 's3' for S3, or 'abs' for adlfs.AzureBlobFileSystem.
filesystemz/===> [OfflineData] - Time for loading dataset: zs.zReading data from {} )6r   Zis_multi_agent
isinstanceZinput_listr   pathZinput_read_methodZdata_read_methodZinput_read_method_kwargsZdata_read_method_kwargsZinput_read_batch_sizedata_read_batch_sizeZmaterialize_datamaterialize_mapped_datadata_is_mappedZinput_filesystemr   Zinput_filesystem_kwargsZfilesystem_kwargsZfilesystem_objectgcsfsZGCSFileSystempyarrowfsZS3FileSystemadlfsZAzureBlobFileSystemZ
FileSystem
ValueErrorupdatetimeperf_countergetattrraydatamaterializeloggerdebuginfoformat	Exceptionerrorbatch_iteratorsdefault_map_batches_kwargsmap_batches_kwargsdefault_iter_batches_kwargsiter_batches_kwargsZreturned_streaming_splitprelearner_classr   locality_hintslearner_handlesmodule_spec)selfr   r   r    
start_time	stop_timeer   r   j/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/rllib/offline/offline_data.py__init__   s   













zOfflineData.__init__F   Nnum_samplesreturn_iterator
num_shardsmodule_statec                 C   s  | j sO|s$|dkrt| jd jjtdt }n| jd jtdt }| j| jt	 | j
|d}| jj| jf|| jp;|d| j| _d| _ | jrO| j | _| jr[|rt| jtjr|dkrptd | jj|d| jd| _n+|ry| j | _n"d	tttjf d
tfdd}| jj d||d| j!| _t"| j| _|rt#| jS zt$| jW S  t%y   td d | _| j&|||d Y S w )Nr>   r   )	component)r   spacesr7   rB   )fn_constructor_kwargs
batch_sizeTz/===> [OfflineData]: Return streaming_split ... )nequalr5   _batchreturnc                 S   s4   t | } tdd |  D tdd |  D dS )Nc                 S   s   i | ]	\}}|t |qS r   )r   ).0Z	module_idmodule_datar   r   r<   
<dictcomp>   s    z;OfflineData.sample.<locals>._collate_fn.<locals>.<dictcomp>c                 s   s$    | ]}t tt| V  qd S )N)lennextitervalues)rK   rL   r   r   r<   	<genexpr>   s
    
z:OfflineData.sample.<locals>._collate_fn.<locals>.<genexpr>)Z	env_steps)r	   r
   itemssumrQ   )rI   r   r   r<   _collate_fn   s   z'OfflineData.sample.<locals>._collate_fn)rF   rU   z>===> [OfflineData]: Batch iterator exhausted. Reinitiating ...)r?   r@   rA   r   )'r   r&   getr6   Z	get_stateremoter   r   rD   r   r7   r'   Zmap_batchesr4   r   r1   r   r(   r/   r   typesGeneratorTyper)   r*   Zstreaming_splitr5   iteratorr   strnpZndarrayr
   Ziter_batchesr3   rP   r   rO   StopIterationsample)r8   r?   r@   rA   rB   rE   rU   r   r   r<   r^   }   s   





zOfflineData.samplec                 C   s   t d| jjddS )N   T)ZconcurrencyZzero_copy_batch)maxr   Znum_learnersr8   r   r   r<   r0     s   z&OfflineData.default_map_batches_kwargsc                 C   s   ddiS )NZprefetch_batchesr_   r   ra   r   r   r<   r2     s   z'OfflineData.default_iter_batches_kwargs)r   r   )Fr>   N)__name__
__module____qualname__r   r=   r   intboolr   r[   r   r^   propertyr0   r2   r   r   r   r<   r      s*    ]
 
r   )%loggingpathlibr   Z
pyarrow.fsr   numpyr\   r&   r#   rX   typingr   r   r   Zray.rllib.corer   Zray.rllib.envr   Z$ray.rllib.offline.offline_prelearnerr   Zray.rllib.utilsr	   Zray.rllib.policy.sample_batchr
   r   r   Zray.rllib.utils.annotationsr   r   Zray.util.annotationsr   Z%ray.rllib.algorithms.algorithm_configr   	getLoggerrb   r)   r   r   r   r   r<   <module>   s*    
