o
    i]                     @   s  U d Z ddlmZ ddlmZmZmZmZ ddlmZm	Z	m
Z
mZmZ ddlmZmZ ddlmZ ddlmZ ddlmZmZ erSdd	lmZ dd
lmZ ddlmZ daee ed< dZdZ edgddd							dkde!de"dee! dee# dee! dee$ dee% dee# dee# ddfddZ&dd de"ddfd!d"Z'dld#d$Z(d%ed  ddfd&d'Z)d(e!ddfd)d*Z*d(e!de"ddfd+d,Z+		dmd-ee d.ee$ ddfd/d0Z,d1e!d2e!d3e%ddfd4d5Z-d(e!ddfd6d7Z.							8dndee! dee# d9ee! d:ee# d.ee$ dee# d;e%ddfd<d=Z/		dmd>ed?ee! d@ee% ddfdAdBZ0edCgdDdd			dodEe"dFee! dGee! dCee! ddf
dHdIZ1edCgdDdd		dmdGee! dCee! ddfdJdKZ2de!de#d9e!d:e#d2e!ddfdLdMZ3	dpdNee
 fdOdPZ4dldQdRZ5	dpdSed(ee! ddfdTdUZ6	dpdVe#dWee	 ddfdXdYZ7dZe#ddfd[d\Z8d]d^d_d^dWeddfd`daZ9dee fdbdcZ:dedd fdedfZ;dldgdhZ<de%fdidjZ=dS )qaT  
Simple, clean API for recording observability metrics.

This module provides a straightforward interface for Redis core code to record
metrics without needing to know about OpenTelemetry internals.

Usage in Redis core code:
    from redis.observability.recorder import record_operation_duration

    start_time = time.monotonic()
    # ... execute Redis command ...
    record_operation_duration(
        command_name='SET',
        duration_seconds=time.monotonic() - start_time,
        server_address='localhost',
        server_port=6379,
        db_namespace='0',
        error=None
    )
    )datetime)TYPE_CHECKINGCallableListOptional)AttributeBuilder	CSCReason	CSCResultGeoFailoverReasonPubSubDirection)CloseReasonRedisMetricsCollector)get_observability_instance)!get_observables_registry_instance)deprecated_argsstr_if_bytes)ConnectionPoolInterface)SyncDatabase)
OTelConfigN_metrics_collectorZconnection_countZ	csc_items
batch_sizezXThe batch_size argument is no longer used and will be removed in the next major version.z7.2.1)Zargs_to_warnreasonversioncommand_nameduration_secondsserver_addressserver_portdb_namespaceerroris_blockingretry_attemptsreturnc	           	      C   sT   t du rt a t du rdS zt j| |||||||||d
 W dS  ty)   Y dS w )a  
    Record a Redis command execution duration.

    This is a simple, clean API that Redis core code can call directly.
    If observability is not enabled, this returns immediately with zero overhead.

    Args:
        command_name: Redis command name (e.g., 'GET', 'SET')
        duration_seconds: Command execution time in seconds
        server_address: Redis server address
        server_port: Redis server port
        db_namespace: Redis database index
        error: Exception if command failed, None if successful
        is_blocking: Whether the operation is a blocking command
        batch_size: Number of commands in batch (for pipelines/transactions)
        retry_attempts: Number of retry attempts made

    Example:
        >>> start = time.monotonic()
        >>> # ... execute command ...
        >>> record_operation_duration('SET', time.monotonic() - start, 'localhost', 6379, '0')
    N)
r   r   r   r   r   
error_typenetwork_peer_addressnetwork_peer_portr   r    )r   _get_or_create_collectorrecord_operation_duration	Exception)	r   r   r   r   r   r   r   r   r     r(   S/home/app/Keep/.python/lib/python3.10/site-packages/redis/observability/recorder.pyr&   1   s(   )r&   connection_poolr   c                 C   D   t du rt a t du rdS z
t j| |d W dS  ty!   Y dS w )as  
    Record connection creation time.

    Args:
        connection_pool: Connection pool implementation
        duration_seconds: Time taken to create connection in seconds

    Example:
        >>> start = time.monotonic()
        >>> # ... create connection ...
        >>> record_connection_create_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
    Nr*   r   )r   r%   record_connection_create_timer'   r,   r(   r(   r)   r-   s   s   r-   c                  C   J   t du rt a t du rdS dd } z	t j| d W dS  ty$   Y dS w )zB
    Initialize observable gauge for connection count metric.
    Nc                 S   .   t  }|t}g }|D ]}||  q|S N)r   getCONNECTION_COUNT_REGISTRY_KEYextend__observables_registry	callbacksobservationscallbackr(   r(   r)   observable_callback      
z2init_connection_count.<locals>.observable_callbackr9   )r   r%   init_connection_countr'   r:   r(   r(   r)   r=         
r=   connection_poolsc                    sb   t du rt a t du rdS zddlm   fdd}t }|t| W dS  ty0   Y dS w )zG
    Add connection pools to connection count observable registry.
    Nr   Observationc                     s6   g } D ]}|  D ]\}}|  ||d q
