o
    iT                     @   sb  d dl Z d dlZd dlmZmZmZmZmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl$m&Z' d dl(m)Z)m*Z*m+Z+ d dl,m-Z-m.Z.m/Z/ d dl0m1Z1 e2e3Z4e1G dd de#e"Z5de%fddZ6G dd de#e"Z7G dd dZ8dS )    N)Any	AwaitableCallableListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)AsyncDatabaseDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)Retry)BackgroundScheduler)	NoBackoff)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)ChannelT
EncodableTKeyT)experimentalc                   @   sh  e Zd ZdZdefddZdCddZd	d
 Zdd Zde	fddZ
deddfddZ	dDdedefddZdedefddZdefddZdedefddZd efd!d"Zd#efd$d%Zd&d' Zd(d) Zdd*dd+d,ed-geeee f f d.ed/ee  d0ed1ee f
d2d3Z!d4d5 Z"de#e$ef fd6d7Z%d8d9 Z&dedefd:d;Z'd<e(d=e)d>e)fd?d@Z*dAdB Z+dS )EMultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc              
   C   s   |  | _|js| n|j| _|j| _|j|j	|j
| _|js%| n|j| _|jd u r2| n|j| _| j| j |j| _|j| _|j| _| jtg t| j| j| j| j|j|j| j| jd| _d| _t ! | _"t# | _$|| _%d | _&g | _'d | _(d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF))r%   
_databasesZhealth_checksZdefault_health_checks_health_checksZhealth_check_interval_health_check_intervalZhealth_check_policyvalueZhealth_check_probesZhealth_check_delay_health_check_policyr$   Zdefault_failure_detectors_failure_detectorsr'   Zdefault_failover_strategyZ_failover_strategyZset_databasesr+   Z_auto_fallback_intervalr*   Z_event_dispatcherr&   Z_command_retryZupdate_supported_errorsConnectionRefusedErrorr	   r(   r)   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr#    r>   S/home/app/Keep/.python/lib/python3.10/site-packages/redis/asyncio/multidb/client.py__init__(   sN   






zMultiDBClient.__init__r=   returnc                    s   | j s|  I d H  | S N)r4   
initializer=   r>   r>   r?   
__aenter__V   s   zMultiDBClient.__aenter__c                    s:   | j r	| j   | jr| j  | jD ]}|  qd S rB   )r:   cancelr<   r;   )r=   exc_type	exc_value	tracebackZhc_taskr>   r>   r?   	__aexit__[   s   



zMultiDBClient.__aexit__c                    s   |   I dH  t| j| j| j| _d}| jD ]\}}|j	
| j |j	jtjkr4|s4|| j_d}q|s;tdd| _dS )zT
        Perform initialization of databases to define their initial state.
        NFTz4Initial connection failed - no active database found)_perform_initial_health_checkr5   create_taskr8   Zrun_recurring_asyncr.   _check_databases_healthr:   r,   circuitZon_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr3   Z_active_databaser   r4   )r=   Zis_active_db_founddatabaseweightr>   r>   r?   rC   c   s(   
zMultiDBClient.initializec                 C   s   | j S )zE
        Returns a sorted (by weight) list of all databases.
        )r,   rD   r>   r>   r?   get_databases   s   zMultiDBClient.get_databasesrS   Nc                    s   d}| j D ]\}}||krd} nq|std| |I dH  |jjtjkr=| j dd \}}| j	|I dH  dS t
d)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r,   
ValueError_check_db_healthrN   rP   rQ   rR   	get_top_nr3   set_active_databaser   )r=   rS   existsexisting_db_highest_weighted_dbr>   r>   r?   r[      s"   z!MultiDBClient.set_active_databaseTskip_initial_health_checkc                    s  |j dtdt di |jr| jjj|jfi |j }n"|jr7|jtdt d | jjj|jd}n
| jjdi |j }|j	du rJ|
 n|j	}t|||j|jd}z
