o
    1 i                    @   s  d dl Z d dlmZ d dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZ d dlmZmZ d dlZd dlZd dlmZ d dlmZmZmZ d dlmZmZmZ d d	lmZmZ d d
lmZm Z  d dl!m"Z" d dl#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* e \Z+Z,Z-e  \Z.Z/dZ0ede1fddZ2eG dd de1Z3eG dd dZ4edee' de'fddZ5edee' ddfddZ6ddde&fddZ7ede'de3fd d!Z8dS )"    N)partial)Number)DictIteratorSetUnion)ListOptional)Columns)DeveloperAPIExperimentalAPI	PublicAPI)packunpackis_compressed)
Deprecateddeprecation_warning)try_import_tftry_import_torch)convert_to_torch_tensor)ModuleIDPolicyID
TensorTypeSampleBatchTypeViewRequirementsDict)log_onceZdefault_policytensor_dictc              	   C   s  |  tj}|dur1trt|rt|dr1t|dkr1tr+t|r+t|	 
 S tt	|S |  D ]Q\}}|tjkr?q5t|tsHJ | |tjksW|dsW|drXq5t|ttfrdt|n|g}dd |D }zt|d }|r||W   S W q5 ty   Y q5w dS )a  Attempt to count timesteps based on dimensions of individual elements.

    Returns the first successfully counted number of timesteps.
    We do not attempt to count on INFOS or any state_in_* and state_out_* keys. The
    number of timesteps we count in cases where we are unable to count is zero.

    Args:
        tensor_dict: A SampleBatch or another dict.

    Returns:
        count: The inferred number of timesteps >= 0.
    Nnumpyr   	state_in_Z
state_out_c                 S   s(   g | ]}t |ttfrt|n|qS  )
isinstancer   listnparray).0_vr   r   i/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/rllib/policy/sample_batch.py
<listcomp>U   s    z+attempt_count_timesteps.<locals>.<listcomp>)getSampleBatchSEQ_LENStf	is_tensorhasattrlentorchintsumitemitemsr    strINFOS
startswithdicttupletreeflatten	Exception)r   seq_lenskvZv_list_lenr   r   r&   attempt_count_timesteps"   sF   


r@   c                   @   s  e Zd ZdZejZejZejZejZej	Z	ej
Z
ejZejZejZejZejZejZejZejZejZdZdZdZdZdZdZdZd	Zd
ZdZdZedd Z ede!fddZ"ede!fddZ#ede!fddZ$e%dd Z&e%dd Z'e(de)fddZ*e(de)fddZ+e,ee-ddd d!d" Z.edxd$d%Z/edyd'e)dd fd(d)Z0ede1e2e3e4f  fd*d+Z5ed,e6e3 de6e7 fd-d.Z8edzd/d0Z9ed{d2e:e3 de6d  fd3d4Z;	1d|d5e!d6e!dd fd7d8Z<d9e<dd fd:d;Z=e	1	1	1d}d<e:e! d=e:e! d>e:e! de6d  fd?d@Z>e-dAdd d~dBdCZ?d~dDe!dEe)fdFdAZ@e(	G	&	&	1ddHe3dIe)dJe)dKe:eAdL  fdMdNZBede!fdOdPZCd{dQdRZDed{dSe:eE ddTfdUdVZFed2eAe3e<f de4fdWdXZGeddYdZZHeId[d\ ZJd~d]eAe)d^f fd_d`ZKedadb ZLe%d&eMddcgfdde)deeNe3 dd fdfdgZOe%eMddcgfdeeNe3 dd fdhdiZPe%djdk ZQdldm ZRd9e<dd fdndoZSe-d&dpdqdr ZTe(	sddteUdueAe3e!f dd fdvdwZVd1S )r)   zWrapper around a dictionary with string keys and array-like values.

    For example, {"obs": [1, 2, 3], "reward": [0, -1, 1]} is a batch of three
    samples, each with an "obs" and "reward" attribute.
    Zaction_distZprev_actionsZprev_rewardsZenv_idZagent_indexZ	unroll_idZ
obs_embedsZreturns_to_goZattention_masksZdonesobsc                 O   s  t j|v r	td|dd| _|dd| _|dd| _|dd| _|dd| _t	j
| g|R i | d| _t | _t | _t | _i | _d| _| t j}|du sbt|trjt|d	krj| t jd n&t|tr}tj|tjd
 | t j< }ntrt|strt|r|| t j< | jdu r|durtrt|st|d	krtrt|r|  | _nt|| _| jdu r| dd| _|  D ]\}}t|t tfr|t j!kst|| |< qt"| | _#g | _$dS )aP  Constructs a sample batch (same params as dict constructor).

        Note: All args and those kwargs not listed below will be passed
        as-is to the parent dict constructor.

        Args:
            _time_major: Whether data in this sample batch
                is time-major. This is False by default and only relevant
                if the data contains sequences.
            _max_seq_len: The max sequence chunk length
                if the data contains sequences.
            _zero_padded: Whether the data in this batch
                contains sequences AND these sequences are right-zero-padded
                according to the `_max_seq_len` setting.
            _is_training: Whether this batch is used for
                training. If False, batch may be used for e.g. action
                computations (inference).
        zSampleBatch cannot be constructed anymore with a `DONES` key! Instead, set the new TERMINATEDS and TRUNCATEDS keys. The values under DONES will then be automatically computed using terminated|truncated._time_majorN_max_seq_len_zero_paddedF_is_training_num_grad_updatesr   dtypeis_training)%r)   DONESKeyErrorpop
time_majormax_seq_lenzero_paddedrE   num_grad_updatesr7   __init___slice_seq_lens_in_Bsetaccessed_keys
added_keysdeleted_keysintercepted_valuesget_interceptorr(   r*   r    r!   r.   r"   r#   int32r/   r,   r+   maxr2   r3   r   r5   r@   count
_slice_map)selfargskwargsZ	seq_lens_r=   r>   r   r   r&   rQ      sR   








