o
    1 idg                     @   s&  d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	m
  mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z- d dl.m/Z/m0Z0 e1e2Z3dZ4dZ5G dd dZ6dS )    N)ThreadPoolExecutor)islice)ListOptional)NodeID)get_or_create_event_loop)chrome_tracing_dump)env_integer)	do_filter)compose_state_message)
RuntimeEnv)RAY_MAX_LIMIT_FROM_API_SERVER
ActorStateActorSummariesJobStateListApiOptionsListApiResponse	NodeStateObjectStateObjectSummariesPlacementGroupStateRuntimeEnvStateStateSummarySummaryApiOptionsSummaryApiResponse	TaskStateTaskSummariesWorkerStateprotobuf_message_to_dictprotobuf_to_task_state_dict)DataSourceUnavailableStateDataSourceClientzFailed to query data from GCS. It is due to (1) GCS is unexpectedly failed. (2) GCS is overloaded. (3) There's an unexpected network issue. Please check the gcs_server.out log to find the root cause.a  Failed to query data from {type}. Queried {total} {type} and {network_failures} {type} failed to reply. It is due to (1) {type} is unexpectedly failed. (2) {type} is overloaded. (3) There's an unexpected network issue. Please check the {log_command} to find the root cause.c                   @   s  e Zd ZdZdedefddZedd Zde	d	e
fd
dZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZded	efddZded	efddZded	efddZd ee d	ee fd!d"Zd#S )$StateAPIManagerzZA class to query states from data source, caches, and post-processes
    the entries.
    state_data_source_clientthread_pool_executorc                 C   s   || _ || _d S N)_client_thread_pool_executor)selfr#   r$    r)   j/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/dashboard/state_aggregator.py__init__B   s   
zStateAPIManager.__init__c                 C   s   | j S r%   )r&   )r(   r)   r)   r*   data_source_clientJ   s   z"StateAPIManager.data_source_clientoptionreturnc                   b   z| j j j jdI dH }W n ty   ttw dtf fdd}t | j	||I dH S )zList all actor information from the cluster.

        Returns:
            {actor_id -> actor_data_in_dict}
            actor_data_in_dict's schema is in ActorState

        timeoutfiltersNr.   c                    s   g }| j D ]}t|g dd}|| qt|| j }t| jt j}t|}|j	dd d t
t| j}t|| j||dS )N)actor_idZowner_idjob_idnode_idplacement_group_idmessagefields_to_decodec                 S      | d S )Nr3   r)   entryr)   r)   r*   <lambda>w       z@StateAPIManager.list_actors.<locals>.transform.<locals>.<lambda>keyresulttotalnum_after_truncationnum_filtered)Zactor_table_datar   appendlenrE   r
   r2   r   detailsortlistr   limitr   rC   replyrB   r8   datarD   rE   r-   r)   r*   	transform]   s$   

z.StateAPIManager.list_actors.<locals>.transform)
r&   Zget_all_actor_infor1   r2   r    GCS_QUERY_FAILURE_WARNINGr   r   run_in_executorr'   r(   r-   rM   rP   r)   rO   r*   list_actorsN   s   #
zStateAPIManager.list_actorsc                   s^   z| j j jdI dH }W n ty   ttw dtf fdd}t | j||I dH S )zList all placement group information from the cluster.

        Returns:
            {pg_id -> pg_data_in_dict}
            pg_data_in_dict's schema is in PlacementGroupState
        r1   Nr.   c                    sz   g }| j D ]}t|g dd}|| qt|}t| jt j}t|}|jdd d t	t
t| j| j||dS )N)r6   Zcreator_job_idr5   r7   c                 S   r:   )Nr6   r)   r;   r)   r)   r*   r=      r>   zJStateAPIManager.list_placement_groups.<locals>.transform.<locals>.<lambda>r?   rA   )Zplacement_group_table_datar   rF   rG   r
   r2   r   rH   rI   r   rJ   r   rK   rC   rL   rO   r)   r*   rP      s&   
