o
    i.                     @   s*  d dl mZ d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZmZmZ d d	lmZmZmZmZmZ d d
l m!Z! d dl"m#Z# d dl$m%Z%m&Z& d dl'm(Z) d dl*m+Z+m,Z, d dl-m.Z. d dl/m0Z0 G dd de,Z1G dd de+e1Z2dS )    )abstractmethod)iscoroutinefunction)datetime)Any	AwaitableCallableListOptionalUnion)RedisCluster)PipelinePubSub)AsyncDatabaseDatabase	Databases)AsyncActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYAsyncFailoverStrategyDefaultFailoverStrategyExecutorFailoverStrategyExecutor)AsyncFailureDetector)Retry)AsyncOnCommandsFailEventEventDispatcherInterface)State)BaseCommandExecutorCommandExecutor)DEFAULT_AUTO_FALLBACK_INTERVAL)KeyTc                   @   sR  e Zd ZeedefddZeedee fddZ	ededdfdd	Z
eedee fd
dZededdfddZeedee fddZejededdfddZeedefddZeedefddZedd Zedd ZedefddZedeegdf fdd Zed!efd"d#Zed$ede fd%d&Z!dS )'AsyncCommandExecutorreturnc                 C      dS )zReturns a list of databases.N selfr&   r&   ]/home/app/Keep/.python/lib/python3.10/site-packages/redis/asyncio/multidb/command_executor.py	databases       zAsyncCommandExecutor.databasesc                 C   r%   )z$Returns a list of failure detectors.Nr&   r'   r&   r&   r)   failure_detectors&   r+   z&AsyncCommandExecutor.failure_detectorsfailure_detectorNc                 C   r%   )z=Adds a new failure detector to the list of failure detectors.Nr&   r(   r-   r&   r&   r)   add_failure_detector,   s   z)AsyncCommandExecutor.add_failure_detectorc                 C   r%   )z"Returns currently active database.Nr&   r'   r&   r&   r)   active_database1   r+   z$AsyncCommandExecutor.active_databasedatabasec                       dS )zjSets the currently active database.

        Args:
            database: The new active database.
        Nr&   )r(   r1   r&   r&   r)   set_active_database7   s   z(AsyncCommandExecutor.set_active_databasec                 C   r%   )z Returns currently active pubsub.Nr&   r'   r&   r&   r)   active_pubsub@   r+   z"AsyncCommandExecutor.active_pubsubpubsubc                 C   r%   )zSets currently active pubsub.Nr&   r(   r5   r&   r&   r)   r4   F   r+   c                 C   r%   )z#Returns failover strategy executor.Nr&   r'   r&   r&   r)   failover_strategy_executorL   r+   z/AsyncCommandExecutor.failover_strategy_executorc                 C   r%   )zReturns command retry object.Nr&   r'   r&   r&   r)   command_retryR   r+   z"AsyncCommandExecutor.command_retryc                    r2   )z:Initializes a PubSub object on a currently active databaseNr&   r(   kwargsr&   r&   r)   r5   X      zAsyncCommandExecutor.pubsubc                    r2   )z*Executes a command and returns the result.Nr&   )r(   argsoptionsr&   r&   r)   execute_command]   r;   z$AsyncCommandExecutor.execute_commandcommand_stackc                    r2   )z)Executes a stack of commands in pipeline.Nr&   )r(   r?   r&   r&   r)   execute_pipelineb   r;   z%AsyncCommandExecutor.execute_pipelinetransactionc                    r2   )z1Executes a transaction block wrapped in callback.Nr&   )r(   rA   watchesr=   r&   r&   r)   execute_transactiong   s   z(AsyncCommandExecutor.execute_transactionmethod_namec                    r2   )z*Executes a given method on active pub/sub.Nr&   )r(   rD   r<   r:   r&   r&   r)   execute_pubsub_methodn   r;   z*AsyncCommandExecutor.execute_pubsub_method
sleep_timec                    r2   )z!Executes pub/sub run in a thread.Nr&   )r(   rF   r:   r&   r&   r)   execute_pubsub_runs   r;   z'AsyncCommandExecutor.execute_pubsub_run)"__name__
__module____qualname__propertyr   r   r*   r   r   r,   r/   r	   r   r0   r3   r   r4   setterr   r7   r   r8   r5   r>   tupler@   r   r   rC   strrE   floatr   rG   r&   r&   r&   r)   r#      sP    

