o
    ưip                     @   s   d Z ddlZddlZddlZddlZddlmZmZmZm	Z	m
Z
mZ ddlmZ er1ddlmZ neZddlZddlmZ ddlmZ ddlmZmZ G d	d
 d
eZdS )z
BETA

This is the PubSub logger for GCS PubSub, this sends LiteLLM SpendLogs Payloads to GCS PubSub.

Users can use this instead of sending their SpendLogs to their Postgres database.
    N)TYPE_CHECKINGAnyDictListOptionalUnion)StandardLoggingPayload)SpendLogsPayload)verbose_logger)CustomBatchLogger)get_async_httpx_clienthttpxSpecialProviderc                       s   e Zd Z			ddee dee dee f fddZdeeef fdd	Zd
d Zdd Z	de
eef deeeef  fddZ  ZS )GcsPubSubLoggerN
project_idtopic_idcredentials_pathc                    s   ddl m} |  ttjd| _|ptd| _|ptd| _	|p&td| _
| jr.| j	s2tdt | _t jd
i |d| ji t|   g | _d	S )a  
        Initialize Google Cloud Pub/Sub publisher

        Args:
            project_id (str): Google Cloud project ID
            topic_id (str): Pub/Sub topic ID
            credentials_path (str, optional): Path to Google Cloud credentials JSON file
        r   _premium_user_check)Zllm_providerZGCS_PUBSUB_PROJECT_IDZGCS_PUBSUB_TOPIC_IDZGCS_PATH_SERVICE_ACCOUNTz-Both project_id and topic_id must be provided
flush_lockN )litellm.proxy.utilsr   r   r   ZLoggingCallbackasync_httpx_clientosgetenvr   r   path_service_account_json
ValueErrorasyncioLockr   super__init__create_taskZperiodic_flush	log_queue)selfr   r   r   kwargsr   	__class__r   ^/home/app/Keep/.python/lib/python3.10/site-packages/litellm/integrations/gcs_pubsub/pub_sub.pyr       s    

zGcsPubSubLogger.__init__returnc                    sb   ddl m} |j| j| jddI dH \}}|jd|| j|dddddd	\}}d| d	d
}|S )z4Construct authorization headers using Vertex AI authr   )vertex_chat_completionZ	vertex_ai)credentialsr   custom_llm_providerNzpub-sub)	modelauth_headerZvertex_credentialsvertex_projectZvertex_locationZgemini_api_keystreamr*   Zapi_basezBearer zapplication/json)AuthorizationzContent-Type)litellmr(   Z_ensure_access_token_asyncr   r   Z_get_token_and_url)r"   r(   Z_auth_headerr-   r,   _headersr   r   r&   construct_request_headersE   s2   

z)GcsPubSubLogger.construct_request_headersc           
   
      s   ddl m} ddlm} |  z;td| |dd}tjdu r1|||||d}| j	
| n| j	
| t| j	| jkrI|  I dH  W dS W dS  tym }	 ztd	t|	 d
t   W Y d}	~	dS d}	~	ww )a  
        Async Log success events to GCS PubSub Topic

        - Creates a SpendLogsPayload
        - Adds to batch queue
        - Flushes based on CustomBatchLogger settings

        Raises:
            Raises a NON Blocking verbose_logger.exception if an error occurs
        r   )get_logging_payloadr   z6PubSub: Logging - Enters logging function for model %sZstandard_logging_objectNT)r#   response_obj
start_timeend_timezPubSub Layer Error - 
)Z1litellm.proxy.spend_tracking.spend_tracking_utilsr4   r   r   r
   debuggetr0   Zgcs_pub_sub_use_v1r!   appendlenZ
batch_sizeasync_send_batch	Exception	exceptionstr	traceback
format_exc)
r"   r#   r5   r6   r7   r4   r   Zstandard_logging_payloadZspend_logs_payloader   r   r&   async_log_success_eventd   s8   
z'GcsPubSubLogger.async_log_success_eventc              
      s   zZz(| j sW W | j   dS tdt| j  d | j D ]
}| |I dH  qW n! tyK } ztdt| dt	
   W Y d}~nd}~ww W | j   dS W | j   dS | j   w )z8
        Sends the batch of messages to Pub/Sub
        NzPubSub - about to flush z eventszPubSub Error sending batch - r8   )r!   clearr
   r9   r<   publish_messager>   r?   r@   rA   rB   )r"   messagerC   r   r   r&   r=      s*   
z GcsPubSubLogger.async_send_batchrG   c           
   
      s  ze|   I dH }t|tr|}ntj|td}ddl}||dd}dd|igi}d| j	 d| j
 d	}| jj|||d
I dH }|jdvr[tdt|j td|j td|j | W S  ty }	 ztdt|	 W Y d}	~	dS d}	~	ww )z
        Publish message to Google Cloud Pub/Sub using REST API

        Args:
            message: Message to publish (dict or string)

        Returns:
            dict: Published message response
        N)defaultr   zutf-8messagesdataz*https://pubsub.googleapis.com/v1/projects/z/topics/z:publish)urlr2   json)      zPub/Sub publish error: %szFailed to publish message: zPub/Sub response: %s)r3   
isinstancer@   rL   dumpsbase64	b64encodeencodedecoder   r   r   poststatus_coder
   errortextr>   r9   )
r"   rG   r2   Zmessage_datarQ   Zencoded_messagerequest_bodyrK   responserC   r   r   r&   rF      s2   


zGcsPubSubLogger.publish_message)NNN)__name__
__module____qualname__r   r@   r   r   r3   rD   r=   r   r	   r   r   rF   __classcell__r   r   r$   r&   r      s&    %.
r   )__doc__r   rL   r   rA   typingr   r   r   r   r   r   Zlitellm.types.utilsr   Zlitellm.proxy._typesr	   r0   Zlitellm._loggingr
   Z(litellm.integrations.custom_batch_loggerr   Z&litellm.llms.custom_httpx.http_handlerr   r   r   r   r   r   r&   <module>   s     