o
    ưi                     @   s^  d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m
Z
mZmZmZmZmZmZ ddlZddlmZmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZ dd
lm Z  ddl!m"Z" e
rddl#m$Z% ddl&m'Z'm(Z( ddl)m*Z* ddl+m,Z, e*Z-e,Z.e'Z/e(Z0ee%ef Z$n
eZ-eZ.eZ/eZ0eZ$dde1de2fddZ3G dd de"Z4dS )z
Redis Cache implementation

Has 4 primary methods:
    - set_cache
    - get_cache
    - async_set_cache
    - async_get_cache
    N)	timedelta)TYPE_CHECKINGAnyListOptionalTupleUnioncast)print_verboseverbose_logger)DEFAULT_REDIS_MAJOR_VERSION)!_get_parent_otel_span_from_kwargs)coroutine_checker)RedisPipelineIncrementOperationRedisPipelineLpopOperationRedisPipelineRpushOperation)ServiceTypes   )	BaseCache)Span)RedisRedisCluster)Pipeline)ClusterPipeline   
num_framesreturnc                 C   s   zEt  }|du rW dS |j}|du rW dS |j}|du r W dS g }t| D ]}|du r. n|jj}|| |j}q&|s@W dS d|W S  tyO   Y dS w )a  
    Get the function names from the previous 1-2 functions in the call stack.

    Args:
        num_frames: Number of previous frames to include (default: 2)

    Returns:
        A string with format "current_function <- caller_function [<- grandparent_function]"
    Nunknownz <- )	inspectcurrentframef_backrangef_codeco_nameappendjoin	Exception)r   current_framer    frameZfunction_names_	func_name r+   R/home/app/Keep/.python/lib/python3.10/site-packages/litellm/caching/redis_cache.py_get_call_stack_info5   s.   

r-   c                       sj  e Zd Z							d{dee dee dee dee f fdd	Zd
d Z	de
fddZde
fddZdefddZdeeef fddZdedefddZdefddZdd Z	d|dedee defdd Zd}d!ed"edefd#d$Zd%edefd&d'Zd(d) Zd*eeef d+eeeef  dee defd,d-Z	d|d+eeeef  dee fd.d/Zd0edededee ddf
d1d2Z dedee fd3d4Z!d5d6 Z"		d~dedee d7ee# defd8d9Z$d:d; Z%d<efd=d>Z&d|d7ee# fd?d@Z'dAee dee fdBdCZ(dAee dee fdDdEZ)	d|dFeee eee  f d7ee# de*fdGdHZ+	d|d7ee# fdIdJZ,	d|dFeee eee  f d7ee# de*fdKdLZ-de.fdMdNZ/de.fdOdPZ0dQdR Z1defdSdTZ2dUdV Z3dWdX Z4dYdZ Z5d[d\ Z6de*fd]d^Z7defd_d`Z8dadb Z9d*edcee: deee  fdddeZ;dcee: deee  fdfdgZ<dedee fdhdiZ=	d|dedjee d7ee# defdkdlZ>d*edmee? dee fdndoZ@dmee? dee fdpdqZAd*eded"edeeB fdrdsZC		d~ded"ee d7ee# deeee f fdtduZDd*edveeE deeee   fdwdxZFdveeE deeee   fdydzZG  ZHS )
RedisCacheNd         @redis_flush_size	namespacestartup_nodessocket_timeoutc                    sn  ddl m}	 ddlm}
m} i }|d ur||d< |d ur ||d< |d ur(||d< |d ur0||d< |d ur8||d	< |d
d d urNt|d
 |	rN|d
| _n|	 | _|	| |
di || _
d | _|| _|di || _|| _g | _|d u r{d| _n|| _d| _zt| j
s| j
 d | _W n	 ty   Y nw |   tjd urt jttjd d S t   d S )Nr   )ServiceLoggingr   )get_redis_clientget_redis_connection_poolhostportpasswordr3   r4   service_logger_objr/   Unknownredis_version)Zdefault_ttlr+   )Zlitellm._service_loggerr5   _redisr6   r7   get
isinstancepopr;   updateredis_clientredis_async_clientredis_kwargsasync_redis_conn_poolr2   redis_batch_writing_bufferr1   r=   r   Zis_async_callableinfor&   _setup_health_pingslitellmZdefault_redis_ttlsuper__init__int)selfr8   r9   r:   r1   r2   r3   r4   kwargsr5   r6   r7   rE   	__class__r+   r,   rL   _   sR   