zSampleBatch.__init__returnc                 C      | j S )z2Returns the amount of samples in the sample batch.r[   r]   r   r   r&   __len__     zSampleBatch.__len__c                 C      t | S )zReturns the same as len(self) (number of steps in this batch).

        To make this compatible with `MultiAgentBatch.agent_steps()`.
        r.   rc   r   r   r&   agent_steps     zSampleBatch.agent_stepsc                 C   rf   )zReturns the same as len(self) (number of steps in this batch).

        To make this compatible with `MultiAgentBatch.env_steps()`.
        rg   rc   r   r   r&   	env_steps  ri   zSampleBatch.env_stepsc                 C   
   d| _ d S )NTrR   rc   r   r   r&   enable_slicing_by_batch_id!     
z&SampleBatch.enable_slicing_by_batch_idc                 C   rk   )NFrl   rc   r   r   r&   disable_slicing_by_batch_id%  rn   z'SampleBatch.disable_slicing_by_batch_idc                 C   s&   | t j d pt j| v o| t j d S )zCReturns True if `self` is either terminated or truncated at idx -1.)r)   TERMINATEDS
TRUNCATEDSrc   r   r   r&   is_terminated_or_truncated)  s   z&SampleBatch.is_terminated_or_truncatedc                 C   s:   t | tj dd  otj| vpt | tj dd  S )zReturns True if this SampleBatch only contains one trajectory.

        This is determined by checking all timesteps (except for the last) for being
        not terminated AND (if applicable) not truncated.
        Nrp   )anyr)   rq   rr   rc   r   r   r&   is_single_trajectory0  s   
z SampleBatch.is_single_trajectory/concat_samples() from rllib.policy.sample_batchTnewerrorc                 C      d S Nr   samplesr   r   r&   concat_samples<  s   zSampleBatch.concat_samplesotherc                 C   s   t | |gS )ag  Concatenates `other` to this one and returns a new SampleBatch.

        Args:
            other: The other SampleBatch object to concat to this one.

        Returns:
            The new SampleBatch, resulting from concating `other` to `self`.

        .. testcode::
            :skipif: True

            import numpy as np
            from ray.rllib.policy.sample_batch import SampleBatch
            b1 = SampleBatch({"a": np.array([1, 2])})
            b2 = SampleBatch({"a": np.array([3, 4, 5])})
            print(b1.concat(b2))

        .. testoutput::

            {"a": np.array([1, 2, 3, 4, 5])}
        )r~   )r]   r   r   r   r&   concatB  s   zSampleBatch.concatFshallowc                    s^   t | }t fdd|}t|| j| j| j| jd}|| j	 | j
|_
| j|_| j|_|S )zCreates a deep or shallow copy of this SampleBatch and returns it.

        Args:
            shallow: Whether the copying should be done shallowly.

        Returns:
            A deep or shallow copy of this SampleBatch object.
        c                    s    t | tjrtj|   dS | S )Ncopy)r    r"   ndarrayr#   r>   r   r   r&   <lambda>g  s    z"SampleBatch.copy.<locals>.<lambda>)rB   rD   rC   rF   )r7   r9   map_structurer)   rM   rO   rN   rP   set_get_interceptorrX   rU   rV   rT   )r]   r   Zcopy_datar   r   r&   r   [  s"   

zSampleBatch.copyc                 #   sT      tjddu rdndt }t jD ]}t|f fdd	|V  qdS )a  Returns an iterator over data rows, i.e. dicts with column values.

        Note that if `seq_lens` is set in self, we set it to 1 in the rows.

        Yields:
            The column values of the row in this iteration.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch({
               "a": [1, 2, 3],
               "b": [4, 5, 6],
               "seq_lens": [1, 2]
            })
            for row in batch.rows():
                print(row)

        .. testoutput::

            {"a": 1, "b": 4, "seq_lens": 1}
            {"a": 2, "b": 5, "seq_lens": 1}
            {"a": 3, "b": 6, "seq_lens": 1}
           Nc                    s   | d  j kr|| S S Nr   )r*   )pr>   ir]   r<   r   r&   r         z"SampleBatch.rows.<locals>.<lambda>)r(   r)   r*   r7   ranger[   r9   map_structure_with_path)r]   self_as_dictr   r   r   r&   rowsy  s   zSampleBatch.rowskeysc                 C   s    g }|D ]	}| | |  q|S )a  Returns a list of the batch-data in the specified columns.

        Args:
            keys: List of column names fo which to return the data.

        Returns:
            The list of data items ordered by the order of column
            names in `keys`.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch({"a": [1], "b": [2], "c": [3]})
            print(batch.columns(["a", "b"]))

        .. testoutput::

            [[1], [2]]
        )append)r]   r   outr=   r   r   r&   columns  s   zSampleBatch.columnsc                    s   |  tjdu}|r| jstd|stj| jntjt	| tj t
| }|tjd tfdd|} durK fddD |tj< | | i | _| S )a  Shuffles the rows of this batch in-place.

        Returns:
            This very (now shuffled) SampleBatch.

        Raises:
            ValueError: If self[SampleBatch.SEQ_LENS] is defined.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch({"a": [1, 2, 3, 4]})
            print(batch.shuffle())

        .. testoutput::

            {"a": [4, 1, 3, 2]}
        NzbSampleBatch.shuffle not possible when your data has `seq_lens` defined AND is not zero-padded yet!c                    s   |   S r{   r   r   )permutationr   r&   r     s    z%SampleBatch.shuffle.<locals>.<lambda>c                    s   g | ]} | qS r   r   )r$   r   )infosr   r&   r'         z'SampleBatch.shuffle.<locals>.<listcomp>)r(   r)   r*   rO   
ValueErrorr"   randomr   r[   r.   r7   rL   r
   r5   r9   r   updaterW   )r]   Zhas_time_rankr   Zshuffledr   )r   r   r&   shuffle  s    
	