r#   c                       s  e Zd Zeeefdee dede	de
dedededef fd	d
ZedefddZedee fddZdeddfddZedee fddZdeddfddZedee fddZejdeddfddZedefddZede	fdd Zd!d" Zd#d$ Zd%efd&d'Z dd(dd)d*e!d+ge"e#e$e# f f d,e%d-ee& d.e'd/ee f
d0d1Z(d2e&fd3d4Z)	dFd5ede#fd6d7Z*	8dGd9e!d:efd;d<Z+d=d> Z,d?d@ Z-dAefdBdCZ.dDdE Z/  Z0S )HDefaultCommandExecutorr,   r*   r8   failover_strategyevent_dispatcherfailover_attemptsfailover_delayauto_fallback_intervalc	           
         sn   t  | |D ]}	|	j| d q|| _|| _|| _t|||| _|| _d| _	d| _
i | _|   |   dS )a  
        Initialize the DefaultCommandExecutor instance.

        Args:
            failure_detectors: List of failure detector instances to monitor database health
            databases: Collection of available databases to execute commands on
            command_retry: Retry policy for failed command execution
            failover_strategy: Strategy for handling database failover
            event_dispatcher: Interface for dispatching events
            failover_attempts: Number of failover attempts
            failover_delay: Delay between failover attempts
            auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
        )Zcommand_executorN)super__init__Zset_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcher_schedule_next_fallback)
r(   r,   r*   r8   rQ   rR   rS   rT   rU   fd	__class__r&   r)   rW   z   s   zDefaultCommandExecutor.__init__r$   c                 C      | j S N)rX   r'   r&   r&   r)   r*         z DefaultCommandExecutor.databasesc                 C   re   rf   )rY   r'   r&   r&   r)   r,      rg   z(DefaultCommandExecutor.failure_detectorsr-   Nc                 C   s   | j | d S rf   )rY   appendr.   r&   r&   r)   r/      s   z+DefaultCommandExecutor.add_failure_detectorc                 C   re   rf   )r]   r'   r&   r&   r)   r0      rg   z&DefaultCommandExecutor.active_databaser1   c                    sP   | j }|| _ |d ur$||ur&| jt|| j | fi | jI d H  d S d S d S rf   )r]   r\   dispatch_asyncr   r_   )r(   r1   Z
old_activer&   r&   r)   r3      s   z*DefaultCommandExecutor.set_active_databasec                 C   re   rf   r^   r'   r&   r&   r)   r4      rg   z$DefaultCommandExecutor.active_pubsubr5   c                 C   s
   || _ d S rf   rj   r6   r&   r&   r)   r4      s   
c                 C   re   rf   )r[   r'   r&   r&   r)   r7      rg   z1DefaultCommandExecutor.failover_strategy_executorc                 C   re   rf   )rZ   r'   r&   r&   r)   r8      rg   z$DefaultCommandExecutor.command_retryc                 K   sD   | j d u r t| jjtrtd| jjjdi || _ || _d S d S )Nz(PubSub is not supported for RedisClusterr&   )r^   
isinstancer]   clientr   
ValueErrorr5   r_   r9   r&   r&   r)   r5      s   