zRedisCache.__init__c              
   C   s   zt  |  }W n3 ty> } z'dt|v rtd ntjd	t|dt|id | 
| W Y d}~nd}~ww zt| jdrN| j  W dS W dS  tys } ztjddt|id | | W Y d}~dS d}~ww )	z,Setup async and sync health pings for Redis.zno running event loopz1Ignoring async redis ping. No running event loop.z+Error connecting to Async Redis client - {}error)extraNpingz%Error connecting to Sync Redis client)asyncioget_running_loopcreate_taskrT   r&   strr   debugrR   format_handle_async_ping_errorhasattrrC   _handle_sync_ping_error)rN   r)   er+   r+   r,   rI      s2   

zRedisCache._setup_health_pingsr^   c                 C   P   zt  }t }|}|| jjtj|| |dd W dS  ty'   Y dS w )z2Handle async ping error with service failure hook.Zredis_async_pingservicedurationrR   	call_typeN	rU   rV   timerW   r;   async_service_failure_hookr   REDISr&   rN   r^   loop
start_timeend_timer+   r+   r,   r[         
z#RedisCache._handle_async_ping_errorc                 C   r_   )z1Handle sync ping error with service failure hook.Zredis_sync_pingr`   Nrd   rh   r+   r+   r,   r]      rl   z"RedisCache._handle_sync_ping_errorr   c                 C   s@   t | j }tj|dd}t|  dd }d| S )z
        Generate a cache key for the async Redis client based on connection parameters.
        This ensures different Redis configurations use different cached clients.
        T)	sort_keysN   zasync-redis-client-)	sortedrE   itemsjsondumpshashlibsha256encode	hexdigest)rN   Zsorted_kwargsZ
kwargs_strZkwargs_hashr+   r+   r,   _get_async_client_cache_key   s   
z&RedisCache._get_async_client_cache_keyc                 C   s   ddl m} ddlm}m} |  }|j|d}|d ur&ttt	t
f |}n|di | j| _|dd| ji| j}|j||d || _|S )	Nr   )in_memory_llm_clients_cacher   )get_redis_async_clientr7   keyZconnection_pool)r{   valuer+   )rJ   rx   r>   ry   r7   rw   	get_cacher	   r   async_redis_clientasync_redis_cluster_clientrE   rF   	set_cacherD   )rN   rx   ry   r7   	cache_keyZcached_clientrD   r+   r+   r,   init_async_client   s&   zRedisCache.init_async_clientr{   c                 C   s(   | j dur|| j s| j d | }|S )zD
        Make sure each key starts with the given namespace
        N:)r2   
startswithrN   r{   r+   r+   r,   check_and_fix_namespace  s   z"RedisCache.check_and_fix_namespacec              	   C   sj   | j dkrtS z t| j  }d|v rt|dd }|W S tt|}|W S  ttfy4   t Y S w )a  
        Parse Redis version to extract the major version number.
        
        Handles multiple version formats:
        - Strings: "7.0.0", "6", "7.0.0-rc1", " 7.0.0 "
        - Floats: 7.0 (e.g., from AWS ElastiCache Valkey)
        - Integers: 7
        - Malformed: "latest", "", "Unknown" (defaults to DEFAULT_REDIS_MAJOR_VERSION)
        
        Returns:
            int: The major version number (defaults to DEFAULT_REDIS_MAJOR_VERSION if unparseable)
        r<   .r   )	r=   r   rX   striprM   splitfloat
ValueErrorAttributeError)rN   version_strmajor_versionr+   r+   r,   _parse_redis_major_version  s   
z%RedisCache._parse_redis_major_versionc           	   
   K   s   | j d
i |}td| d| d| d| j  | j|d}z*t }| jj|t||d t }|| }| jj	t
j|dt  ||d W d S  tye } ztd	t|  W Y d }~d S d }~ww )NzSet Redis Cache: key: 
Value 
ttl=, redis_version=rz   namer|   exzset_cache <- ra   rb   rc   rj   rk   z<litellm.caching.caching: set() - Got exception from REDIS : r+   )get_ttlr
   r=   r   re   rC   setrX   r;   service_success_hookr   rg   r-   r&   )	rN   r{   r|   rO   ttlrj   rk   	_durationr^   r+   r+   r,   r   )  s.   
