o
    iU                     @   sv  d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZmZmZm Z  d dl!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 e 4e5Z6e3G dd deeZ7defddZ8G dd deeZ9G dd dZ:dS )    N)as_completed)ThreadPoolExecutor)AnyCallableListOptional)BackgroundScheduler)	NoBackoff)PubSubWorkerThread)CoreCommandsRedisModuleCommands)MaintNotificationsConfig)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)Database	DatabasesSyncDatabase)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)HealthCheckHealthCheckPolicy)GeoFailoverReason)Retry)experimentalc                   @   s(  e Zd ZdZdefddZdd Zdefdd	Zd
e	ddfddZ
	d8dedefddZde	de	fddZd
efddZd
e	defddZdefddZdefdd Zd!d" Zd#d$ Zd%ed&gdf fd'd(Zd)d* Zd
e	defd+d,Zdeeef fd-d.Zd/d0 Zd1e d2e!d3e!fd4d5Z"d6d7 Z#dS )9MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc              
   C   s   |  | _|js| n|j| _|j| _|j|j	|j
| _|js%| n|j| _|jd u r2| n|j| _| j| j |j| _|j| _|j| _| jtf t| j| j| j| j|j|j| j| jd| _d| _t ! | _"t# | _$|| _%d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)&r$   
_databasesZhealth_checksZdefault_health_checks_health_checksZhealth_check_interval_health_check_intervalZhealth_check_policyvalueZhealth_check_probesZhealth_check_probes_delay_health_check_policyr#   Zdefault_failure_detectors_failure_detectorsr&   Zdefault_failover_strategyZ_failover_strategyZset_databasesr*   Z_auto_fallback_intervalr)   Z_event_dispatcherr%   Z_command_retryZupdate_supported_errorsConnectionRefusedErrorr   r'   r(   command_executorinitialized	threadingRLock_hc_lockr   _bg_scheduler_config)selfr"    r:   K/home/app/Keep/.python/lib/python3.10/site-packages/redis/multidb/client.py__init__+   sH   






zMultiDBClient.__init__c                 C   sp   |    | j| j| j d}| jD ]\}}|j| j |jj	t
jkr,|s,|| j_d}q|s3tdd| _dS )zT
        Perform initialization of databases to define their initial state.
        FTz4Initial connection failed - no active database foundN)_perform_initial_health_checkr7   Zrun_recurringr-   _check_databases_healthr+   circuitZon_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr2   Z_active_databaser   r3   )r9   Zis_active_db_founddatabaseweightr:   r:   r;   
initializeU   s"   
zMultiDBClient.initializereturnc                 C   s   | j S )zE
        Returns a sorted (by weight) list of all databases.
        )r+   r9   r:   r:   r;   get_databasesw   s   zMultiDBClient.get_databasesrD   Nc                 C   sv   d}| j D ]\}}||krd} nq|std| | |jjtjkr7| j dd \}}|tj	f| j
_dS td)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r+   
ValueError_check_db_healthr?   rA   rB   rC   	get_top_nr   MANUALr2   active_databaser   )r9   rD   existsexisting_db_highest_weighted_dbr:   r:   r;   set_active_database}   s$   
z!MultiDBClient.set_active_databaseTskip_initial_health_checkc                 C   s  t dt d|jd< d|jvrtdd|jd< |jr(| jjj|jfi |j}n"|jr@|jt dt d | jjj|jd}n
| jjdi |j}|j	du rS|
 n|j	}t|||j|jd	}z| | W n tys   |sq Y nw | jd
d \}}| j||j | || dS )z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        r   )retriesbackoffretryZmaint_notifications_configF)enabled)Zconnection_poolN)clientr?   rE   health_check_urlrK   r:   )r   r	   Zclient_kwargsr   Zfrom_urlr8   Zclient_classZ	from_poolZ	set_retryr?   Zdefault_circuit_breakerr   rE   r\   rM   r   r+   rN   add_change_active_database)r9   r"   rV   r[   r?   rD   rT   highest_weightr:   r:   r;   add_database   sH   

