o
    ưi8                     @   s  d dl m Z  d dlmZmZmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZmZmZ d dlmZ d dlmZ d d	lmZmZmZmZmZm Z  zd d
l!m"Z" W n e#yi   dZ"Y nw errd dl$m%Z& e Z'G dd de(Z)G dd deZ*dd Z+dS )    )datetime)	TYPE_CHECKINGAnyDictListLiteralOptionalTypeUnionget_args)verbose_logger)	DualCache)CustomLogger)DynamicGuardrailParamsGuardrailEventHooksLitellmParamsMode)AllMessageValues)GuardrailConfigModel)	CallTypesGenericGuardrailAPIInputsGuardrailStatusGuardrailTracingDetailLLMResponseTypes#StandardLoggingGuardrailInformation)HTTPExceptionN)Loggingc                       sT   e Zd ZdZ		d
dededeeef dee deeeef  f
 fdd	Z  Z	S )ModifyResponseExceptiona  
    Exception raised when a guardrail wants to modify the response.

    This exception carries the synthetic response that should be returned
    to the user instead of calling the LLM or instead of the LLM's response.
    It should be caught by the proxy and returned with a 200 status code.

    This is a base exception that all guardrails can use to replace responses,
    allowing violation messages to be returned as successful responses
    rather than errors.
    Nmessagemodelrequest_dataguardrail_namedetection_infoc                    s2   || _ || _|| _|| _|pi | _t | dS )a  
        Initialize the modify response exception.

        Args:
            message: The violation message to return to the user
            model: The model that was being called
            request_data: The original request data
            guardrail_name: Name of the guardrail that raised this exception
            detection_info: Additional detection metadata (scores, rules, etc.)
        N)r   r   r    r!   r"   super__init__)selfr   r   r    r!   r"   	__class__ \/home/app/Keep/.python/lib/python3.10/site-packages/litellm/integrations/custom_guardrail.pyr$   9   s   
z ModifyResponseException.__init__)NN)
__name__
__module____qualname____doc__strr   r   r   r$   __classcell__r(   r(   r&   r)   r   ,   s    
r   c                       s  e Zd Z										d_dee deee  deeeee ef  de	de	de	d	ee d
ee
 dee dee f fddZ	d`dedeeeef  defddZ	d`dedeeef deeeef  ddfddZedeed  fddZdeeeee ef  dee ddfddZdedee	 fdd Zd!ede	fd"d#Zdedeee eeeef  f fd$d%Zd&eee eeeef  f de	fd'd(Zd)eeef d*ee dee fd+d,Zded-ed*ee dee fd.d/Zd0ede	fd1d2Zd0ede	fd3d4Zdedefd5d6Z de	fd7d8Z!							dad9ee"eeee f ded:e#d;ee$ d<ee$ d=ee$ d>eeee
f  d?ee d0ee d@ee% ddfdAdBZ&	d`dCe'dedDe(dE dFedG de'f
dHdIZ)					dbd-ee ded;ee$ d<ee$ d=ee$ d0ee dJee fdKdLZ*edMe"de	fdNdOZ+				dcdMe"ded;ee$ d<ee$ d=ee$ d0ee fdPdQZ,dJed-ede	fdRdSZ-dTedUedVe
dWe
def
dXdYZ.dZe/ddfd[d\Z0	d`d*edee deee1  fd]d^Z2  Z3S )dCustomGuardrailNFr!   supported_event_hooks
event_hook
default_onmask_request_contentmask_response_contentviolation_message_templateend_session_after_n_failson_violationrealtime_violation_messagec                    sb   || _ || _|| _|| _|| _|| _|| _|| _|	| _|
| _	|r&| 
|| t jdi | dS )a[  
        Initialize the CustomGuardrail class

        Args:
            guardrail_name: The name of the guardrail. This is the name used in your requests.
            supported_event_hooks: The event hooks that the guardrail supports
            event_hook: The event hook to run the guardrail on
            default_on: If True, the guardrail will be run by default on all requests
            mask_request_content: If True, the guardrail will mask the request content
            mask_response_content: If True, the guardrail will mask the response content
            end_session_after_n_fails: For /v1/realtime sessions, end the session after this many violations
            on_violation: For /v1/realtime sessions, 'warn' or 'end_session'
            realtime_violation_message: Message the bot speaks aloud when a /v1/realtime guardrail fires
        Nr(   )r!   r1   r2   r3   r4   r5   r6   r7   r8   r9   _validate_event_hookr#   r$   )r%   r!   r1   r2   r3   r4   r5   r6   r7   r8   r9   kwargsr&   r(   r)   r$   T   s   zCustomGuardrail.__init__defaultcontextreturnc              
   C   sn   | j s|S d|i}|r|| z
