o
    1 i-                     @   s<  d dl Z d dlZd dlmZ d dlZd dlZd dlmZm	Z	m
Z
mZ d dlZd dlZd dlmZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZ d d	lmZ e
r^d d
lm Z  dZ!e "e#Z$dede%fddZ&dee% de%fddZ'e	 dddde(de	ej)j*eej)j* f fddZ+eG dd deZ,dS )    N)Path)ListTupleTYPE_CHECKINGOptional)InputReader)	IOContext)from_json_datapostprocess_actions)concat_samplesSampleBatchDEFAULT_POLICY_ID)override	PublicAPI)SampleBatchType)AlgorithmConfigg      ?fpathextract_pathc                 C   s@   t t| d}|| W d    d S 1 sw   Y  d S )Nr)zipfileZipFilestr
extractall)r   r   Zzip_ref r   l/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/rllib/offline/dataset_reader.py_unzip_this_path   s   "r   pathsformatc                 C   s,  g }| D ]}t dt|rbt|drtdd}z	tt|| W n% tyI   ztttj	j	| | W n tyF   td| w Y nw tt|
 t|j d|  }|| qt|dro|| qt| stttj	j	| }t| std| |}|| q|S )zLIf a path in paths is a zip file, unzip it and use path of the unzipped filez\.zip$zs3://z?unzip_if_needed currently does not support remote paths from s3z./zFile not found: .)researchr   
startswith
ValueErrorr   FileNotFoundErrorr   __file__parentabsolutestemappendexists)r   r   Z	ret_pathspathr   Zunzipped_pathZrelative_pathr   r   r   _unzip_if_needed   s>   r+   configr   num_workersreturnc                 C   s  | j dksJ d| j  | j}|d}ddg}|dur+||vr+td| d| |d	}|d
}|r?|s;|r?td|rC|sI|sItd|durpt|trV|g}nt|trgt|d tsfJ dntdt||}|d|pvd}|dt}|r| }	n%|dkrt	j
j||d|id}	n|dkrt	j
j||d|id}	ntd||dkr|	|	gfS |	j|dd|}
|	dg|
 fS )a  Returns a dataset and a list of shards.

    This function uses algorithm configs to create a dataset and a list of shards.
    The following config keys are used to create the dataset:
        input: The input type should be "dataset".
        input_config: A dict containing the following key and values:
            `format`: str, speciifies the format of the input data. This will be the
            format that ray dataset supports. See ray.data.Dataset for
            supported formats. Only "parquet" or "json" are supported for now.
            `paths`: str, a single string or a list of strings. Each string is a path
            to a file or a directory holding the dataset. It can be either a local path
            or a remote path (e.g. to an s3 bucket).
            `loader_fn`: Callable[None, ray.data.Dataset], Instead of
            specifying paths and format, you can specify a function to load the dataset.
            `parallelism`: int, The number of tasks to use for loading the dataset.
            If not specified, it will be set to the number of workers.
            `num_cpus_per_read_task`: float, The number of CPUs to use for each read
            task. If not specified, it will be set to 0.5.

    Args:
        config: The config dict for the algorithm.
        num_workers: The number of shards to create for remote workers.

    Returns:
        dataset: The dataset object.
        shards: A list of dataset shards. For num_workers > 0 the first returned
        shared would be a dummy None shard for local_worker.
    datasetzQMust specify config.input_ as 'dataset' if calling `get_dataset_and_shards`. Got r   jsonZparquetNzUnsupported format z. Supported formats are r   	loader_fnzBWhen using a `loader_fn`, you cannot specify a `format` or `path`.zMMust specify either a `loader_fn` or a `format` and `path` in `input_config`.r   z%Paths must be a list of path strings.z6Paths must be a path string or a list of path strings.parallelism   Znum_cpus_per_read_taskZnum_cpus)r2   Zray_remote_argsz!Un-supported Ray dataset format: F)Z
