o
    1 i                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZmZmZmZmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZ d d	l m!Z! d d
l"m#Z# d dl$m%Z% d dl&m'Z(m)Z*m+Z, d dl-m.Z.m/Z/m0Z0m1Z2 d dl3Z3d dl4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z: d dl;m<Z< d dl=m1Z1 d dl>m?Z? e@eAZBdZCdZDeEdZFG dd de#ZGe	dg dZHde1fddZIG dd dZJG d d! d!ZKG d"d# d#ZLG d$d% d%ZMG d&d' d'ejNZOdS )(    N)defaultdict
namedtuple)AnyDictListSetTupleUnion)MetricDescriptorType)ValueDouble)aggregationmeasure)CountAggregationDataDistributionAggregationDataLastValueAggregationDataSumAggregationData)StatsExporter)StatsRecorder)View)ViewManager)tag_keytag_map	tag_value)CounterMetricFamilyGaugeMetricFamilyHistogramMetricFamilyMetricbuild_address)env_bool)WORKER_ID_TAG_KEYMetricCardinality)	GcsClient)r   )_is_invalid_metric_nameRAY_WORKER_TIMEOUT_SZCOREz[^a-zA-Z0-9]c                   @   sR   e Zd ZdZdee fddZedd Zedd Z	ed	d
 Z
edd ZdS )GaugezGauge representation of opencensus view.

    This class is used to collect process metrics from the reporter agent.
    Cpp metrics should be collected in a different way.
    tagsc                 C   sX   t |rtd| dt|||| _|| _dd |D }t|||| jt	 | _
d S )NzInvalid metric name: z. Metric will be discarded and data will not be collected or published. Metric names can only contain letters, numbers, _, and :. Metric names cannot start with numbers.c                 S   s   g | ]}t |qS  )tag_key_moduleTagKey).0tagr'   r'   f/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/_private/metrics_agent.py
<listcomp>J   s    z"Gauge.__init__.<locals>.<listcomp>)r#   
ValueErrormeasure_moduleZ
MeasureInt_measure_descriptionr   r   r   ZLastValueAggregation_view)selfnamedescriptionunitr&   r'   r'   r,   __init__@   s   

zGauge.__init__c                 C      | j S N)r0   r3   r'   r'   r,   r   O      zGauge.measurec                 C   r8   r9   )r2   r:   r'   r'   r,   viewS   r;   z
Gauge.viewc                 C   s   | j jS r9   )r   r4   r:   r'   r'   r,   r4   W   s   z
Gauge.namec                 C   r8   r9   )r1   r:   r'   r'   r,   r5   [   r;   zGauge.descriptionN)__name__
__module____qualname____doc__r   strr7   propertyr   r<   r4   r5   r'   r'   r'   r,   r%   9   s    


r%   Record)gaugevaluer&   metricc                 C   s|   | j jds	dS td| j j| j _| jD ]%}|jD ]}|dr:|j}|j	j
j}t|dkr:|d dkr:d|d< qqdS )a  
    Fix the inbound `opencensus.proto.metrics.v1.Metric` protos to make it acceptable
    by opencensus.stats.DistributionAggregationData.

    - metric name: gRPC OpenCensus metrics have names with slashes and dots, e.g.
    `grpc.io/client/server_latency`[1]. However Prometheus metric names only take
    alphanums,underscores and colons[2]. We santinize the name by replacing non-alphanum
    chars to underscore, like the official opencensus prometheus exporter[3].
    - distribution bucket bounds: The Metric proto asks distribution bucket bounds to
    be > 0 [4]. However, gRPC OpenCensus metrics have their first bucket bound == 0 [1].
    This makes the `DistributionAggregationData` constructor to raise Exceptions. This
    applies to all bytes and milliseconds (latencies). The fix: we update the initial 0
    bounds to be 0.000_000_1. This will not affect the precision of the metrics, since
    we don't expect any less-than-1 bytes, or less-than-1-nanosecond times.

    [1] https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md#units  # noqa: E501
    [2] https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
    [3] https://github.com/census-instrumentation/opencensus-cpp/blob/50eb5de762e5f87e206c011a4f930adb1a1775b1/opencensus/exporters/stats/prometheus/internal/prometheus_utils.cc#L39 # noqa: E501
    [4] https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto#L218 # noqa: E501
    zgrpc.io/N_distribution_valuer   gHz>)metric_descriptorr4   