zSampleBatch.shuffleNkeyc                    s  |du s|t jt jfv sJ d| d fdd} fdd}t j|t j|i}t jt jg}d}|durM|t jkrG| vrGt  d| d	||  }n#|D ]}|t jksZ| v ra||  } nqO|du rpt  d
| dtdd |D  jksJ d  d| d j d |S )a  Splits by `eps_id` column and returns list of new batches.
        If `eps_id` is not present, splits by `dones` instead.

        Args:
            key: If specified, overwrite default and use key to split.

        Returns:
            List of batches, one per distinct episode.

        Raises:
            KeyError: If the `eps_id` AND `dones` columns are not present.

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            # "eps_id" is present
            batch = SampleBatch(
                {"a": [1, 2, 3], "eps_id": [0, 0, 1]})
            print(batch.split_by_episode())

            # "eps_id" not present, split by "dones" instead
            batch = SampleBatch(
                {"a": [1, 2, 3, 4, 5], "dones": [0, 0, 1, 0, 1]})
            print(batch.split_by_episode())

            # The last episode is appended even if it does not end with done
            batch = SampleBatch(
                {"a": [1, 2, 3, 4, 5], "dones": [0, 0, 1, 0, 0]})
            print(batch.split_by_episode())

            batch = SampleBatch(
                {"a": [1, 2, 3, 4, 5], "dones": [0, 0, 0, 0, 0]})
            print(batch.split_by_episode())


        .. testoutput::

            [{"a": [1, 2], "eps_id": [0, 0]}, {"a": [3], "eps_id": [1]}]
            [{"a": [1, 2, 3], "dones": [0, 0, 1]}, {"a": [4, 5], "dones": [0, 1]}]
            [{"a": [1, 2, 3], "dones": [0, 0, 1]}, {"a": [4, 5], "dones": [0, 0]}]
            [{"a": [1, 2, 3, 4, 5], "dones": [0, 0, 0, 0, 0]}]


        Nz"`SampleBatch.split_by_episode(key=z,)` invalid! Must be [None|'dones'|'eps_id'].c                     sn   g }  t j d }d}t jD ]} t j | }||kr*|  ||  |}|}q|  | j  | S r   )r)   EPS_IDr   r[   r   )slicesZ
cur_eps_idoffsetr   Znext_eps_idrc   r   r&   slice_by_eps_id'  s   z5SampleBatch.split_by_episode.<locals>.slice_by_eps_idc                     s|   g } d}t  jD ]$} tj | stj v r- tj | r-|  ||d   |d }q	| jkr<|  |d   | S )Nr   r   )r   r[   r)   rq   rr   r   )r   r   r   rc   r   r&   "slice_by_terminateds_or_truncateds6  s   
zHSampleBatch.split_by_episode.<locals>.slice_by_terminateds_or_truncatedsz does not have key `z`!z does not have keys !c                 s   s    | ]}|j V  qd S r{   rb   r$   sr   r   r&   	<genexpr>_  s    z/SampleBatch.split_by_episode.<locals>.<genexpr>zCalling split_by_episode on z	 returns zwhich should in total have z timesteps!)r)   r   rJ   rK   r1   r[   )r]   r   r   r   Zkey_to_methodZkey_resolve_orderr   r   rc   r&   split_by_episode  s4   0

zSampleBatch.split_by_episodestartendc                    s|  |  tjdur*t| tj dkr*dk r$ fdd|  D }n fdd|  D }|dur|dus:J d}d|}|| v r\| | || ||< |d7 }d|}|| v sEt| tj || }t|tt| }	t	||	krt	||	ksJ |	t	|dd  |d< nd}
d}d}t
| tj D ]\}}|
|7 }
|
 krd}d|}|du r|}|| v r| | ||d  ||< |d7 }d|}|| v st| tj || ||
   g }dk r|d   7  < t	|   }|dkr|d  |8  < t	|  ksJ  n|du r|
kr|}qt||| j| j| jd	S tt fd
d| | j| j| jdS )a  Returns a slice of the row data of this batch (w/o copying).

        Args:
            start: Starting index. If < 0, will left-zero-pad.
            end: Ending index.

        Returns:
            A new SampleBatch, which has a slice of this batch's data.
        Nr   c              
      sZ   i | ])\}}|t jkr|d s|ttj f|jdd  |jd|d  gqS )r   r   N)shaperH   r   )r)   r*   r6   r"   concatenatezerosr   rH   r$   r=   r>   r   r   r   r&   
<dictcomp>u  s     
z%SampleBatch.slice.<locals>.<dictcomp>c                    s<   i | ]\}}|t jkr|d s|t fdd|qS )r   c                       |   S r{   r   r   r   r   r&   r         z.SampleBatch.slice.<locals>.<dictcomp>.<lambda>)r)   r*   r6   r9   r   r   r   r   r&   r     s
    zstate_in_{}r   rp   )r<   rE   rB   rF   c                    r   r{   r   valuer   r   r&   r     r   z#SampleBatch.slice.<locals>.<lambda>rE   rB   rF   )r(   r)   r*   r.   r3   formatr!   nextiterr1   	enumeraterI   rM   rP   r9   r   )r]   r   r   Zstate_startZ	state_endr   Z	state_idxZ	state_keyr<   data_lenr[   r   seq_lendiffr   r   r&   sliced  s   






zSampleBatch.sliceslice_c                    s   |j pd |jpt| tj t| krt|  dkr&dkr&|jdv s(J | tjd}t	 fdd| }|durft
t| tj d  }t
t| tj   }||| |tj< || tj< t|| j| j| jdS )2  Helper method to handle SampleBatch slicing using a slice object.

        The returned SampleBatch uses the same underlying data object as
        `self`, so changing the slice will also change `self`.

        Note that only zero or positive bounds are allowed for both start
        and stop values. The slice step must be 1 (or None, which is the
        same).

        Args:
            slice_: The python slice object to slice by.

        Returns:
            A new SampleBatch, however "linking" into the same data
            (sliced) as self.
        r   )r   NNc                       |   S r{   r   r   r   stopr   r&   r     r   z*SampleBatch._batch_slice.<locals>.<lambda>r   )r   r   r.   r)   r*   steprL   r5   r9   r   r0   r1   r
   rI   rM   rP   )r]   r   r   r   Zinfo_slice_startZinfo_slice_stopr   r   r&   _batch_slice  s$   

