o
    1 i@                    @   s>  U d dl Z d dlZd dlZd dlZd dlZd dl mZ d dlmZ d dlm	Z	m
Z
 d dlmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlZd dlmZ d dlmZ d d	lmZ d d
lm Z m!Z!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2m3Z3m4Z4m5Z5 d dl6m7Z7 e8e9Z:dZ;dZ<dZ=ee>ee, f Z?de@de>fddZAd2deBdeBde>fddZCG dd dZDG d d! d!ZEejFd d"G d#d$ d$ZGeH ZIejHeJd%< d&d' ZKG d(d) d)ZLeL ZMG d*d+ d+ZNe0e	G d,d- d-ZOe	G d.d/ d/ZPe	G d0d1 d1ZQdS )3    N)defaultdict)contextmanager)	dataclassfields)AnyDictListMappingOptionalSetTupleUnion)uuid4)ActorHandle)	BlockList)DatasetState)NODE_UNKNOWNMetricsGroupMetricsTypeNodeMetricsOpRuntimeMetrics)DatasetMetadataTopologyget_dataset_metadata_exporter)capfirst)
BlockStats)DataContext)DeveloperAPI)CounterGauge	HistogramMetric)NodeAffinitySchedulingStrategyZdatasets_stats_actorZ_dataset_stats_actorunknownsecondsreturnc                 C   sR   | dkrt t| dd S | dkrt t| d dd S t t| d d dd S )N      sgMbP?  msus)strround)r$    r.   d/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/_internal/stats.pyfmt/   s
   r0      lvlspaces_per_indentc                 C   s   d| |  S )zReturns a string of spaces which contains `level` indents,
    each indent containing `spaces_per_indent` spaces. For example:
    >>> leveled_indent(2, 3)
    '      '
     r.   )r2   r3   r.   r.   r/   leveled_indent8      r5   c                   @   sp   e Zd ZdZdd ZedddZdeddfd	d
ZdefddZ	defddZ
defddZdefddZdS )Timerz8Helper class for tracking accumulated time (in seconds).c                 C   s    d| _ td| _d| _d| _d S )Nr   inf)_totalfloat_min_max_total_countselfr.   r.   r/   __init__D   s   

zTimer.__init__r%   Nc              
   c   s>    t  }zd V  W | t  |  d S | t  |  w N)timeperf_counteradd)r?   Z
time_startr.   r.   r/   timerJ   s
   *zTimer.timervaluec                 C   s@   |  j |7  _ || jk r|| _|| jkr|| _|  jd7  _d S Nr&   )r9   r;   r<   r=   )r?   rF   r.   r.   r/   rD   R   s   

z	Timer.addc                 C      | j S rA   )r9   r>   r.   r.   r/   getZ      z	Timer.getc                 C   rH   rA   )r;   r>   r.   r.   r/   min]   rJ   z	Timer.minc                 C   rH   rA   )r<   r>   r.   r.   r/   max`   rJ   z	Timer.maxc                 C   s   | j r	| j| j  S tdS )Nr8   )r=   r9   r:   r>   r.   r.   r/   avgc   s   z	Timer.avgr%   N)__name__
__module____qualname____doc__r@   r   rE   r:   rD   rI   rK   rL   rM   r.   r.   r.   r/   r7   A   s    r7   c                   @   sN   e Zd ZdZdedddee fddZded	dfd
dZ	de
d	dfddZdS )_DatasetStatsBuilderzHelper class for building dataset stats.

    When this class is created, we record the start time. When build() is
    called with the final blocks of the new dataset, the time delta is
    saved as part of the stats.operator_nameparentDatasetStatsoverride_start_timec                 C   s   || _ || _|pt | _d S rA   )rT   rU   rB   rC   
start_time)r?   rT   rU   rW   r.   r.   r/   r@   n   s   z_DatasetStatsBuilder.__init__metadatar%   c                 C   s   i }t | D ].\}\}}t|}t|dkr1|dkr$||| j| < q||| jdd | < q||| j< qt|| j| jd}t	 | j
 |_|S )Nr&   r   z->)rY   rU   	base_name)	enumerateitemsr   lenrT   splitrV   rU   rB   rC   rX   time_total_s)r?   rY   Zop_metadataikvZcapped_kstatsr.   r.   r/   build_multioperatorx   s   z(_DatasetStatsBuilder.build_multioperatorfinal_blocksc                 C   s,   t | j| i| jd}t | j |_|S )N)rY   rU   )rV   rT   get_metadatarU   rB   rC   rX   r`   )r?   rf   rd   r.   r.   r/   build   s   z_DatasetStatsBuilder.buildN)rO   rP   rQ   rR   r,   r
   r:   r@   	StatsDictre   r   rh   r.   r.   r.   r/   rS   g   s    

rS   )Znum_cpusc                   @   s|  e Zd ZdZd3ddZdedeedf deee	f fd	d
Z
deeef fddZdd Zdd Z	d4dedeeeeeef f  dee deeef deeeeeeeef f f  f
ddZd5ddZ		d6ddZdededee d ed!ef
d"d#Zdedeeef fd$d%Zd4dee fd&d'Zd(ed)efd*d+Zd(ed,eeef fd-d.Z		d7ded/ee d0ee fd1d2ZdS )8_StatsActora  Actor holding stats for blocks created by LazyBlockList.

    This actor is shared across all datasets created in the same cluster.
    In order to cap memory usage, we set a max number of stats to keep
    in the actor. When this limit is exceeded, the stats will be garbage
    collected in FIFO order.

    TODO(ekl) we should consider refactoring LazyBlockList so stats can be
    extracted without using an out-of-band actor.r)   c                 C   s  t t| _i | _i | _|| _d| _i | _i | _	t
 | _i | _t  | _d}tdd|d| _tdd|d| _tdd	|d| _td
