o
    1 i7a                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlZd dlm	Z	m
Z
 d dlmZ d dlmZmZ d dlmZmZ d dlmZmZ d dlmZmZ d d	lmZ d d
lmZmZmZm Z m!Z!m"Z"m#Z#m$Z$ e %e&Z'dZ(dZ)G dd dZ*G dd de*eZ+dS )    N)Thread)usage_constants	usage_lib)subprocess_output_util)cf
cli_logger)AUTOSCALER_NODE_START_WAIT_SProcessRunnerError)LABELS_ENVIRONMENT_VARIABLERESOURCES_ENVIRONMENT_VARIABLE)CreateClusterEventglobal_event_system)LogTimer)STATUS_SETTING_UPSTATUS_SYNCING_FILESSTATUS_UP_TO_DATESTATUS_UPDATE_FAILEDSTATUS_WAITING_FOR_SSHTAG_RAY_FILE_MOUNTS_CONTENTSTAG_RAY_NODE_STATUSTAG_RAY_RUNTIME_CONFIG      c                	   @   sb   e Zd ZdZddddeddddf	ddZdd Zdd	d
Zdd Zdd Z	dddZ
dddZdS )NodeUpdatera  A process for syncing files and running init commands on a node.

    Arguments:
        node_id: the Node ID
        provider_config: Provider section of autoscaler yaml
        provider: NodeProvider Class
        auth_config: Auth section of autoscaler yaml
        cluster_name: the name of the cluster.
        file_mounts: Map of remote to local paths
        initialization_commands: Commands run before container launch
        setup_commands: Commands run before ray starts
        ray_start_commands: Commands to start ray
        runtime_hash: Used to check for config changes
        file_mounts_contents_hash: Used to check for changes to file mounts
        is_head_node: Whether to use head start/setup commands
        rsync_options: Extra options related to the rsync command.
        process_runner: the module to use to run the commands
            in the CommandRunner. E.g., subprocess.
        use_internal_ip: Wwhether the node_id belongs to an internal ip
            or external ip.
        docker_config: Docker section of autoscaler yaml
        restart_only: Whether to skip setup commands & just restart ray
        for_recovery: True if updater is for a recovering node. Only used for
            metric tracking.
    NFc              	   C   s   d || _|p|ddo|o|dd }|| j||||||| _d| _|| _|d| _|| _|p6i }dd |	 D | _
|| _|| _|	| _|| _|| _|
| _|| _|pYg }d	d
 |D | _|pei | _|| _|| _|| _|| _d | _|| _d S )NzNodeUpdater: {}: Zuse_internal_ipsFZuse_external_head_ipTtypec                 S   s   i | ]\}}|t j|qS  ospath
expanduser).0remotelocalr   r   k/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/autoscaler/_private/updater.py
<dictcomp>x   s    z(NodeUpdater.__init__.<locals>.<dictcomp>c                 S   s   g | ]}t j|qS r   r   )r    r   r   r   r#   
<listcomp>   s    z(NodeUpdater.__init__.<locals>.<listcomp>)format
log_prefixgetZget_command_runner
cmd_runnerdaemonnode_idprovider_typeprovideritemsfile_mountsinitialization_commandssetup_commandsray_start_commandsnode_resourcesnode_labelsruntime_hashfile_mounts_contents_hashcluster_synced_filesrsync_optionsauth_configis_head_nodedocker_configrestart_onlyupdate_timefor_recovery)selfr+   provider_configr-   r9   cluster_namer/   r0   r1   r2   r5   r6   r:   r3   r4   r7   r8   Zprocess_runnerZuse_internal_ipr;   r<   r>   r   r   r#   __init__C   sR   


zNodeUpdater.__init__c              
   C   s  t   }t rt rd}t| z!t| jd| j	  | 
  W d    n1 s.w   Y  W np ty } zd| j| jtti tdtt td t|drmt|dd}tdt|j|j| ntd	tt| t }td
| tdt| td t  t|tj rW Y d }~d S  d }~ww tt!t"| j	i}| j#d ur| j#|t$< | j| j| t%dt! t   | | _&d| _'d S )NzOutput was redirected for an interactive command. Either do not pass `--redirect-command-output` or also pass in `--use-normal-shells`.zApplied config {}zNew status: {}z!!!cmdstderrzNo stderr availablez7Setup command `{}` failed with exit code {}. stderr: {}zException details: {}zFull traceback: {}zError message: {}
New statusr   )(timecmd_output_utilZdoes_allow_interactiveis_output_redirectedr   abortr   r'   r&   r5   	do_update	Exceptionr-   set_node_tagsr+   r   r   errorr   boldhasattrgetattrrC   
returncodeZverbose_errorstrvars	traceback
format_excnewline
isinstanceclickClickExceptionr   r   r6   r   labeled_valuer=   exitcode)r?   Zupdate_start_timemsgeZstderr_outputZfull_tracebackZtags_to_setr   r   r#   run   sb   