| j jdi |W S  ty6 } ztd| j| |W  Y d}~S d}~ww )z<Return a custom violation message if template is configured.Zdefault_messagez@Failed to format violation message template for guardrail %s: %sNr(   )r6   updateformat	Exceptionr   warningr!   )r%   r<   r=   Zformat_contexter(   r(   r)   render_violation_message   s    
z(CustomGuardrail.render_violation_messageviolation_messager    r"   c                 C   s    | dd}t|||| j|d)a  
        Raise a passthrough exception for guardrail violations.

        This helper method should be used by guardrails when they detect a violation
        in passthrough mode.

        The exception will be caught by the proxy endpoints and converted to a 200 response
        with the violation message, preventing the LLM call from being made (pre_call/during_call)
        or replacing the LLM response (post_call).

        Args:
            violation_message: The formatted violation message to return to the user
            request_data: The original request data dictionary
            detection_info: Optional dictionary with detection metadata (scores, rules, etc.)

        Raises:
            ModifyResponseException: Always raises this exception to short-circuit
                                     the LLM call and return the violation message

        Example:
            if violation_detected and self.on_flagged_action == "passthrough":
                message = self._format_violation_message(detection_info)
                self.raise_passthrough_exception(
                    violation_message=message,
                    request_data=data,
                    detection_info=detection_info
                )
        r   unknown)r   r   r    r!   r"   )getr   r!   )r%   rE   r    r"   r   r(   r(   r)   raise_passthrough_exception   s   "z+CustomGuardrail.raise_passthrough_exceptionr   c                   C   s   dS )zx
        Returns the config model for the guardrail

        This is used to render the config model in the UI.
        Nr(   r(   r(   r(   r)   get_config_model   s   z CustomGuardrail.get_config_modelc                 C   s  dt tt tt f dtt dd fdd}|d u rd S t|tr$t|}t|tr0||| d S t|trmg }|j D ]}t|trI|	| q<|