num_blocksshuffle)Zinput_input_configgetr"   
isinstancer   listr+   DEFAULT_NUM_CPUS_PER_TASKraydata	read_jsonZread_parquetZrepartitionsplit)r,   r-   r5   r   Zsupported_fmtsr   r1   r2   Zcpus_per_taskr/   Zremote_shardsr   r   r   get_dataset_and_shardsE   sh   !









r>   c                   @   sj   e Zd ZdZeddejjdee	 fddZ
eedefdd	Zd
edefddZd
edefddZdS )DatasetReadera  Reader object that loads data from Ray Dataset.

    Examples:
        config = {
            "input": "dataset",
            "input_config": {
                "format": "json",
                # A single data file, a directory, or anything
                # that ray.data.dataset recognizes.
                "paths": "/tmp/sample_batches/",
                # By default, parallelism=num_workers.
                "parallelism": 3,
                # Dataset allocates 0.5 CPU for each reader by default.
                # Adjust this value based on the size of your offline dataset.
                "num_cpus_per_read_task": 0.5,
            }
        }
    Ndsioctxc                    s(  |pt  _d __d_|_jsdnj _dtjj	
 _jjdd_jjdd}jjdd |rKttj| d_|rjjdursjjj_jt_jjddspjjjtnd_td	jj d
|  d  fdd}| _dS d_dS )ziInitializes a DatasetReader instance.

        Args:
            ds: Ray dataset to sample from.
        NFZtrain_batch_sizer3   Znum_env_runnersr   seedZ_disable_preprocessorszDatasetReader z has z
, samples.c                  3   s"    	 j j d} |  E d H  q)NT)rB   )_datasetZrandom_shuffleZ	iter_rows)r@   rB   selfr   r   iterator   s
   z(DatasetReader.__init__.<locals>.iterator)r   _ioctx_default_policyZ
policy_mappreprocessorrC   countr:   r;   ZDataContextZget_currentZenable_progress_barsr,   r6   
batch_sizemaxmathceilworkerZ_policy_mapr   ZpreprocessorsprintZworker_index_iter)rE   r@   rA   r-   rF   r   rD   r   __init__   s2   
zDatasetReader.__init__r.   c                 C   s   | j d usJ g }d}|| jk r;t| j }t|| jj}||j7 }| |}t|| j}| 	|}|
| || jk st|}|S )Nr   )rQ   rK   nextr	   rG   rO   rJ   _preprocess_if_neededr
   _postprocess_if_neededr(   r   )rE   retrJ   dr   r   r   rS      s   






	zDatasetReader.nextbatchc                    sD    j r tjtjfD ]}||v rt fdd|| D ||< q	|S )Nc                    s   g | ]} j |qS r   )rI   Z	transform).0srE   r   r   
<listcomp>  s    z7DatasetReader._preprocess_if_needed.<locals>.<listcomp>)rI   r   ZCUR_OBSZNEXT_OBSnpstack)rE   rX   keyr   r[   r   rT     s   z#DatasetReader._preprocess_if_neededc                 C   sf   | j jds	|S t|tr/g }| D ]}| jd ur%|| j| q|| qt	|S t
d)NZpostprocess_inputsz7Postprocessing of multi-agent data not implemented yet.)rG   r,   r6   r7   r   Zsplit_by_episoderH   r(   Zpostprocess_trajectoryr   NotImplementedError)rE   rX   outZ	sub_batchr   r   r   rU     s   

z$DatasetReader._postprocess_if_needed)N)__name__
__module____qualname____doc__r   r:   r;   Datasetr   r   rR   r   r   r   rS   rT   rU   r   r   r   r   r?      s    ,
r?   )r   )-loggingrM   pathlibr   r   numpyr]   typingr   r   r   r   r   Zray.datar:   Zray.rllib.offline.input_readerr   Zray.rllib.offline.io_contextr   Zray.rllib.offline.json_readerr	   r
   Zray.rllib.policy.sample_batchr   r   r   Zray.rllib.utils.annotationsr   r   Zray.rllib.utils.typingr   Z%ray.rllib.algorithms.algorithm_configr   r9   	getLoggerrb   loggerr   r   r+   intr;   rf   r>   r?   r   r   r   r   <module>   s>    
'l