o
    1 i)                     @   s  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZ d d	lmZ e eZzd dlZW n eye   dZe d
 Y nw eG dd dZ!eG dd dej"Z#edd Z$edd Z%dS )    N)UnionOptional)ExternalEnv)ExternalMultiAgentEnv)MultiAgentEnv)MultiAgentBatch)OldAPIStack)MultiAgentDictEnvInfoDict
EnvObsTypeEnvActionType)RLlinkzMCouldn't import `requests` library. Be sure to install it on the client side.c                   @   s  e Zd ZdZ			d'dedededeej fd	d
Z		d(dee de
defddZdedeeef deeef fddZdedeeef deeef ddfddZ		d)dededeeef dee ddf
ddZdedeeef ddfddZd*ddZd d! Zd"d# Zd+d%d&ZdS ),PolicyClientz4REST client to interact with an RLlib policy server.local      $@Naddressinference_modeupdate_intervalsessionc                 C   sH   || _ || _d | _|dkrd| _| | d S |dkr d| _d S td)Nr   TremoteFz1inference_mode must be either 'local' or 'remote')r   r   envr   _setup_local_rollout_worker
ValueError)selfr   r   r   r    r   g/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/rllib/env/policy_client.py__init__&   s   
zPolicyClient.__init__T
episode_idtraining_enabledreturnc                 C   s4   | j r|   | j||S | |tj|dd S )N)r   commandr   r   )r   _update_local_policyr   start_episode_sendCommandsZSTART_EPISODE)r   r   r   r   r   r   r"   8   s   zPolicyClient.start_episodeobservationc                    sZ   j r!  t|ttfr fdd|D }|S j| S tj	 |dd S )Nc                    s    i | ]}|j | | qS r   )r   
get_action).0Zeidr%   r   r   r   
<dictcomp>M   s    z+PolicyClient.get_action.<locals>.<dictcomp>r    r%   r   action)
r   r!   
isinstancelisttupler   r&   r#   r$   Z
GET_ACTION)r   r   r%   actionsr   r(   r   r&   G   s    zPolicyClient.get_actionr+   c                 C   s8   | j r|   | j|||S | tj|||d d S )N)r    r%   r+   r   )r   r!   r   
log_actionr#   r$   Z
LOG_ACTION)r   r   r%   r+   r   r   r   r0   ]   s   zPolicyClient.log_actionrewardinfomultiagent_done_dictc                 C   sb   | j r#|   |d urt|tsJ | j||||S | j|||S | tj||||d d S )N)r    r1   r2   r   done)	r   r!   r,   dictr   log_returnsr#   r$   ZLOG_RETURNS)r   r   r1   r2   r3   r   r   r   r6   p   s    zPolicyClient.log_returnsc                 C   s4   | j r|   | j||S | tj||d d S )Nr*   )r   r!   r   end_episoder#   r$   ZEND_EPISODE)r   r   r%   r   r   r   r7      s   zPolicyClient.end_episodec                 C   s   | j dd dS )zGQuery the server for new policy weights, if local inference is enabled.T)forceN)r!   r   r   r   r   update_policy_weights   s   z"PolicyClient.update_policy_weightsc                 C   sn   t |}| jd u rtj| j|d}n	| jj| j|d}|jdkr+td	|j
| |  t |j}|S )N)data   zRequest failed {}: {})pickledumpsr   requestspostr   status_codeloggererrorformattextraise_for_statusloadscontent)r   r;   payloadresponseparsedr   r   r   r#      s   


zPolicyClient._sendc                 C   sL   || _ d| _td | dtjid }t|| j\| _| _	| jj
| _
d S )Nr   z,Querying server for rollout worker settings.r    Zworker_args)r   last_updatedrB   r2   r#   r$   ZGET_WORKER_ARGS_create_embedded_rollout_workerrollout_workerinference_threadr   )r   r   kwargsr   r   r   r      s   
z(PolicyClient._setup_local_rollout_workerFc                 C   s   | j  sJ | jrt | j | jks|rAtd | dtj	i}|d }|d }td
| | j|| t | _d S d S )Nz'Querying server for new policy weights.r    weightsglobal_varsz3Updating rollout worker weights and global vars {}.)rO   is_aliver   timerL   rB   r2   r#   r$   ZGET_WEIGHTSrD   rN   Zset_weights)r   r8   resprQ   rR   r   r   r   r!      s*   
z!PolicyClient._update_local_policy)r   r   NNT)NN)r   N)F)__name__
__module____qualname____doc__strfloatr   r?   Sessionr   boolr"   r   r   r	   r   r&   r0   r
   r6   r7   r:   r#   r   r!   r   r   r   r   r   "   s|    