q| S )N
attributes)Zget_connection_countappend)r8   r*   countrD   rB   r@   r(   r)   connection_count_callback   s   zBregister_pools_connection_count.<locals>.connection_count_callback)r   r%   opentelemetry.metricsrB   r   registerr2   r'   )r@   rH   r6   r(   rG   r)   register_pools_connection_count   s   
rK   	pool_namec                 C   B   t du rt a t du rdS z	t j| d W dS  ty    Y dS w )z
    Record a connection timeout event.

    Args:
        pool_name: Connection pool identifier

    Example:
        >>> record_connection_timeout('ConnectionPool<localhost:6379>')
    NrL   )r   r%   record_connection_timeoutr'   rN   r(   r(   r)   rO         rO   c                 C   r+   )at  
    Record time taken to obtain a connection from the pool.

    Args:
        pool_name: Connection pool identifier
        duration_seconds: Wait time in seconds

    Example:
        >>> start = time.monotonic()
        >>> # ... wait for connection from pool ...
        >>> record_connection_wait_time('ConnectionPool<localhost:6379>', time.monotonic() - start)
    NrL   r   )r   r%   record_connection_wait_timer'   rQ   r(   r(   r)   rR      s   rR   close_reasonr"   c                 C   r+   )a  
    Record a connection closed event.

    Args:
        close_reason: Reason for closing (e.g. 'error', 'application_close')
        error_type: Error type if closed due to error

    Example:
        >>> record_connection_closed('ConnectionPool<localhost:6379>', 'idle_timeout')
    NrS   r"   )r   r%   record_connection_closedr'   rT   r(   r(   r)   rU     s   rU   connection_namemaint_notificationrelaxedc                 C   F   t du rt a t du rdS zt j| ||d W dS  ty"   Y dS w )ab  
    Record a connection timeout relaxation event.

    Args:
        connection_name: Connection identifier
        maint_notification: Maintenance notification type
        relaxed: True to count up (relaxed), False to count down (unrelaxed)

    Example:
        >>> record_connection_relaxed_timeout('Connection<localhost:6379>', 'MOVING', True)
    NrV   rW   rX   )r   r%   !record_connection_relaxed_timeoutr'   rZ   r(   r(   r)   r[   *  s   r[   c                 C   rM   )z
    Record a connection handoff event (e.g., after MOVING notification).

    Args:
        pool_name: Connection pool identifier

    Example:
        >>> record_connection_handoff('ConnectionPool<localhost:6379>')
    NrN   )r   r%   record_connection_handoffr'   rN   r(   r(   r)   r\   K  rP   r\   Tr#   r$   is_internalc              	   C   sN   t du rt a t du rdS zt j| ||||||d W dS  ty&   Y dS w )a  
    Record error count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        error_type: Error type (Exception)
        retry_attempts: Retry attempts
        is_internal: Whether the error is internal (e.g., timeout, network error)

    Example:
        >>> record_error_count('localhost', 6379, 'localhost', 6379, ConnectionError(), 3)
    Nr   r   r#   r$   r"   r    r]   )r   r%   record_error_countr'   r^   r(   r(   r)   r_   f  s"   	r_   	directionchannelshardedc                 C   sj   t du rt a t du rdS |}|durt }|dur|jrd}zt j| ||d W dS  ty4   Y dS w )a5  
    Record a PubSub message (published or received).

    Args:
        direction: Message direction ('publish' or 'receive')
        channel: Pub/Sub channel name
        sharded: True if sharded Pub/Sub channel

    Example:
        >>> record_pubsub_message(PubSubDirection.PUBLISH, 'channel', False)
    N)r`   ra   rb   )r   r%   _get_configZhide_pubsub_channel_namesrecord_pubsub_messager'   )r`   ra   rb   Zeffective_channelconfigr(   r(   r)   rd     s$   rd   consumer_namez[The consumer_name argument is no longer used and will be removed in the next major version.lag_secondsstream_nameconsumer_groupc                 C   sj   t du rt a t du rdS |}|durt }|dur|jrd}zt j| ||d W dS  ty4   Y dS w )z
    Record the lag of a streaming message.

    Args:
        lag_seconds: Lag in seconds
        stream_name: Stream name
        consumer_group: Consumer group name
        consumer_name: Consumer name
    Nrg   rh   ri   )r   r%   rc   hide_stream_namesrecord_streaming_lagr'   )rg   rh   ri   rf   effective_stream_namere   r(   r(   r)   rl     s$   rl   c                 C   s\  t du rt a t du rdS | sdS zt  }t }|duo!|j}t| trg| 	 D ]8\}}|r3dnt
|}|D ])}	|	D ]$}
|
\}}t
|}|d\}}td|t|d  }t j|||d q=q9q+W dS | D ]7}t
|d }|rudn|}|d D ]$}
|
\}}t
|}|d\}}td|t|d  }t j|||d q{qiW dS  ty   Y dS w )aQ  
    Record streaming lag from XREAD/XREADGROUP response.

    Parses the response and calculates lag for each message based on message ID timestamp.

    Args:
        response: Response from XREAD/XREADGROUP command
        consumer_group: Consumer group name (for XREADGROUP)
        consumer_name: Consumer name (for XREADGROUP)
    N-g        i  rj   r      )r   r%   r   now	timestamprc   rk   