zRedisCache.set_cacher|   r   c              
   K   s<  | j }t }| j|d}zqt }|j||d}t }	|	| }
| jjtj|
dt  ||	d |d ur|t }|	|}t }	|	| }
| jjtj|
dt  ||	d |dkr|t }|
|| t }	|	| }
| jjtj|
dt  ||	d |W S  ty } zt }	|	| }
tdt|| |d }~ww )	Nr   r   amountzincrement_cache <- r   zincrement_cache_ttl <- zincrement_cache_expire <- zXLiteLLM Redis Caching: increment_cache() - Got exception from REDIS %s, Writing value=%s)rC   re   r   incrr;   r   r   rg   r-   r   expirer&   r   rR   rX   )rN   r{   r|   r   rO   _redis_clientrj   Zset_ttlresultrk   r   current_ttlr^   r+   r+   r,   increment_cacheA  sf   



zRedisCache.increment_cachepatterncountc           
         s   t   }zOg }|  }t|dstd g W S |j|d |d2 z3 d H W }|| t||kr5 nq"6 t   }|| }t	| j
jtj|dt  ||d |W S  ty~ }	 zt   }|| }t	| j
jtj||	dt  ||d |	d }	~	ww )N	scan_iterz_Redis client does not support scan_iter, potentially using Redis Cluster. Returning empty list.*)matchr   zasync_scan_iter <- r   )ra   rb   rR   rc   rj   rk   )re   r   r\   r   rY   r   r$   lenrU   rW   r;   async_service_success_hookr   rg   r-   r&   rf   )
rN   r   r   rj   keysr   r{   rk   r   r^   r+   r+   r,   async_scan_iterz  sX   


	

zRedisCache.async_scan_iterscriptc              
      s   z1|    t dr |W S t dr/ |dtt dtt dtf fdd}|W S W d	S  tyI } zt	dt|  |d	}~ww )
a  
        Register a Lua script with Redis asynchronously.
        Works with both standalone Redis and Redis Cluster.

        Args:
            script (str): The Lua script to register

        Returns:
            Any: A script object that can be called with keys and args
        register_scriptscript_loadr   argsr   c                    s    j t| g| |R  S N)Zevalshar   )r   r   r   Z
script_shar+   r,   script_callable  s   z9RedisCache.async_register_script.<locals>.script_callablez Error registering Redis script: N)
r   r\   r   r   r   rX   r   r&   r   rR   )rN   r   r   r^   r+   r   r,   async_register_script  s   


$	z RedisCache.async_register_scriptc                    s  ddl m} t }z|  }W n6 tyG } z*t }|| }	t| jjt	j
|	|||t|dt  d tdt|| |d }~ww | j|d}| jdi |}
|dd}td	| d
| d|
  zIt|dsstd|j|t|||
dI d H }td| d
| d|
  t }|| }	t| jjt	j
|	dt  ||t|d|id |W S  ty } z2t }|| }	t| jjt	j
|	|dt  ||t|d|id tdt|| W Y d }~d S d }~ww )Nr   r   zasync_set_cache <- ra   rb   rR   rj   rk   parent_otel_spanrc   RLiteLLM Redis Caching: async set() - Got exception from REDIS %s, Writing value=%srz   nxFSet ASYNC Redis Cache: key: r   r   r   z3Redis client cannot set cache. Attribute not found.)r   r|   r   r   z)Successfully Set ASYNC Redis Cache: key: r{   ra   rb   rc   rj   rk   r   event_metadatara   rb   rR   rc   rj   rk   r   r   r+   )redis.asyncior   re   r   r&   rU   rW   r;   rf   r   rg   r   r-   r   rR   rX   r   r   r?   r
   r\   r   rq   rr   r   )rN   r{   r|   rO   r   rj   r   r^   rk   r   r   r   r   r+   r+   r,   async_set_cache  s   



zRedisCache.async_set_cachepipe
cache_listc           	         s   | j |d}|D ]/\}}| j|d}td| d| d|  t|}d}|dur0t|d}|j|||d q	| I dH }|S )	zU
        Helper function for executing a pipeline of set operations on Redis
        r   rz   z%Set ASYNC Redis Cache PIPELINE: key: r   r   Nsecondsr   )r   r   r
   rq   rr   r   r   execute)	rN   r   r   r   r   cache_valueZjson_cache_value_tdresultsr+   r+   r,   _pipeline_helper  s$   	