| q<||| |jrkt|jtr`|jn|jg}||| d S d S t|tr||vrtd| d| d S d S )Nr2   r1   r>   c                 S   s<   | D ]}t |trt|}||vrtd| d| qd S )NEvent hook % is not in the supported event hooks )
isinstancer.   r   
ValueError)r2   r1   hookr(   r(   r)   5_validate_event_hook_list_is_in_supported_event_hooks   s   
zcCustomGuardrail._validate_event_hook.<locals>._validate_event_hook_list_is_in_supported_event_hooksrJ   rK   )r
   r   r   r.   rL   listr   tagsvaluesextendappendr<   rM   )r%   r2   r1   rO   Ztag_values_flatvdefault_listr(   r(   r)   r:      sP   






	z$CustomGuardrail._validate_event_hookdatac                 C   s:   d|v r|d S | dp| di }d|v r|d S dS )zI
        Returns True if the global guardrail should be disabled
        disable_global_guardraillitellm_metadatametadataFrG   r%   rW   rZ   r(   r(   r)   get_disable_global_guardrail  s   z,CustomGuardrail.get_disable_global_guardrailresultc              
   C   sV   |du rdS z
t t}t||W S  ty* } zdt|v r%W Y d}~dS  d}~ww )a  
        Check if result is a valid LLMResponseTypes instance.

        Safely handles TypedDict types which don't support isinstance checks.
        For non-LiteLLM responses (like passthrough httpx.Response), returns True
        to allow them through.
        NF	TypedDictT)r   r   rL   	TypeErrorr.   )r%   r^   Zresponse_typesrC   r(   r(   r)   _is_valid_response_type  s   z'CustomGuardrail._is_valid_response_typec                 C   s4   d|v r|d S | dp| di }| dpg S )zN
        Returns the guardrail(s) to be run from the metadata or root
        
guardrailsrY   rZ   r[   r\   r(   r(   r)   get_guardrail_from_metadata#  s   z+CustomGuardrail.get_guardrail_from_metadatarequested_guardrailsc                 C   sD   |D ]}t |tr| j|v r dS qt |tr| j|kr dS qdS )NTF)rL   dictr!   r.   )r%   rd   Z
_guardrailr(   r(   r)   %_guardrail_is_in_requested_guardrails/  s   



z5CustomGuardrail._guardrail_is_in_requested_guardrailsr;   	call_typec              	      s   ddl m} |d}|d u st|ts|S | j|tjddur#|S |tj	ks-|tj
krh| j||d|d|d|d	|d
dt||jpLddI d H }|d urht|trh|d}|d urh||d< |S )Nr   UserAPIKeyAuthrb   rW   
event_typeTuser_api_key_user_iduser_api_key_team_iduser_api_key_end_user_iduser_api_key_hashuser_api_key_request_routeZuser_idZteam_idZend_user_idZapi_keyZrequest_routeacompletion)user_api_key_dictcacherW   rg   messages)litellm.proxy._typesri   rG   rL   rP   should_run_guardrailr   pre_callr   
completionrr   async_pre_call_hookdcvaluere   )r%   r;   rg   ri   litellm_guardrailsr^   Zresult_messagesr(   r(   r)   async_pre_call_deployment_hook=  s:   

z.CustomGuardrail.async_pre_call_deployment_hookresponsec              	      s   ddl m} |d}|du st|ts|S | j|tjddur#|S | j||d|d|d	|d
|dd||dI dH }| 	|sK|S |S )zh
        Allow modifying / reviewing the response just after it's received from the deployment.
        r   rh   rb   Nrj   Trl   rm   rn   ro   rp   rq   )rs   rW   r   )
rv   ri   rG   rL   rP   rw   r   	post_callasync_post_call_success_hookra   )r%   r    r   rg   ri   r}   r^   r(   r(   r)   'async_post_call_success_deployment_hooke  s2   	

z7CustomGuardrail.async_post_call_success_deployment_hookrk   c                 C   s"  |  |}| |}td| j|| j|| j | jdu rQ|durQ| |rOt| jt	rMzddl
m} W n ty>   tdw ||| j|}|durM|S dS dS | jr`| |s`|jdkr`dS | |sgdS t| jt	rzddl
m} W n ty   tdw ||| j|}|dur|S dS )	zO
        Returns True if the guardrail should be run on the event_type
        zinside should_run_guardrail for guardrail=%s event_type= %s guardrail_supported_event_hooks= %s requested_guardrails= %s self.default_on= %sTr   )EnterpriseCustomGuardrailHelperzuSetting tag-based guardrails is only available in litellm-enterprise. You must be a premium user to use this feature.NFZlogging_only)rc   r]   r   debugr!   r2   r3   _event_hook_is_event_typerL   r   Z0litellm_enterprise.integrations.custom_guardrailr   ImportErrorZ_should_run_if_mode_by_tagrf   r|   )r%   rW   rk   rd   rX   r   r^   r(   r(   r)   rw     sb   




z$CustomGuardrail.should_run_guardrailc                 C   s   | j du rdS t| j tr|j| j v S t| j trS| j j D ]}t|tr/|j|v r. dS q|j|kr7 dS q| j jrQt| j jtrG| j jn| j jg}|j|v S dS | j |jkS )a  
        Returns True if the event_hook is the same as the event_type

        eg. if `self.event_hook == "pre_call" and event_type == "pre_call"` -> then True
        eg. if `self.event_hook == "pre_call" and event_type == "post_call"` -> then False
        NTF)r2   rL   rP   r|   r   rQ   rR   r<   )r%   rk   Z	tag_valuerV   r(   r(   r)   r     s*   





z)CustomGuardrail._event_hook_is_event_typec                 C   s   |  |}|D ]=}t|trD| j|v rDtdi || j }|di }|  dur@t|tr<|r<td| jt	|
  i   S |  S qi S )a  
        Returns `extra_body` to be added to the request body for the Guardrail API call

        Use this to pass dynamic params to the guardrail API call - eg. success_threshold, failure_threshold, etc.

        ```
        [{"lakera_guard": {"extra_body": {"foo": "bar"}}}]
        ```

        Will return: for guardrail=`lakera-guard`:
        {
            "foo": "bar"
        }

        Args:
            request_data: The original `request_data` passed to LiteLLM Proxy
        
extra_bodyTzOGuardrail %s: ignoring dynamic extra_body keys %s because premium_user is FalseNr(   )rc   rL   re   r!   r   rG   _validate_premium_userr   rB   rP   keys)r%   r    rd   Z	guardrailZguardrail_configr   r(   r(   r)   )get_guardrail_dynamic_request_body_params  s$   

z9CustomGuardrail.get_guardrail_dynamic_request_body_paramsc                 C   s4   ddl m}m} |durtd|jj  dS dS )z<
        Returns True if the user is a premium user
        r   )CommonProxyErrorspremium_userTz5Trying to use premium guardrail without premium user F)Zlitellm.proxy.proxy_serverr   r   r   rB   Znot_premium_userr|   )r%   r   r   r(   r(   r)   r     s   z&CustomGuardrail._validate_premium_userguardrail_json_responseguardrail_status
start_timeend_timedurationmasked_entity_countguardrail_providertracing_detailc                    s  t |tr	t|}ddlm} |	dur|	}nt | jtr)|di t| j }n| j}ddl	m
} ||}td| j||||||||d	|
pGi  dtddf fdd	}d
|v rk|d
 du rci |d