d|d| _tdd|d| _tdd|d| _tdd|d| _| jtj|d| _| jtj|d| _| jtj|d| _| jtj|d| _ | jtj!|d| _"| jtj#|d| _$| % | _&d}tdd|d| _'tdd|d| _(tdd|d| _)tdd|d| _*tdd|d| _+tdd|d| _,tdd |d| _-td!d"|d| _.td#d$|d| _/td%d&|d| _0td'd(|d| _1td)d*|d| _2td+d,|d| _3td-d.|d| _4td/d0|d| _5td1d2|d| _6td3d4|d| _7d5}td6d7|d| _8td8d9|d| _9td:d;d<:d=d> t;D  d?|d| _<d}td@dA|d| _=tdBdC|d| _>tdDdE|d| _?tdFdGd<:dHd> t;D  d?|d| _@d S )INr   datasetoperatorZdata_spilled_byteszBytes spilled by dataset operators.
                DataContext.enable_get_object_locations_for_metrics
                must be set to True to report this metricdescriptiontag_keysZdata_freed_bytesz Bytes freed by dataset operatorsZdata_current_bytesz9Bytes currently in memory store used by dataset operatorsZdata_cpu_usage_coresz#CPUs allocated to dataset operatorsZdata_gpu_usage_coresz#GPUs allocated to dataset operatorsZdata_output_bytesz$Bytes outputted by dataset operatorsZdata_output_rowsz#Rows outputted by dataset operators)metrics_grouprp   )rl   Z%data_iter_time_to_first_batch_secondszTotal time spent waiting for the first batch after starting iteration. This includes the dataset pipeline warmup time. This metric is accumulated across different epochs.Z data_iter_block_fetching_secondsz>Seconds taken to fetch (with ray.get) blocks by iter_batches()Zdata_iter_batch_shaping_secondszCSeconds taken to shape batch from incoming blocks by iter_batches()Z"data_iter_batch_formatting_secondsz1Seconds taken to format batches by iter_batches()Z!data_iter_batch_collating_secondsz2Seconds taken to collate batches by iter_batches()Z"data_iter_batch_finalizing_secondsZdata_iter_total_blocked_secondsz0Seconds user thread is blocked by iter_batches()Zdata_iter_user_secondszSeconds spent in user codeZdata_iter_initialize_secondsz-Seconds spent in iterator initialization codeZdata_iter_get_secondsz;Seconds spent in ray.get() while resolving block referencesZdata_iter_next_batch_secondsz:Seconds spent getting the next batch from the block bufferZdata_iter_format_batch_secondsz"Seconds spent formatting the batchZdata_iter_collate_batch_secondsz!Seconds spent collating the batchZ data_iter_finalize_batch_secondsz"Seconds spent finalizing the batchZdata_iter_blocks_localz*Number of blocks already on the local nodeZdata_iter_blocks_remotez8Number of blocks that require fetching from another nodeZdata_iter_unknown_locationz,Number of blocks that have unknown locationsrl   job_idrX   #data_dataset_estimated_total_blocksz&Total work units in blocks for dataset!data_dataset_estimated_total_rowsz$Total work units in rows for datasetdata_dataset_statezState of dataset (z, c                 S      g | ]}|j  d |j qS =rF   name.0r(   r.   r.   r/   
<listcomp>      z(_StatsActor.__init__.<locals>.<listcomp>)$data_operator_estimated_total_blocksz'Total work units in blocks for operator"data_operator_estimated_total_rowsz%Total work units in rows for operatordata_operator_queued_blocksz$Number of queued blocks for operatordata_operator_statezState of operator (c                 S   rw   rx   rz   r|   r.   r.   r/   r~     r   )Acollectionsr   dictrY   Z	last_timerX   	max_statsnext_dataset_iddatasets_ray_nodes_cacher   _metadata_exporterdataset_metadatasdequefinished_datasets_queuer   spilled_bytesfreed_bytescurrent_bytescpu_usage_coresgpu_usage_coresoutput_bytesoutput_rows0_create_prometheus_metrics_for_execution_metricsr   ZINPUTSexecution_metrics_inputsZOUTPUTSexecution_metrics_outputsZTASKSexecution_metrics_tasksZOBJECT_STORE_MEMORY"execution_metrics_obj_store_memoryZACTORSexecution_metrics_actorsZMISCexecution_metrics_misc/_create_prometheus_metrics_for_per_node_metricsper_node_metricstime_to_first_batch_siter_block_fetching_siter_batch_shaping_siter_batch_formatting_siter_batch_collating_siter_batch_finalizing_siter_total_blocked_siter_user_siter_initialize_s
iter_get_siter_next_batch_siter_format_batch_siter_collate_batch_siter_finalize_batch_siter_blocks_localiter_blocks_remoteiter_unknown_locationrt   ru   joinr   rv   r   r   r   r   )r?   r   Zop_tags_keysZiter_tag_keysdataset_tagsoperator_tagsr.   r.   r/   r@      s  
						
z_StatsActor.__init__rq   rp   .r%   c                 C   s   i }t  D ]H}|j|ksqd|j }|j}|jtjkr(t|||d||j< q|jtjkr>t|f||d|j	||j< q|jtj
krNt
|||d||j< q|S )Ndata_rn   )r   Zget_metricsrq   r{   ro   Zmetrics_typer   r   r    Zmetrics_argsr   )r?   rq   rp   metricsmetricmetric_nameZmetric_descriptionr.   r.   r/   r     s:   
z<_StatsActor._create_prometheus_metrics_for_execution_metricsc                 C   s8   i }t tD ]}d|j d}t|ddd||j< q|S )Nr   Z	_per_node )rl   node_iprn   )r   r   r{   r   )r?   r   fieldr   r.   r.   r/   r     s   z;_StatsActor._create_prometheus_metrics_for_per_node_metricsc                 C   s   t | j}|  jd7  _|S rG   )r,   r   )r?   
dataset_idr.   r.   r/   get_dataset_id  s   
z_StatsActor.get_dataset_idc                 C   s,   |D ]}| j |  q|D ]}| j|  qd S rA   )update_execution_metricsupdate_iteration_metrics)r?   execution_metricsiteration_metricsr   r.   r.   r/   update_metrics  s
   z_StatsActor.update_metricsNdataset_tag