zNodeUpdater.runr      c           	         s"  |\}}g  t jdkrddg d fdd	}t jdd|d	 |fd
 j D ]	\}}||| q*|d	7 }W d    n1 sBw   Y  jrt jdd|d	 |fd
# t dtj jD ]	}|||dd qc|d	7 }W d    d S 1 s|w   Y  d S t jdd|d	 |fd
 d S )Nr   z~/ray_bootstrap_key.pemz~/ray_bootstrap_config.yamlFc                    s  |rt j|std| d S t j|sJ |t j|r2|ds)|d7 }| ds2| d7 } tjd	||  D j
oGj
d dk}|sYjjd	t j| dd || d	d
 |  vrytdt| t| W d    d S W d    d S 1 sw   Y  d S )Nz"sync: {} does not exist. Skipping./zSynced {} to {}Zcontainer_name zmkdir -p {}hostrun_envT)docker_mount_if_possiblez
{} from {})r   r   existsr   printisdirendswithr   r'   r&   r;   r)   r^   dirnamer   rN   )remote_path
local_pathallow_non_existing_pathsZ	is_dockerZnolog_pathsr?   sync_cmdr   r#   do_sync   s6   

"z-NodeUpdater.sync_file_mounts.<locals>.do_synczProcessing file mounts[]   Z	_numberedzProcessing worker file mountszsynced files: {}T)rn   zNo worker file mounts to syncF)r   	verbositygroupr/   r.   r7   rh   rR   )	r?   rp   step_numbersZprevious_stepsZtotal_stepsrq   rl   rm   r   r   ro   r#   sync_file_mounts   s4   
%


"	
zNodeUpdater.sync_file_mountsc                 C   s  t jdddtfd t| jd  t dtd d }	 t |kr)t	d	| j
| jr4t	d
z| jjdddd t d W W d    W d    dS  typ } ztj||td}tt W Y d }~nfd }~w t	y } zVdt| d }t|drt|jtr|j}nt|jtrd|j}ntdt|j d t|j}d|j|}t dt |ttt tt W Y d }~nd }~ww q1 sw   Y  W d    d S 1 sw   Y  d S )Nz#Waiting for SSH to become availablerr   rs   rt   zGot remote shellzRunning `{}` as a test.ZuptimeTzwait_ready timeout exceeded.z8wait_ready aborting because node detected as terminated.
   rc   )timeoutre   zSuccess.)Zretry_interval()rC    ze.cmd type (z) not list or str.z(Exit Status {}): {}z3SSH still not available {}, retrying in {} seconds.)!r   rw   NUM_SETUP_STEPSr   r'   rh   r   rN   rF   rK   r-   Zis_terminatedr+   r)   r^   successr	   rG   Zhandle_ssh_failsREADY_CHECK_INTERVALsleeprR   rO   rW   rC   listjoinloggerdebugr   r&   rQ   Zdimmed)r?   deadlineZfirst_conn_refused_timer]   Z	retry_strZcmd_r   r   r#   
wait_ready  sh   