zRedisCache._pipeline_helperc                    s^  t |dkr	dS |  }t }td| d| d| j  d}zR|jdd4 I dH }| |||I dH }W d  I dH  n1 I dH sGw   Y  td|  t }	|	| }
t| j	j
tj|
d	t  ||	t|d
 W dS  ty } z/t }	|	| }
t| j	jtj|
|d	t  ||	t|d tdt|| W Y d}~dS d}~ww )z?
        Use Redis Pipelines for bulk write operations
        r   Nz!Set Async Redis Cache: key list: r   r   FZtransactionzpipeline results: zasync_set_cache_pipeline <- ra   rb   rc   rj   rk   r   ra   rb   rR   rc   rj   rk   r   zaLiteLLM Redis Caching: async set_cache_pipeline() - Got exception from REDIS %s, Writing value=%s)r   r   re   r
   r=   pipeliner   rU   rW   r;   r   r   rg   r-   r   r&   rf   r   rR   rX   )rN   r   r   rO   r   rj   r   r   r   rk   r   r^   r+   r+   r,   async_set_cache_pipeline3  s`   (


z#RedisCache.async_set_cache_pipelinerC   c                    sh   | j |d}z$|j|g|R  I dH  |dur)t|d}|||I dH  W dS W dS  ty3    w )z@Helper function for async_set_cache_sadd. Separated for testing.r   Nr   )r   Zsaddr   r   r&   )rN   rC   r{   r|   r   r   r+   r+   r,   _set_cache_sadd_helpern  s   
z!RedisCache._set_cache_sadd_helperc                    s  ddl m} t }z|  }W n6 tyG } z*t }	|	| }
t| jjt	j
|
|||	t|dt  d tdt|| |d }~ww | j|d}td| d| d	|  z:| j||||d
I d H  td| d| d	|  t }	|	| }
t| jjt	j
|
dt  ||	t|d W d S  ty } z/t }	|	| }
t| jjt	j
|
|dt  ||	t|d tdt|| W Y d }~d S d }~ww )Nr   r   zasync_set_cache_sadd <- r   r   rz   r   r   r   )rC   r{   r|   r   z.Successfully Set ASYNC Redis Cache SADD: key: r   r   z]LiteLLM Redis Caching: async set_cache_sadd() - Got exception from REDIS %s, Writing value=%s)r   r   re   r   r&   rU   rW   r;   rf   r   rg   r   r-   r   rR   rX   r   r
   r   r   )rN   r{   r|   r   rO   r   rj   r   r^   rk   r   r+   r+   r,   async_set_cache_sadd  s   




zRedisCache.async_set_cache_saddc                    sX   t dt| j  | j|d}| j||f t| j| jkr*|  I d H  d S d S )Nz-in batch cache writing for redis buffer size=rz   )r
   r   rG   r   r$   r1   flush_cache_buffer)rN   r{   r|   rO   r+   r+   r,   batch_cache_write  s   zRedisCache.batch_cache_writer   c                    s$  ddl m} |  }t }| j|d}| j|d}zB|j||dI d H }	|d ur?||I d H }
|
dkr?|||I d H  t }|| }t	
| jjtj|dt  |||d |	W S  ty } z(t }|| }t	
| jjtj||dt  |||d	 td
t|| |d }~ww )Nr   r   r   rz   r   r   zasync_increment <- r   r   z^LiteLLM Redis Caching: async async_increment() - Got exception from REDIS %s, Writing value=%s)r   r   r   re   r   r   incrbyfloatr   r   rU   rW   r;   r   r   rg   r-   r&   rf   r   rR   rX   )rN   r{   r|   r   r   r   r   rj   Z	_used_ttlr   r   rk   r   r^   r+   r+   r,   async_increment  s^   


zRedisCache.async_incrementc                    s2   t dt| j  | | jI d H  g | _d S )Nz,flushing to redis....reached size of buffer )r
   r   rG   r   rN   r+   r+   r,   r     s   
zRedisCache.flush_cache_buffercached_responsec                 C   sF   |du r|S | d}zt|}W |S  ty"   t|}Y |S w )z[
        Common 'get_cache_logic' across sync + async redis client implementations
        Nutf-8)decoderq   loadsr&   astliteral_eval)rN   r   r+   r+   r,   _get_cache_logic  s   
zRedisCache._get_cache_logicc           	   
   K   s   zA| j |d}td|  t }| j|}t }|| }| jjtj|dt	  |||d td| d|  | j
|dW S  tyZ } ztd| W Y d }~d S d }~ww )	Nrz   zGet Redis Cache: key: zget_cache <- r   zGot Redis Cache: key: , cached_response r   z;litellm.caching.caching: get() - Got exception from REDIS: )r   r
   re   rC   r?   r;   r   r   rg   r-   r   r&   r   rR   )	rN   r{   r   rO   rj   r   rk   r   r^   r+   r+   r,   r}   "  s2   
zRedisCache.get_cacher   c                 C   s   | j j|dS )
        Wrapper to call `mget` on the redis client

        We use a wrapper so RedisCluster can override this method
        r   )rC   mget)rN   r   r+   r+   r,   _run_redis_mget_operation<  s   z$RedisCache._run_redis_mget_operationc                    s   |   }|j|dI dH S )r   r   N)r   r   )rN   r   r~   r+   r+   r,   _async_run_redis_mget_operationD  s   z*RedisCache._async_run_redis_mget_operationkey_listc              
   C   s  i }dd |D }z_g }|D ]}| j |pdd}|| qt }| j|d}t }	|	| }
| jjtj|
dt  ||	|d t	t
||}i }| D ]\}}t|tr\|d}| |}|||< qN|W S  ty } ztd	t|  |W  Y d
}~S d
}~ww )a  
        Use Redis for bulk read operations

        Args:
            key_list: List of keys to get from Redis
            parent_otel_span: Optional parent OpenTelemetry span

        Returns:
            dict: A dictionary mapping keys to their cached values
        c                 S      g | ]}|d ur|qS r   r+   .0r{   r+   r+   r,   
<listcomp>]      z.RedisCache.batch_get_cache.<locals>.<listcomp> rz   r   zbatch_get_cache <- r   r   z$Error occurred in batch get cache - N)r   r$   re   r   r;   r   r   rg   r-   dictziprp   r@   bytesr   r   r&   r   rR   rX   )rN   r   r   key_value_dict	_key_list_keysr   rj   r   rk   r   decoded_resultskvr^   r+   r+   r,   batch_get_cacheM  s@   




zRedisCache.batch_get_cachec                    s&  ddl m} |  }| j|d}t }zAtd|  ||I d H }td| d|  | j|d}t }	|	| }
t	| j
jtj|
dt  ||	|d	|id
 |W S  ty } z0t }	|	| }
t	| j
jtj|
|dt  ||	|d	|id tdt|  W Y d }~d S d }~ww )Nr   r   rz   zGet Async Redis Cache: key: zGot Async Redis Cache: key: r   r   zasync_get_cache <- r{   r   r   zAlitellm.caching.caching: async get() - Got exception from REDIS: )r   r   r   r   re   r
   r?   r   rU   rW   r;   r   r   rg   r-   r&   rf   rX   )rN   r{   r   rO   r   r   rj   r   responserk   r   r^   r+   r+   r,   async_get_cache  s\   

zRedisCache.async_get_cachec                    sR  i }t   }dd |D }z_g }|D ]}| j|d}|| q| j|dI dH }t   }	|	| }
t| jjtj	|
dt
  ||	|d tt||}i }| D ]\}}t|tra|d}| |}|||< qS|W S  ty } z/t   }	|	| }
t| jjtj	|
|dt
  ||	|d	 td
t|  |W  Y d}~S d}~ww )a[  
        Use Redis for bulk read operations

        Args:
            key_list: List of keys to get from Redis
            parent_otel_span: Optional parent OpenTelemetry span

        Returns:
            dict: A dictionary mapping keys to their cached values

        `.mget` does not support None keys. This will filter out None keys.
        c                 S   r   r   r+   r   r+   r+   r,   r     r   z4RedisCache.async_batch_get_cache.<locals>.<listcomp>rz   r   Nzasync_batch_get_cache <- r   r   r   z*Error occurred in async batch get cache - )re   r   r$   r   rU   rW   r;   r   r   rg   r-   r   r   rp   r@   r   r   r   r&   rf   r   rR   rX   )rN   r   r   r   rj   r   r   r   r   rk   r   r   r   r   r^   r+   r+   r,   async_batch_get_cache  s`   