op_metricsr   stater   c              	   C   s`  	 ddt dttttt f dtttf fdd}t||D ]\}}| ||}	| j	
|dd|	 | j
|dd|	 | j
|d	d|	 | j
|d
d|	 | j
|dd|	 | j
|dd|	 | j
|dd|	 | j D ]\}
}||||
d|	 qy| j D ]\}
}||||
d|	 q| j D ]\}
}||||
d|	 q| j D ]\}
}||||
d|	 q| j D ]\}
}||||
d|	 q| j D ]\}
}||||
d|	 qq|d ur(| D ]1\}}|| jvr|   | j|t}| j||d}	| D ]\}}| j| }||||	 qq| || d S )Nprom_metricrF   tagsc                 S   s   t | tr| || d S t | tr| || d S t | trct |trett|D ]<}|t|d k r9| j	| n| j	d d }|dkrK| j	|d  nd}|| d }t|| D ]}| 
|| qYq*d S d S d S )Nr&   rZ   d   r   r'   )
isinstancer   setr   incr    listranger^   Z
boundariesZobserve)r   rF   r   ra   Zboundary_upper_boundZboundary_lower_boundZbucket_value_r.   r.   r/   _record  s(   




z5_StatsActor.update_execution_metrics.<locals>._recordZobj_store_mem_spilledr   Zobj_store_mem_freedZobj_store_mem_usedZbytes_task_outputs_generatedZrow_outputs_takenZ	cpu_usageZ	gpu_usage)r   node_ip_tagrA   )r!   r   intr:   r   r   r,   zip_create_tagsr   r   rI   r   r   r   r   r   r   r   r]   r   r   r   r   r   r   _rebuild_ray_nodes_cacher   r   update_dataset)r?   r   r   r   r   r   r   rd   operator_tagr   
field_namer   node_idnode_metricsr   r   Zmetric_valuer.   r.   r/   r     sX   



z$_StatsActor.update_execution_metricsc                 C   sH   t  }|D ]}|dd }|dd }|d ur!|d ur!|| j|< qd S )NZNodeIDZNodeName)raynodesrI   r   )r?   Zcurrent_nodesnoder   Z	node_namer.   r.   r/   r   (  s   
z$_StatsActor._rebuild_ray_nodes_cacherd   rV   c                 C   sV  |  |}| j|j | | j|j | | j|j | | j|j | | j|j | | j|j | | j	|j	| | j
|j
| | j|j| | j|j | | j|j | | j|j | | j|j | | j|j | | j|j | | j|j | | j|j | d S rA   )r   r   r   rI   r   r   r   r   r   r   r   r   r   r   r   r   r   r   iter_time_to_first_batch_sr   r   )r?   rd   r   r   r.   r.   r/   r   0  s$   
z$_StatsActor.update_iteration_metricsrs   topologydata_contextc              
   C   sx   t   }|tjjddd|d dd |D d| j|< | jd ur:t|||||d d tjjd| j|< | j| j|  d S d S )Nr   c                 S   s    i | ]}|t jjd d d dqS )r   )r   progresstotalqueued_blocks)r   PENDINGr{   )r}   rm   r.   r.   r/   
<dictcomp>]  s    z0_StatsActor.register_dataset.<locals>.<dictcomp>)rs   r   r   r   
total_rowsrX   end_time	operators)rs   r   r   rX   r   execution_start_timeexecution_end_timer   )	rB   r   r   r{   r   r   r   r   export_dataset_metadata)r?   rs   r   r   r   r   rX   r.   r.   r/   register_datasetL  s6   

z_StatsActor.register_datasetc                 C   s  | j | | | j | }| j | dd}tt| j | dd}|||d}| j|dd| | j|dd| |dtj	j
}t|}| j|j| | || i }|d	i  D ]C\}	}
||	d
}| j|
dd| | j|
dd| | j|
dd| |
dtj	j
}t|}| j|j| |||	< qd| || |d tjj
tjj
hv r| j| t| j | jkr| jr| j }| j |d  | j|d  t| j | jkr| jsd S d S d S d S d S )Nrs   NonerX   r   rr   r   r   r   r   rk   r   )r   updaterI   r,   r   rt   r   ru   r   UNKNOWNr{   Zfrom_stringrv   rF   update_dataset_metadata_stater]   r   r   r   r   'update_dataset_metadata_operator_statesFINISHEDFAILEDr   appendr^   r   popleftpopr   )r?   r   r   rs   rX   r   Zstate_stringZ
state_enumoperator_statesrm   Zop_stater   Ztag_to_evictr.   r.   r/   r   v  s\   




z_StatsActor.update_datasetc                    s"    s| j S  fdd| j  D S )Nc                    s"   i | ]\}}|d   kr||qS rs   r.   r}   rb   rc   r   r.   r/   r     s   " z,_StatsActor.get_datasets.<locals>.<dictcomp>)r   r]   )r?   rs   r.   r   r/   get_datasets  s   z_StatsActor.get_datasetsr   	new_statec                 C   s   || j vrd S t }| j | }|j|krd S t|}||_|tjjkr)||_n!|tj	jtj
jfv rJ||_|jjD ]}|jtjjkrI||_||_q:|| j |< | j| d S rA   )r   rB   r   copydeepcopyr   RUNNINGr{   r   r   r   r   r   r   r   r   )r?   r   r   update_timedataset_metadataupdated_dataset_metadatarm   r.   r.   r/   r     s&   




z)_StatsActor.update_dataset_metadata_stater   c           	      C   s   || j vrd S | j | }d}|jjD ]}|j|v r%|j||j kr%d} nq|s*d S t|}t }|jjD ]2}|j|v ri||j }|j|krIq7||_|tj	j
krV||_q7|tjj
tjj
fv ri||_|jsi||_q7|| j |< | j| d S )NFT)r   r   r   idr   r   r   rB   r   r   r{   r   r   r   r   r   r   )	r?   r   r   r   Zupdate_neededrm   r   r   r   r.   r.   r/   r     s@   







z3_StatsActor.update_dataset_metadata_operator_statesr   r   c                 C   s,   d|i}|d ur||d< |d ur||d< |S )Nrl   rm   r   r.   )r?   r   r   r   r   r.   r.   r/   r     s   z_StatsActor._create_tags)r)   rA   rN   )rd   rV   )NN) rO   rP   rQ   rR   r@   r   r   r,   r   r!   r   r   r   r   r   r   r   r   r:   r   r
   r   r   r   r   r   r   r   r   r   r   r   r.   r.   r.   r/   rj      st    

 |





Y

*9

-rj   _stats_actor_lockc                  C   sp   t  } | j}tjjj stt 	 dd}t
 tjttdd|d W  d    S 1 s1w   Y  d S )NF)ZsoftTZdetached)r{   	namespaceZget_if_existsZlifetimescheduling_strategy)r   get_currentr  r   utilclientZis_connectedr"   get_runtime_contextZget_node_idr   rj   optionsSTATS_ACTOR_NAMESTATS_ACTOR_NAMESPACEremote)ctxr  r.   r.   r/   _get_or_create_stats_actor  s"   
$r  c                   @   s   e Zd ZdZdZdZdd Z	d$dedee	 fdd	Z
d
d Zdee deeeeeeeef f f  fddZ	d$dedee dee deeef def
ddZdefddZdddefddZdefddZdedee dedefdd Zdefd!d"Zd#S )%_StatsManageraf  A Class containing util functions that manage remote calls to _StatsActor.

    This class collects stats from execution and iteration codepaths and keeps
    track of the latest snapshot.

    An instance of this class runs a single background thread that periodically
    forwards the latest execution/iteration stats to the _StatsActor.

    This thread will terminate itself after being inactive (meaning that there are
    no active executors or iterators) for STATS_ACTOR_UPDATE_THREAD_INACTIVITY_LIMIT
    iterations. After terminating, a new thread will start if more calls are made
    to this class.
       c                 C   s6   d | _ d | _i | _i | _t | _d | _t | _d S rA   )	_stats_actor_handle_stats_actor_cluster_id_last_execution_stats_last_iteration_stats	threadingLock_stats_lock_update_thread_update_thread_lockr>   r.   r.   r/   r@   1  s   
z_StatsManager.__init__F
skip_cacher%   c                 C   s   t jjjd u rtdt jjjj}| jd u s| j|ks|rCzt jt	t
d| _|| _W | jS  tyB   t | _t jjjj| _Y | jS w | jS )NzEGlobal node is not initialized. Driver might be not connected to Ray.)r{   r  )r   Z_privateZworkerZ_global_nodeRuntimeErrorZ
cluster_idr  r  Z	get_actorr  r	  
ValueErrorr  )r?   r  Zcurrent_cluster_idr.   r.   r/   r  C  s,   


z(_StatsManager._get_or_create_stats_actorc                    sv    j .  jd u s j s) fdd}tj|dd _ j  W d    d S W d    d S 1 s4w   Y  d S )Nc            
         s  d} 	  j s	 jrmzR  }|d u rW qg } j(  j D ]\}}}}}dd |D }|||||f}	||	 qW d    n1 sDw   Y  |jjt|t j  d d} W n! t	yl   t
jddd Y d S w | d7 } | tjkr}t
d	 d S ttj q)
Nr   Tc                 S      g | ]}|j d dqS T)Zreset_histogram_metricsas_dictr}   r   r.   r.   r/   r~   {  s    
zX_StatsManager._start_thread_if_not_running.<locals>._run_update_loop.<locals>.<listcomp>)r   r   z1Error occurred during remote call to _StatsActor.)exc_infor&   z2Terminating StatsManager thread due to inactivity.)r  r  r  r  valuesr   r   r
  r   	Exceptionloggerdebugr  UPDATE_THREAD_INACTIVITY_LIMITrB   sleepStatsManager#STATS_ACTOR_UPDATE_INTERVAL_SECONDS)
Ziter_stats_inactivitystats_actorZformatted_execution_statsr   r   r   r   r   op_metrics_dictsargsr>   r.   r/   _run_update_loope  sh   zD_StatsManager._start_thread_if_not_running.<locals>._run_update_loopT)targetdaemon)r  r  is_aliver  Threadstart)r?   r,  r.   r>   r/   _start_thread_if_not_running`  s   <"z*_StatsManager._start_thread_if_not_runningr   c              	   C   sl   t  jsdS tdd }|D ]$}|j D ]\}}|| }ttD ]}||j  t	||j7  < q"qq|S )aR  
        Aggregate per-node metrics from a list of OpRuntimeMetrics objects.

        If per-node metrics are disabled in the current DataContext, returns None.
        Otherwise, it sums up all NodeMetrics fields across the provided metrics and
        returns a nested dictionary mapping each node ID to a dict of field values.
        Nc                   S   s   t tS rA   )r   r   r.   r.   r.   r/   <lambda>  s    z;_StatsManager._aggregate_per_node_metrics.<locals>.<lambda>)