zSampleBatch._batch_slicesize
num_slicesr=   c           	      C   s   |du r|du rt dd |dusJ |}|du rIt|ts J g }t| }d}|rG||t|  }|| }|| ||  ||8 }|}|s*|S t|tsPJ g }t| }d}|ro|| }|| ||  ||8 }|}|sZ|S )a(  Returns SampleBatches, each one representing a k-slice of this one.

        Will start from timestep 0 and produce slices of size=k.

        Args:
            size: The size (in timesteps) of each returned SampleBatch.
            num_slices: The number of slices to produce.
            k: Deprecated: Use size or num_slices instead. The size
                (in timesteps) of each returned SampleBatch.

        Returns:
            The list of `num_slices` (new) SampleBatches or n (new)
            SampleBatches each one of size `size`.
        Nr=   zsize or num_slicesr   )r   r    r0   r.   r   )	r]   r   r   r=   r   leftr   len_r   r   r   r&   
timeslices  s8   
zSampleBatch.timesliceszSampleBatch.right_zero_padc                 C   rz   r{   r   )r]   rN   exclude_statesr   r   r&   zero_pad%  s   zSampleBatch.zero_padrN   r   c                    sd    tj}|du rtd t|  fdd}t}t|| d__	S )a*  Right (adding zeros at end) zero-pads this SampleBatch in-place.

        This will set the `self.zero_padded` flag to True and
        `self.max_seq_len` to the given `max_seq_len` value.

        Args:
            max_seq_len: The max (total) length to zero pad to.
            exclude_states: If False, also right-zero-pad all
                `state_in_x` data. If True, leave `state_in_x` keys
                as-is.

        Returns:
            This very (now right-zero-padded) SampleBatch.

        Raises:
            ValueError: If self[SampleBatch.SEQ_LENS] is None (not defined).

        .. testcode::
            :skipif: True

            from ray.rllib.policy.sample_batch import SampleBatch
            batch = SampleBatch(
                {"a": [1, 2, 3], "seq_lens": [1, 2]})
            print(batch.right_zero_pad(max_seq_len=4))

            batch = SampleBatch({"a": [1, 2, 3],
                                 "state_in_0": [1.0, 3.0],
                                 "seq_lens": [1, 2]})
            print(batch.right_zero_pad(max_seq_len=5))

        .. testoutput::

            {"a": [1, 0, 0, 0, 2, 3, 0, 0], "seq_lens": [1, 2]}
            {"a": [1, 0, 0, 0, 0, 2, 3, 0, 0, 0],
             "state_in_0": [1.0, 3.0],  # <- all state-ins remain as-is
             "seq_lens": [1, 2]}

        NzNCannot right-zero-pad SampleBatch if no `seq_lens` field present! SampleBatch=c           	         s   du r| d  ds| d tjkrd S |jtks |jjtju r&d g }ntjft	|dd   |jd}d }}tj D ]}||||  |||| < |7 }||7 }qA|t
|ksdJ |}t| D ]\}}|t
| d krz|||< || }qjd S )NTr   r   r   rG   )r6   r)   r*   rH   objecttyper"   Zstr_r   r   r.   r   )	pathr   Zf_padZ
f_pad_baseZf_baser   currr   r   r   lengthrN   r]   r   r&   _zero_pad_in_placeY  s*   $

z6SampleBatch.right_zero_pad.<locals>._zero_pad_in_placeT)
r(   r)   r*   r   r.   r7   r9   r   rO   rN   )r]   rN   r   r<   r   r   r   r   r&   right_zero_pad)  s   'r/   	framework
pin_memory
use_streamstreamtorch.cuda.Streamr   c                 C   sD   |dkr t dus
J |  D ]\}}t|||||d| |< q| S t)9TODO: transfer batch to given device as framework tensor.r/   N)r   r   r   )r/   r3   r   NotImplementedError)r]   devicer   r   r   r   r=   r>   r   r   r&   	to_device}  s   
	zSampleBatch.to_devicec                 C   s   t dd t| D S )a  Returns sum over number of bytes of all data buffers.

        For numpy arrays, we use ``.nbytes``. For all other value types, we use
        sys.getsizeof(...).

        Returns:
            The overall size in bytes of the data buffer (all columns).
        c                 s   s,    | ]}t |tjr|jnt|V  qd S r{   )r    r"   r   nbytessys	getsizeof)r$   r>   r   r   r&   r     s
    