r   c                       $   e Zd Z fddZdd Z  ZS )_LocalInferenceThreadc                    s    t    d| _|| _|| _d S rV   )superr   daemonrN   send_fn)r   rN   rc   	__class__r   r   r      s   

z_LocalInferenceThread.__init__c              
   C   s   z7	 t d | j }| j }t|tr$t d| |	  n	t d|j
 | tj||d q tyP } zt d| W Y d }~d S d }~ww )NTz$Generating new batch of experiences.z9Sending batch of {} env steps ({} agent steps) to server.z)Sending batch of {} steps back to server.)r    samplesmetricsz$Error: inference worker thread died!)rB   r2   rN   sampleZget_metricsr,   r   rD   Z	env_stepsZagent_stepscountrc   r$   ZREPORT_SAMPLES	ExceptionrC   )r   rf   rg   er   r   r   run   s4   



z_LocalInferenceThread.runrW   rX   rY   r   rl   __classcell__r   r   rd   r   r`      s    r`   c                    s    fdd}|S )Nc                    sP    | }t |ttfs&td t |trt}nt}G dd d|}||S |S )NzzThe env you specified is not a supported (sub-)type of ExternalEnv. Attempting to convert it automatically to ExternalEnv.c                       r_   )zI_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapperc                    s   t  j|j|jd d S )N)observation_spaceaction_space)ra   r   ro   rp   )r   real_envrd   r   r   r     s   
zR_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapper.__init__c                 S   s   t d d S )Ni?B )rT   sleepr9   r   r   r   rl     s   zM_auto_wrap_external.<locals>.wrapped_creator.<locals>._ExternalEnvWrapper.runrm   r   r   rd   r   _ExternalEnvWrapper  s    rs   )r,   r   r   rB   r2   r   )
env_configrq   Zexternal_clsrs   real_env_creatorr   r   wrapped_creator   s   
z,_auto_wrap_external.<locals>.wrapped_creatorr   )rv   rw   r   ru   r   _auto_wrap_external   s   rx   c                    s   |   } | d j dd| d< | d }d |_d|_i |_|jd u r@ddlm m |j|j	d|j
t fdd	| d
< n
| d
 }t|| d
< td|  ddlm} |di | }t||}|  ||fS )NconfigF)Zcopy_frozenZsamplerr   )	RandomEnvRandomMultiAgentEnv)rp   ro   c                    s   rS  S )Nr   )_rz   r{   rt   Zis_mar   r   <lambda>1  s    z1_create_embedded_rollout_worker.<locals>.<lambda>Zenv_creatorz&Creating rollout worker with kwargs={})RolloutWorkerr   )copyoutputZinput_Zinput_configr   Z*ray.rllib.examples.envs.classes.random_envrz   r{   rp   ro   Zis_multi_agentrx   rB   r2   rD   Z#ray.rllib.evaluation.rollout_workerr   r`   start)rP   rc   ry   rv   r   rN   rO   r   r}   r   rM     s.   


rM   )&logging	threadingrT   typingr   r   Zray.cloudpickleZcloudpickler=   Zray.rllib.env.external_envr   Z&ray.rllib.env.external_multi_agent_envr   Zray.rllib.env.multi_agent_envr   Zray.rllib.policy.sample_batchr   Zray.rllib.utils.annotationsr   Zray.rllib.utils.typingr	   r
   r   r   Zray.rllib.env.external.rllinkr   r$   	getLoggerrW   rB   r?   ImportErrorwarningr   Threadr`   rx   rM   r   r   r   r   <module>   s<    
 /#
!