r   r  Zenable_per_node_metricsr   Z_per_node_metricsr]   r   r   r{   getattr)r?   r   Zaggregated_by_noder   r   r   Zagg_node_metricsfr.   r.   r/   _aggregate_per_node_metrics  s   

z)_StatsManager._aggregate_per_node_metricsr   r   r   force_updatec           	      C   s   |  |}|rdd |D }|||||f}|  jj|  d S |||||f}| j || j|< W d    n1 s9w   Y  |   d S )Nc                 S   r  r  r  r  r.   r.   r/   r~     s    z:_StatsManager.update_execution_metrics.<locals>.<listcomp>)r6  r  r   r
  r  r  r2  )	r?   r   r   r   r   r7  r   r*  r+  r.   r.   r/   r     s"   
z&_StatsManager.update_execution_metricsc                 C   N   | j  || jv r| j|= W d    d S W d    d S 1 s w   Y  d S rA   )r  r  r?   r   r.   r.   r/   clear_last_execution_stats  s   

"z(_StatsManager.clear_last_execution_statsrd   rV   c                 C   s@   | j  ||f| j|< W d    n1 sw   Y  |   d S rA   )r  r  r2  )r?   rd   r   r.   r.   r/   r     s   z&_StatsManager.update_iteration_metricsc                 C   r8  rA   )r  r  r9  r.   r.   r/   clear_iteration_metrics  s   

"z%_StatsManager.clear_iteration_metricsr   r   c                 C   s,   | j dd}|jt  |||| dS )a1  Register a dataset with the stats actor.

        Args:
            dataset_tag: Tag for the dataset
            operator_tags: List of operator tags
            topology: Optional Topology representing the DAG structure to export
            data_context: The DataContext attached to the dataset
        Tr  N)r  r   r
  r   r  Z
get_job_id)r?   r   r   r   r   r)  r.   r.   r/   register_dataset_to_stats_actor  s   
z-_StatsManager.register_dataset_to_stats_actorc                 C   s:   z| j dd}t|j W S  ty   t j Y S w )NTr<  )r  r   rI   r   r
  r"  r   hex)r?   r)  r.   r.   r/   get_dataset_id_from_stats_actor  s   z-_StatsManager.get_dataset_id_from_stats_actorN)F)rO   rP   rQ   rR   r(  r%  r@   boolr
   r   r  r2  r   r   r	   r,   r   r   r:   r6  r   r   r   r:  r   r;  r   r   r=  r?  r.   r.   r.   r/   r    sT    