| |I dH  W n tym   |sk Y nw | jdd \}}| j||j | ||I dH  dS )	z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        retryr   )retriesbackoff)Zconnection_poolN)clientrN   rT   health_check_urlrW   r>   )Zclient_kwargsupdater   r   Zfrom_urlr9   Zclient_classZ	from_poolZ	set_retryrN   Zdefault_circuit_breakerr   rT   re   rY   r   r,   rZ   add_change_active_database)r=   r#   r`   rd   rN   rS   r_   highest_weightr>   r>   r?   add_database   sD   
zMultiDBClient.add_databasenew_databasehighest_weight_databasec                    s:   |j |j kr|jjtjkr| j|I d H  d S d S d S rB   )rT   rN   rP   rQ   rR   r3   r[   )r=   rk   rl   r>   r>   r?   rh      s   z%MultiDBClient._change_active_databasec                    sV   | j |}| j dd \}}||kr'|jjtjkr)| j|I dH  dS dS dS )z<
        Removes a database from the database list.
        rW   r   N)	r,   removerZ   rN   rP   rQ   rR   r3   r[   )r=   rS   rT   r_   ri   r>   r>   r?   remove_database   s   zMultiDBClient.remove_databaserT   c                    sp   d}| j D ]\}}||krd} nq|std| j dd \}}| j || ||_| ||I dH  dS )z<
        Updates a database from the database list.
        NTrV   rW   r   )r,   rX   rZ   Zupdate_weightrT   rh   )r=   rS   rT   r\   r]   r^   r_   ri   r>   r>   r?   update_database_weight   s   z$MultiDBClient.update_database_weightfailure_detectorc                 C   s   | j | dS )z>
        Adds a new failure detector to the database.
        N)r1   append)r=   rp   r>   r>   r?   add_failure_detector   s   z"MultiDBClient.add_failure_detectorhealthcheckc              	      sN   | j 4 I dH  | j| W d  I dH  dS 1 I dH s w   Y  dS )z:
        Adds a new health check to the database.
        N)r7   r-   rq   )r=   rs   r>   r>   r?   add_health_check  s   .zMultiDBClient.add_health_checkc                    s.   | j s|  I dH  | jj|i |I dH S )zB
        Executes a single command and return its result.
        N)r4   rC   r3   execute_commandr=   argsoptionsr>   r>   r?   ru     s   zMultiDBClient.execute_commandc                 C   s   t | S )z:
        Enters into pipeline mode of the client.
        )PipelinerD   r>   r>   r?   pipeline  s   zMultiDBClient.pipelineF
shard_hintvalue_from_callablewatch_delayfuncry   watchesr|   r}   r~   c                   s:   | j s|  I dH  | jj|g|R |||dI dH S )z3
        Executes callable as transaction.
        Nr{   )r4   rC   r3   Zexecute_transaction)r=   r   r|   r}   r~   r   r>   r>   r?   transaction  s   zMultiDBClient.transactionc                    s&   | j s|  I dH  t| fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)r4   rC   PubSub)r=   kwargsr>   r>   r?   pubsub2  s   zMultiDBClient.pubsubc                    s   z3i  g | _ | jD ]\}}t| |}| |< | j | q
tjtj| j ddi| jdI dH }W n tj	yA   t	dw  fddt
| j |D }| D ]'\}}t|trq|j}tj|j_tjd|jd	 d
||< qSt|trzd
||< qS|S )zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsT)timeoutNz4Health check execution exceeds health_check_intervalc                    s   i | ]	\}} | |qS r>   r>   ).0taskresultZ
task_to_dbr>   r?   
<dictcomp>U  s    z9MultiDBClient._check_databases_health.<locals>.<dictcomp>z%Health check failed, due to exception)exc_infoF)r;   r,   r5   rL   rY   rq   wait_forgatherr.   TimeoutErrorzipitems
isinstancer   rS   rQ   OPENrN   rP   loggerdebugZoriginal_exception	Exception)r=   rS   r^   r   resultsZ
db_resultsr   Zunhealthy_dbr>   r   r?   rM   =  sB   





z%MultiDBClient._check_databases_healthc                    s   |   I dH }d}| jjtjkrd| v}n!| jjtjkr,t| t|d k}n| jjtj	kr9d| v }|sDt
d| jj dS )zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )rM   r9   Zinitial_health_check_policyr   ZALL_AVAILABLEvaluesZMAJORITY_AVAILABLEsumlenZONE_AVAILABLEr   )r=   r   
is_healthyr>   r>   r?   rK   i  s    z+MultiDBClient._perform_initial_health_checkc                    sX   | j | j|I dH }|s|jjtjkrtj|j_|S |r*|jjtjkr*tj|j_|S )zO
        Runs health checks on the given database until first failure.
        N)r0   executer-   rN   rP   rQ   r   rR   )r=   rS   r   r>   r>   r?   rY     s   


zMultiDBClient._check_db_healthrN   	old_state	new_statec                 C   s   t  }|tjkrt | |j| _d S |tjkr0|tj	kr0t
d|j d |tt| |tjkrF|tjkrHt
d|j d d S d S d S )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)r5   get_running_looprQ   	HALF_OPENrL   rY   rS   r<   rR   r   r   warning
call_laterr
   _half_open_circuitinfo)r=   rN   r   r   loopr>   r>   r?   rO     s   

z/MultiDBClient._on_circuit_state_change_callbackc                    s&   | j jr| j jj I d H  d S d S rB   )r3   Zactive_databaserd   acloserD   r>   r>   r?   r     s   zMultiDBClient.aclose)r=   r"   rA   r"   )T),__name__
__module____qualname____doc__r   r@   rE   rJ   rC   r   rU   r   r[   r   boolrj   rh   rn   floatro   r   rr   r   rt   ru   rz   r   r   r   r   r    r   strr   r   dictr   rM   rK   rY   r   rQ   rO   r   r>   r>   r>   r?   r"   !   sf    
.$
1
		

,
r"   rN   c                 C   s   t j| _d S rB   )rQ   r   rP   )rN   r>   r>   r?   r        r   c                   @   s   e Zd ZdZdefddZdddZd	d
 Zdd Zdd Z	de
fddZdefddZdddZdddZd ddZdd Zdee fddZdS )!ry   zG
    Pipeline implementation for multiple logical Redis databases.
    rd   c                 C   s   g | _ || _d S rB   )_command_stack_client)r=   rd   r>   r>   r?   r@     s   