startswithRE_NON_ALPHANUMSsub
timeseriespointsZHasFieldrH   bucket_optionsexplicitboundslen)rF   seriespoint
dist_valuebucket_boundsr'   r'   r,   fix_grpc_metricc   s   



rW   c                	   @   s   e Zd Zdedededee fddZedd Zed	d
 Zedd Z	edd Z
edd Zdd ZdedefddZdefddZdS )OpencensusProxyMetricr4   descr6   
label_keysc                 C   s"   || _ || _|| _|| _i | _dS )z>Represents the OpenCensus metrics that will be proxy exported.N)_name_desc_unit_label_keys_data)r3   r4   rY   r6   rZ   r'   r'   r,   r7      s
   
zOpencensusProxyMetric.__init__c                 C   r8   r9   )r[   r:   r'   r'   r,   r4      r;   zOpencensusProxyMetric.namec                 C   r8   r9   )r\   r:   r'   r'   r,   rY      r;   zOpencensusProxyMetric.descc                 C   r8   r9   )r]   r:   r'   r'   r,   r6      r;   zOpencensusProxyMetric.unitc                 C   r8   r9   )r^   r:   r'   r'   r,   rZ      r;   z OpencensusProxyMetric.label_keysc                 C   r8   r9   r_   r:   r'   r'   r,   data   r;   zOpencensusProxyMetric.datac                 C   s&   t | jdkottt| j tS )z8Check if the metric is a distribution aggreation metric.r   )rR   r_   
isinstancenextitervaluesr   r:   r'   r'   r,    is_distribution_aggregation_data   s   z6OpencensusProxyMetric.is_distribution_aggregation_datalabel_valuesra   c                 C   s   || j |< dS )zAdd the data to the metric.

        Args:
            label_values: The label values of the metric.
            data: The data to be added.
        Nr`   )r3   rg   ra   r'   r'   r,   add_data   s   zOpencensusProxyMetric.add_datarF   c           
      C   s   |j }t|dkrdS |D ]j}tdd |jD }|jD ]Z}|jjtjkr+t	|j
}nF|jjtjkr9tt|j}n8|jjtjkrGtt|j}n*|jjtjkrm|j}dd |jD }|jjj}	t|j|j |j|j||	}ntd|| j|< qqdS )zzParse the Opencensus Protobuf and store the data.

        The data can be accessed via `data` API once recorded.
        r   Nc                 s   s    | ]}|j V  qd S r9   rE   )r*   valr'   r'   r,   	<genexpr>   s    z/OpencensusProxyMetric.record.<locals>.<genexpr>c                 S      g | ]}|j qS r'   )count)r*   bucketr'   r'   r,   r-          z0OpencensusProxyMetric.record.<locals>.<listcomp>zSummary is not supported)rM   rR   tuplerg   rN   rI   typer
   ZCUMULATIVE_INT64r   Zint64_valueZCUMULATIVE_DOUBLEr   r   Zdouble_valueZGAUGE_DOUBLEr   ZCUMULATIVE_DISTRIBUTIONrH   bucketsrO   rP   rQ   r   sumrm   Zsum_of_squared_deviationr.   r_   )
r3   rF   rM   rS   labelsrT   ra   rU   counts_per_bucketrV   r'   r'   r,   record   sB   


zOpencensusProxyMetric.recordN)r=   r>   r?   rA   r   r7   rB   r4   rY   r6   rZ   ra   rf   r   r   rh   r   rv   r'   r'   r'   r,   rX      s    




	rX   c                   @   sR   e Zd ZdefddZedeeef fddZedd Z	d	e
e fd
dZdS )	Componentidc                 C   s   || _ t | _i | _dS )zyRepresent a component that requests to proxy export metrics

        Args:
            id: Id of this component.
        N)rx   time	monotonic_last_reported_time_metrics)r3   rx   r'   r'   r,   r7      s   

zComponent.__init__returnc                 C   r8   )zAReturn the metrics requested to proxy export from this component.)r|   r:   r'   r'   r,   metrics   s   zComponent.metricsc                 C   r8   r9   )r{   r:   r'   r'   r,   last_reported_time   r;   zComponent.last_reported_timer~   c                 C   sn   t  | _|D ]-}t| |j}|j}dd |jD }|| jvr,t||j	|j
|| j|< | j| | qdS )zParse the Opencensus protobuf and store metrics.

        Metrics can be accessed via `metrics` API for proxy export.

        Args:
            metrics: A list of Opencensus protobuf for proxy export.
        c                 S   rl   r'   )key)r*   Z	label_keyr'   r'   r,   r-     ro   z$Component.record.<locals>.<listcomp>N)ry   rz   r{   rW   rI   r4   rZ   r|   rX   r5   r6   rv   )r3   r~   rF   Z
descriptorr4   rZ   r'   r'   r,   rv      s   


zComponent.recordN)r=   r>   r?   rA   r7   rB   r   rX   r~   r   r   r   rv   r'   r'   r'   r,   rw      s    
rw   c                   @   s   e Zd ZddedefddZd dee defd	d
Zdd Z	dededee dede
ej dedeeee f ddfddZdeeeeef  deeeef fddZdee dee fddZdd ZdS )!OpenCensusProxyCollector<   	namespacecomponent_timeout_sc                 C   s,   t  | _|| _|| _i | _tdd| _dS )a)  Prometheus collector implementation for opencensus proxy export.

        Prometheus collector requires to implement `collect` which is
        invoked whenever Prometheus queries the endpoint.

        The class is thread-safe.

        Args:
            namespace: Prometheus namespace.
        ZRAY_EXPORT_COUNTER_AS_GAUGETN)	threadingLock_components_lock_component_timeout_s
_namespace_componentsr   _export_counter_as_gauge)r3   r   r   r'   r'   r,   r7     s
   
z!OpenCensusProxyCollector.__init__Nr~   worker_id_hexc                 C   s`   |st n|}| j || jvrt|| j|< | j| | W d   dS 1 s)w   Y  dS )a.  Record the metrics reported from the component that reports it.

        Args:
            metrics: A list of opencensus protobuf to proxy export metrics.
            worker_id_hex: A worker id that reports these metrics.
                If None, it means they are reported from Raylet or GCS.
        N)GLOBAL_COMPONENT_KEYr   r   rw   rv   )r3   r~   r   r   r'   r'   r,   rv   5  s   