H


	
r  c                   @   s~   e Zd ZdZdddedeed  ed  f defddZ	e
d	d
 Z	ddedee defddZdddZdefddZdS )rV   zHolds the execution times for a given Dataset.

    This object contains a reference to the parent Dataset's stats as well,
    but not the Dataset object itself, to allow its blocks to be dropped from
    memory.N)r[   rY   rU   r[   c                C   s   || _ |durt|ts|g}|pg | _| jsdntdd | jD d | _|| _d| _d| _t	 | _
t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _t	 | _i | _d| _d| _d| _d| _d| _d| _t	 | _dS )a  Create dataset stats.

        Args:
            metadata: Dict of operators used to create this Dataset from the
                previous one. Typically one entry, e.g., {"map": [...]}.
            parent: Reference to parent Dataset's stats, or a list of parents
                if there are multiple.
            base_name: The name of the base operation for a multi-operator operation.
        Nr   c                 s       | ]}|j V  qd S rA   )numberr}   pr.   r.   r/   	<genexpr>@      z(DatasetStats.__init__.<locals>.<genexpr>r&   Zunknown_uuid)rY   r   r   parentsrL   rB  r[   dataset_uuidr`   r7   streaming_exec_schedule_siter_wait_sr   r   r   r   r   r   r   r   r   iter_total_sextra_metricsr   r   r   global_bytes_spilledglobal_bytes_restoreddataset_bytes_spilledstreaming_split_coordinator_s)r?   rY   rU   r[   r.   r.   r/   r@   *  s:   
 zDatasetStats.__init__c                 C   s   t  S rA   )r  r>   r.   r.   r/   r)  k  s   zDatasetStats.stats_actorr{   rW   r%   c                 C   s   t || |S )z>Start recording stats for an op of the given name (e.g., map).)rS   )r?   r{   rW   r.   r.   r/   child_buildero  s   zDatasetStats.child_builderDatasetStatsSummaryc                    s  g }t | jdk t| j| j| j| j| j| j| j	| j
| j| j| j| j| j| j| j}g }| jdur9dd | jD }d}t|D ]1\}}|jrp|jd }t|jtrX|jddnd}td|d  d	|j d
| d ||7 }q? fdd| j D }	t|	D ]-\}}
 r|dkr||
_n|	|d  }|jrd|jv r|jd nd|
_n||
_||
 q| jr| j nd}t |||| j!| j"| j#| j$| j%| j&| j'| j(|S )zGenerate a `DatasetStatsSummary` object from the given `DatasetStats`
        object, which can be used to generate a summary string.r&   Nc                 S      g | ]}|  qS r.   )
to_summaryrC  r.   r.   r/   r~         z+DatasetStats.to_summary.<locals>.<listcomp>r   rZ   sumzParent z (operator: z) contributes z rows to inputc                    s    g | ]\}}t j|| d qS )is_sub_operator)OperatorStatsSummaryfrom_block_metadata)r}   r{   rd   rW  r.   r/   r~     s    ))r^   rY   IterStatsSummaryrJ  r   r   r   r   r   r   r   r   r   rK  rP  r   r   r   rG  r\   operators_statsr   output_num_rowsr   rI   r#  r$  rT   r]   total_input_num_rowsr   rI  rR  rB  rH  r`   r[   rL  rM  rN  rO  )r?   r\  
iter_statsZstats_summary_parentsZparent_total_outputra   Zparent_summaryZlast_parent_opZ	op_outputZop_statsZop_statZprev_oprI  r.   rW  r/   rT  u  s   





	zDatasetStats.to_summaryc                 C   s   |    S )ax  Generate a string representing the runtime metrics of a Dataset. This is
        a high level summary of the time spent in Ray Data code broken down by operator.
        It also includes the time spent in the scheduler. Times are shown as the total
        time for each operator and percentages of time are shown as a fraction of the
        total time for the whole dataset.)rT  runtime_metricsr>   r.   r.   r/   r`    r6   zDatasetStats.runtime_metricsrA   )r%   rR  )rO   rP   rQ   rR   ri   r   r
   r   r,   r@   propertyr)  r:   rS   rQ  rT  r`  r.   r.   r.   r/   rV   #  s,    
A


]rV   c                   @   s2  e Zd ZU ed ed< ded< ed  ed< eed< eed< eed< eed	< eee	f ed
< eed< eed< eed< eed< 			d)de
ee  dedefddZedd ded  fddZedd deeef fddZdefddZd*defdd Zdefd!d"Zdefd#d$Zdefd%d&Zdefd'd(ZdS )+rR  rY  r\  r[  r_  rG  rB  rH  r`   r[   rL  rM  rN  rO  rI  NTalready_printedinclude_parentr%   c                 C   s  |du rt  }d}| jr%|r%| jD ]}|j|dd}|r$||7 }|d7 }qd}t| jdkrY| jd }|j}| j| }	|d| j|7 }|	|v rM|d	7 }n\|	|	 |t
|7 }nPt| jdkrt| jd
}
|
dkrld}
|d| j| j|
7 }t| jD ],\}}|j}| j| }	|d7 }|d||7 }|	|v r|d7 }q||	|	 |t
|7 }q|t j}|r| jr|r|jrdnd}||7 }|dt
| j d 7 }|t
| j7 }t| jdkrS|rSt| jd }t| jd }|s|r|d7 }|d|7 }|d|7 }t| jd }|r|d7 }|d|7 }| jd j}|r$|d nd}|  }|  }|rS|rS|rS|d7 }|d7 }|d||  d7 }|d||  d7 }|ra|ra|d|   7 }|S )a  Return a human-readable summary of this Dataset's stats.

        Args:
            already_printed: Set of operator IDs that have already had its stats printed
               out.
            include_parent: If true, also include parent stats summary; otherwise, only
               log stats of the latest operator.
            add_global_stats: If true, includes global stats to this summary.
        Returns:
            String with summary statistics for executing the Dataset.
        Nr   F)add_global_stats
r&   r   zOperator {} {}: z[execution cached]
r'   z Operator {} {}: executed in {}s
z	Suboperator {} {}: z	[execution cached]
	z* Extra metrics:     .Az