< ||d
  dS d|v rw||d  dS i |d
< ||d
  dS )a  
        Builds `StandardLoggingGuardrailInformation` and adds it to the request metadata so it can be used for logging to DataDog, Langfuse, etc.

        Args:
            tracing_detail: Optional typed dict with provider-specific tracing fields
                (guardrail_id, policy_template, detection_method, confidence_score,
                classification, match_details, patterns_checked, alert_recipients).
        r   )GuardrailModeN)filter_exceptions_from_params)	r!   r   guardrail_modeguardrail_responser   r   r   r   r   	containerr>   c                    sL   d}|  |}|d u r g| |< d S t|tr|  d S | g| |< d S )NZ&standard_logging_guardrail_information)rG   rL   rP   rT   )r   keyexistingZslgr(   r)   _append_guardrail_info]  s   

zjCustomGuardrail.add_standard_logging_guardrail_information_to_request_data.<locals>._append_guardrail_inforZ   rY   r(   )rL   rA   r.   litellm.types.utilsr   r2   r   re   Z
model_dumpZ'litellm.litellm_core_utils.core_helpersr   r   r!   )r%   r   r    r   r   r   r   r   r   rk   r   r   r   r   Zclean_guardrail_responser   r(   r   r)   :add_standard_logging_guardrail_information_to_request_data"  sD   

zJCustomGuardrail.add_standard_logging_guardrail_information_to_request_datainputs
input_type)requestr   logging_objLiteLLMLoggingObjc                    s   |S )a  
        Apply your guardrail logic to the given inputs

        Args:
            inputs: Dictionary containing:
                - texts: List of texts to apply the guardrail to
                - images: Optional list of images to apply the guardrail to
                - tool_calls: Optional list of tool calls to apply the guardrail to
            request_data: The request data dictionary - containing user api key metadata (e.g. user_id, team_id, etc.)
            input_type: The type of input to apply the guardrail to - "request" or "response"
            logging_obj: Optional logging object for tracking the guardrail execution

        Any of the custom guardrails can override this method to provide custom guardrail logic

        Returns the texts with the guardrail applied and the images with the guardrail applied (if any)

        Raises:
            Exception:
                - If the guardrail raises an exception

        r(   )r%   r   r    r   r   r(   r(   r)   apply_guardrailt  s   zCustomGuardrail.apply_guardrailoriginal_inputsc           	   	   C   sd   |du ri n|}|durt |tr| ||rd}nd}td|  | j||d||||d |S )
        Add StandardLoggingGuardrailInformation to the request data

        This gets logged on downsteam Langfuse, DataDog, etc.
        NmaskZallowzGuardrail response: successr   r    r   r   r   r   rk   )rL   re   _inputs_were_modifiedr   r   r   )	r%   r   r    r   r   r   rk   r   r   r(   r(   r)   _process_response  s"   	z!CustomGuardrail._process_responserC   c                 C   s2   t | trdS tdurt | tr| jdkrdS dS )ak  
        Returns True if the exception represents an intentional guardrail block
        (this was logged previously as an API failure - guardrail_failed_to_respond).

        Guardrails signal intentional blocks by raising:
        - HTTPException with status 400 (content policy violation)
        - ModifyResponseException (passthrough mode violation)
        TNi  F)rL   r   r   status_code)rC   r(   r(   r)   _is_guardrail_intervention  s   

z*CustomGuardrail._is_guardrail_interventionc           	   	   C   sB   |  |rdnd}|}d| jjv rd}| j|||||||d |)r   Zguardrail_intervenedZguardrail_failed_to_respondZCustomCodeGuardrailZdenyr   )r   r'   r*   r   )	r%   rC   r    r   r   r   rk   r   r   r(   r(   r)   _process_error  s"   	zCustomGuardrail._process_errorc                 C   sH   t | t | B }|D ]}||}||}||kr! dS qdS )z
        Compare original inputs with response to determine if content was modified.

        Returns True if the inputs were modified (mask scenario), False otherwise (allow scenario).
        TF)setr   rG   )r%   r   r   all_keysr   original_valueZresponse_valuer(   r(   r)   r     s   

z%CustomGuardrail._inputs_were_modifiedcontent_stringmask_stringstart_index	end_indexc                 C   sF   d|  kr|  k rt |ks|S  |S |d| | ||d  S )zS
        Mask the content in the string between the start and end indices.
        r   N)len)r%   r   r   r   r   r(   r(   r)   mask_content_in_string  s
    z&CustomGuardrail.mask_content_in_stringlitellm_paramsc                 C   s&   t | D ]
