o
    ưiMG                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZmZmZ e	r4ddlmZ ddlZddlmZmZ ddlmZ d	d
lmZ d	dlmZ d	dlmZ e	rgddlmZ  ee e
f Zne
Zddl!m"Z" G dd de"Z#G dd deZ$dS )z
Dual Cache implementation - Class to update both Redis and an in-memory cache simultaneously.

Has 4 primary methods:
    - set_cache
    - get_cache
    - async_set_cache
    - async_get_cache
    N)ThreadPoolExecutor)Lock)TYPE_CHECKINGAnyDictListOptionalTupleUnion)RedisPipelineIncrementOperation)print_verboseverbose_logger)"DEFAULT_MAX_REDIS_BATCH_CACHE_SIZE   )	BaseCache)InMemoryCache)
RedisCache)Span)OrderedDictc                       s.   e Zd Zdd fdd
Z fddZ  ZS )LimitedSizeOrderedDictd   max_sizec                   s   t  j|i | || _d S N)super__init__r   )selfr   argskwargs	__class__ Q/home/app/Keep/.python/lib/python3.10/site-packages/litellm/caching/dual_cache.pyr   (   s   
zLimitedSizeOrderedDict.__init__c                    s,   t | | jkr| jdd t || d S )NF)last)lenr   popitemr   __setitem__)r   keyvaluer   r!   r"   r&   ,   s   z"LimitedSizeOrderedDict.__setitem__)__name__
__module____qualname__r   r&   __classcell__r!   r!   r   r"   r   '   s    r   c                       s<  e Zd ZdZdddddefdee dee dee dee dee de	d	df fd
dZ
dee dee fddZd<defddZ	d<de	ded	e	fddZ		d=dee defddZ		d=dedee defddZ		d=dee defddZdedee dee d	eee eeee f f fdd Zd!eeee f d	dfd"d#Z		d=dedee defd$d%Zd<defd&d'Z	d<d(edefd)d*Z		d=dedee ded	efd+d,Z		d>d-ed. dedee d	eee  fd/d0Z	d<deded	dfd1d2Z d3d4 Z!d5d6 Z"d7efd8d9Z#d7ed	ee	 fd:d;Z$  Z%S )?	DualCachea5  
    DualCache is a cache implementation that updates both Redis and an in-memory cache simultaneously.
    When data is updated or inserted, it is written to both the in-memory cache + Redis.
    This ensures that even if Redis hasn't been updated yet, the in-memory cache reflects the most recent data.
    Nin_memory_cacheredis_cachedefault_in_memory_ttldefault_redis_ttl default_redis_batch_cache_expiry"default_max_redis_batch_cache_sizereturnc                    s\   t    |p	t | _|| _t|d| _t | _|pt	j
pd| _|p$t	j| _|p*t	j| _d S )Nr   
   )r   r   r   r.   r/   r   last_redis_batch_access_timer   "_last_redis_batch_access_time_locklitellmr2   redis_batch_cache_expiryr0   r1   )r   r.   r/   r0   r1   r2   r3   r   r!   r"   r   :   s   
	zDualCache.__init__c                 C   s$   |d ur|| _ |d ur|| _d S d S r   )r0   r1   )r   r0   r1   r!   r!   r"   update_cache_ttlV   s
   
zDualCache.update_cache_ttlF
local_onlyc              
   K   s   z;| j d urd|vr| jd ur| j|d< | j j||fi | | jd ur6|du r9| jj||fi | W d S W d S W d S  tyR } zt| W Y d }~d S d }~ww )NttlF)r.   r0   	set_cacher/   	Exceptionr   r   r'   r(   r;   r   er!   r!   r"   r=   _   s   

zDualCache.set_cacher(   c              
   K   s   z)|}| j dur| j j||fi |}| jdur'|du r'| jj||fi |}|W S  tyA } ztdt|  |d}~ww )z
        Key - the key in cache

        Value - int - the value you want to increment by

        Returns - int - the incremented value
        NF)LiteLLM Cache: Excepton async add_cache: )r.   increment_cacher/   r>   r   errorstr)r   r'   r(   r;   r   resultr@   r!   r!   r"   rB   m   s   