z)SampleBatch.size_bytes.<locals>.<genexpr>)r1   r9   r:   rc   r   r   r&   
size_bytes  s   
zSampleBatch.size_bytesc                 C   s$   z|  |W S  ty   | Y S w )z=Returns one column (by key) from the data or a default value.)__getitem__rK   )r]   r   defaultr   r   r&   r(     s
   zSampleBatch.get	module_idMultiAgentBatchc                 C   s   t |pt| i| jS )ak  Returns the respective MultiAgentBatch

        Note, if `module_id` is not provided uses `DEFAULT_POLICY`_ID`.

        Args;
            module_id: An optional module ID. If `None` the `DEFAULT_POLICY_ID`
                is used.

        Returns:
            The MultiAgentBatch (using DEFAULT_POLICY_ID) corresponding
            to this SampleBatch.
        )r   DEFAULT_POLICY_IDr[   )r]   r   r   r   r&   as_multi_agent  s   zSampleBatch.as_multi_agentc                 C   s   t |tr
| |S |tjkr| tj S |dkr&tdr#tdddd | jS t	| |s5|| v r5| j
| t| |}| jdurR|| jvrM| || j|< | j| }|S )a'  Returns one column (by key) from the data or a sliced new batch.

        Args:
            key: The key (column name) to return or
                a slice object for slicing this SampleBatch.

        Returns:
            The data under the given key or a sliced version of this batch.
        rI   SampleBatch['is_training']SampleBatch.is_trainingFoldrx   ry   N)r    r   _slicer)   rJ   rq   r   r   rI   r-   rT   addr7   r   rX   rW   )r]   r   r   r   r   r&   r     s(   






zSampleBatch.__getitem__c                 C   s   |t jkr	tdt| dst| || dS |dkr+tdr&tdddd || _dS || vr5| j	
| t| || || jv rH|| j|< dS dS )	zInserts (overrides) an entire column (by key) in the data buffer.

        Args:
            key: The column name to set a value for.
            item: The data to insert.
        zCannot set `DONES` anymore in a SampleBatch! Instead, set the new TERMINATEDS and TRUNCATEDS keys. The values under DONES will then be automatically computed using terminated|truncated.rU   NrI   r   r   Fr   )r)   rJ   rK   r-   r7   __setitem__r   r   rE   rU   r   rW   )r]   r   r2   r   r   r&   r     s,   
	

zSampleBatch.__setitem__c                 C   sB   | j d urt| jtrd| jvr|  | j| jd< | jd S | jS )NrE   )rX   r    rE   boolrW   rc   r   r   r&   rI     s   


r   trainingztf1.placeholderc                 C   s   || _ | jdd dS )z1Sets the `is_training` flag for this SampleBatch.rE   N)rE   rW   rL   )r]   r   r   r   r&   set_training  s   zSampleBatch.set_trainingc                 C   s   | j | t| | d S r{   )rV   r   r7   __delitem__r]   r   r   r   r&   r     s   zSampleBatch.__delitem__new_obsbulkr   c                    s     fdd}t | S )a  Compresses the data buffers (by column) in place.

        Args:
            bulk: Whether to compress across the batch dimension (0)
                as well. If False will compress n separate list items, where n
                is the batch size.
            columns: The columns to compress. Default: Only
                compress the obs and new_obs columns.

        Returns:
            This very (now compressed) SampleBatch.
        c                    sl   | d vrd S }t | D ]%\}}|t| d kr/ r#t|||< ntdd |D ||< || }qd S )Nr   r   c                 S      g | ]}t |qS r   )r   r$   or   r   r&   r'   6  r   zDSampleBatch.compress.<locals>._compress_in_place.<locals>.<listcomp>)r   r.   r   r"   r#   )r   r   r   r   r   r   r   r]   r   r&   _compress_in_place-  s   
z0SampleBatch.compress.<locals>._compress_in_placer9   r   )r]   r   r   r   r   r   r&   compress  s   zSampleBatch.compressc                    s    fdd}t | S )a  Decompresses data buffers (per column if not compressed) in place.

        Args:
            columns: The columns to decompress. Default: Only
                decompress the obs and new_obs columns.

        Returns:
            This very (now uncompressed) SampleBatch.
        c                    s   | d  vrd S }| d d D ]}|| }qt |r%t||| d < d S t|dkrAt |d rCtdd |D || d < d S d S d S )Nr   rp   c                 S   r   r   )r   r   r   r   r&   r'   V  r   zRSampleBatch.decompress_if_needed.<locals>._decompress_in_place.<locals>.<listcomp>)r   r   r.   r"   r#   )r   r   r   r   r   r]   r   r&   _decompress_in_placeK  s   
 z>SampleBatch.decompress_if_needed.<locals>._decompress_in_placer   )r]   r   r  r   r   r&   decompress_if_needed=  s   z SampleBatch.decompress_if_neededc                 C   s   || j uri | _|| _ dS )z.Sets a function to be called on every getitem.N)rX   rW   )r]   fnr   r   r&   r   \  s   

zSampleBatch.set_get_interceptorc                 C   s^   t |  }| tjd u rd| j d| dS |tj d| j dt| d  d| dS )NzSampleBatch(z: )z (seqs=r<   z): )r!   r   r(   r)   r*   r[   remover.   )r]   r   r   r   r&   __repr__d  s    zSampleBatch.__repr__c           
         s  | j r| |S |jpd |jpt| t| krt| | tjdurt| tj dkr| js_d}t	t
t| tj D ]\}}| j||fg|  || }q>| jt| tj |f | j  \}| j \}||| jr~| j | j fdd}| tjd}t|| }	|durt|ttjfr|| tj< ||| |	tj< t|	| j| j| j| jr| jnd| jdS | tjd}t fdd| }	|durt|ttjfr|| tj< |  |	tj< t|	| j| j| jdS )	r   r   Nc                    s4   | d t jkr| d ds|  S | S )Nr   r   )r)   r*   r6   )r   r   )start_paddedstart_seq_lenstop_paddedstop_seq_lenr   r&   map_  s
   z SampleBatch._slice.<locals>.map_)rE   rB   rD   rC   rF   c                    r   r{   r   r   r   r   r&   r     r   z$SampleBatch._slice.<locals>.<lambda>r   )rR   r   r   r   r.   r(   r)   r*   r\   r   mapr0   extendr   rO   rN   rL   r5   r9   r   r    r!   r"   r   rI   rM   rP   r   )
r]   r   Zsum_r   lZstart_unpaddedZstop_unpaddedr  r   r   r   )r   r  r  r   r	  r
  r&   r   n  s\   





	
zSampleBatch._slice)ry   c                 C   st  g }g }|  tjd urt| tj dkrt| tj |k s#J dd}d}d}d}d}|t| tj k r| tj | }	||	7 }|| jsG|	n| j7 }||kr|d }
| jsv|||| f ||7 }||kru|| }||	| 8 }|d8 }n	|||f |}|||
f d}|d }|d7 }|t| tj k s6||fS d}|| j	k r|||| f ||7 }|| j	k s||fS )Nr   zFERROR: `slice_size` must be larger than the max. seq-len in the batch!r   )
r(   r)   r*   r.   r"   allrO   rN   r   r[   )r]   Z
slice_sizeZdata_slicesZdata_slices_states	start_posZcurrent_slize_sizeZactual_slice_idxZ	start_idxidxr   Zend_idxZoverheadr   r   r   r&   _get_slice_indices  sT    

zSampleBatch._get_slice_indiceslastview_requirementsindexc              	   C   sN  t jt jt jt jt jt ji}i }| D ]\}}|jdu rq|j	p"|}|dkr|
||}|jdurz| | d }t| | }	|	|j }
|t jt jfv rMdnd}|j| }|j| d }|dkrad}tt|| | |
 d g|| g||< qtdd | | ||< q| | ||dkr|d nd ||< qt |tjdgtjd	d
S )a  Creates single ts SampleBatch at given index from `self`.

        For usage as input-dict for model (action or value function) calls.

        Args:
            view_requirements: A view requirements dict from the model for
                which to produce the input_dict.
            index: An integer index value indicating the
                position in the trajectory for which to generate the
                compute_actions input dict. Set to "last" to generate the dict
                at the very end of the trajectory (e.g. for value estimation).
                Note that "last" is different from -1, as "last" will use the
                final NEXT_OBS as observation input.

        Returns:
            The (single-timestep) input dict for ModelV2 calls.
        Fr  Nrp   r   r   c                 S   s   | dd  S )Nrp   r   r   r   r   r&   r   9  r   z8SampleBatch.get_single_step_input_dict.<locals>.<lambda>rG   )r<   )r)   OBSNEXT_OBSPREV_ACTIONSACTIONSPREV_REWARDSREWARDSr3   Zused_for_compute_actionsdata_colr(   Z
shift_fromr.   Zbatch_repeat_valueZshift_tor"   r#   r   r9   r   rY   )r]   r  r  Zlast_mappingsZ
input_dictZview_colZview_reqr  r   Ztraj_lenZmissing_at_endZ	obs_shiftZfrom_Zto_r   r   r&   get_single_step_input_dict  sF   





	

z&SampleBatch.get_single_step_input_dict)r   r)   r`   r)   )F)r`   r)   r{   )NN)NNN)Tr/   FFN)r`   N)r  )W__name__
__module____qualname____doc__r
   r  r  r  rq   rr   r5   r*   TZACTION_DIST_INPUTSZACTION_PROBZACTION_LOGPZVF_PREDSZVALUES_BOOTSTRAPPEDr   r  ZACTION_DISTr  r  ZENV_IDAGENT_INDEXZ	UNROLL_IDZ