"zOpenCensusProxyCollector.recordc                 C   s   | j @ g }g }| j D ]\}}t |j }|| jkr+|| t	d
|| q|D ]}|| j| q.|W  d   S 1 sFw   Y  dS )zClean up stale components.

        Stale means the component is dead or unresponsive.

        Stale components won't be reported to Prometheus anymore.
        zSMetrics from a worker ({}) is cleaned up due to timeout. Time since last report {}sN)r   r   itemsry   rz   r   r   appendloggerinfoformatpop)r3   Zstale_componentsZstale_component_idsrx   	componentelapsedr'   r'   r,   clean_stale_componentsC  s    

$z/OpenCensusProxyCollector.clean_stale_componentsmetric_namemetric_descriptionrZ   metric_unitsrg   agg_datametrics_mapr}   c                 C   sh  | j  sJ | j d| }t|t|ksJ ||fdd |D }t|trK||}|s?t||||d}	|	g}|||< |d j||j	d dS t|t
r||}|set|||d}	|	g}|||< |d j||jd | jsu	 dS |d	r}	 dS t|d
krt|d| d| |d}	||	 t|dksJ |d
 j||jd dS t|tr|jt|jksJ g }
d}t|jD ]\}}||j| 7 }t||g}|
| q|
d|j	g ||}|st|||d}	|	g}|||< |d j||
|jd dS t|tr+||}|st|||d}	|	g}|||< |d j||jd dS tdt| )a  to_metric translate the data that OpenCensus create
        to Prometheus format, using Prometheus Metric object.

        This method is from Opencensus Prometheus Exporter.

        Args:
            metric_name: Name of the metric.
            metric_description: Description of the metric.
            label_keys: The fixed label keys of the metric.
            metric_units: Units of the metric.
            label_values: The values of `label_keys`.
            agg_data: `opencensus.stats.aggregation_data.AggregationData` object.
                Aggregated data that needs to be converted as Prometheus samples
            metrics_map: The converted metric is added to this map.

        rG   c                 S   s   g | ]}|r|nd qS ) r'   )r*   tvr'   r'   r,   r-   z  s    zBOpenCensusProxyCollector.to_prometheus_metrics.<locals>.<listcomp>)r4   documentationr6   rt   r   )rt   rE   N)r4   r   rt   Z_total   z(DEPRECATED, use z_total metric instead)    z+Inf)rt   rr   Z	sum_valuezunsupported aggregation type )r   lockedr   rR   rb   r   getr   Z