zDualCache.increment_cacheparent_otel_spanc                 K   s   zGd }| j d ur| j j|fi |}|d ur|}|d u r>| jd ur>|du r>| jj||d}|d ur<| j j||fi | |}td|  |W S  tyX   tt	  Y d S w )NFrF   get cache: cache result: )
r.   	get_cacher/   r=   r   r>   r   rC   	traceback
format_excr   r'   rF   r;   r   rE   in_memory_resultredis_resultr!   r!   r"   rI      s$   
zDualCache.get_cachekeysc           	         s   t    d  fdd}z&t }tdd}||}| W  d    W S 1 s.w   Y  W d S  tyA   |  Y S w )Nr   c               	      sR   t  } zt |  | jdi  W |   t d S |   t d w )z9Run the coroutine in a new event loop within this thread.Nr!   )asyncionew_event_loopset_event_looprun_until_completeasync_batch_get_cacheclose)Znew_loopZreceived_argsr   r!   r"   run_in_new_loop   s   
z2DualCache.batch_get_cache.<locals>.run_in_new_loopr   )max_workers)localspoprP   get_running_loopr   submitrE   RuntimeError)	r   rO   rF   r;   r   rW   _executorfuturer!   rV   r"   batch_get_cache   s   

(
zDualCache.batch_get_cachec                    s   zat d| d|  d }| jd ur-| jj|fi |I d H }t d|  |d ur-|}|d u rY| jd urY|du rY| jj||dI d H }|d urW| jj||fi |I d H  |}t d|  |W S  tys   tt	  Y d S w )Nzasync get cache: cache key: ; local_only: zin_memory_result: FrG   rH   )
r   r.   async_get_cacher/   async_set_cacher>   r   rC   rJ   rK   rL   r!   r!   r"   rc      s>   
zDualCache.async_get_cachecurrent_timerE   c                 C   s   g }i }| j ; t||D ]*\}}|durq|| jvs%|| j|  | jkr7|| | j|||< || j|< qW d   ||fS 1 sEw   Y  ||fS )z
        Atomically choose keys to fetch from Redis and reserve their access time.
        This prevents check-then-act races under concurrent async callers.
        N)r7   zipr6   r9   appendget)r   re   rO   rE   sublist_keysprevious_access_timesr'   r(   r!   r!   r"   _reserve_redis_batch_keys   s*   




z#DualCache._reserve_redis_batch_keysrj   c                 C   s`   | j # | D ]\}}|d u r| j|d  q|| j|< qW d    d S 1 s)w   Y  d S r   )r7   itemsr6   rZ   )r   rj   r'   Zprevious_timer!   r!   r"   &_rollback_redis_batch_key_reservations  s   "z0DualCache._rollback_redis_batch_key_reservationsc                    sd  zd gt | }| jd ur!| jj|fi |I d H }|d ur!|}d |v r| jd ur|du r	 t }| |||\}}	t |dkrz| jj||dI d H }
W n ty\   | |	  w |
d u sltdd |
	 D ro|W S dd t
|D }|
 D ]!\}}|||| < |d ur| jd ur| jj||fi |I d H  q||W S  ty   tt  Y d S w )NFr   rG   c                 s   s    | ]}|d u V  qd S r   r!   ).0vr!   r!   r"   	<genexpr>D  s    z2DualCache.async_batch_get_cache.<locals>.<genexpr>c                 S   s   i | ]\}}||qS r!   r!   )rn   ir'   r!   r!   r"   
<dictcomp>H  s    z3DualCache.async_batch_get_cache.<locals>.<dictcomp>)r$   r.   rT   r/   timerk   r>   rm   allvalues	enumeraterl   rd   r   rC   rJ   rK   )r   rO   rF   r;   r   rE   rM   re   ri   rj   rN   Zkey_to_indexr'   r(   r!   r!   r"   rT     sX   
zDualCache.async_batch_get_cachec              
      s   t d| d| d|  z3| jd ur"| jj||fi |I d H  | jd ur<|du r?| jj||fi |I d H  W d S W d S W d S  ty^ } ztdt|  W Y d }~d S d }~ww )Nzasync set cache: cache key: rb   z	; value: FrA   )r   r.   rd   r/   r>   r   	exceptionrD   r?   r!   r!   r"   rd   W  s    