z RedisCache.async_batch_get_cachec              
   C   s   t d t }z'| j }t d|  t }|| }| jjtj|dt  ||d |W S  t	y^ } z#t }|| }| jj
tj||dt  d tdt|  |d}~ww )zD
        Tests if the sync redis client is correctly setup.
        zPinging Sync Redis CachezRedis Cache PING: zsync_ping <- r   r`   7LiteLLM Redis Cache PING: - Got exception from REDIS : N)r
   re   rC   rT   r;   r   r   rg   r-   r&   Zservice_failure_hookr   rR   rX   )rN   rj   r   rk   r   r^   r+   r+   r,   	sync_ping  s<   


zRedisCache.sync_pingc                    s   |   }t }td z#| I d H }t }|| }t| jjtj	|dt
  d |W S  tyb } z&t }|| }t| jjtj	||dt
  d tdt|  |d }~ww )NzPinging Async Redis Cachezasync_ping <- ra   rb   rc   r`   r   )r   re   r
   rT   rU   rW   r;   r   r   rg   r-   r&   rf   r   rR   rX   )rN   r   rj   r   rk   r   r^   r+   r+   r,   rT     sB   

zRedisCache.pingc                    s   |   }|j| I d H  d S r   r   delete)rN   r   r   r+   r+   r,   delete_cache_keysB  s   zRedisCache.delete_cache_keysc                 C      | j  }|S r   )rC   client_list)rN   r   r+   r+   r,   r   H     
zRedisCache.client_listc                 C   r   r   )rC   rH   )rN   rH   r+   r+   r,   rH   L  r  zRedisCache.infoc                 C      | j   d S r   rC   flushallr   r+   r+   r,   flush_cacheP     zRedisCache.flush_cachec                 C   r  r   r  r   r+   r+   r,   r  S  r  zRedisCache.flushallc              
      sZ   | j jddI d H  z| j  W d S  ty, } ztd| W Y d }~d S d }~ww )NT)Zinuse_connectionsz#Error closing sync Redis client: %s)rF   