zMultiDBClient.add_databasenew_databasehighest_weight_databasec                 C   s4   |j |j kr|jjtjkr|tjf| j_d S d S d S N)	rE   r?   rA   rB   rC   r   Z	AUTOMATICr2   rP   )r9   ra   rb   r:   r:   r;   r^      s   z%MultiDBClient._change_active_databasec                 C   sP   | j |}| j dd \}}||kr$|jjtjkr&|tjf| j	_
dS dS dS )z<
        Removes a database from the database list.
        rK   r   N)r+   removerN   r?   rA   rB   rC   r   rO   r2   rP   )r9   rD   rE   rT   r_   r:   r:   r;   remove_database   s   zMultiDBClient.remove_databaserE   c                 C   sh   d}| j D ]\}}||krd} nq|std| j dd \}}| j || ||_| || dS )z<
        Updates a database from the database list.
        NTrJ   rK   r   )r+   rL   rN   Zupdate_weightrE   r^   )r9   rD   rE   rQ   rR   rS   rT   r_   r:   r:   r;   update_database_weight   s   z$MultiDBClient.update_database_weightfailure_detectorc                 C   s   | j | dS )z>
        Adds a new failure detector to the database.
        N)r0   append)r9   rg   r:   r:   r;   add_failure_detector   s   z"MultiDBClient.add_failure_detectorhealthcheckc                 C   s8   | j  | j| W d   dS 1 sw   Y  dS )z:
        Adds a new health check to the database.
        N)r6   r,   rh   )r9   rj   r:   r:   r;   add_health_check  s   "zMultiDBClient.add_health_checkc                 O   s    | j s|   | jj|i |S )zB
        Executes a single command and return its result.
        )r3   rF   r2   execute_commandr9   argsoptionsr:   r:   r;   rl     s   zMultiDBClient.execute_commandc                 C   s   t | S )z:
        Enters into pipeline mode of the client.
        )PipelinerH   r:   r:   r;   pipeline  s   zMultiDBClient.pipelinefuncrp   c                 O   s&   | j s|   | jj|g||R  S )z3
        Executes callable as transaction.
        )r3   rF   r2   Zexecute_transaction)r9   rr   Zwatchesro   r:   r:   r;   transaction  s   zMultiDBClient.transactionc                 K   s   | j s|   t| fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )r3   rF   PubSub)r9   kwargsr:   r:   r;   pubsub%  s   zMultiDBClient.pubsubc                 C   sP   | j | j|}|s|jjtjkrtj|j_|S |r&|jjtjkr&tj|j_|S )zO
        Runs health checks on the given database until first failure.
        )r/   executer,   r?   rA   rB   OPENrC   )r9   rD   
is_healthyr:   r:   r;   rM   0  s   

zMultiDBClient._check_db_healthc                    s   t tjd`  fddjD }i }z>t|jdD ]4}z|| }| ||< W q tyR } z|j}tj	|j
_tjd|jd d||< W Y d}~qd}~ww W n ty_   td	w W d   |S 1 skw   Y  |S )
zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        )max_workersc                    s    i | ]\}}  j||qS r:   )submitrM   ).0rD   rS   executorr9   r:   r;   
<dictcomp>G  s    z9MultiDBClient._check_databases_health.<locals>.<dictcomp>)timeoutz%Health check failed, due to exception)exc_infoFNz4Health check execution exceeds health_check_interval)r   lenr+   r   r-   resultr   rD   rB   rx   r?   rA   loggerdebugZoriginal_exceptionTimeoutError)r9   futuresresultsfuturerD   eZunhealthy_dbr:   r}   r;   r>   @  sB   


z%MultiDBClient._check_databases_healthc                 C   s   |   }d}| jjtjkrd| v}n!| jjtjkr(t| t|d k}n| jjtj	kr5d| v }|s@t
d| jj dS )zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        TF   z:Initial health check failed. Initial health check policy: N)r>   r8   Zinitial_health_check_policyr   ZALL_AVAILABLEvaluesZMAJORITY_AVAILABLEsumr   ZONE_AVAILABLEr   )r9   r   ry   r:   r:   r;   r=   e  s   z+MultiDBClient._perform_initial_health_checkr?   	old_state	new_statec                 C   s   |t jkr| |j d S |t jkr)|t jkr)td|j d | j	t
t| |t jkr?|t jkrAtd|j d d S d S d S )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rB   	HALF_OPENrM   rD   rC   rx   r   warningr7   Zrun_oncer   _half_open_circuitinfo)r9   r?   r   r   r:   r:   r;   r@   }  s   
z/MultiDBClient._on_circuit_state_change_callbackc                 C   s.   | j r| j   | jjr| jjj  dS dS )z:
        Closes the client and all its resources.
        N)r7   stopr2   rP   r[   closerH   r:   r:   r;   r     s
   
zMultiDBClient.close)T)$__name__
__module____qualname____doc__r   r<   rF   r   rI   r   rU   r   boolr`   r^   r   re   floatrf   r   ri   r   rk   rl   rq   r   rs   rv   rM   dictr>   r=   r   rB   r@   r   r:   r:   r:   r;   r!   $   sH    *"
8
		%
r!   r?   c                 C   s   t j| _d S rc   )rB   r   rA   )r?   r:   r:   r;   r        r   c                   @   s   e Zd ZdZdefddZdddZdd	 Zd
d Zde	fddZ
defddZdddZdddZdddZdd Zdee fddZdS )rp   zG
    Pipeline implementation for multiple logical Redis databases.
    r[   c                 C   s   g | _ || _d S rc   )_command_stack_client)r9   r[   r:   r:   r;   r<     s   
zPipeline.__init__rG   c                 C      | S rc   r:   rH   r:   r:   r;   	__enter__     zPipeline.__enter__c                 C      |    d S rc   reset)r9   exc_type	exc_value	tracebackr:   r:   r;   __exit__  r   zPipeline.__exit__c                 C   $   z|    W d S  ty   Y d S w rc   r   	ExceptionrH   r:   r:   r;   __del__  s
   zPipeline.__del__c                 C   s
   t | jS rc   )r   r   rH   r:   r:   r;   __len__     