"zDualCache.async_set_cache
cache_listc              
      s   t d| d|  z8| jdur| jjd	d|i|I dH  | jdur>|du rA| jjd	||ddd|I dH  W dS W dS W dS  ty` } ztdt|  W Y d}~dS d}~ww )
z1
        Batch write values to the cache
        z#async batch set cache: cache keys: rb   Nrx   Fr<   )rx   r<   rA   r!   )	r   r.   async_set_cache_pipeliner/   rZ   r>   r   rw   rD   )r   rx   r;   r   r@   r!   r!   r"   ry   g  s0   

z"DualCache.async_set_cache_pipelinec              
      s   z2|}| j dur| j j||fi |I dH }| jdur1|du r1| jj||||dddI dH }|W S  tyA } z|d}~ww )z
        Key - the key in cache

        Value - float - the value you want to increment by

        Returns - float - the incremented value
        NFr<   )rF   r<   )r.   Zasync_incrementr/   rh   r>   )r   r'   r(   rF   r;   r   rE   r@   r!   r!   r"   async_increment_cache  s*   

zDualCache.async_increment_cacheincrement_listr   c              
      sr   z)d }| j d ur| j j||dI d H }| jd ur(|du r(| jj||dI d H }|W S  ty8 } z|d }~ww )N)r{   rF   F)r.   Zasync_increment_pipeliner/   r>   )r   r{   r;   rF   r   rE   r@   r!   r!   r"   async_increment_cache_pipeline  s$   
z(DualCache.async_increment_cache_pipelinec              
      s   z1| j dur| j j|||dddI dH }| jdur0|du r0| jj|||dddI dH }W dS  ty@ } z|d}~ww )z
        Add value to a set

        Key - the key in cache

        Value - str - the value you want to add to the set

        Returns - None
        Nr<   )r<   F)r.   async_set_cache_saddrh   r/   r>   )r   r'   r(   r;   r   r^   r@   r!   r!   r"   r}     s   
zDualCache.async_set_cache_saddc                 C   s0   | j d ur
| j   | jd ur| j  d S d S r   )r.   flush_cacher/   )r   r!   r!   r"   r~     s
   


zDualCache.flush_cachec                 C   s4   | j dur| j | | jdur| j| dS dS z-
        Delete a key from the cache
        N)r.   delete_cacher/   r   r'   r!   r!   r"   r     s
   

zDualCache.delete_cacher'   c                    s<   | j dur| j | | jdur| j|I dH  dS dS r   )r.   r   r/   async_delete_cacher   r!   r!   r"   r     s   

zDualCache.async_delete_cachec                    s<   | j |I dH }|du r| jdur| j|I dH }|S )zL
        Get the remaining TTL of a key in in-memory cache or redis
        N)r.   async_get_ttlr/   )r   r'   r<   r!   r!   r"   r     s
   zDualCache.async_get_ttl)F)NF)FN)&r)   r*   r+   __doc__r   r   r   r   floatintr   r:   boolr=   rB   r   rI   listra   rc   r   rD   r   r	   r   rk   rm   rT   rd   ry   rz   r|   r}   r~   r   r   r   r,   r!   r!   r   r"   r-   3   s    
	

$
&
)


=

%


		r-   )%r   rP   rs   rJ   concurrent.futuresr   	threadingr   typingr   r   r   r   r   r	   r
   Zlitellm.types.cachingr   r8   Zlitellm._loggingr   r   Zlitellm.constantsr   Z
base_cacher   r.   r   r/   r   Zopentelemetry.tracer   _Spancollectionsr   r   r-   r!   r!   r!   r"   <module>   s,    
$