o
    1 iXo                     @   sz  d dl Z d dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lm Z  d d
l!m"Z" d dl#m$Z$m%Z% d dl&m'Z' ervd dl(m)Z) ej*ej*ej+ej+ej,ej,ej-ej-ej.ej.ej/ej/ej0ej0ej1ej1ej2ej2ej3ej3ej4ej4ddddddiZ5e6e7Z8e'ddG dd dZ9dS )    N)AnyDictListOptionalUnionSetTupleTYPE_CHECKING)Columns)MultiRLModuleSpecMultiRLModule)SingleAgentEpisode)flatten_dict)OverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommended)unpack_if_needed)ReplayBuffer)from_jsonable_if_needed)EpisodeTypeModuleID)	PublicAPI)AlgorithmConfigagent_indexdonesZ	unroll_idalpha)Z	stabilityc                   @   s  e Zd ZdZedddddddeeejejf  dee	 dee
eef  d	e
eef f
d
dZede
eejf de
eejf fddZedefddZede
eef fddZdee dee fddZd%defddZeeedddddfdede
eee ejf f de
eef dedeee  dee dejd ejd	e
eef de
eee! f fd!d"Z"eeeddfdede
eee ejf f de
eef dedeee  de
eee! f fd#d$Z#dS )&OfflinePreLearnera?  Class that coordinates data transformation from dataset to learner.

    This class is an essential part of the new `Offline RL API` of `RLlib`.
    It is a callable class that is run in `ray.data.Dataset.map_batches`
    when iterating over batches for training. It's basic function is to
    convert data in batch from rows to episodes (`SingleAGentEpisode`s
    for now) and to then run the learner connector pipeline to convert
    further to trainable batches. These batches are used directly in the
    `Learner`'s `update` method.

    The main reason to run these transformations inside of `map_batches`
    is for better performance. Batches can be pre-fetched in `ray.data`
    and therefore batch trransformation can be run highly parallelized to
    the `Learner''s `update`.

    This class can be overridden to implement custom logic for transforming
    batches and make them 'Learner'-ready. When deriving from this class
    the `__call__` method and `_map_to_episodes` can be overridden to induce
    custom logic for the complete transformation pipeline (`__call__`) or
    for converting to episodes only ('_map_to_episodes`).

    Custom `OfflinePreLearner` classes can be passed into
    `AlgorithmConfig.offline`'s `prelearner_class`. The `OfflineData` class
    will then use the custom class in its data pipeline.
    N)spacesmodule_specmodule_stateconfigr   r   r   r   kwargsc                K   s   || _ | j j| _| j j| _| | _| j| |pd\| _| _| j j| j| jd| _	| j j
| _|j| _d| _| jsB| j sB| jrZ| j jpH| j}| j| j jB }|di || _d S d S )N)NN)Zinput_observation_spaceZinput_action_spacer    )r   input_read_episodesinput_read_sample_batchesbuild_moduleZ	set_stateobservation_spaceaction_spaceZbuild_learner_connector_learner_connectorZpolicies_to_train_policies_to_trainis_multi_agent_is_multi_agentZiter_since_last_module_updateis_statefulprelearner_buffer_classdefault_prelearner_buffer_class default_prelearner_buffer_kwargsprelearner_buffer_kwargsepisode_buffer)selfr   r   r   r   r    r-   r0   r!   r!   p/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/rllib/offline/offline_prelearner.py__init__R   s<   



zOfflinePreLearner.__init__batchreturnc              	      sv  | j rDddlddl  fdd|d D }| |}| j| | jj| jj| j	
 r5| jjddnd| jddp>dd	d	d
}nZ| jrtj| j|d	t| jjB | jjdd }| |}| j| | jj| jj| j	
 rx| jjddnd| jddpdd	d	d
}n| j| j|t| jjB d| jj| j| jdd }| j| j	i |i dd}|D ]}| ||s||= qt|S )aq  Prepares plain data batches for training with `Learner`'s.

        Args:
            batch: A dictionary of numpy arrays containing either column data
                with `self.config.input_read_schema`, `EpisodeType` data, or
                `BatchType` data.

        Returns:
            A `MultiAgentBatch` that can be passed to `Learner.update` methods.
        r   Nc                    s"   g | ]}t j| jd qS ))object_hook)r   Z