add_metric
count_datar   sum_datar   endswithr   r   r   rQ   sorted	enumerateru   rA   r   rs   r   rE   r.   rq   )r3   r   r   rZ   r   rg   r   r   r~   rF   rr   Z	cum_countiiboundrn   r'   r'   r,   to_prometheus_metricsZ  s   







z.OpenCensusProxyCollector.to_prometheus_metricsdatasc                 C   s   t |dksJ |d }t|trtttdd |D S t|tr-ttdd |D S t|tr>tttdd |D S tdt| dt dt dt d	| d
)Nr   c                 S   rl   r'   ri   r*   ra   r'   r'   r,   r-     ro   zCOpenCensusProxyCollector._aggregate_metric_data.<locals>.<listcomp>c                 S   rl   r'   )r   r   r'   r'   r,   r-     ro   c                 S   rl   r'   )r   r   r'   r'   r,   r-     ro   zUnsupported aggregation type z. Supported types are z, z.Got .)	rR   rb   r   r   rs   r   r   r.   rq   )r3   r   sampler'   r'   r,   _aggregate_metric_data  s.   


z/OpenCensusProxyCollector._aggregate_metric_dataper_worker_metricsc           	      C   s   t t|d}|rt|jvr|S |jt}tt}|D ]}|j D ]\}}||d| ||d d   	| q#qt
|j|j|j|jd| |j|d d  d}| D ]\}}||| | qY|gS )a  Collect per-worker metrics, aggregate them into per-node metrics and convert
        them to Prometheus format.

        Args:
            per_worker_metrics: A list of per-worker metrics for the same metric name.
        Returns:
            A list of per-node metrics for the same metric name, with the high
            cardinality labels removed and the values aggregated.
        Nr   )r4   rY   r6   rZ   )rc   rd   r    rZ   indexr   listra   r   r   rX   r4   rY   r6   rh   r   )	r3   r   rF   Zworker_id_label_indexZlabel_value_to_datarg   ra   Zaggregated_metricr   r'   r'   r,   '_aggregate_with_recommended_cardinality  s:   


z@OpenCensusProxyCollector._aggregate_with_recommended_cardinalityc                 c   s
   | j h g }tt}t }| j D ]!}|j D ]}|tjkr/|	 s/||j
 | q|| qq| D ]
}|| | q:i }|D ]}|j D ]\}}	| |j
|j|j|j||	| qPqIW d   n1 sow   Y  | D ]
}
|
D ]}|V  q|qxdS )a,  Collect fetches the statistics from OpenCensus
        and delivers them as Prometheus Metrics.
        Collect is invoked every time a prometheus.Gatherer is run
        for example when the HTTP endpoint is invoked by Prometheus.

        This method is required as a Prometheus Collector.
        N)r   r   r   r!   Zget_cardinality_levelr   re   r~   ZRECOMMENDEDrf   r4   r   extendr   ra   r   r   rY   rZ   r6   )r3   Zopen_cencus_metricsZto_lower_cardinalityZcardinality_levelr   rF   r   Zprometheus_metrics_maprg   ra   r~   r'   r'   r,   collect9  sR   