z8StateAPIManager.list_placement_groups.<locals>.transform)	r&   Zget_all_placement_group_infor1   r    rQ   r   r   rR   r'   rS   r)   rO   r*   list_placement_groups   s   
z%StateAPIManager.list_placement_groupsc                   r/   )zList all node information from the cluster.

        Returns:
            {node_id -> node_data_in_dict}
            node_data_in_dict's schema is in NodeState
        r0   Nr.   c                    s   g }| j D ]9}t|dgd}|d |d< t|d |d< t|d |d< |di }t|dd |d	d |d
< || qt|| j }t| j	t
 j}t|}|jdd d tt| j}t|| j||dS )Nr5   r7   node_manager_addressZnode_ipstart_time_msend_time_ms
death_inforeasonZreason_messageZstate_messagec                 S   r:   )Nr5   r)   r;   r)   r)   r*   r=      r>   z?StateAPIManager.list_nodes.<locals>.transform.<locals>.<lambda>r?   rA   )node_info_listr   intgetr   rF   rG   rE   r
   r2   r   rH   rI   rJ   r   rK   r   rC   )rM   rB   r8   rN   rZ   rD   rE   rO   r)   r*   rP      s2   


z-StateAPIManager.list_nodes.<locals>.transform)
r&   get_all_node_infor1   r2   r    rQ   r   r   rR   r'   rS   r)   rO   r*   
list_nodes   s   
zStateAPIManager.list_nodesc                   r/   )zList all worker information from the cluster.

        Returns:
            {worker_id -> worker_data_in_dict}
            worker_data_in_dict's schema is in WorkerState
        r0   Nr.   c                    s   g }| j D ]G}t|ddgd}|d d |d< |d d |d< |d d |d< t|d |d< t|d |d< t|d	 |d	< t|d
 |d
< || qt|| j }t| jt j	}t|}|j
dd d tt| j}t|| j||dS )N	worker_idr5   r7   Zworker_address
ip_addressiprX   rY   Zworker_launch_time_msZworker_launched_time_msc                 S   r:   )Nra   r)   r;   r)   r)   r*   r=     r>   zAStateAPIManager.list_workers.<locals>.transform.<locals>.<lambda>r?   rA   )Zworker_table_datar   r]   rF   rG   rE   r
   r2   r   rH   rI   rJ   r   rK   r   rC   rL   rO   r)   r*   rP      s0   
