o
    ưi(                     @   s|   d Z ddlZddlmZ ddlmZmZmZmZm	Z	m
Z
 ddlmZ ddlmZ ddlmZ ddlmZ G d	d
 d
eZdS )zb
Base class across routing strategies to abstract commmon functions like batch incrementing redis
    N)ABC)DictListOptionalSetTupleUnion)verbose_router_logger)	DualCache)RedisPipelineIncrementOperation)DEFAULT_REDIS_SYNC_INTERVALc                	   @   s
  e Zd Zdededeeeef  fddZ	deeeef  fddZ
dd	 Zd
eeeef  dedee fddZdedeeef defddZdeeeef  fddZdd ZdefddZdee fddZdee fddZdee fddZdd  Zd!d" Zd#S )$BaseRoutingStrategy
dual_cacheshould_batch_redis_writesdefault_sync_intervalc                 C   s,   || _ g | _d | _|r| | t | _d S N)r   redis_increment_operation_queue
_sync_tasksetup_sync_tasksetin_memory_keys_to_update)selfr   r   r    r   d/home/app/Keep/.python/lib/python3.10/site-packages/litellm/router_strategy/base_routing_strategy.py__init__   s   
zBaseRoutingStrategy.__init__c                 C   sJ   zt  }W n ty   t  }t | Y nw || j|d| _dS )z;Setup the sync task in a way that's compatible with FastAPI)r   N)asyncioget_running_loopRuntimeErrornew_event_loopset_event_loopcreate_task(periodic_sync_in_memory_spend_with_redisr   )r   r   loopr   r   r   r       s   
z#BaseRoutingStrategy.setup_sync_taskc                    sD   | j dur | j   z	| j I dH  W dS  tjy   Y dS w dS )z.Cleanup method to be called when shutting downN)r   cancelr   CancelledErrorr   r   r   r   cleanup.   s   

zBaseRoutingStrategy.cleanupincrement_listttlreturnc                    s8   g }|D ]\}}| j |||dI dH }|| q|S )zB
        Increment a list of values in the current window
        keyvaluer(   N)"_increment_value_in_current_windowappend)r   r'   r(   resultsr+   r,   resultr   r   r   '_increment_value_list_in_current_window7   s   z;BaseRoutingStrategy._increment_value_list_in_current_windowr+   r,   c                    sF   | j jj|||dI dH }t|||d}| j| | j|d |S )a  
        Increment spend within existing budget window

        Runs once the budget start time exists in Redis Cache (on the 2nd and subsequent requests to the same provider)

        - Increments the spend in memory cache (so spend instantly updated in memory)
        - Queues the increment operation to Redis Pipeline (using batched pipeline to optimize performance. Using Redis for multi instance environment of LiteLLM)
        r*   N)r+   increment_valuer(   r+   )r   in_memory_cacheZasync_incrementr   r   r.   add_to_in_memory_keys_to_update)r   r+   r,   r(   r0   Zincrement_opr   r   r   r-   E   s   z6BaseRoutingStrategy._increment_value_in_current_windowc              
      sz   |pt }	 z|  I dH  t|I dH  W n$ ty; } ztdt|  t|I dH  W Y d}~nd}~ww q)z
        Handler that triggers sync_in_memory_spend_with_redis every DEFAULT_REDIS_SYNC_INTERVAL seconds

        Required for multi-instance environment usage of provider budgets
        TNzError in periodic sync task: )r    _sync_in_memory_spend_with_redisr   sleep	Exceptionr	   errorstr)r   r   er   r   r   r!   _   s    z<BaseRoutingStrategy.periodic_sync_in_memory_spend_with_redisc              
      s*  zs| j js	W dS t| jdkrri }g  t| jD ]$\}}|d |v r2||d  d  |d 7  < n|||d <  | qt| }| j jj|dI dH } fddt| jD | _|durmdd	 t	||D }|W S i }|W S W dS  t