zPipeline.__init__r=   rA   c                       | S rB   r>   rD   r>   r>   r?   rE        zPipeline.__aenter__c                    s*   |   I d H  | j|||I d H  d S rB   )resetr   rJ   r=   rG   rH   rI   r>   r>   r?   rJ     s   zPipeline.__aexit__c                 C   s   |    S rB   )_async_self	__await__rD   r>   r>   r?   r     r   zPipeline.__await__c                    r   rB   r>   rD   r>   r>   r?   r     r   zPipeline._async_selfc                 C   s
   t | jS rB   )r   r   rD   r>   r>   r?   __len__  s   
zPipeline.__len__c                 C   s   dS )z1Pipeline instances should always evaluate to TrueTr>   rD   r>   r>   r?   __bool__  s   zPipeline.__bool__Nc                    s   g | _ d S rB   )r   rD   r>   r>   r?   r     s   
zPipeline.resetc                    s   |   I dH  dS )zClose the pipelineN)r   rD   r>   r>   r?   r     s   zPipeline.aclosec                 O   s   | j ||f | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   rq   rv   r>   r>   r?   pipeline_execute_command  s   z!Pipeline.pipeline_execute_commandc                 O   s   | j |i |S )zAdds a command to the stack)r   r=   rw   r   r>   r>   r?   ru     s   zPipeline.execute_commandc                    sV   | j js| j  I dH  z| j jt| jI dH W |  I dH  S |  I dH  w )z0Execute all the commands in the current pipelineN)r   r4   rC   r3   Zexecute_pipelinetupler   r   rD   r>   r>   r?   r     s   
 zPipeline.execute)r=   ry   rA   ry   rA   N)rA   ry   )r   r   r   r   r"   r@   rE   rJ   r   r   intr   r   r   r   r   r   ru   r   r   r   r>   r>   r>   r?   ry     s    



ry   c                   @   s   e Zd ZdZdefddZd&ddZd'd	d
Zdd Ze	de
fddZdefddZdedefddZdefddZdedefddZdd Z	d(de
dee fdd Zdd!d"d#eddfd$d%ZdS ))r   z2
    PubSub object for multi database client.
    rd   c                 K   s   || _ | j jjdi | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr>   )r   r3   r   )r=   rd   r   r>   r>   r?   r@     s   zPubSub.__init__rA   c                    r   rB   r>   rD   r>   r>   r?   rE     r   zPubSub.__aenter__Nc                    s   |   I d H  d S rB   )r   r   r>   r>   r?   rJ     s   zPubSub.__aexit__c                    s   | j jdI d H S )Nr   r   r3   Zexecute_pubsub_methodrD   r>   r>   r?   r   
  s   zPubSub.aclosec                 C   s   | j jjjS rB   )r   r3   Zactive_pubsub
subscribedrD   r>   r>   r?   r     s   zPubSub.subscribedrw   c                    s   | j jjdg|R  I d H S )Nru   r   r=   rw   r>   r>   r?   ru     s   zPubSub.execute_commandr   c                    $   | j jjdg|R i |I dH S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr   r   r>   r>   r?   r        zPubSub.psubscribec                       | j jjdg|R  I dH S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr   r   r>   r>   r?   r   "     zPubSub.punsubscribec                    r   )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr   r   r>   r>   r?   r   +  r   zPubSub.subscribec                    r   )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr   r   r>   r>   r?   r   7  r   zPubSub.unsubscribeF        ignore_subscribe_messagesr   c                    s   | j jjd||dI dH S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r   r   Nr   )r=   r   r   r>   r>   r?   r   @  s   
zPubSub.get_messageg      ?)exception_handlerpoll_timeoutr   c                   s   | j jj||| dI dH S )a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )Z
sleep_timer   r   N)r   r3   Zexecute_pubsub_run)r=   r   r   r>   r>   r?   runP  s   z
PubSub.run)rA   r   r   )Fr   )r   r   r   r   r"   r@   rE   rJ   r   propertyr   r   r   ru   r   r   r   r   r   r   r   r   r   r   r   r>   r>   r>   r?   r     s4    

	

r   )9r5   loggingtypingr   r   r   r   r   r   Zredis.asyncio.clientr   Z&redis.asyncio.multidb.command_executorr	   Zredis.asyncio.multidb.configr
   r   r   r   Zredis.asyncio.multidb.databaser   r   r   Z&redis.asyncio.multidb.failure_detectorr   Z!redis.asyncio.multidb.healthcheckr   r   Zredis.asyncio.retryr   Zredis.backgroundr   Zredis.backoffr   Zredis.commandsr   r   Zredis.multidb.circuitr   r   rQ   Zredis.multidb.exceptionr   r   r   Zredis.typingr   r   r    Zredis.utilsr!   	getLoggerr   r   r"   r   ry   r   r>   r>   r>   r?   <module>   s6     
   D