isinstancedictitemsr   splitmaxintrl   r'   )responseri   rf   rp   re   rk   rh   Zstream_messagesrm   messagesmessageZ
message_id_rq   rg   Zstream_entryr(   r(   r)   "record_streaming_lag_from_response  s\   
r|   c                 C   sJ   t du rt a t du rdS zt j| ||||d W dS  ty$   Y dS w )a  
    Record a maintenance notification count.

    Args:
        server_address: Server address
        server_port: Server port
        network_peer_address: Network peer address
        network_peer_port: Network peer port
        maint_notification: Maintenance notification type (e.g., 'MOVING', 'MIGRATING')

    Example:
        >>> record_maint_notification_count('localhost', 6379, 'localhost', 6379, 'MOVING')
    Nr   r   r#   r$   rW   )r   r%   record_maint_notification_countr'   r}   r(   r(   r)   r~   6  s   r~   resultc                 C   rM   )zm
    Record a Client Side Caching (CSC) request.

    Args:
        result: CSC result ('hit' or 'miss')
    Nr   )r   r%   record_csc_requestr'   r   r(   r(   r)   r   ]     r   c                  C   r.   )z;
    Initialize observable gauge for CSC items metric.
    Nc                 S   r/   r0   )r   r1   CSC_ITEMS_REGISTRY_KEYr3   r4   r(   r(   r)   r:     r;   z+init_csc_items.<locals>.observable_callbackr<   )r   r%   init_csc_itemsr'   r>   r(   r(   r)   r   u  r?   r   r9   c                    sd   t du rt a t du rdS ddlm   fdd}zt }|t| W dS  ty1   Y dS w )z
    Adds given callback to CSC items observable registry.

    Args:
        callback: Callback function that returns the cache size
        pool_name: Connection pool name for observability
    Nr   rA   c                      s     t jddgS )NrN   rC   )r   Zbuild_csc_attributesr(   rB   r9   rL   r(   r)   csc_items_callback  s
   
z7register_csc_items_callback.<locals>.csc_items_callback)r   r%   rI   rB   r   rJ   r   r'   )r9   rL   r   r6   r(   r   r)   register_csc_items_callback  s   r   rF   r   c                 C   r+   )z
    Record a Client Side Caching (CSC) eviction.

    Args:
        count: Number of evictions
        reason: Reason for eviction
    NrF   r   )r   r%   record_csc_evictionr'   r   r(   r(   r)   r     s   r   bytes_savedc                 C   rM   )z
    Record the number of bytes saved by using Client Side Caching (CSC).

    Args:
        bytes_saved: Number of bytes saved
    Nr   )r   r%   record_csc_network_savedr'   r   r(   r(   r)   r     r   r   	fail_fromr   fail_toc                 C   rY   )z
    Record a geo failover.

    Args:
        fail_from: Database failed from
        fail_to: Database failed to
        reason: Reason for the failover
    Nr   r   r   )r   r%   record_geo_failoverr'   r   r(   r(   r)   r     s   r   c                  C   sj   z!t   } | du s| jjsW dS |  tjtj}t|| jW S  t	y+   Y dS  t
y4   Y dS w )z
    Get or create the global metrics collector.

    Returns:
        RedisMetricsCollector instance if observability is enabled, None otherwise
    N)r   get_provider_managerre   Zenabled_telemetryZget_meter_providerZ	get_meterr   Z
METER_NAMEZMETER_VERSIONImportErrorr'   )managerZmeterr(   r(   r)   r%     s   
r%   r   c                  C   s6   zt   } | du rW dS | jW S  ty   Y dS w )z
    Get the OTel configuration from the observability manager.

    Returns:
        OTelConfig instance if observability is enabled, None otherwise
    N)r   r   re   r'   )r   r(   r(   r)   rc   "  s   
rc   c                   C   s   da dS )zM
    Reset the global collector (used for testing or re-initialization).
    N)r   r(   r(   r(   r)   reset_collector2  s   r   c                   C   s   t du rt a t duS )zw
    Check if observability is enabled.

    Returns:
        True if metrics are being collected, False otherwise
    N)r   r%   r(   r(   r(   r)   
is_enabled:  s   	r   )NNNNNNN)r!   N)NN)NNNNNNT)NNNr0   )>__doc__r   typingr   r   r   r   Zredis.observability.attributesr   r   r	   r
   r   Zredis.observability.metricsr   r   Zredis.observability.providersr   Zredis.observability.registryr   Zredis.utilsr   r   Zredis.connectionr   Zredis.multidb.databaser   Zredis.observability.configr   r   __annotations__r2   r   strfloatrw   r'   boolr&   r-   r=   rK   rO   rR   rU   r[   r\   r_   rd   rl   r|   r~   r   r   r   r   r   r   r%   rc   r   r   r(   r(   r(   r)   <module>   s   	
=

!
 

!

!

/
('J
(


&