"zNodeUpdater.wait_readyc                 C   s  | j | jtti tdt t t }| 	| t
tj | j | j}tdt| | jdkrP| j jrPddlm} || j j| j| j j| j |t| jkrp| jj| j| jdd}|rp|t  d7  < d| _ | j rvg | _!|t| jkr| j"r|t#| j"krtj$d	d
dt%fd ntj$dt&| jdd | j | jtt'i tdt' | j(| j)dt%fd |t| jkr<| j | jtt*i tdt* | j+rTtj,dd
dt%fdi t
tj- t.| j/d ddI | j+D ]=}t
tj-d|i z| jj0|| j1ddd W q t2y2 } z|j3dkr(t4d t4d t56dd d }~ww W d    n	1 s>w   Y  W d    n	1 sNw   Y  n
tj$dd
dt%fd tj,d d
d!t%fd | jj| j| jdd W d    n	1 sw   Y  | j!r2tj,d"d
d#t%fd t
tj7 t.| j/d$ ddv t8| j!}t9| j!D ]c\}}t
tj7d|i tj:dkrt8|d%krt;<|d d% d& }	nt;<|}	tj$d'|	d(||fd z| jj0|d)d* W q t2y } z|j3dkrt4d t4d t56d+d }~ww W d    n	1 sw   Y  W d    n	1 s,w   Y  n
tj$d,d
d#t%fd tj,d-d
d.t%fd t
tj= t.| j/d/ ddx | j>D ]l}i }
| jrrt?@ rmd|
tAjB< nd|
tAjB< | jd0kr| jCr| jC|
tD< | jEr| jE|
tF< ztGH }tGId | jj0||
d)d1 tGI| W qZ t2y } z|j3dkrt4d t4d t56d2d }~ww W d    n	1 sw   Y  t
tjJ W d    d S 1 sw   Y  d S )3NrE   zNode tags: {}Zawsr   )CloudwatchHelperF)Zas_headr/   Zsync_run_yetz-invalidatezYConfiguration already up to date, skipping file mounts, initalization and setup commands.rr   z2-6rt   zUpdating cluster configuration.)hash)Z_tagsrs   )rx   zRunning initialization commands   zInitialization commandsT)Zshow_statuscommandZssh_private_keyrc   )Zssh_options_override_ssh_keyre   Zssh_command_failedzFailed.zSee above for stderr.zInitialization command failed.z"No initialization commands to run.zInitializing command runnerr   zRunning setup commands   zSetup commands   z...z{}z()autord   zSetup command failed.zNo setup commands to run.zStarting the Ray runtimer   zRay start commandsr"   )Zenvironment_variablesre   zStart command failed.)Kr-   rL   r+   r   r   r   rZ   rF   r   r   r   Zexecute_callbackr   Zssh_control_acquired	node_tagsr   r   r&   rR   r,   r@   Z8ray.autoscaler._private.aws.cloudwatch.cloudwatch_helperr   rA   Zupdate_from_configr:   r(   r   r5   r)   Zrun_initr/   r<   r1   r6   r   rh   r   dictr   ry   rsync_upr   r0   rw   Zrun_initialization_cmdr   r'   r^   r9   r	   msg_typerM   rX   rY   Zrun_setup_cmdlen	enumeraterv   r   rN   Zstart_ray_runtimer2   r   Zusage_stats_enabledr   ZUSAGE_STATS_ENABLED_ENV_VARr3   r   r4   r
   rG   rH   Zset_output_redirectedZstart_ray_runtime_completed)r?   r   r   r   Zinit_requiredrC   r]   totaliZcmd_to_printZenv_varsZold_redirectedr   r   r#   rJ   O  sn  









&







#








"$zNodeUpdater.do_updatec                 C   \   i }||d< | j d|d< | j d|d< | jj|||d tdt|t| d S )Nrf   rsync_excludersync_filteroptionsz#`rsync`ed {} (local) to {} (remote))r8   r(   r)   Zrun_rsync_upr   verboser   rN   r?   sourcetargetrf   r   r   r   r#   r   !     zNodeUpdater.rsync_upc                 C   r   )Nrf   r   r   r   z#`rsync`ed {} (remote) to {} (local))r8   r(   r)   Zrun_rsync_downr   r   r   rN   r   r   r   r#   
rsync_down+  r   zNodeUpdater.rsync_down)r_   ru   )__name__
__module____qualname____doc__
subprocessrB   r^   ry   r   rJ   r   r   r   r   r   r#   r   (   s&    (
P
>C; 
S
r   c                   @   s   e Zd Zdd ZdS )NodeUpdaterThreadc                 O   s,   t |  tj| g|R i | d| _d S )N)r   rB   r   r[   )r?   argskwargsr   r   r#   rB   7  s   

zNodeUpdaterThread.__init__N)r   r   r   rB   r   r   r   r#   r   6  s    r   ),loggingr   r   rF   rT   	threadingr   rX   Zray._common.usager   r   Zray.autoscaler._privater   rG   Z"ray.autoscaler._private.cli_loggerr   r   Z&ray.autoscaler._private.command_runnerr   r	   Z!ray.autoscaler._private.constantsr
   r   Z$ray.autoscaler._private.event_systemr   r   Z!ray.autoscaler._private.log_timerr   Zray.autoscaler.tagsr   r   r   r   r   r   r   r   	getLoggerr   r   r   r   r   r   r   r   r   r#   <module>   s0    (
    