disconnectrC   closer&   r   rY   )rN   r^   r+   r+   r,   r  V  s   zRedisCache.disconnectc              
      s   z+ddl m} |jdi | j}| I dH }| I dH  |r'dddW S dddW S  tyU } ztdt	|  dd	t	| t	|d
W  Y d}~S d}~ww )aa  
        Test the Redis connection by creating a new client and pinging it.
        
        This creates a fresh connection without using cached clients or connection pools
        to ensure the credentials are actually valid.
        
        Returns:
            dict: {"status": "success" | "failed", "message": str, "error": Optional[str]}
        r   Nsuccessz Redis connection test successful)statusmessagefailedzRedis ping returned FalsezRedis connection test failed: zRedis connection failed: )r
  r  rR   r+   )
r   rU   r   rE   rT   acloser&   r   rR   rX   )rN   Zredis_asyncrC   Zping_resultr^   r+   r+   r,   test_connection]  s*   
zRedisCache.test_connectionc                    s   |   }||I d H S r   r   )rN   r{   r   r+   r+   r,   async_delete_cache  s   zRedisCache.async_delete_cachec                 C   s   | j | d S r   )rC   r   r   r+   r+   r,   delete_cache  s   zRedisCache.delete_cacheincrement_listc              	      s   |D ]6}| j |d d}td| d|d  d|d   |||d  |d dur9t|d d	}||| q| I dH }td
|  dd |D S )z1Helper function for pipeline increment operationsr{   rz   z+Increment ASYNC Redis Cache PIPELINE: key: r   Zincrement_valuer   r   Nr   z/Increment ASYNC Redis Cache PIPELINE: results: c                 S   s   g | ]	}t |tr|qS r+   )r@   r   r   rr+   r+   r,   r     s    z9RedisCache._pipeline_increment_helper.<locals>.<listcomp>)r   r
   r   r   r   r   r   rY   )rN   r   r  Zincrement_opr   r   r   r+   r+   r,   _pipeline_increment_helper  s    z%RedisCache._pipeline_increment_helperc                    s<  t |dkr	dS ddlm} |  }t }td|  zJ|jdd4 I dH }| ||I dH }W d  I dH  n1 I dH sCw   Y  t }|| }	t	| j