y } ztd
t|  g | _W Y d}~dS d}~ww )av  
        How this works:
        - async_log_success_event collects all provider spend increments in `redis_increment_operation_queue`
        - This function compresses multiple increments for the same key into a single operation
        - Then pushes all increments to Redis in a batched pipeline to optimize performance

        Only runs if Redis is initialized
        Nr   r+   r2   )r'   c                    s   g | ]
\}}| vr|qS r   r   ).0idxopZops_to_remover   r   
<listcomp>   s
    zKBaseRoutingStrategy._push_in_memory_increments_to_redis.<locals>.<listcomp>c                 S   s   i | ]	\}}|d  |qS r3   r   )r<   r+   r>   r   r   r   
<dictcomp>   s    zKBaseRoutingStrategy._push_in_memory_increments_to_redis.<locals>.<dictcomp>*Error syncing in-memory cache with Redis: )r   redis_cachelenr   	enumerater.   listvaluesZasync_increment_pipelinezipr8   r	   r9   r:   )r   Zcompressed_opsr=   r>   Zcompressed_queueZincrement_resultZreturn_resultr;   r   r?   r   #_push_in_memory_increments_to_redist   sJ   	


'z7BaseRoutingStrategy._push_in_memory_increments_to_redisc                 C   s   | j | d S r   )r   add)r   r+   r   r   r   r5      s   z3BaseRoutingStrategy.add_to_in_memory_keys_to_updatec                 C   s   dS )z-
        Get the key pattern to sync
        Nr   r%   r   r   r   get_key_pattern_to_sync   s   z+BaseRoutingStrategy.get_key_pattern_to_syncc                 C   s   | j S r   )r   r%   r   r   r   get_in_memory_keys_to_update   s   z0BaseRoutingStrategy.get_in_memory_keys_to_updatec                 C   s   | j }t | _ |S )z-Atomic get and reset in-memory keys to update)r   r   )r   keysr   r   r   &get_and_reset_in_memory_keys_to_update   s   z:BaseRoutingStrategy.get_and_reset_in_memory_keys_to_updatec                 C   s   t  | _d S r   )r   r   r%   r   r   r   reset_in_memory_keys_to_update   s   z2BaseRoutingStrategy.reset_in_memory_keys_to_updatec              
      sD  z| j jdu rW dS |  }t|}i }| j jj|dI dH }t||D ]\}}t|p-d||< q%|  I dH }|du r@W dS |D ]?}t|	|dpLd}	t|	|dpVd}
t| j jj
|dI dH ped}||
 }||	krt|	| }nqB| j jj||dI dH  qBW dS  ty } ztdt|  W Y d}~dS d}~ww )a  
        Ensures in-memory cache is updated with latest Redis values for all provider spends.

        Why Do we need this?
        - Optimization to hit sub 100ms latency. Performance was impacted when redis was used for read/write per request
        - Use provider budgets in multi-instance environment, we use Redis to sync spend across all instances

        What this does:
        1. Push all provider spend increments to Redis
        2. Fetch all current provider spend from Redis to update in-memory cache
        N)rM   r   r3   )r+   r,   rB   )r   rC   rL   rF   r4   Zasync_batch_get_cacherH   floatrI   getZasync_get_cacheZasync_set_cacher8   r	   	exceptionr:   )r   Z
cache_keysZcache_keys_listZin_memory_before_dictZin_memory_beforekvZredis_valuesr+   Z	redis_valbeforeafterdeltamergedr;   r   r   r   r6      sJ   

z4BaseRoutingStrategy._sync_in_memory_spend_with_redisN)__name__
__module____qualname__r
   boolr   r   intrP   r   r   r&   r   r   r:   r1   r-   r!   rI   r5   rK   r   rL   rN   rO   r6   r   r   r   r   r      sD    
	



:r   )__doc__r   abcr   typingr   r   r   r   r   r   Zlitellm._loggingr	   Zlitellm.caching.cachingr
   Zlitellm.caching.redis_cacher   Zlitellm.constantsr   r   r   r   r   r   <module>   s     