.z OpenCensusProxyCollector.collect)r   r9   )r=   r>   r?   rA   intr7   r   r   rv   r   r   tag_value_moduleTagValuer   r   PrometheusMetricr   r	   r   r   r   r   rX   r   r   r'   r'   r'   r,   r     sF    	
 

7r   c                   @   s   e Zd Z	ddededefddZddee fdd	Z	d
e
dedefddZddee defddZddee defddZdd ZdS )MetricsAgentNview_managerstats_recorderstats_exporterc                 C   sl   t  | _|| _|| _|| _d| _| jdu rd| _n| j| t| jj	j
tttdd| _t | _dS )a+  A class to record and export metrics.

        The class exports metrics in 2 different ways.
        - Directly record and export metrics using OpenCensus.
        - Proxy metrics from other core components
            (e.g., raylet, GCS, core workers).

        This class is thread-safe.
        Nx   )r   )r   r   _lockr   r   r   proxy_exporter_collectorZregister_exporterr   optionsr   r   osgetenvr$   set_registered_views)r3   r   r   r   r'   r'   r,   r7   u  s   

zMetricsAgent.__init__recordsc                 C   s   |pi }| j W | js	 W d   dS |D ]?}|j}|j}|j}z| ||i || W q tyU } ztd|j	 d| d|d|d|
 W Y d}~qd}~ww W d   dS 1 saw   Y  dS )z7Directly record and export stats from the same process.NzFailed to record metric z with value z with tags z and global tags 	 due to: )
r   r   rD   rE   r&   _record_gauge	Exceptionr   errorr4   )r3   r   Zglobal_tagsrv   rD   rE   r&   er'   r'   r,   record_and_export  s&   ""zMetricsAgent.record_and_exportrD   rE   r&   c                 C   s  |j | jvr| j|j | j|j  | j }t	 }|
 D ]W\}}zt|}W n tyI }	 ztd| d|j  d|	 |	d }	~	ww zt|}
W n  tyq }	 ztd| d| d|j  d|	 |	d }	~	ww |||
 q!||j| || d S )NzFailed to create tag key z for metric r   zFailed to create tag value z	 for key )r4   r   r   Zregister_viewr<   addr   Znew_measurement_maptag_map_moduleZTagMapr   r(   r)   r.   r   r   r   r   insertZmeasure_float_putr   rv   )r3   rD   rE   r&   Zmeasurement_mapr   r   Ztag_valr   r   r   r'   r'   r,   r     s6   
zMetricsAgent._record_gauger~   r   c                 C   sN   | j  | js	 W d   dS W d   n1 sw   Y  | || dS )a{  Proxy export metrics specified by a Opencensus Protobuf.

        This API is used to export metrics emitted from
        core components.

        Args:
            metrics: A list of protobuf Metric defined from OpenCensus.
            worker_id_hex: The worker ID it proxies metrics export. None
                if the metric is not from a worker (i.e., raylet, GCS).
        N)r   r   _proxy_export_metricsr3   r~   r   r'   r'   r,   proxy_export_metrics  s   z!MetricsAgent.proxy_export_metricsc                 C   s   | j || d S r9   )r   rv   r   r'   r'   r,   r     s   z"MetricsAgent._proxy_export_metricsc                 C   sL   | j  | js	 W d   dS W d   n1 sw   Y  | j  dS )zClean dead worker's metrics.

        Worker metrics are cleaned up and won't be exported once
        it is considered as dead.

        This method has to be periodically called by a caller.
        N)r   r   r   r   r:   r'   r'   r,   clean_all_dead_worker_metrics  s   z*MetricsAgent.clean_all_dead_worker_metricsr9   )r=   r>   r?   r   r   r   r7   r   rC   r   r%   floatdictr   r   rA   r   r   r   r'   r'   r'   r,   r   t  s    
1r   c                       sP   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
  ZS ) PrometheusServiceDiscoveryWriteraF  A class to support Prometheus service discovery.

    It supports file-based service discovery. Checkout
    https://prometheus.io/docs/guides/file-sd/ for more details.

    Args:
        gcs_address: Gcs address for this cluster.
        temp_dir: Temporary directory used by
            Ray to store logs and metadata.
    c                    sV   t jjj|d ddd}|| _t jjj| || _d| _	g | _