jtj|	dt  ||t|d |W S  ty }
 z)t }|| }	t	| j
jtj|	|
dt  ||t|d	 td
t|
 |
d}
~
ww )a  
        Use Redis Pipelines for bulk increment operations
        Args:
            increment_list: List of RedisPipelineIncrementOperation dicts containing:
                - key: str
                - increment_value: float
                - ttl_seconds: int
        r   Nr   z6Increment Async Redis Cache Pipeline: increment list: Fr   zasync_increment_pipeline <- r   r   zOLiteLLM Redis Caching: async increment_pipeline() - Got exception from REDIS %s)r   r   r   r   re   r
   r   r  rU   rW   r;   r   r   rg   r-   r   r&   rf   r   rR   rX   )rN   r  rO   r   r   rj   r   r   rk   r   r^   r+   r+   r,   async_increment_pipeline  s^   (


z#RedisCache.async_increment_pipelinec              
      sf   z|   }||I dH }|dkrW dS |W S  ty2 } ztd|  W Y d}~dS d}~ww )a  
        Get the remaining TTL of a key in Redis

        Args:
            key (str): The key to get TTL for

        Returns:
            Optional[int]: The remaining TTL in seconds, or None if key doesn't exist

        Redis ref: https://redis.io/docs/latest/commands/ttl/
        Nr   zRedis TTL Error: )r   r   r&   r   rY   )rN   r{   r   r   r^   r+   r+   r,   async_get_ttl  s   zRedisCache.async_get_ttlvaluesc                    s   |   }t }z(|j|g|R  I dH }t }|| }	t| jjtj|	dt	  d |W S  t
yc }
 z&t }|| }	t| jjtj|	|
dt	  d tdt|
  |
d}
~
ww )aR  
        Append one or multiple values to a list stored at key

        Args:
            key: The Redis key of the list
            values: One or more values to append to the list
            parent_otel_span: Optional parent OpenTelemetry span

        Returns:
            int: The length of the list after the push operation
        Nzasync_rpush <- r   r`   z8LiteLLM Redis Cache RPUSH: - Got exception from REDIS : )r   re   rpushrU   rW   r;   r   r   rg   r-   r&   rf   r   rR   rX   )rN   r{   r  r   rO   r   rj   r   rk   r   r^   r+   r+   r,   async_rpush  s@   

zRedisCache.async_rpush
rpush_listc                    sP   |D ]}|j |d g|d R   q| I dH }|D ]	}t|tr%|q|S )z-Helper function for pipeline rpush operationsr{   r  N)r  r   r@   r&   )rN   r   r  Zrpush_opr   r  r+   r+   r,   _pipeline_rpush_helper/  s   
z!RedisCache._pipeline_rpush_helperc           	           t |dkr	g S |  }t }zE|jdd4 I dH }| ||I dH }W d  I dH  n1 I dH s6w   Y  t }|| }t| jjt	j
|dt  d |W S  ty } z$t }|| }t| jjt	j
||dt  d tdt| |d}~ww )	a!  
        Use Redis Pipelines for bulk RPUSH operations

        Args:
            rpush_list: List of RedisPipelineRpushOperation dicts containing:
                - key: str
                - values: List[Any]

        Returns:
            List[int]: List lengths after each push
        r   Fr   Nzasync_rpush_pipeline <- r   r`   zKLiteLLM Redis Caching: async_rpush_pipeline() - Got exception from REDIS %s)r   r   re   r   r  rU   rW   r;   r   r   rg   r-   r&   rf   r   rR   rX   )	rN   r  r   rj   r   r   rk   r   r^   r+   r+   r,   async_rpush_pipeline>  J   (

zRedisCache.async_rpush_pipelinec                    sL   g }t |D ]}|| | I d H }|D ]}|d ur"|| qq|S r   )r!   lpopr   r$   )rN   r   r{   r   r   r)   r   r  r+   r+   r,   *handle_lpop_count_for_older_redis_versionst  s   

z5RedisCache.handle_lpop_count_for_older_redis_versionsc                    s  |   }t }td| d|  z|  }|d urK|dk rK|jdd4 I d H }| |||I d H }	W d   I d H  n1 I d H sEw   Y  n	|||I d H }	t }
|
| }t| j	j
tj|dt  d t|	trz|	dW W S  ty   |	 Y W S w t|	trtd	d
 |	D rz	dd |	D W W S  ty   |	 Y W S w |	W S  ty } z&t }