zPipeline.__len__c                 C   s   dS )z1Pipeline instances should always evaluate to TrueTr:   rH   r:   r:   r;   __bool__  s   zPipeline.__bool__Nc                 C   s
   g | _ d S rc   )r   rH   r:   r:   r;   r     r   zPipeline.resetc                 C   s   |    dS )zClose the pipelineNr   rH   r:   r:   r;   r        zPipeline.closec                 O   s   | j ||f | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   rh   rm   r:   r:   r;   pipeline_execute_command  s   z!Pipeline.pipeline_execute_commandc                 O   s   | j |i |S )zAdds a command to the stack)r   r9   rn   ru   r:   r:   r;   rl     s   zPipeline.execute_commandc                 C   s<   | j js	| j   z| j jt| jW |   S |   w )z0Execute all the commands in the current pipeline)r   r3   rF   r2   Zexecute_pipelinetupler   r   rH   r:   r:   r;   rw     s   
zPipeline.execute)rG   rp   rG   N)r   r   r   r   r!   r<   r   r   r   intr   r   r   r   r   r   rl   r   r   rw   r:   r:   r:   r;   rp     s    



rp   c                   @   s   e Zd ZdZdefddZd.ddZd/d	d
Zd/ddZd/ddZ	e
defddZdd Zdd Zdd Zdd Zdd Zdd Zdd Z	 d0d!ed"efd#d$Z	 d0d!ed"efd%d&Z	 			d1d'ed(ed)ee d*edd+f
d,d-ZdS )2rt   z2
    PubSub object for multi database client.
    r[   c                 K   s   || _ | j jjdi | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr:   )r   r2   rv   )r9   r[   ru   r:   r:   r;   r<     s   zPubSub.__init__rG   c                 C   r   rc   r:   rH   r:   r:   r;   r     r   zPubSub.__enter__Nc                 C   r   rc   r   rH   r:   r:   r;   r     s
   zPubSub.__del__c                 C   s   | j jdS )Nr   r   r2   Zexecute_pubsub_methodrH   r:   r:   r;   r     s   zPubSub.resetc                 C   r   rc   r   rH   r:   r:   r;   r      r   zPubSub.closec                 C   s   | j jjjS rc   )r   r2   Zactive_pubsub
subscribedrH   r:   r:   r;   r     r   zPubSub.subscribedc                 G      | j jjdg|R  S )Nrl   r   r9   rn   r:   r:   r;   rl     s
   zPubSub.execute_commandc                 O      | j jjdg|R i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber   r   r:   r:   r;   r        zPubSub.psubscribec                 G   r   )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber   r   r:   r:   r;   r     
   zPubSub.punsubscribec                 O   r   )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber   r   r:   r:   r;   r   !  r   zPubSub.subscribec                 G   r   )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber   r   r:   r:   r;   r   -  s   zPubSub.unsubscribec                 O   r   )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber   r   r:   r:   r;   r   4  r   zPubSub.ssubscribec                 G   r   )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber   r   r:   r:   r;   r   @  r   zPubSub.sunsubscribeF        ignore_subscribe_messagesr   c                 C      | j jjd||dS )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager   r   r   r9   r   r   r:   r:   r;   r   I  
   
zPubSub.get_messagec                 C   r   )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager   r   r   r:   r:   r;   r   Y  r   zPubSub.get_sharded_message
sleep_timedaemonexception_handlersharded_pubsubr
   c                 C   s   | j jj|||| |dS )N)r   r   rv   r   )r   r2   Zexecute_pubsub_run)r9   r   r   r   r   r:   r:   r;   run_in_threadi  s   zPubSub.run_in_thread)rG   rt   r   )Fr   )r   FNF)r   r   r   r   r!   r<   r   r   r   r   propertyr   r   rl   r   r   r   r   r   r   r   r   r   r   r   r   r:   r:   r:   r;   rt     sV    


	
	


rt   );loggingr4   concurrent.futuresr   Zconcurrent.futures.threadr   typingr   r   r   r   Zredis.backgroundr   Zredis.backoffr	   Zredis.clientr
   Zredis.commandsr   r   Zredis.maint_notificationsr   Zredis.multidb.circuitr   r   rB   Zredis.multidb.command_executorr   Zredis.multidb.configr   r   r   r   Zredis.multidb.databaser   r   r   Zredis.multidb.exceptionr   r   r   Zredis.multidb.failure_detectorr   Zredis.multidb.healthcheckr   r   Zredis.observability.attributesr   Zredis.retryr   Zredis.utilsr    	getLoggerr   r   r!   r   rp   rt   r:   r:   r:   r;   <module>   s:    
  wC