z/StateAPIManager.list_workers.<locals>.transform)
r&   Zget_all_worker_infor1   r2   r    rQ   r   r   rR   r'   rS   r)   rO   r*   list_workers   s   
zStateAPIManager.list_workersc                   s^   z| j j jdI d H }W n ty   ttw dtf fdd}t | j||I d H S )NrU   r.   c                    s`   dd | D }t |}t| jt j}t |}|jdd d tt| j}t	||||dS )Nc                 S   s   g | ]}|  qS r)   )dict).0Zjobr)   r)   r*   
<listcomp>  s    z@StateAPIManager.list_jobs.<locals>.transform.<locals>.<listcomp>c                 S   s   | d pdS )Nr4    r)   r;   r)   r)   r*   r=     s    z>StateAPIManager.list_jobs.<locals>.transform.<locals>.<lambda>r?   rA   )
rG   r
   r2   r   rH   rI   rJ   r   rK   r   )rM   rB   rC   rE   rO   r)   r*   rP     s   z,StateAPIManager.list_jobs.<locals>.transform)	r&   Zget_job_infor1   r    rQ   r   r   rR   r'   rS   r)   rO   r*   	list_jobs  s   
zStateAPIManager.list_jobsc                   s   z| j j j j jdI dH }W n ty   ttw dtf fdd}|jj	dkr9tg ddd|jj
gdS t | j||I dH S )zList all task information from the cluster.

        Returns:
            {task_id -> task_data_in_dict}
            task_data_in_dict's schema is in TaskState
        )r1   r2   exclude_driverNr.   c                    sp   dd | j D }t|}t|| j }t| jt j}t|}|jdd d tt	| j
}t||||dS )z
            Transforms from proto to dict, applies filters, sorts, and truncates.
            This function is executed in a separate thread.
            c                 S   s   g | ]}t |qS r)   )r   )rf   r8   r)   r)   r*   rg   ?  s    zAStateAPIManager.list_tasks.<locals>.transform.<locals>.<listcomp>c                 S   r:   )Ntask_idr)   r;   r)   r)   r*   r=   M  r>   z?StateAPIManager.list_tasks.<locals>.transform.<locals>.<lambda>r?   rA   )Zevents_by_taskrG   Znum_status_task_events_droppedr
   r2   r   rH   rI   rJ   r   rK   r   )rM   rB   rD   Z	num_totalrE   rO   r)   r*   rP   :  s   z-StateAPIManager.list_tasks.<locals>.transformr   )rB   rC   rD   rE   warnings)r&   Zget_all_task_infor1   r2   rj   r    rQ   r   statuscoder8   r   rR   r'   rS   r)   rO   r*   
list_tasks*  s,   
zStateAPIManager.list_tasksc                   sv   j j jddgdI dH } fdd|jD tjddiI dH }dtf fd	d
}t j	||I dH S )zList all object information from the cluster.

        Returns:
            {object_id -> object_data_in_dict}
            object_data_in_dict's schema is in ObjectState
        Nstate=ZALIVEr1   rK   r2   c                    $   g | ]}j j|j|j jd qS rU   )r&   Zget_object_inforW   Znode_manager_portr1   rf   	node_infor-   r(   r)   r*   rg   r      z0StateAPIManager.list_objects.<locals>.<listcomp>return_exceptionsTr.   c              	      s  d}g }d}| D ])}t |tr|d7 }qt |tr|||j7 }|jD ]}|t|dgdd q#qd }tdkrX|dkrXtj	dt|dd}|tkrSt|d	| }g }t
|}	|	jD ]3}
|
 }|d
 |d< |d
= |d |d< |d= |d  |d< |d dkrdn|d |d< || qbg }tdd}|s|d t|}t| jt j}t|}|jdd d tt| j}t||||||dS )Nr      	object_idF)r8   r9   Zpreserving_proto_field_nameZrayletz
raylet.outtyperC   Znetwork_failuresZlog_command1The returned data may contain incomplete result. Z
object_refZnode_ip_addressrc   r~   Ztask_status-ZNILZRAY_record_ref_creation_siteszCallsite is not being recorded. To record callsite information for each ObjectRef created, set env variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init`.c                 S   r:   )Nr|   r)   r;   r)   r)   r*   r=     r>   zAStateAPIManager.list_objects.<locals>.transform.<locals>.<lambda>r?   )rB   partial_failure_warningrC   rD   rE   rl   )
isinstancer    	ExceptionrC   Zcore_workers_statsrF   r   rG   NODE_QUERY_FAILURE_WARNINGformatmemory_utilsZconstruct_memory_tabletableas_dictupperr	   r
   r2   r   rH   rI   rJ   r   rK   r   )repliesunresponsive_nodesZworker_statsZtotal_objectsrM   Zcore_worker_statr   warning_msgrB   Zmemory_tabler<   rN   Zcallsite_warningZcallsite_enabledrD   rE   )r-   tasksr)   r*   rP     s|   






z/StateAPIManager.list_objects.<locals>.transform
r&   r_   r1   r\   asynciogatherr   r   rR   r'   )r(   r-   Zall_node_info_replyr   rP   r)   )r-   r(   r   r*   list_objectsf  s$   	R
zStateAPIManager.list_objectsc                   s   j jjddgdI dH }dd |jD  fdd D tjddiI dH }d	tf fd
d}t j	||I dH S )ao  List all runtime env information from the cluster.

        Returns:
            A list of runtime env information in the cluster.
            The schema of returned "dict" is equivalent to the
            `RuntimeEnvState` protobuf message.
            We don't have id -> data mapping like other API because runtime env
            doesn't have unique ids.
        Nrp   rs   c                 S   s   g | ]	}|j d ur|qS r%   )runtime_env_agent_portrv   r)   r)   r*   rg     s
    
z5StateAPIManager.list_runtime_envs.<locals>.<listcomp>c                    rt   ru   )r&   Zget_runtime_envs_inforW   r   r1   rv   rx   r)   r*   rg     ry   rz   Tr.   c                    s@  g }d}d}t  | D ]A\}}t|tr|d7 }qt|tr |||j7 }|j}|D ]!}t|g d}t|d 	 |d< t
|j |d< || q*qd }	tdkrs|dkrstjdt|dd}
|tkrnt|
d	|
 }	t|}t|jtj}t|}d