|
| }t| j	jtj||dt  d tdt|  |d }~ww )NzLPOP from Redis list: key: z	, count:    Fr   zasync_lpop <- r   r   c                 s   s    | ]}t |tV  qd S r   )r@   r   r   itemr+   r+   r,   	<genexpr>  s    

z(RedisCache.async_lpop.<locals>.<genexpr>c                 S   s   g | ]}| d qS )r   )r   r"  r+   r+   r,   r     s    z)RedisCache.async_lpop.<locals>.<listcomp>r`   z7LiteLLM Redis Cache LPOP: - Got exception from REDIS : )r   re   r
   r   r   r   r  rU   rW   r;   r   r   rg   r-   r@   r   r   r&   listallrf   r   rR   rX   )rN   r{   r   r   rO   r   rj   r   r   r   rk   r   r^   r+   r+   r,   
async_lpop  sn   (

	


zRedisCache.async_lpop	lpop_listc              	      s`  |   }|dkr |D ]}||d |d  q| I dH }nJg }|D ]}|d p+d}|| t|D ]	}||d  q5q$| I dH }	g }d}
|D ]}dd |	|
|
|  D }||rb|nd |
|7 }
qM|D ]	}t|tru|qlg }|D ]3}|du r|d qzt|trz|d	d |D pd W qz ty   || Y qzw |d qz|S )
zHelper function for pipeline lpop operations.

        For Redis >= 7, queues one LPOP(key, count) per operation.
        For Redis < 7, queues `count` individual LPOP(key) commands per operation.
        r!  r{   r   Nr   r   c                 S   r   r   r+   r  r+   r+   r,   r     s    z4RedisCache._pipeline_lpop_helper.<locals>.<listcomp>c                 S   s,   g | ]}|d urt |tr|dn|qS )Nr   )r@   r   r   r"  r+   r+   r,   r     s
    )r   r  r   r$   r!   r@   r&   r%  )rN   r   r(  r   Zlpop_opZraw_resultscountsr   r)   Zflat_resultsoffsetZkey_resultsr  r   r+   r+   r,   _pipeline_lpop_helper  sV   




z RedisCache._pipeline_lpop_helperc           	         r  )	aC  
        Use Redis Pipelines for bulk LPOP operations

        Args:
            lpop_list: List of RedisPipelineLpopOperation dicts containing:
                - key: str
                - count: Optional[int]

        Returns:
            List[Optional[List[str]]]: Decoded results per key, None if key was empty
        r   Fr   Nzasync_lpop_pipeline <- r   r`   zJLiteLLM Redis Caching: async_lpop_pipeline() - Got exception from REDIS %s)r   r   re   r   r+  rU   rW   r;   r   r   rg   r-   r&   rf   r   rR   rX   )	rN   r(  r   rj   r   r   rk   r   r^   r+   r+   r,   async_lpop_pipeline  r  zRedisCache.async_lpop_pipeline)NNNr/   NNr0   r   )r/   )NN)I__name__
__module____qualname__r   rM   rX   r   r   rL   rI   r&   r[   r]   rw   r   r~   r   r   r   r   r   r   r%  r   r   r   r   r   cluster_pipeliner   r   r   r   r   r   r   r   r   r   r}   r   r   r   r   r   r   boolr   rT   r   r   rH   r  r  r  r  r  r  r   r  r  r  r  r   r  r  r   r   r'  r   r+  r,  __classcell__r+   r+   rP   r,   r.   \   sp   A

	
9.O


;

H
;
5
5
G$$(



?
3

6

B
?r.   )r   )5__doc__r   rU   rs   r   rq   re   datetimer   typingr   r   r   r   r   r   r	   rJ   Zlitellm._loggingr
   r   Zlitellm.constantsr   Z'litellm.litellm_core_utils.core_helpersr   Z,litellm.litellm_core_utils.coroutine_checkerr   Zlitellm.types.cachingr   r   r   Zlitellm.types.servicesr   Z
base_cacher   Zopentelemetry.tracer   _Spanr   r   r   Zredis.asyncio.clientr   Zredis.asyncio.clusterr   r   r0  r~   r   rM   rX   r-   r.   r+   r+   r+   r,   <module>   sD    
$'