o
    ưi`=                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZmZm	Z	 d dl
mZmZmZmZmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlT d dl m!Z! erqd dl"m#Z# neZ#G dd deeZ$dS )    N)uuid)datetime	timedeltatimezone)TYPE_CHECKINGAnyDictListOptionalTuple)quote)verbose_logger)LITELLM_ASYNCIO_QUEUE_MAXSIZE)AdditionalLoggingUtils)GCSBucketBase)CommonProxyErrors)IntegrationHealthCheckStatus)*)StandardLoggingPayload)
VertexBasec                	       s  e Zd Zd<dee ddf fddZdd Zdd	 Zdee	 fd
dZ
dededefddZdeeef defddZdedefddZdee	 deeee	 f fddZdee	 defddZdee	 dedeeef fddZdee	 ddfddZde	ddfd d!Zd"d# Zded$ed%edefd&d'Zd(ed)ee d*ee dee fd+d,Zd-ed.edefd/d0Zd-edefd1d2Zd3edefd4d5Zd6d7 Z d8d9 Z!de"fd:d;Z#  Z$S )=GCSBucketLoggerNbucket_namereturnc                    s   ddl m} t j|d ttdt| _ttdt	| _
tdtt  dk| _t | _t j| j| j| j
d tjtd	| _t|   t|  |d
ur_tdtjj d S )Nr   premium_user)r   ZGCS_BATCH_SIZEZGCS_FLUSH_INTERVALZGCS_USE_BATCHED_LOGGINGtrue)
flush_lock
batch_sizeflush_interval)maxsizeTCGCS Bucket logging is a premium feature. Please upgrade to use it. )litellm.proxy.proxy_serverr   super__init__intosgetenvZGCS_DEFAULT_BATCH_SIZEr   Z"GCS_DEFAULT_FLUSH_INTERVAL_SECONDSr   strZGCS_DEFAULT_USE_BATCHED_LOGGINGloweruse_batched_loggingasyncioLockr   Queuer   	log_queuecreate_taskperiodic_flushr   
ValueErrorr   not_premium_uservalue)selfr   r   	__class__ a/home/app/Keep/.python/lib/python3.10/site-packages/litellm/integrations/gcs_bucket/gcs_bucket.pyr#      s0   


zGCSBucketLogger.__init__c              
      s   ddl m} |durtdtjj z2td|| |dd }|d u r*td| j	
 r6|  I d H  | j	t|||dI d H  W d S  tyc } ztd	t|  W Y d }~d S d }~ww )
Nr   r   Tr    zHGCS Logger: async_log_success_event logging kwargs: %s, response_obj: %sstandard_logging_object+standard_logging_object not found in kwargspayloadkwargsresponse_objGCS Bucket logging error: )r!   r   r0   r   r1   r2   r   debuggetr-   fullflush_queueputGCSLogQueueItem	Exception	exceptionr'   )r3   r<   r=   
start_timeend_timer   logging_payloader6   r6   r7   async_log_success_event9   s8   
"z'GCSBucketLogger.async_log_success_eventc              
      s   z2t d|| |dd }|d u rtd| j r#|  I d H  | jt|||dI d H  W d S  t	yP } zt 
dt|  W Y d }~d S d }~ww )NzHGCS Logger: async_log_failure_event logging kwargs: %s, response_obj: %sr8   r9   r:   r>   )r   r?   r@   r0   r-   rA   rB   rC   rD   rE   rF   r'   )r3   r<   r=   rG   rH   rI   rJ   r6   r6   r7   async_log_failure_eventW   s.   
"z'GCSBucketLogger.async_log_failure_eventc                 C   sP   g }t || jk r&z
|| j  W n tjy   Y |S w t || jk s	|S )a  
        Drain items from the queue (non-blocking), respecting batch_size limit.
        
        This prevents unbounded queue growth when processing is slower than log accumulation.
        
        Returns:
            List of items to process, up to batch_size items
        )lenr   appendr-   
get_nowaitr*   
QueueEmpty)r3   items_to_processr6   r6   r7   _drain_queue_batchp   s   	z"GCSBucketLogger._drain_queue_batchdate_strbatch_idc                 C   s   | d| dS )zm
        Generate object name for a batched log file.
        Format: {date}/batch-{batch_id}.ndjson
        z/batch-z.ndjsonr6   )r3   rS   rT   r6   r6   r7   _generate_batch_object_name   s   z+GCSBucketLogger._generate_batch_object_namer<   c                 C   sJ   | ddpi }| ddp| jpd}| ddp| jpd}| d| S )a  
        Extract a synchronous grouping key from kwargs to group items by GCS config.
        This allows us to batch items with the same bucket/credentials together.
        
        Returns a string key that uniquely identifies the GCS config combination.
        This key may contain sensitive information (bucket names, paths) - use _sanitize_config_key()
        for logging purposes.
         standard_callback_dynamic_paramsNZgcs_bucket_namedefaultZgcs_path_service_account|)r@   ZBUCKET_NAMEZpath_service_account_json)r3   r<   rV   r   path_service_accountr6   r6   r7   _get_config_key   s   	zGCSBucketLogger._get_config_key
config_keyc                 C   s&   t |d}d| dd  S )z
        Create a sanitized version of the config key for logging.
        Uses a hash to avoid exposing sensitive bucket names or service account paths.
        
        Returns a short hash prefix for safe logging.
        zutf-8zconfig-N   )hashlibsha256encode	hexdigest)r3   r[   Zhash_objr6   r6   r7   _sanitize_config_key   s   z$GCSBucketLogger._sanitize_config_keyitemsc                 C   s>   i }|D ]}|  |d }||vrg ||< || | q|S )z
        Group items by their GCS config (bucket + credentials).
        This ensures items with different configs are processed separately.
        
        Returns a dict mapping config_key -> list of items with that config.
        r<   )rZ   rN   )r3   rb   groupeditemr[   r6   r6   r7   _group_items_by_config   s   z&GCSBucketLogger._group_items_by_configc                 C   s:   g }|D ]}|d }t j|tdd}|| qd|S )z
        Combine multiple log payloads into newline-delimited JSON (NDJSON) format.
        Each line is a valid JSON object representing one log entry.
        r;   F)rW   ensure_ascii
)jsondumpsr'   rN   join)r3   rb   linesrd   rI   Z	json_liner6   r6   r7   _combine_payloads_to_ndjson   s   
z+GCSBucketLogger._combine_payloads_to_ndjsonc              
      s  |sdS |d d }zY|  |I dH }| j|d |d dI dH }|d }| ttj}tt d	  d
t	
 jdd  }| ||}	| |}
| j|||	|
dI dH  t|}d}||fW S  ty } zd}t|}tdt|  ||fW  Y d}~S d}~ww )z
        Send a batch of items that share the same GCS config.
        
        Returns:
            (success_count, error_count)
        )r   r   r   r<   Nvertex_instancerY   rm   Zservice_account_jsonr   i  -r\   headersr   object_namerI   z6GCS Bucket error logging batch payload to GCS bucket: )get_gcs_logging_configconstruct_request_headers_get_object_date_from_datetimer   nowr   utcr$   timer   uuid4hexrU   rl   _log_json_data_on_gcsrM   rE   r   rF   r'   )r3   rb   r[   Zfirst_kwargsgcs_logging_configrq   r   current_daterT   rr   Zcombined_payloadZsuccess_countZerror_countrJ   r6   r6   r7   _send_grouped_batch   sF   
(

z#GCSBucketLogger._send_grouped_batchc                    s    |D ]
}|  |I dH  qdS )z
        Send each log individually as separate GCS objects (legacy behavior).
        This is used when GCS_USE_BATCHED_LOGGING is disabled.
        N)_send_single_log_item)r3   rb   rd   r6   r6   r7   _send_individual_logs   s   z%GCSBucketLogger._send_individual_logsrd   c              
      s   z;|  |d I dH }| j|d |d dI dH }|d }| j|d |d |d d	}| j||||d d
I dH  W dS  tyY } ztdt|  W Y d}~dS d}~ww )zH
        Send a single log item to GCS as an individual object.
        r<   Nrm   rY   rn   r   r;   r=   )r<   rI   r=   rp   z;GCS Bucket error logging individual payload to GCS bucket: )rs   rt   _get_object_namer{   rE   r   rF   r'   )r3   rd   r|   rq   r   rr   rJ   r6   r6   r7   r      s6   
z%GCSBucketLogger._send_single_log_itemc                    s^   |   }|s	dS | jr%| |}| D ]\}}| ||I dH  qdS | |I dH  dS )aQ  
        Process queued logs - sends logs to GCS Bucket.

        If `GCS_USE_BATCHED_LOGGING` is enabled (default), batches multiple log payloads
        into single GCS object uploads (NDJSON format), dramatically reducing API calls.

        If disabled, sends each log individually as separate GCS objects (legacy behavior).
        N)rR   r)   re   rb   r~   r   )r3   rQ   Zgrouped_itemsr[   Zgroup_itemsr6   r6   r7   async_send_batch  s   	
z GCSBucketLogger.async_send_batchrI   r=   c                 C   sz   |  ttj}|dddur| j|d}n| j||ddd}|ddp*i }|ddp2i }d	|v r;|d	 }|S )
zD
        Get the object name to use for the current payload
        Z	error_strN)request_date_strid r   response_idZlitellm_paramsmetadataZ
gcs_log_id)ru   r   rv   r   rw   r@   _generate_failure_object_name_generate_success_object_name)r3   r<   rI   r=   r}   rr   Z_litellm_params	_metadatar6   r6   r7   r   *  s   
z GCSBucketLogger._get_object_name
request_idstart_time_utcend_time_utcc                    s   |du r	t d||tdd |tdd g}d}|D ]L}z+| j|d}| j||d}t|dd}| |I dH }	|	durHt|	}
|
W   S W q tyh } zt	
d	| d
t|  W Y d}~qd}~ww dS )z
        Get the request and response payload for a given `request_id`
        Tries current day, next day, and previous day until it finds the payload
        Nz@start_time_utc is required for getting a payload from GCS Bucket   )days)datetime_objr   r   )safez!Failed to fetch payload for date z: )r0   r   ru   r   r   Zdownload_gcs_objectrh   loadsrE   r   r?   r'   )r3   r   r   r   Zdates_to_tryrS   daterr   Zencoded_object_nameresponseZloaded_responserJ   r6   r6   r7   get_request_response_payloadC  s>   


z,GCSBucketLogger.get_request_response_payloadr   r   c                 C   s   | d| S )N/r6   )r3   r   r   r6   r6   r7   r   m  s   z-GCSBucketLogger._generate_success_object_namec                 C   s   | dt  j S )Nz	/failure-)r   ry   rz   )r3   r   r6   r6   r7   r   t  s   z-GCSBucketLogger._generate_failure_object_namer   c                 C   s
   | dS )Nz%Y-%m-%d)strftime)r3   r   r6   r6   r7   ru   z  s   
z.GCSBucketLogger._get_object_date_from_datetimec                    s   |   I dH  t | _dS )zB
        Override flush_queue to work with asyncio.Queue.
        N)r   rx   Zlast_flush_timer3   r6   r6   r7   rB   }  s   zGCSBucketLogger.flush_queuec                    s:   	 t | jI dH  td| j d |  I dH  q)zE
        Override periodic_flush to work with asyncio.Queue.
        TNz GCS Bucket periodic flush after z seconds)r*   sleepr   r   r?   rB   r   r6   r6   r7   r/     s   zGCSBucketLogger.periodic_flushc                    s
   t d)Nz(GCS Bucket does not support health check)NotImplementedErrorr   r6   r6   r7   async_health_check  s   z"GCSBucketLogger.async_health_check)N)%__name__
__module____qualname__r
   r'   r#   rK   rL   r	   rD   rR   rU   r   r   rZ   ra   re   rl   r   r$   r~   r   r   r   r   r   r   dictr   r   r   ru   rB   r/   r   r   __classcell__r6   r6   r4   r7   r      s`    "
"/ 

*

r   )%r*   r]   rh   r%   rx   Zlitellm._uuidr   r   r   r   typingr   r   r   r	   r
   r   urllib.parser   Zlitellm._loggingr   Zlitellm.constantsr   Z-litellm.integrations.additional_logging_utilsr   Z/litellm.integrations.gcs_bucket.gcs_bucket_baser   Zlitellm.proxy._typesr   Z,litellm.types.integrations.base_health_checkr   Z%litellm.types.integrations.gcs_bucketZlitellm.types.utilsr   Z&litellm.llms.vertex_ai.vertex_llm_baser   r   r6   r6   r6   r7   <module>   s*     