t | _t   d S )NTF)Zallow_cluster_id_nilZfetch_cluster_id_if_nil   )rayZ_rayletZGcsClientOptionscreategcs_address_privatestateZ_initialize_global_statetemp_dir&default_service_discovery_flush_period latest_service_discovery_contentr   RLock_content_locksuperr7   )r3   r   r   Zgcs_client_options	__class__r'   r,   r7     s   
z)PrometheusServiceDiscoveryWriter.__init__c                 C   s0   | j  | jW  d   S 1 sw   Y  dS )z3Return the latest stored service discovery content.N)r   r   r:   r'   r'   r,   $get_latest_service_discovery_content  s   $zEPrometheusServiceDiscoveryWriter.get_latest_service_discovery_contentc                 C   s   t  }dd |D }t| jd}|dd}|r!||d |dd}|r1||d dd	i|d
g}| j || _W d   n1 sJw   Y  t	
|S )z4Return the content for Prometheus service discovery.c                 S   s*   g | ]}|d  du rt |d |d qS )aliveTZNodeManagerAddressZMetricsExportPortr   )r*   noder'   r'   r,   r-     s
    zOPrometheusServiceDiscoveryWriter.get_file_discovery_content.<locals>.<listcomp>)addresss   AutoscalerMetricsAddressNzutf-8s   DashboardMetricsAddressZjobr   )rt   targets)r   nodesr"   r   Zinternal_kv_getr   decoder   r   jsondumps)r3   r   Zmetrics_export_addressesZ
gcs_clientZautoscaler_addrZdashboard_addrcontentr'   r'   r,   get_file_discovery_content  s    
z;PrometheusServiceDiscoveryWriter.get_file_discovery_contentc                 C   sT   |   }t|d}||   W d    n1 sw   Y  t||   d S )Nw)get_temp_file_nameopenwriter   r   replaceget_target_file_name)r3   Ztemp_file_nameZ	json_filer'   r'   r,   r   *  s
   z&PrometheusServiceDiscoveryWriter.writec                 C   s   t j| jtjjjS r9   )r   pathjoinr   r   r   ray_constants!PROMETHEUS_SERVICE_DISCOVERY_FILEr:   r'   r'   r,   r   7  s   z5PrometheusServiceDiscoveryWriter.get_target_file_namec                 C   s   t j| jddtjjjS )Nz{}_{}tmp)	r   r   r   r   r   r   r   r   r   r:   r'   r'   r,   r   <  s   
z3PrometheusServiceDiscoveryWriter.get_temp_file_namec              
   C   st   	 z|    W n+ ty2 } ztd|   tt  td|  W Y d }~nd }~ww t	| j
 q)NTz,Writing a service discovery file, {},failed.zError message: )r   r   r   warningr   r   	traceback
format_excry   sleepr   )r3   r   r'   r'   r,   runD  s   
z$PrometheusServiceDiscoveryWriter.run)r=   r>   r?   r@   r7   r   r   r   r   r   r   __classcell__r'   r'   r   r,   r     s    r   )Pr   loggingr   rer   ry   r   collectionsr   r   typingr   r   r   r   r   r	   Z+opencensus.metrics.export.metric_descriptorr
   Zopencensus.metrics.export.valuer   Zopencensus.statsr   r   r/   Z!opencensus.stats.aggregation_datar   r   r   r   Zopencensus.stats.base_exporterr   Zopencensus.stats.stats_recorderr   Zopencensus.stats.viewr   Zopencensus.stats.view_managerr   Zopencensus.tagsr   r(   r   r   r   r   Zprometheus_client.corer   r   r   r   r   r   Zray._common.network_utilsr   Zray._private.ray_constantsr   Z)ray._private.telemetry.metric_cardinalityr    r!   Zray._rayletr"   Zray.core.generated.metrics_pb2Zray.util.metricsr#   	getLoggerr=   r   r$   r   compilerK   r%   rC   rW   rX   rw   r   r   Threadr   r'   r'   r'   r,   <module>   sR     

'&^.  a 