OBS_EMBEDSZRETURNS_TO_GOZATTENTION_MASKSrJ   ZCUR_OBSr   rQ   r0   rd   rh   rj   r   rm   ro   r   r   rs   ru   staticmethodr   r~   r   r   r   r   r4   r   r   r   rt   r   r   r	   r   r   r   r   r   r   r   r   r   r(   r   r   r   r   propertyrI   r   r   	frozensetr   r   r  r   r  r   r  r   r  r   r   r   r&   r)   d   s    
m


%5r
[/
6T

%&
	
 


W
2
r)   c                
   @   s  e Zd ZdZedeeef defddZ	edefddZ
edefd	d
ZedefddZededed  fddZeedeeef dedeed f fddZeeedddded  dd fddZed6ddZe				d7dededeed   fd!d"Zedefd#d$Zeded%d&gfd'ed(ee ddfd)d*Zeed%d&gfd(ee dd fd+d,Zed6d-d.Z d/edefd0d1Z!d2d3 Z"d4d5 Z#dS )8r   a  A batch of experiences from multiple agents in the environment.

    Attributes:
        policy_batches (Dict[PolicyID, SampleBatch]): Dict mapping policy IDs to
            SampleBatches of experiences.
        count: The number of env steps in this batch.
    policy_batchesrj   c                 C   s,   |  D ]	}t|tsJ q|| _|| _dS )au  Initialize a MultiAgentBatch instance.

        Args:
            policy_batches: Dict mapping policy IDs to SampleBatches of experiences.
            env_steps: The number of environment steps in the environment
                this batch contains. This will be less than the number of
                transitions this batch contains across all policies in total.
        N)valuesr    r)   r(  r[   )r]   r(  rj   r>   r   r   r&   rQ   O  s   
zMultiAgentBatch.__init__r`   c                 C   ra   )zThe number of env steps (there are >= 1 agent steps per env step).

        Returns:
            The number of environment steps contained in this batch.
        rb   rc   r   r   r&   rj   b  s   zMultiAgentBatch.env_stepsc                 C   ra   )zSame as `self.env_steps()`.rb   rc   r   r   r&   rd   k  re   zMultiAgentBatch.__len__c                 C   s"   d}| j  D ]}||j7 }q|S )zThe number of agent steps (there are >= 1 agent steps per env step).

        Returns:
            The number of agent steps total in this batch.
        r   )r(  r)  r[   )r]   ctbatchr   r   r&   rh   p  s   zMultiAgentBatch.agent_stepsr=   c           
   	      s  ddl m} g }| j D ]\}}| D ]}||tj |tj |tj	 ||f qq|
  g t| d fdd}t|dd D ])\}}	|	D ]\}}}}} | jd	i | qOd7 |krr|  dksrJ qIdkrz|  tdksJ S )
a  Returns k-step batches holding data for each agent at those steps.

        For examples, suppose we have agent1 observations [a1t1, a1t2, a1t3],
        for agent2, [a2t1, a2t3], and for agent3, [a3t3] only.

        Calling timeslices(1) would return three MultiAgentBatches containing
        [a1t1, a2t1], [a1t2], and [a1t3, a2t3, a3t3].

        Calling timeslices(2) would return two MultiAgentBatches containing
        [a1t1, a1t2, a2t1], and [a1t3, a2t3, a3t3].

        This method is used to implement "lockstep" replay mode. Note that this
        method does not guarantee each batch contains only data from a single
        unroll. Batches might contain data from multiple different envs.
        r   )SampleBatchBuilderc                     s>   dksJ t dd   D } d   |  d S )Nr   c                 S      i | ]	\}}||  qS r   )Zbuild_and_resetr   r   r   r&   r     r   zDMultiAgentBatch.timeslices.<locals>.finish_slice.<locals>.<dictcomp>)r   r3   clearr   )r+  Z	cur_sliceZcur_slice_sizeZfinished_slicesr   r&   finish_slice  s   z0MultiAgentBatch.timeslices.<locals>.finish_slicec                 S   s   | d d S )N   r   )xr   r   r&   r     r   z,MultiAgentBatch.timeslices.<locals>.<lambda>r   Nr   )Z)ray.rllib.evaluation.sample_batch_builderr,  r(  r3   r   r   r)   r   r#  r$  sortcollectionsdefaultdict	itertoolsgroupbyZ
add_valuesr.   )
r]   r=   r,  ZstepsZ	policy_idr+  rowr0  _groupr   r/  r&   r   |  s<   

zMultiAgentBatch.timeslicesc                 C   s(   t | dkrt| v r| t S t| |dS )a  Returns SampleBatch or MultiAgentBatch, depending on given policies.
        If policy_batches is empty (i.e. {}) it returns an empty MultiAgentBatch.

        Args:
            policy_batches: Mapping from policy ids to SampleBatch.
            env_steps: Number of env steps in the batch.

        Returns:
            The single default policy's SampleBatch or a MultiAgentBatch
            (more than one policy).
        r   r(  rj   )r.   r   r   r;  r   r   r&   wrap_as_needed  s   zMultiAgentBatch.wrap_as_neededrv   Trw   r}   c                 C   rf   r{   )concat_samples_into_ma_batchr|   r   r   r&   r~     s   zMultiAgentBatch.concat_samplesc                 C   s   t dd | j D | jS )z{Deep-copies self into a new MultiAgentBatch.

        Returns:
            The copy of self with deep-copied data.
        c                 S   r-  r   r   r   r   r   r&   r     r   z(MultiAgentBatch.copy.<locals>.<dictcomp>)r   r(  r3   r[   rc   r   r   r&   r     s   zMultiAgentBatch.copyr/   FNr   r   r   r   c                 C   sJ   |dkr#t dus
J | j D ]\}}|j|||||d| j|< q| S t)r   r/   N)r   r   r   r   )r/   r(  r3   r   r   )r]   r   r   r   r   r   pidZpolicy_batchr   r   r&   r     s   
	zMultiAgentBatch.to_devicec                 C   s   t dd | j D S )ze
        Returns:
            The overall size in bytes of all policy batches (all columns).
        c                 s   s    | ]}|  V  qd S r{   )r   )r$   br   r   r&   r      s    z-MultiAgentBatch.size_bytes.<locals>.<genexpr>)r1   r(  r)  rc   r   r   r&   r     s   zMultiAgentBatch.size_bytesrA   r   r   r   c                 C   s"   | j  D ]	}|j||d qdS )a8  Compresses each policy batch (per column) in place.

        Args:
            bulk: Whether to compress across the batch dimension (0)
                as well. If False will compress n separate list items, where n
                is the batch size.
            columns: Set of column names to compress.
        )r   r   N)r(  r)  r   )r]   r   r   r+  r   r   r&   r     s   zMultiAgentBatch.compressc                 C   s   | j  D ]}|| q| S )zDecompresses each policy batch (per column), if already compressed.

        Args:
            columns: Set of column names to decompress.

        Returns:
            Self.
        )r(  r)  r  )r]   r   r+  r   r   r&   r    s   z$MultiAgentBatch.decompress_if_neededc                 C   s   | S )zSimply returns `self` (already a MultiAgentBatch).

        Returns:
            This very instance of MultiAgentBatch.
        r   rc   r   r   r&   r   !  s   zMultiAgentBatch.as_multi_agentr   c                 C   s
   | j | S )z0Returns the SampleBatch for the given policy id.)r(  r   r   r   r&   r   *  rn   zMultiAgentBatch.__getitem__c                 C      d t| j| jS Nz!MultiAgentBatch({}, env_steps={})r   r4   r(  r[   rc   r   r   r&   __str__.     zMultiAgentBatch.__str__c                 C   r@  rA  rB  rc   r   r   r&   r  3  rD  zMultiAgentBatch.__repr__)r`   r   r  )$r  r   r!  r"  r   r   r   r)   r0   rQ   rj   rd   rh   r   r   r%  r   r<  r   r~   r   r   r   r	   r   r   r   r'  r   r4   r   r  r   r   rC  r  r   r   r   r&   r   E  s|    @