from_stateunpackbdecode).0stateZmnpmsgpackr!   r3   
<listcomp>   s    z.OfflinePreLearner.__call__.<locals>.<listcomp>itemZmax_seq_lenn_step   T)	num_itemsZbatch_length_Tr@   Zsample_episodesto_numpy)rC   schemainput_compress_columnsepisodesF)rD   rC   rE   r&   r'   )Z	rl_moduler5   rF   Zshared_dataZmetrics)r"   r=   Zmsgpack_numpy_validate_episodesr1   addsampler   train_batch_size_per_learnerr%   r,   Zmodel_configgetr#   r   _map_sample_batch_to_episoder+   SCHEMAZinput_read_schemarE   _map_to_episodesr&   r'   r(   _should_module_be_updatedr   )r2   r5   rF   	module_idr!   r<   r3   __call__   s|   



&zOfflinePreLearner.__call__c                 C   s   ddl m} |S )zSets the default replay buffer.r   )EpisodeReplayBuffer)Z4ray.rllib.utils.replay_buffers.episode_replay_bufferrR   )r2   rR   r!   r!   r3   r.     s   z1OfflinePreLearner.default_prelearner_buffer_classc                 C   s   | j jd | j jdS )zSets the default arguments for the replay buffer.

        Note, the `capacity` might vary with the size of the episodes or
        sample batches in the offline dataset.
        
   )capacityZbatch_size_B)r   rJ   )r2   r!   r!   r3   r/     s   
z2OfflinePreLearner.default_prelearner_buffer_kwargsrF   c                    s8   t dd |D stdt  fdd|D }|S )a  Validate episodes sampled from the dataset.

        Note, our episode buffers cannot handle either duplicates nor
        non-ordered fragmentations, i.e. fragments from episodes that do
        not arrive in timestep order.

        Args:
            episodes: A list of `SingleAgentEpisode` instances sampled
                from a dataset.

        Returns:
            A set of `SingleAgentEpisode` instances.

        Raises:
            ValueError: If not all episodes are `done`.
        c                 s   s    | ]}|j V  qd S N)Zis_doner:   Zepsr!   r!   r3   	<genexpr>7  s    z7OfflinePreLearner._validate_episodes.<locals>.<genexpr>zWhen sampling from episodes (`input_read_episodes=True`) all recorded episodes must be done (i.e. either `terminated=True`) or `truncated=True`).c                    s8   h | ]}|j vr|j s|j  jj vr|qS r!   )id_rH   r1   Zepisode_id_to_indexkeysrV   r2   Zunique_episode_idsr!   r3   	<setcomp>@  s    

z7OfflinePreLearner._validate_episodes.<locals>.<setcomp>)all
ValueErrorset)r2   rF   r!   rZ   r3   rG   #  s   z$OfflinePreLearner._validate_episodesc                 C   s.   | j sdS t| j s|t| j v S |  ||S )z:Checks which modules in a MultiRLModule should be updated.T)r)   callabler^   )r2   rP   Zmulti_agent_batchr!   r!   r3   rO   I  s
   
z+OfflinePreLearner._should_module_be_updatedFr*   rD   rC   rE   ignore_final_observationr&   r'   c                    s6  pg |r|rt }	ndd }	g }
t|tj  D ]\ }| r?tj|v r.|tj    nd |v r<|d    nd}nd}| rDqtjv rP|	t||n|	||}|rctdd t	|}ntjv rv|	t|tj
    |n|	|tj
    |}ttj |v rt|tj    nt j|||gi tj |v r|tj    ni gtjv r|	t|tj    |n|	|tj    |g|tj    g|tj |v rtj nd   tj |v r|tj    nd fd	d