Cluster memory:
z* Spilled to disk: {}MB
z* Restored from disk: {}MB
z
Dataset memory:
rZ   rV  zDataset throughput:
	* Ray Data throughput:  rows/s
%	* Estimated single node throughput: )r   rG  	to_stringr^   r\  rT   rH  formatrB  rD   r,   r-   r`   r[   r\   r   r  verbose_stats_logsrL  rX  r_  rM  rN  rO  r]  get_total_wall_timeget_total_time_all_blocksr`  )r?   rb  rc  rd  outrD  
parent_sumZoperators_stats_summaryrT   Zoperator_uuidrounded_totalnrm  indentZ
mb_spilledZmb_restoredZdataset_mb_spilledr]  total_num_out_rows	wall_timeZtotal_time_all_blocksr.   r.   r/   rk    s   












zDatasetStatsSummary.to_stringcurrc                 C   s4   g }| j D ]}|r|j r|t| q|| g S rA   )rG  extendrR   _collect_dataset_stats_summaries)rw  ZsummsrD  r.   r.   r/   ry  _  s   


z4DatasetStatsSummary._collect_dataset_stats_summariessummc                 C   s0   t dd | jD }tdd | jD }||fS )Nc                 s   rA  rA   )earliest_start_timer}   opsr.   r.   r/   rE  l  rF  z:DatasetStatsSummary._find_start_and_end.<locals>.<genexpr>c                 s   rA  rA   )latest_end_timer|  r.   r.   r/   rE  m  rF  )rK   r\  rL   )rz  earliest_start
latest_endr.   r.   r/   _find_start_and_endj  s   z'DatasetStatsSummary._find_start_and_endc                    s   |    dtdtdtf fdd}t| }d}|D ]}t|jdkr6t|\}}|| }|||j|7 }q||d| j	7 }||d	 7 }|S )
Nr{   rB   r%   c                    s6    dkr|  nd}d|  dt | d|d ddS )Nr   z* : z (r   z.3fz%)
r0   )r{   rB   fractionZtotal_wall_timer.   r/   fmt_lines  s   "z5DatasetStatsSummary.runtime_metrics.<locals>.fmt_linezRuntime Metrics:
r   Z
SchedulingZTotal)
rn  r,   r:   rR  ry  r^   r\  r  r[   rI  )r?   r  	summariesrp  rz  r  r  Zop_total_timer.   r  r/   r`  p  s   
z#DatasetStatsSummary.runtime_metricsr   c                    s  t  }d fdd| jD }d fdd| jD }d fdd| j D }|r8d| d| dnd	}|rEd| d| dnd	}|rRd| d| dnd	}d	g | d
| d| j d| d| j d| d| j d| d| d| d| d| d| j	
 d  d| d| jd  d| d| jd  d| d| jd  d| d| d| dS )Nre  c                       g | ]	}|  d  qS r'   __repr__r}   sslevelr.   r/   r~         z0DatasetStatsSummary.__repr__.<locals>.<listcomp>c                    r  r  r  )r}   Zpsr  r.   r/   r~     r  c                 3   s0    | ]\}}t  d   | d| dV  qdS )r'   r  ,N)r5   r   r  r.   r/   rE    s
    
z/DatasetStatsSummary.__repr__.<locals>.<genexpr>,
z   r   zDatasetStatsSummary(
z   dataset_uuid=z   base_name=z
   number=z   extra_metrics={z},
z   operators_stats=[z],
z   iter_stats=r&   z   global_bytes_spilled=rg  zMB,
z   global_bytes_restored=z   dataset_bytes_spilled=z   parents=[r   )r5   r   r\  rG  rL  r]   rH  r[   rB  r_  r  rM  rN  rO  )r?   r  rt  r\  Zparent_statsrL  r.   r  r/   r    sp   		


zDatasetStatsSummary.__repr__c                 C   sP   dd t | D }t|dkrdS tdd |D }tdd |D }|| S )zCalculate the total wall time for the dataset, this is done by finding
        the earliest start time and latest end time for any block in any operator.
        The wall time is the difference of these two times.
        c                 S   s$   g | ]}t |jd krt|qS r   )r^   r\  rR  r  r}   rz  r.   r.   r/   r~     s
    z;DatasetStatsSummary.get_total_wall_time.<locals>.<listcomp>r   c                 s       | ]}|d  V  qdS )r   Nr.   r}   Z	start_endr.   r.   r/   rE        z:DatasetStatsSummary.get_total_wall_time.<locals>.<genexpr>c                 s   r  )r&   Nr.   r  r.   r.   r/   rE    r  )rR  ry  r^   rK   rL   )r?   Z
start_endsr  r  r.   r.   r/   rn    s   z'DatasetStatsSummary.get_total_wall_timec                 C   s   t | }tdd |D S )zGCalculate the sum of the wall times across all blocks of all operators.c                 s   s$    | ]}t d d |jD V  qdS )c                 s   s(    | ]}|j r|j d dndV  qdS rV  r   N)rv  rI   r|  r.   r.   r/   rE    s
    
zJDatasetStatsSummary.get_total_time_all_blocks.<locals>.<genexpr>.<genexpr>N)rV  r\  r  r.   r.   r/   rE    s    
z@DatasetStatsSummary.get_total_time_all_blocks.<locals>.<genexpr>)rR  ry  rV  )r?   r  r.   r.   r/   ro    s   
z-DatasetStatsSummary.get_total_time_all_blocksc                 C   s,   t dd | jD }|t dd | jD  S )Nc                 s   s    | ]}|  V  qd S rA   )get_total_cpu_timerC  r.   r.   r/   rE    r  z9DatasetStatsSummary.get_total_cpu_time.<locals>.<genexpr>c                 s   s    | ]
}|j d dV  qdS r  )cpu_timerI   r  r.   r.   r/   rE    s    
)rV  rG  r\  )r?   rq  r.   r.   r/   r    s   

z&DatasetStatsSummary.get_total_cpu_timec                 C   sF   dd | j D }|rt|nd}| js|S t|gdd | jD R  S )Nc                 S   rS  r.   )get_max_heap_memoryrC  r.   r.   r/   r~     rU  z;DatasetStatsSummary.get_max_heap_memory.<locals>.<listcomp>r   c                 S   s   g | ]	}|j d dqS )rL   r   )memoryrI   r  r.   r.   r/   r~     r  )rG  rL   r\  )r?   Zparent_memoryZ
parent_maxr.   r.   r/   r    s   z'DatasetStatsSummary.get_max_heap_memory)NTTr  )rO   rP   rQ   r   __annotations__r   r,   r:   r   r   r
   r   r@  rk  staticmethodry  r   r  r`  r  rn  ro  r  r  r.   r.   r.   r/   rR    sJ   
 