\}}t| || qdS )z@
        Update the guardrails litellm params in memory
        N)varsitemssetattr)r%   r   r   r|   r(   r(   r)   update_in_memory_litellm_params  s   z/CustomGuardrail.update_in_memory_litellm_paramsc                 C   s   |du s|du r
dS |t jjks|t jjks|t jjkr!|dS |t jjks-|t jjkrRddlm	} ddl
m} |d}|du rDdS |j||d}|tt |S dS )zG
        Returns the messages for the given call type and data
        Nru   r   )cast) LiteLLMCompletionResponsesConfiginput)r   Zresponses_api_request)r   ry   r|   rr   Zanthropic_messagesrG   	responsesZ
aresponsestypingr   ZBlitellm.responses.litellm_completion_transformation.transformationr   Z)transform_responses_api_input_to_messagesr   r   )r%   rg   rW   r   r   Z
input_dataru   r(   r(   r)   %get_guardrails_messages_for_call_type!  s&   

z5CustomGuardrail.get_guardrails_messages_for_call_type)
NNNFFFNNNNN)NNNNNNN)NNNNN)NNNN)4r*   r+   r,   r   r.   r   r   r
   r   boolintr$   r   r   rD   rH   staticmethodr	   rI   r:   re   r]   ra   r   rc   rf   r   r~   r   r   rw   r   r   r   rA   r   floatr   r   r   r   r   r   r   r   r   r   r   r   r   r   r/   r(   r(   r&   r)   r0   S   s   
	
1


,
4



(
*
?*	

W
"
*
$

r0   c                    sv   ddl }ddldtdtt fdd | fdd| fd	d
|fdd}|S )a   
    Decorator to add standard logging guardrail information to any function

    Add this decorator to ensure your guardrail response is logged to DataDog, OTEL, s3, GCS etc.

    Logs for:
        - pre_call
        - during_call
        - post_call
    r   N	func_namer>   c                 S   s.   | dkrt jS | dkrt jS | dv rt jS dS )z2Infer the actual event type from the function namerz   Zasync_moderation_hook)r   Zasync_post_call_streaming_hookN)r   rx   Zduring_callr   )r   r(   r(   r)   $_infer_event_type_from_function_name^  s   zGlog_guardrail_information.<locals>._infer_event_type_from_function_namec            	         s   t  }| d }|dp|dpi } j}d }jdkr*d|v r*|d}z#| i |I d H }|j||| t   t  |  ||dW S  tyv } z|j||| t   t  |  |dW  Y d }~S d }~ww )Nr   rW   r    r   r   )r   r    r   r   r   rk   r   )rC   r    r   r   r   rk   )	r   nowrG   r*   r   	timestamptotal_secondsrA   r   	argsr;   r   r%   r    rk   r   r   rC   r   funcr(   r)   async_wrapperm  s<   


	
z0log_guardrail_information.<locals>.async_wrapperc            	   
      s   t  }| d }|dp|dpi } j}d }jdkr)d|v r)|d}z| i |}|j||t  |  ||dW S  tyb } z|j||t  |  |dW  Y d }~S d }~ww )Nr   rW   r    r   r   )r   r    r   rk   r   )rC   r    r   rk   )r   r   rG   r*   r   r   rA   r   r   r   r(   r)   sync_wrapper  s2   

z/log_guardrail_information.<locals>.sync_wrapperc                     s&    r | i |S | i |S r   )iscoroutinefunction)r   r;   )r   r   inspectr   r(   r)   wrapper  s   
z*log_guardrail_information.<locals>.wrapper)	functoolsr   r.   r   r   wraps)r   r   r   r(   )r   r   r   r   r   r)   log_guardrail_informationP  s   
 r   ),r   r   r   r   r   r   r   r   r	   r
   r   Zlitellm._loggingr   Zlitellm.cachingr   Z"litellm.integrations.custom_loggerr   Zlitellm.types.guardrailsr   r   r   r   Zlitellm.types.llms.openair   Z3litellm.types.proxy.guardrails.guardrail_hooks.baser   r   r   r   r   r   r   r   Zfastapi.exceptionsr   r   Z*litellm.litellm_core_utils.litellm_loggingr   r   r{   rA   r   r0   r   r(   r(   r(   r)   <module>   s4    , 	'      