zDefaultCommandExecutor.pubsubc                    s$    fdd} | I d H S )Nc                     s0   j jj i I d H }  I d H  | S rf   )r]   rl   r>   _register_command_executionresponser<   r=   r(   r&   r)   callback   s   z8DefaultCommandExecutor.execute_command.<locals>.callback_execute_with_failure_detection)r(   r<   r=   rr   r&   rq   r)   r>      s   z&DefaultCommandExecutor.execute_commandr?   c                    s"    fdd} | I d H S )Nc               	      s   j j 4 I d H +}  D ]\}}| j|i | q|  I d H } I d H  |W  d   I d H  S 1 I d H s<w   Y  d S rf   )r]   rl   Zpipeliner>   executern   )pipecommandr=   rp   r?   r(   r&   r)   rr      s   0z9DefaultCommandExecutor.execute_pipeline.<locals>.callbackrs   )r(   r?   rr   r&   rx   r)   r@      s   	z'DefaultCommandExecutor.execute_pipelineF
shard_hintvalue_from_callablewatch_delayfuncr   rB   rz   r{   r|   c                   s(    fdd} |I d H S )Nc                     s<   j jj gR dI d H } dI d H  | S )Nry   r&   )r]   rl   rA   rn   ro   r}   r(   rz   r{   r|   rB   r&   r)   rr      s   z<DefaultCommandExecutor.execute_transaction.<locals>.callbackrs   )r(   r}   rz   r{   r|   rB   rr   r&   r~   r)   rC      s   z*DefaultCommandExecutor.execute_transactionrD   c                    s,    fdd}j |g R  I d H S )Nc                     sN   t j} t| r|  i I d H }n|  i } I d H  |S rf   )getattrr4   r   rn   )methodrp   r<   r:   rD   r(   r&   r)   rr     s   z>DefaultCommandExecutor.execute_pubsub_method.<locals>.callbackrs   )r(   rD   r<   r:   rr   r&   r   r)   rE     s   
z,DefaultCommandExecutor.execute_pubsub_methodrF   c                    s$    fdd} |I d H S )Nc                      s   j j dI d H S )N)Zpoll_timeoutexception_handlerr5   )r^   runr&   r   r5   r(   rF   r&   r)   rr     s   z;DefaultCommandExecutor.execute_pubsub_run.<locals>.callbackrs   )r(   rF   r   r5   rr   r&   r   r)   rG     s   z)DefaultCommandExecutor.execute_pubsub_runr&   rr   cmdsc                    s6    fddj fddfddI dH S )zO
        Execute a commands execution callback with failure detection.
        c                      s     I d H    I d H S rf   )_check_active_databaser&   )rr   r(   r&   r)   wrapper$  s   zGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapperc                      s     S rf   r&   r&   )r   r&   r)   <lambda>*  s    zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>c                    s   j | g R  S rf   )_on_command_fail)error)r   r(   r&   r)   r   +  s    N)rZ   Zcall_with_retry)r(   rr   r   r&   )rr   r   r(   r   r)   rt     s   

z6DefaultCommandExecutor._execute_with_failure_detectionc                    sd   | j du s| j jjtjks| jdkr.| jt kr0| 	| j
 I dH I dH  |   dS dS dS )zB
        Checks if active a database needs to be updated.
        Nr   )r]   ZcircuitstateCBStateZCLOSEDZ_auto_fallback_intervalZ_next_fallback_attemptr   nowr3   r[   ru   ra   r'   r&   r&   r)   r   .  s   


z-DefaultCommandExecutor._check_active_databasec                    s   | j t||I d H  d S rf   )r\   ri   r   )r(   r   r<   r&   r&   r)   r   ?  s   z'DefaultCommandExecutor._on_command_failcmdc                    s"   | j D ]
}||I d H  qd S rf   )rY   Zregister_command_execution)r(   r   detectorr&   r&   r)   rn   D  s   
z2DefaultCommandExecutor._register_command_executionc                 C   s4   t | j}t }t }| jt|gt||gi dS )z0
        Registers necessary listeners.
        N)r   rY   r   r   r\   Zregister_listenersr   r   )r(   Zfailure_listenerZresubscribe_listenerZclose_connection_listenerr&   r&   r)   r`   H  s   
z.DefaultCommandExecutor._setup_event_dispatcher)NN)r&   )1rH   rI   rJ   r   r   r!   r   r   r   r   r   r   intrO   rW   rK   r*   r,   r/   r	   r   r0   r3   r   r4   rL   r   r7   r8   r5   r>   rM   r@   r   r
   r   r   r"   rN   boolrC   rE   rG   rt   r   r   rn   r`   __classcell__r&   r&   rc   r)   rP   y   s    	*



rP   N)3abcr   asyncior   r   typingr   r   r   r   r	   r
   Zredis.asyncior   Zredis.asyncio.clientr   r   Zredis.asyncio.multidb.databaser   r   r   Zredis.asyncio.multidb.eventr   r   r   r   Zredis.asyncio.multidb.failoverr   r   r   r   r   Z&redis.asyncio.multidb.failure_detectorr   Zredis.asyncio.retryr   Zredis.eventr   r   Zredis.multidb.circuitr   r   Zredis.multidb.command_executorr   r    Zredis.multidb.configr!   Zredis.typingr"   r#   rP   r&   r&   r&   r)   <module>   s$     Z