t
 rR  c                	   @   sN  e Zd ZU eed< eed< eed< eed< eed< eed< dZee	eef  ed< dZ
ee	eef  ed	< dZee	eef  ed
< dZee	eef  ed< dZee ed< dZee	eef  ed< dZee	eef  ed< dZee	eef  ed< dZee	eef  ed< ededee dedd fddZdefddZddefddZdS )rY  rT   rX  r`   r{  r~  block_execution_summary_strNrv  r  udf_timer  r^  r]  output_size_bytes
node_count	task_rowsblock_statsr%   c                 C   s\  dd |D }d}d}d\}}|r't dd |D }tdd |D }|| }|r1dt|}	n|rGt|d	}|dkr>d}d
t||}	nd}	|	d7 }	tt}
|D ]}|jdurk|j	durk|
|j	j
  |j7  < qTd}t|
dkrt |
 t|
 ttt|
 t|
d}dt|
|	}	d\}}}}|r!t dd |D tdd |D tdd |D tdd |D d}t dd |D tdd |D tdd |D tdd |D d}dd |D }t |t|tt|d}t dd |D tdd |D tdd |D tdd |D d}d}d d |D }|r?t |t|tt|t|d}d}d!d |D }|r]t |t|tt|t|d}d}|rtt}|D ]}||j |j
 qid"d# | D }t | t| ttt| t|d}d}t||||||	|||||||||d$S )%a  Calculate the stats for a operator from a given list of blocks,
        and generates a `OperatorStatsSummary` object with the results.

        Args:
            block_stats: List of `BlockStats` to calculate stats of
            operator_name: Name of operator associated with `blocks`
            is_sub_operator: Whether this set of blocks belongs to a sub operator.
        Returns:
            A `OperatorStatsSummary` object initialized with the calculated statistics
        c                 S      g | ]
}|j d ur|j qS rA   )
exec_statsr}   mr.   r.   r/   r~         z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>r   )r   r   c                 s   rA  rA   )Zstart_time_sr|   r.   r.   r/   rE    rF  z;OperatorStatsSummary.from_block_metadata.<locals>.<genexpr>c                 s   rA  rA   )Z
end_time_sr|   r.   r.   r/   rE    rF  z{} blocks produced
r'   z{} blocks produced in {}sr   re  N)rK   rL   meancountz{} tasks executed, {})NNNNc                 S      g | ]}|j qS r.   Zwall_time_sr}   er.   r.   r/   r~   3      c                 S   r  r.   r  r  r.   r.   r/   r~   4  r  c                 S   r  r.   r  r  r.   r.   r/   r~   5  r  c                 S   r  r.   r  r  r.   r.   r/   r~   6  r  )rK   rL   r  rV  c                 S   r  r.   Z
cpu_time_sr  r.   r.   r/   r~   9  r  c                 S   r  r.   r  r  r.   r.   r/   r~   :  r  c                 S   r  r.   r  r  r.   r.   r/   r~   ;  r  c                 S   r  r.   r  r  r.   r.   r/   r~   <  r  c                 S   s    g | ]}t |jp	d d dqS )r   i   r'   )r-   Zmax_uss_bytesr  r.   r.   r/   r~   ?  s    )rK   rL   r  c                 S   r  r.   Z
udf_time_sr  r.   r.   r/   r~   I  r  c                 S   r  r.   r  r  r.   r.   r/   r~   J  r  c                 S   r  r.   r  r  r.   r.   r/   r~   K  r  c                 S   r  r.   r  r  r.   r.   r/   r~   L  r  c                 S   r  rA   )num_rowsr  r.   r.   r/   r~   P  r  c                 S   r  rA   )Z
size_bytesr  r.   r.   r/   r~   Z  s    c                 S      i | ]	\}}|t |qS r.   )r^   )r}   r   tasksr.   r.   r/   r   k  r  z<OperatorStatsSummary.from_block_metadata.<locals>.<dictcomp>)rT   rX  r`   r{  r~  r  rv  r  r  r  r^  r]  r  r  r  )rK   rL   rl  r^   r-   r   r   r   r  r  Ztask_idxr!  npr  r   rV  r   r   rD   r]   rY  )clsrT   r  rX  r  rr  r`   r{  r~  Zexec_summary_strr  metaZtask_rows_statswall_time_stats	cpu_statsmemory_stats	udf_statsZmemory_stats_mboutput_num_rows_statsr]  output_size_bytes_statsr  Znode_counts_statsZ
node_tasksr(   Znode_countsr^  r.   r.   r/   rZ    s   






z(OperatorStatsSummary.from_block_metadatac              	   C   s  | j rdnd}| j}| j}|r-||7 }|dt|d t|d t|d t|d 7 }| j}|rP||7 }|dt|d t|d t|d t|d 7 }| j}|rs||7 }|d	t|d t|d t|d t|d 7 }| j}|r||7 }|d
|d |d |d 7 }| j}|r||7 }|d|d |d |d |d 7 }| j	}|r||7 }|d|d |d |d |d 7 }| j
}	|	r||7 }|d|	d |	d |	d |	d 7 }| j}
|
r||7 }|d|
d |
d |
d |
d 7 }|rE| jrE|rE| jr| jnd}|d }||7 }|d7 }||d| d 7 }||d| d 7 }||d|| j  d 7 }||d||d   d 7 }|S )E  For a given (pre-calculated) `OperatorStatsSummary` object (e.g. generated from
        `OperatorStatsSummary.from_block_metadata()`), returns a human-friendly string
        that summarizes operator execution statistics.

        Returns:
            String with summary statistics for executing the given operator.
        rf  r   z6* Remote wall time: {} min, {} max, {} mean, {} total
rK   rL   r  rV  z5* Remote cpu time: {} min, {} max, {} mean, {} total
z.* UDF time: {} min, {} max, {} mean, {} total
z8* Peak heap memory usage (MiB): {} min, {} max, {} mean
z?* Output num rows per block: {} min, {} max, {} mean, {} total
zA* Output size bytes per block: {} min, {} max, {} mean, {} total
z?* Output rows per task: {} min, {} max, {} mean, {} tasks used
r  z9* Tasks per node: {} min, {} max, {} mean; {} nodes used
r   z* Operator throughput:
z	* Total input num rows: z rows
z	* Total output num rows: rh  ri  rj  )rX  r  rv  rl  r0   r  r  r  r]  r  r  r  r`   r^  )r?   rt  rp  r  r  r  r  r  r  r  Znode_count_statsZtotal_num_in_rowsru  r.   r.   r/   __str__  s   











			
zOperatorStatsSummary.__str__r   c           
      C   s  t |}|| jrt dnd7 }dd | jpi  D }dd | jp"i  D }dd | jp.i  D }dd | jp:i  D }dd | jpFi  D }d	d | jpRi  D }d	g | d