r   r}   r`   c              	      s,  t dd | D rt| S g }ddg}g }d } }}| D ]y}|jdkr%q|du r2|j}|j}|j}|j|ks<|j|kr@td|jdu sI|du rR|j|krRtd|r]|j|kr]td|durgt||j}|t	j
durw||t	j
  |jdur|d  |j7  < |d	  |j|j 7  < || qt|dkrt	 S i }|d  D ]2  t	jkrt fd
d|D d|i| < q fdd|D }	tt|d}
tj|
g|	R  | < q|g krtrt|d rt|}n|g krtrt|d rt|}t	||||||d	 |d pd dS )aX  Concatenates a list of  SampleBatches or MultiAgentBatches.

    If all items in the list are or SampleBatch typ4, the output will be
    a SampleBatch type. Otherwise, the output will be a MultiAgentBatch type.
    If input is a mixture of SampleBatch and MultiAgentBatch types, it will treat
    SampleBatch objects as MultiAgentBatch types with 'default_policy' key and
    concatenate it with th rest of MultiAgentBatch objects.
    Empty samples are simply ignored.

    Args:
        samples: List of SampleBatches or MultiAgentBatches to be
            concatenated.

    Returns:
        A new (concatenated) SampleBatch or MultiAgentBatch.

    .. testcode::
        :skipif: True

        import numpy as np
        from ray.rllib.policy.sample_batch import SampleBatch
        b1 = SampleBatch({"a": np.array([1, 2]),
                          "b": np.array([10, 11])})
        b2 = SampleBatch({"a": np.array([3]),
                          "b": np.array([12])})
        print(concat_samples([b1, b2]))


        c1 = MultiAgentBatch({'default_policy': {
                                        "a": np.array([1, 2]),
                                        "b": np.array([10, 11])
                                        }}, env_steps=2)
        c2 = SampleBatch({"a": np.array([3]),
                          "b": np.array([12])})
        print(concat_samples([b1, b2]))

    .. testoutput::

        {"a": np.array([1, 2, 3]), "b": np.array([10, 11, 12])}
        MultiAgentBatch = {'default_policy': {"a": np.array([1, 2, 3]),
                                              "b": np.array([10, 11, 12])}}

    c                 s   s    | ]}t |tV  qd S r{   )r    r   r   r   r   r&   r   g  s    z!concat_samples.<locals>.<genexpr>r   g        NzNAll SampleBatches' `zero_padded` and `time_major` settings must be consistent!z?Samples must consistently either provide or omit `max_seq_len`!zPFor `zero_padded` SampleBatches, the values of `max_seq_len` must be consistent!r   c                       g | ]}|  qS r   r   r   r=   r   r&   r'     r   z"concat_samples.<locals>.<listcomp>rM   c                    rE  r   r   )r$   crF  r   r&   r'     r   rM   g      ?)r<   rB   rD   rC   rF   )rt   r=  r[   rO   rN   rM   r   rZ   r(   r)   r*   r  rP   r   r.   r   r5   _concat_valuesr   r9   r   r/   r,   ZTensorr+   Zconvert_to_tensor)r}   Zconcatd_seq_lensZconcatd_num_grad_updatesZconcated_samplesrO   rN   rM   r   Zconcatd_dataZvalues_to_concatZ_concat_values_w_timer   rF  r&   r~   9  s~   .



 
r~   c                 C   s   t t}d}| D ]9}t|trt|dkrq	| }nt|ts+td	t
|j|j D ]\}}|| | q0|| 7 }q	i }| D ]
\}}t|||< qIt||S )a  Concatenates a list of SampleBatchTypes to a single MultiAgentBatch type.

    This function, as opposed to concat_samples() forces the output to always be
    MultiAgentBatch which is more generic than SampleBatch.

    Args:
        samples: List of SampleBatches or MultiAgentBatches to be
            concatenated.

    Returns:
        A new (concatenated) MultiAgentBatch.

    .. testcode::
        :skipif: True

        import numpy as np
        from ray.rllib.policy.sample_batch import SampleBatch
        b1 = MultiAgentBatch({'default_policy': {
                                        "a": np.array([1, 2]),
                                        "b": np.array([10, 11])
                                        }}, env_steps=2)
        b2 = SampleBatch({"a": np.array([3]),
                          "b": np.array([12])})
        print(concat_samples([b1, b2]))

    .. testoutput::

        {'default_policy': {"a": np.array([1, 2, 3]),
                            "b": np.array([10, 11, 12])}}

    r   z[`concat_samples_into_ma_batch` can only concat SampleBatch|MultiAgentBatch objects, not {}!)r4  r5  r!   r    r)   r.   r   r   r   r   r   r  r(  r3   r   rj   r~   )r}   r(  rj   r   r   r+  r   Zbatchesr   r   r&   r=    s&   
"



r=  rH  c                 G   s   t rt |d rt j|| rddS ddS t|d tjr+tj|| r'ddS ddS trAt|d rAtj|| r=ddS ddS t|d t	rVg }|D ]}|
| qL|S tdt|d  d|d  )zConcatenates a list of values.

    Args:
        values: The values to concatenate.
        time_major: Whether to concatenate along the first axis
            (time_major=False) or the second axis (time_major=True).
    r   r   )dim)Zaxisz$Unsupported type for concatenation: z first element: )r/   r,   catr    r"   r   r   r+   r   r!   r  r   r   )rM   r)  Zconcatenated_listZsublistr   r   r&   rI    s    rI  r+  c                 C   sB   t | tr| j }t|dkrt|v r| jt } | S td| S )a:  Converts a MultiAgentBatch to a SampleBatch if neccessary.

    Args:
        batch: The SampleBatchType to convert.

    Returns:
        batch: the converted SampleBatch

    Raises:
        ValueError if the MultiAgentBatch has more than one policy_id
        or if the policy_id is not `DEFAULT_POLICY_ID`
    r   a  RLlib tried to convert a multi agent-batch with data from more than one policy to a single-agent batch. This is not supported and may be due to a number of issues. Here are two possible ones:1) Off-Policy Estimation is not implemented for multi-agent batches. You can set `off_policy_estimation_methods: {}` to resolve this.2) Loading multi-agent data for offline training is not implemented.Load single-agent data instead to resolve this.)r    r   r(  r   r.   r   r   )r+  Zpolicy_keysr   r   r&    convert_ma_batch_to_sample_batch  s   



rL  )9r4  	functoolsr   r6  r   numbersr   typingr   r   r   r   r   r	   r   r"   r9   Zray.rllib.core.columnsr
   Zray.rllib.utils.annotationsr   r   r   Zray.rllib.utils.compressionr   r   r   Zray._common.deprecationr   r   Zray.rllib.utils.frameworkr   r   Zray.rllib.utils.torch_utilsr   Zray.rllib.utils.typingr   r   r   r   r   Zray.utilr   Ztf1r+   Ztfvr/   r9  r   r7   r@   r)   r   r~   r=  rI  rL  r   r   r   r&   <module>   sX    
A         i t >