| D dd
}|r|  |
| qd|
iS )z!Maps a batch of data to episodes.c                 S   s   | S rU   r!   )rI   spacer!   r!   r3   convertl  s   z3OfflinePreLearner._map_to_episodes.<locals>.convertr   Nc                 S   s   d|  S )Nr   r!   )xr!   r!   r3   <lambda>  s    z4OfflinePreLearner._map_to_episodes.<locals>.<lambda>r   Fc                    sL   i | ]"\}}|vr$|  vr$|d vr||v rt|  n|  gqS )r   r   typevaluesr   r:   kvirE   rD   r!   r3   
<dictcomp>  s    	z6OfflinePreLearner._map_to_episodes.<locals>.<dictcomp>r   
rX   agent_idZobservationsinfosactionsZrewards
terminated	truncatedZextra_model_outputsZlen_lookback_bufferrF   )r   	enumerater
   OBSAGENT_IDr   treeZmap_structurecopydeepcopyNEXT_OBSr   EPS_IDstruuiduuid4hexINFOSACTIONSREWARDSTERMINATEDS
TRUNCATEDSitemsrC   append)r*   r5   rD   rC   rE   r`   r&   r'   r    rb   rF   obsrp   Zunpacked_obsZunpacked_next_obsepisoder!   rl   r3   rN   S  s   


	

	;z"OfflinePreLearner._map_to_episodesc           
         s  pg g }t |tj  D ]l\ | r)d |v r&|d    d nd}nd}| r.qttrFtfddtjd D n"ttj	r^tj v rYt
 n
 n
tdt dtj |v rtj v rt|tj    d n
|tj    d  nd  tj |v rtj |v r|tj    d }|tj    d }nLtj |v rڈtj |vr|tj    d }d	}n0tj |vrtj |v r|tj    d }d	}nd
|v r|d
   d }d	}nd}d	}ttj |v r t|tj    d nt j|tj |v r7|tj    ni gt tjv rNt|tj    n|tj    |tj    || fdd| D dd
}	|rw|	  ||	 qd|iS )z6Maps an old stack `SampleBatch` to new stack episodes.r   r   Nc                    s   g | ]} |d f qS ).r!   )r:   rm   )r   r!   r3   r>     s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<listcomp>zUnknown observation type: z. When mapping from old recorded `SampleBatches` batched observations should be either of type `np.array` or - if the column is compressed - of `str` type.FdoneTc                    sJ   i | ]!\}}|vr#|  vr#|d vr||v rt|  n|  qS re   rg   ri   rl   r!   r3   rn   m  s    zBOfflinePreLearner._map_sample_batch_to_episode.<locals>.<dictcomp>ro   rF   )ru   r
   rv   
isinstancer}   r   rangeshapenpndarraytolist	TypeErrorrf   r{   r   r   r   r   r|   r~   r   r   r   lenr   r   r   rC   )
r*   r5   rD   rC   rE   rF   rp   rt   rs   r   r!   )rm   rE   r   rD   r3   rL     s   	

	.z.OfflinePreLearner._map_sample_batch_to_episoderU   )$__name__
__module____qualname____doc__r   r   r   gymZSpacer   r   r   r   r}   r4   r   r   r   rQ   propertyr   r.   r/   r   r   r   rG   boolrO   staticmethodrM   r   listr   rN   rL   r!   r!   r!   r3   r   6   s    
:(	
&



	
 

r   ):ry   Z	gymnasiumr   loggingnumpyr   rx   r~   typingr   r   r   r   r   r   r   r	   Zray.rllib.core.columnsr
   Z(ray.rllib.core.rl_module.multi_rl_moduler   r   Z"ray.rllib.env.single_agent_episoder   Zray.rllib.utilsr   Zray.rllib.utils.annotationsr   r   Zray.rllib.utils.compressionr   Z,ray.rllib.utils.replay_buffers.replay_bufferr   Z"ray.rllib.utils.spaces.space_utilsr   Zray.rllib.utils.typingr   r   Zray.util.annotationsr   Z%ray.rllib.algorithms.algorithm_configr   r|   rw   Z	MODULE_IDrv   r   r   r   r{   r   r   TrM   	getLoggerr   loggerr   r!   r!   r!   r3   <module>   sJ    (	