| d| j
 d| d| j d| dt| j d| d| j | d|pd d| d|pd d| d|pd d| d|pd d| d|pd d| d|pd d| d}	|	S )r  r&   r   c                 S   r  r.   r  r   r.   r.   r/   r     r  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>c                 S   r  r.   r  r   r.   r.   r/   r     r  c                 S   r  r.   r  r   r.   r.   r/   r     r  c                 S   r  r.   r  r   r.   r.   r/   r         c                 S   r  r.   r  r   r.   r.   r/   r     r  c                 S   r  r.   r  r   r.   r.   r/   r     r  zOperatorStatsSummary(
z   operator_name='z',
z   is_suboperator=r  z   time_total_s=z   block_execution_summary_str=z   wall_time=Nz   cpu_time=z
   memory=z   output_num_rows=z   output_size_bytes=z   node_count=r   )r5   rX  rv  r]   r  r  r]  r  r  r   rT   r0   r`   r  )
r?   r  rt  r  r  r  r  r  Znode_conut_statsrp  r.   r.   r/   r    sr   		


zOperatorStatsSummary.__repr__r  )rO   rP   rQ   r,   r  r@  r:   rv  r
   r   r  r  r  r^  r   r]  r  r  r  classmethodr   r   rZ  r  r  r.   r.   r.   r/   rY    s<   
   rY  c                   @   s   e Zd ZU eed< eed< eed< eed< eed< eed< eed< eed< eed	< eed
< eed< eed< eed< eed< eed< defddZdefddZddefddZ	dS )r[  	wait_timeget_time	next_timeformat_timecollate_timefinalize_batch_timetime_to_first_batch
block_time	user_timeinitialize_time
total_timestreaming_split_coord_timer   r   r   r%   c                 C   s   |   S rA   )rk  r>   r.   r.   r/   r  R  s   zIterStatsSummary.__str__c              	   C   s  d}| j  s+| j s+| j s+| j s+| j s+| j s+| j s+| j ry|d7 }| j r@|d	t
| j 7 }| j rQ|d	t
| j 7 }| j  rb|d	t
| j  7 }| j rs|d	t
| j 7 }| j r|d	t
| j 7 }|d7 }| j r|d		t
| j t
| j t
| j t
| j 7 }| j rd
}||	t
| j t
| j t
| j t
| j 7 }| j rd}||	t
| j t
| j t
| j t
| j 7 }| j r|d	t
| j t
| j t
| j t
| j 7 }| j r?d}||	t
| j t
| j t
| j t
| j 7 }t jra|d7 }|d	| j7 }|d	| j7 }|d	| j7 }| j dkry|d7 }|t
| j  d7 }|S )Nr   z"
Dataset iterator time breakdown:
z* Total time overall: {}
z>    * Total time in Ray Data iterator initialization code: {}
zE    * Total time user thread is blocked by Ray Data iter_batches: {}
zP    * Total time spent waiting for the first batch after starting iteration: {}
z/    * Total execution time for user thread: {}
zC* Batch iteration time breakdown (summed across prefetch threads):
z5    * In ray.get(): {} min, {} max, {} avg, {} total
z:    * In batch creation: {} min, {} max, {} avg, {} total
z<    * In batch formatting: {} min, {} max, {} avg, {} total
z6    * In collate_fn: {} min, {} max, {} avg, {} total
zA    * In host->device transfer: {} min, {} max, {} avg, {} total
zBlock locations:
z    * Num blocks local: {}
z    * Num blocks remote: {}
z&    * Num blocks unknown location: {}
r   z+Streaming split coordinator overhead time: re  )r  rI   r  r  r  r  r  r  r  rl  r0   r  r  rK   rL   rM   r   r  Z'enable_get_object_locations_for_metricsr   r   r   r  )r?   rp  Zbatch_creation_str
format_strr.   r.   r/   rk  U  s   








zIterStatsSummary.to_stringr   c                 C   s:  t |}dg d| dt| j pd  d| dt| j p'd  d| d| jp4d  d| d| jpAd  d| d| jpNd  d| d	t| j	 p_d  d| d
t| j
 ppd  d| dt| j pd  d| dt| j pd  d| dS )Nr   zIterStatsSummary(
z   wait_time=r  z   get_time=z   iter_blocks_local=z   iter_blocks_remote=z   iter_unknown_location=z   next_time=z   format_time=z   user_time=z   total_time=r   )r5   r   r0   r  rI   r  r   r   r   r  r  r  r  )r?   r  rt  r.   r.   r/   r    sR   
		

zIterStatsSummary.__repr__Nr  )
rO   rP   rQ   r7   r  r   r,   r  rk  r  r.   r.   r.   r/   r[  2  s&   
 ^r[  )r   r1   )Rr   r   loggingr  rB   r   
contextlibr   dataclassesr   r   typingr   r   r   r	   r
   r   r   r   uuidr   numpyr  r   Z	ray.actorr   Zray.data._internal.block_listr   Z*ray.data._internal.execution.dataset_stater   Z:ray.data._internal.execution.interfaces.op_runtime_metricsr   r   r   r   r   Z$ray.data._internal.metadata_exporterr   r   r   Zray.data._internal.utilr   Zray.data.blockr   Zray.data.contextr   Zray.util.annotationsr   Zray.util.metricsr   r   r    r!   Zray.util.scheduling_strategiesr"   	getLoggerrO   r#  r  r	  r   r,   ri   r:   r0   r   r5   r7   rS   r
  rj   RLockr   r  r  r  r'  rV   rR  rY  r[  r.   r.   r.   r/   <module>   sp   
 (
		&
-    s   9 x  _