d }|j|dd tt|j}t||	|||dS )Nr   r{   r7   Zruntime_envr5   Zagentzdashboard_agent.logr}   r   c                 S   s0   d| vrt dS | d d u rt dS t | d S )NZcreation_time_msinf)floatr;   r)   r)   r*   	sort_func   s
   zGStateAPIManager.list_runtime_envs.<locals>.transform.<locals>.sort_funcT)r@   reverse)rB   r   rC   rD   rE   )zipr   r    r   rC   Zruntime_env_statesr   r   Zdeserializeto_dictr   r5   hexrF   rG   r   r   r
   r2   r   rH   rI   rJ   r   rK   r   )r   rB   r   Ztotal_runtime_envsrw   rM   Zstatesrq   rN   r   r   rD   rE   r   )
node_infosr-   r   r)   r*   rP     s\   


	z4StateAPIManager.list_runtime_envs.<locals>.transformr   )r(   r-   Zlive_node_info_replyr   rP   r)   )r   r-   r(   r   r*   list_runtime_envs  s*   
	=
z!StateAPIManager.list_runtime_envsc                    s   |j pd}|dvrtd| jt|jt|j|dkddI d H }|dkr-tj|j	d}n| j
t|jtdd	dI d H }tj|j	|j	d
}td|id}|j}|j|j |j |jk rd|p^g }|d t|j||j||j|jdS )N	func_name)r   lineagez3summary_by must be one of "func_name" or "lineage".r   )r1   rK   r2   rH   rO   )r   T)r1   rK   rH   )r   actorsclusterZnode_id_to_summaryzfThere is missing data in this aggregation. Possibly due to task data being evicted to preserve memory.rC   rB   r   rl   rD   rE   )
summary_by
ValueErrorro   r   r1   r   r2   r   Zto_summary_by_func_namerB   rT   Zto_summary_by_lineager   rl   Ztotal_actor_scheduledZtotal_actor_tasksZtotal_tasksrE   rF   r   rC   r   rD   )r(   r-   r   rB   Zsummary_resultsr   summaryrl   r)   r)   r*   summarize_tasks9  sZ   
	zStateAPIManager.summarize_tasksc                    X   | j t|jt|jddI d H }tdtj|jdid}t	|j
||j|j|j|jdS )Nrs   rO   r   )r   r   r   )rT   r   r1   r   r2   r   r   
to_summaryrB   r   rC   r   rl   rD   rE   r(   r-   rB   r   r)   r)   r*   summarize_actorsl  (   z StateAPIManager.summarize_actorsc                    r   )Nrs   rO   r   )objectsr   r   )r   r   r1   r   r2   r   r   r   rB   r   rC   r   rl   rD   rE   r   r)   r)   r*   summarize_objects  r   z!StateAPIManager.summarize_objectsr4   c                    s<   |r	dd|fgnd }| j td|dddI d H }t|jS )Nr4   rr   Ti'  )rH   r2   rK   rO   )ro   r   r   rB   )r(   r4   r2   rB   r)   r)   r*   generate_task_timeline  s   
z&StateAPIManager.generate_task_timelineN)__name__
__module____qualname____doc__r!   r   r+   propertyr,   r   r   rT   rV   r`   rd   ri   ro   r   r   r   r   r   r   r   r   strr   re   r   r)   r)   r)   r*   r"   =   s*    

6-10<pc3r"   )7r   loggingconcurrent.futuresr   	itertoolsr   typingr   r   Zray.dashboard.memory_utilsZ	dashboardr   Zrayr   Zray._common.utilsr   Zray._private.profilingr   Zray._private.ray_constantsr	   Zray.dashboard.state_api_utilsr
   Zray.dashboard.utilsr   Zray.runtime_envr   Zray.util.state.commonr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   Zray.util.state.state_managerr    r!   	getLoggerr   loggerrQ   r   r"   r)   r)   r)   r*   <module>   s*    T
