o
    ưi                     @   s  U d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZmZm	Z	 d dl
Z
d dlZd dlmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+m,Z, G dd dZ-dd Z.G dd de-Z/G dd de-Z0G dd de-Z1d dl2m3Z3 d d
lmZ4 g dZ5G dd dZ6e$j7e$j8B Z9e:e;d< e:h dZ<e:e;d < G d!d" d"Z=dS )#    N)datetime)AnyDictListOptional)&LITELLM_MAX_STREAMING_DURATION_SECONDSSTREAM_SSE_DONE_STRING)run_async_function)process_response_headers)Logging)get_api_base)update_response_metadata)executor)BaseResponsesAPIConfig)ResponsesAPIRequestUtils)OutputTextDeltaEventResponseAPIUsageResponseCompletedEventResponsesAPIRequestParamsResponsesAPIResponseResponsesAPIStreamEventsResponsesAPIStreamingResponse)	CallTypes)CustomStreamWrapper'async_post_call_success_deployment_hookc                   @   s   e Zd ZdZ				ddejdededede	e
eef  de	e d	e	e
eef  d
e	e fddZdddZde	e fddZdd Zdd Zdd ZdefddZdefddZdS ) !BaseResponsesAPIStreamingIteratorz
    Base class for streaming iterators that process responses from the Responses API.

    This class contains shared logic for both synchronous and asynchronous iterators.
    Nresponsemodelresponses_api_provider_configlogging_objlitellm_metadatacustom_llm_providerrequest_data	call_typec	                 C   s   || _ || _|| _d| _|| _d | _t|dt | _	d| _
t | _|| _|| _|p,i | _|| _t|p5d| jjdi d}	|rG|di ni }
|
dd |	d| _t| j jpYi | jd	< d S )
NF
start_time litellm_params)r   Zoptional_params
model_infoid)model_idapi_baseZadditional_headers)r   r   r   finishedr   completed_responsegetattrr   nowr$   _failure_handledtime_stream_created_timer    r!   r"   r#   r   model_call_detailsgetZ_hidden_paramsr
   headers)selfr   r   r   r   r    r!   r"   r#   Z	_api_baseZ_model_info r6   [/home/app/Keep/.python/lib/python3.10/site-packages/litellm/responses/streaming_iterator.py__init__-   s6   



z*BaseResponsesAPIStreamingIterator.__init__returnc                 C   sR   t du rdS t | j }|t kr'tjdt  d|dd| jp d| jp$dddS )zXRaise litellm.Timeout if the stream has exceeded LITELLM_MAX_STREAMING_DURATION_SECONDS.Nz*Stream exceeded max streaming duration of zs (elapsed z.1fzs)r%   )messager   Zllm_provider)r   r0   r1   litellmTimeoutr   r!   )r5   elapsedr6   r6   r7   _check_max_streaming_duration[   s   z?BaseResponsesAPIStreamingIterator._check_max_streaming_durationc              
   C   s  |sdS t |}|du rdS |tkrd| _dS zt|}t|tr| jj	| j
|| jd}t|dd}|rFtj|| j| jd}t|d| | jr| jdrt|dd}|tjtjfv rt|dd}|rt|d	d}|rt|tr| jr| jd
i dnd}	|	rt||	}
t|d	|
 |rt|ddtjkr|| _tjr| jdurt|dd}|rt|dd}|durz| jj|d}|durt|d| W n	 ty   Y nw |   |W S W dS  tjy   Y dS  ty } z|  |  d}~ww )z.Process a single chunk of data from the streamNT)r   parsed_chunkr   r   )Zresponses_api_responser    r!   Z"encrypted_content_affinity_enabledtypeitemencrypted_contentr'   r(   usageresultcost)!r   Z_strip_sse_data_from_chunkr   r+   jsonloads
isinstancedictr   Ztransform_streaming_responser   r   r-   r   Z/_update_responses_api_response_id_with_model_idr    r!   setattrr3   r   ZOUTPUT_ITEM_ADDEDZOUTPUT_ITEM_DONEstrZ%_wrap_encrypted_content_with_model_idRESPONSE_COMPLETEDr,   r;   include_cost_in_streaming_usage_response_cost_calculator	Exception"_handle_logging_completed_responseJSONDecodeError_handle_failure)r5   chunkr?   Zopenai_responses_api_chunkZresponse_objectr   Z
event_typerA   rB   r)   Zwrapped_contentZresponse_obj	usage_objrF   er6   r6   r7   _process_chunkg   s   


	



z0BaseResponsesAPIStreamingIterator._process_chunkc                 C   s   dS )z8Base implementation - should be overridden by subclassesNr6   r5   r6   r6   r7   rQ      s   zDBaseResponsesAPIStreamingIterator._handle_logging_completed_responsec                    s   zqd}| j durzt| j }W n ty   d}Y nw |du r7ztt| jdd}W n ty6   d}Y nw | jp@t| jdi }ttddpHg }d}|D ]}t|drgd}|j	|||dI dH }|durg|}qM|rpt
|d	d |W S  ty}   | Y S w )
za
        Allow callbacks to modify streaming chunks before returning (parity with chat).
        Nr#   r2   	callbacksF)async_post_call_streaming_deployment_hookT)r"   Zresponse_chunkr#   Z_post_streaming_hooks_ran)r#   r   
ValueErrorr-   r   rP   r"   r;   hasattrrZ   rK   )r5   rT   typed_call_typer"   rY   Z	hooks_rancallbackrE   r6   r6   r7   $_call_post_streaming_deployment_hook   sJ   

zFBaseResponsesAPIStreamingIterator._call_post_streaming_deployment_hookc                    s   |  |I dH S )zY
        Helper to invoke streaming deployment hooks explicitly (used in tests).
        N)r_   )r5   rT   r6   r6   r7   %call_post_streaming_hooks_for_testing  s   zGBaseResponsesAPIStreamingIterator.call_post_streaming_hooks_for_testingend_timec                 C   sz  | j du rdS i }t| jtr|| j zt| jdr#|| jj W n	 ty-   Y nw d|vrOzt	| jdi 
di |d< W n tyN   i |d< Y nw zt| j | j| j|| j|d W n	 tyh   Y nw zd}| jdurzt| j}W n ty   d}Y nw W n ty   d}Y nw |du rztj}W n ty   d}Y nw ztt|| j |d W dS  ty   Y dS w )z^
        Run post-call deployment hooks and update metadata similar to chat pipeline.
        Nr2   r&   )rE   r   r   kwargsr$   ra   )async_functionr"   r   r#   )r,   rI   r"   rJ   updater\   r   r2   rP   r-   r3   r   r   r$   r#   r   r[   	responsesr	   r   )r5   ra   Zrequest_payloadr]   r6   r6   r7   _run_post_success_hooks  s|   




z9BaseResponsesAPIStreamingIterator._run_post_success_hooks	exceptionc                 C   s   | j rdS d| _ t }zt| jj||| jt d W n	 t	y%   Y nw zt
| jj||| jt  W dS  t	yA   Y dS w )z
        Trigger failure handlers before bubbling the exception.
        Only calls handlers once even if called multiple times.
        NT)rc   rg   traceback_exceptionr$   ra   )r/   	traceback
format_excr	   r   Zasync_failure_handlerr$   r   r.   rP   r   submitZfailure_handler)r5   rg   rh   r6   r6   r7   rS   F  s4   

z1BaseResponsesAPIStreamingIterator._handle_failureNNNNr9   N)__name__
__module____qualname____doc__httpxResponserL   r   LiteLLMLoggingObjr   r   r   r8   r>   r   rW   rQ   r_   r`   r   rf   rP   rS   r6   r6   r6   r7   r   &   s<    	

.p'>r   c                    s(   t | dd}|du r|S ||I dH S )zg
    Module-level helper for tests to ensure hooks can be invoked even if the iterator is wrapped.
    r_   N)r-   )iteratorrT   Zhook_fnr6   r6   r7   r`   h  s
   r`   c                       s   e Zd ZdZ				ddejdededede	e
eef  de	e d	e	e
eef  d
e	e f fddZdd ZdefddZdd Z  ZS )ResponsesAPIStreamingIteratorzS
    Async iterator for processing streaming responses from the Responses API.
    Nr   r   r   r   r    r!   r"   r#   c	           	   
      (   t  |||||||| | | _d S N)superr8   Zaiter_linesstream_iterator	r5   r   r   r   r   r    r!   r"   r#   	__class__r6   r7   r8   w     
z&ResponsesAPIStreamingIterator.__init__c                 C      | S rx   r6   rX   r6   r6   r7   	__aiter__     z'ResponsesAPIStreamingIterator.__aiter__r9   c              
      s   z;|    	 z
| j I d H }W n ty   d| _tw |    | |}| jr,t|d ur<| j|dI d H }|W S q tyD     tjyZ } z
d| _| 	| |d }~w t
yo } z
d| _| 	| |d }~ww )NT)rT   )r>   rz   	__anext__StopAsyncIterationr+   rW   r_   rr   	HTTPErrorrS   rP   r5   rT   rE   rV   r6   r6   r7   r     sB   


z'ResponsesAPIStreamingIterator.__anext__c                 C   s   | j }| j dur%t| j dr%zt| j | j  }W n	 ty$   Y nw t| jj	|| j
t dd tj| jj|d| j
t d | jt d dS )z7Handle logging for completed responses in async contextN
model_dump)rE   r$   ra   	cache_hitrE   r   r$   ra   ra   )r,   r\   r@   model_validater   rP   asynciocreate_taskr   async_success_handlerr$   r   r.   r   rk   success_handlerrf   r5   Zlogging_responser6   r6   r7   rQ     s2   
	z@ResponsesAPIStreamingIterator._handle_logging_completed_responserl   )rn   ro   rp   rq   rr   rs   rL   r   rt   r   r   r   r8   r   r   r   rQ   __classcell__r6   r6   r|   r7   rv   r  s4    
	&rv   c                       s   e Zd ZdZ				ddejdededede	e
eef  de	e d	e	e
eef  d
e	e f fddZdd Zdd Zdd Z  ZS )!SyncResponsesAPIStreamingIteratorzY
    Synchronous iterator for processing streaming responses from the Responses API.
    Nr   r   r   r   r    r!   r"   r#   c	           	   
      rw   rx   )ry   r8   
iter_linesrz   r{   r|   r6   r7   r8     r~   z*SyncResponsesAPIStreamingIterator.__init__c                 C   r   rx   r6   rX   r6   r6   r7   __iter__  r   z*SyncResponsesAPIStreamingIterator.__iter__c              
   C   s   z6|    	 zt| j}W n ty   d| _tw |    | |}| jr(t|d ur6t| j|d}|W S q ty>     tj	yT } z
d| _| 
| |d }~w tyi } z
d| _| 
| |d }~ww )NT)rc   rT   )r>   nextrz   StopIterationr+   rW   r	   r_   rr   r   rS   rP   r   r6   r6   r7   __next__  sB   


z*SyncResponsesAPIStreamingIterator.__next__c                 C   s   | j }| j dur%t| j dr%zt| j | j  }W n	 ty$   Y nw t| jj|| j	t
 dd tj| jj|d| j	t
 d | jt
 d dS )z6Handle logging for completed responses in sync contextNr   )rc   rE   r$   ra   r   r   r   )r,   r\   r@   r   r   rP   r	   r   r   r$   r   r.   r   rk   r   rf   r   r6   r6   r7   rQ      s0   
zDSyncResponsesAPIStreamingIterator._handle_logging_completed_responserl   )rn   ro   rp   rq   rr   rs   rL   r   rt   r   r   r   r8   r   r   rQ   r   r6   r6   r|   r7   r     s4    
	&r   c                       s   e Zd ZdZdZ				ddejdedede	de
eeef  d	e
e d
e
eeef  de
e f fddZdd ZdefddZdd ZdefddZdedefddZ  ZS )!MockResponsesAPIStreamingIteratoru   
    Mock iterator—fake a stream by slicing the full response text into
    5 char deltas, then emit a completed event.

    Models like o1-pro don't support streaming, so we fake it.
       Nr   r   r   r   r    r!   r"   r#   c	              
      s   t  j||||||||d jjj||d  fddtdt jD }	t	j
r\|d ur\tdd }
|
d ur\z|jd}|d urQt|
d| W n	 ty[   Y nw |	ttjd	g _d_d S )
N)r   r   r   r   r    r!   r"   r#   )r   raw_responser   c              	      s0   g | ]}t tj ||j  jd d dqS )r   )r@   deltaZitem_idZoutput_indexZcontent_index)r   r   ZOUTPUT_TEXT_DELTA
CHUNK_SIZEr(   ).0i	full_textr5   Ztransformedr6   r7   
<listcomp>n  s    z>MockResponsesAPIStreamingIterator.__init__.<locals>.<listcomp>r   rC   rD   rF   )r@   r   )ry   r8   r   Ztransform_response_api_responser   _collect_textrangelenr   r;   rN   r-   rO   rK   rP   r   r   rM   _events_idx)r5   r   r   r   r   r    r!   r"   r#   ZdeltasrU   rF   r|   r   r7   r8   M  sT   

z*MockResponsesAPIStreamingIterator.__init__c                 C   r   rx   r6   rX   r6   r6   r7   r     r   z+MockResponsesAPIStreamingIterator.__aiter__r9   c                    s4   | j t| jkrt| j| j  }|  j d7  _ |S N   )r   r   r   r   r5   Zevtr6   r6   r7   r     s   z+MockResponsesAPIStreamingIterator.__anext__c                 C   r   rx   r6   rX   r6   r6   r7   r     r   z*MockResponsesAPIStreamingIterator.__iter__c                 C   s2   | j t| jkr
t| j| j  }|  j d7  _ |S r   )r   r   r   r   r   r6   r6   r7   r     s
   z*MockResponsesAPIStreamingIterator.__next__respc                 C   sD   d}|j D ]}t|dd }|dkrt|dg D ]}||j7 }qq|S )Nr%   r@   r:   content)outputr-   text)r5   r   outZout_item	item_typecr6   r6   r7   r     s   
z/MockResponsesAPIStreamingIterator._collect_textrl   )rn   ro   rp   rq   r   rr   rs   rL   r   rt   r   r   r   r8   r   r   r   r   r   r   r   r   r6   r6   r|   r7   r   C  s:    	Er   )verbose_logger)zresponse.createdresponse.completedzresponse.failedzresponse.incompleteerrorc                   @   s   e Zd ZdZ		ddedededee dee f
dd	Zd
e	de
fddZdeddfddZdeddfddZdeddfddZdddZdddZdddZdddZdS ) ResponsesWebSocketStreaminga  
    Manages bidirectional WebSocket forwarding for the Responses API
    WebSocket mode (wss://.../v1/responses).

    Unlike the Realtime API, the Responses API WebSocket mode:
    - Uses response.create as the client-to-server event
    - Streams back the same events as the HTTP streaming Responses API
    - Supports previous_response_id for incremental continuation
    - Supports generate: false for warmup
    - One response at a time per connection (sequential, no multiplexing)
    N	websocket
backend_wsr   user_api_key_dictr"   c                 C   s2   || _ || _|| _|| _|pi | _g | _g | _d S rx   )r   r   r   r   r"   messagesinput_messages)r5   r   r   r   r   r"   r6   r6   r7   r8     s   

z$ResponsesWebSocketStreaming.__init__	event_objr9   c                 C   s   | dtv S )Nr@   )r3   RESPONSES_WS_LOGGED_EVENT_TYPES)r5   r   r6   r6   r7   _should_store_event  s   z/ResponsesWebSocketStreaming._should_store_eventeventc              	   C   sj   t |tr
|d}t |tr$zt|}W n tjtfy#   Y d S w |}| |r3| j	
| d S d S )Nutf-8)rI   bytesdecoderL   rG   rH   rR   	TypeErrorr   r   append)r5   r   r   r6   r6   r7   _store_event  s   



z(ResponsesWebSocketStreaming._store_eventr:   c              
   C   sL  zt |trt|}nt |tr|}nW dS |ddkr!W dS |dg }t |tr8| jd|d W dS t |tr|D ]U}t |tsGq?|ddkr|ddkr|d	g }t |trj| jd|d q?t |tr|D ]}t |tr|dd
kr|dd}|r| jd|d qqq?W dS W dS  tj	t
tfy   Y dS w )z<Extract user input content from response.create for logging.Nr@   response.createinputuser)roler   r:   r   r   
input_textr   r%   )rI   rL   rG   rH   rJ   r3   r   r   listrR   AttributeErrorr   )r5   r:   msg_objZinput_itemsrA   r   r   r   r6   r6   r7    _collect_input_from_client_event  sL   






z<ResponsesWebSocketStreaming._collect_input_from_client_eventc                 C   s(   |  | | jr| jj|dd d S d S )Nr%   )r   api_key)r   r   Zpre_call)r5   r:   r6   r6   r7   _store_input  s   
z(ResponsesWebSocketStreaming._store_inputc                    sT   | j sd S | jr| j| j jd< | jr(t| j | j t| j j	| j d S d S )Nr   )
r   r   r2   r   r   r   r   _ws_executorrk   r   rX   r6   r6   r7   _log_messages  s   z)ResponsesWebSocketStreaming._log_messagesc              
      s  ddl }zz;	 z| jjddI dH }W n ty%   | j I dH }Y nw t|tr1|d}n|}| | | j	|I dH  q |j
jy[ } ztd| W Y d}~nd}~w tyr } ztd| W Y d}~nd}~ww W |  I dH  dS W |  I dH  dS |  I dH  w )	z4Forward events from backend WebSocket to the client.r   NTF)r   r   z*Responses WS backend connection closed: %sz+Error in responses WS backend_to_client: %s)
websocketsr   recvr   rI   r   r   r   r   	send_text
exceptionsZConnectionClosedr   debugrP   rg   r   )r5   r   r   Zresponse_strrV   r6   r6   r7   backend_to_client(  s<   

"z-ResponsesWebSocketStreaming.backend_to_clientc              
      sp   z	 | j  I dH }| | | | | j|I dH  q ty7 } ztd| W Y d}~dS d}~ww )z6Forward response.create events from client to backend.TNz(Responses WS client_to_backend ended: %s)	r   receive_textr   r   r   sendrP   r   r   )r5   r:   rV   r6   r6   r7   client_to_backendF  s   

z-ResponsesWebSocketStreaming.client_to_backendc                    s   t |  }zDz	|  I dH  W n	 ty   Y nw W | s7|  z|I dH  W n
 t jy6   Y nw z| j	 I dH  W dS  tyL   Y dS w | sg|  z|I dH  W n
 t jyf   Y nw z
| j	 I dH  W w  tyz   Y w w )z,Run both forwarding directions concurrently.N)
r   r   r   r   rP   donecancelCancelledErrorr   close)r5   Zforward_taskr6   r6   r7   bidirectional_forwardS  s@   z1ResponsesWebSocketStreaming.bidirectional_forward)NNrm   )rn   ro   rp   rq   r   rt   r   r   r8   rJ   boolr   r   r   r   r   r   r   r   r6   r6   r6   r7   r     s.    
*


r   _RESPONSE_CREATE_PARAMS>   Zlitellm_call_idr   
aresponsesZlitellm_logging_objZ_aresponses_websocket_MANAGED_WS_SKIP_KWARGSc                   @   s  e Zd ZdZ						d@dededddee deeeef  d	ee d
ee dee dee deddfddZ	e
dedee fddZdAdededdfddZdedeeeef  fddZdedeeeef  ddfddZe
d eeef dee fd!d"Ze
d eeef deeeef  fd#d$Ze
d%edeeeef  fd&d'Zd(edeeeef  fd)d*Ze
d+eeef deeef fd,d-Zd.eeef dee d/eeeef  d0eeeef  ddf
d1d2Zd.eeef d3ee ddfd4d5Ze
d.eeef deddfd6d7Zded.eeef deeeef  fd8d9Zd eeeef  d0eeeef  d/eeeef  ddfd:d;Zd(eddfd<d=ZdBd>d?ZdS )C ManagedResponsesWebSocketHandlera  
    Handles Responses API WebSocket mode for providers that do not expose a
    native ``wss://`` responses endpoint.

    Instead of proxying to a provider WebSocket, this handler:
    - Listens for ``response.create`` events from the client
    - Makes HTTP streaming calls via ``litellm.aresponses(stream=True)``
    - Serialises and forwards every streaming event back over the WebSocket
    - Supports ``previous_response_id`` for multi-turn conversations via
      in-memory session tracking (avoids async DB-write timing issues)
    - Supports sequential requests over a single persistent connection

    This makes every provider that LiteLLM can reach over HTTP available on
    the WebSocket transport without any provider-specific changes.
    Nr   r   r   rt   r   r    r   r*   timeoutr!   rb   r9   c
                 K   sX   || _ || _|| _|| _|pi | _|| _|| _|| _|	| _dd |
	 D | _
i | _d S )Nc                 S   s   i | ]\}}|t vr||qS r6   )r   r   kvr6   r6   r7   
<dictcomp>  s    z=ManagedResponsesWebSocketHandler.__init__.<locals>.<dictcomp>)r   r   r   r   r    r   r*   r   r!   itemsextra_kwargs_session_history)r5   r   r   r   r   r    r   r*   r   r!   rb   r6   r6   r7   r8     s   

z)ManagedResponsesWebSocketHandler.__init__rT   c              
   C   s   z2t | dr| jddW S t | drtj| jddtdW S t| tr+tj| tdW S tt| W S  tyK } zt	
d| W Y d}~dS d}~ww )zHSerialize a streaming chunk to a JSON string for WebSocket transmission.model_dump_jsonT)Zexclude_noner   )defaultz1ManagedResponsesWS: failed to serialize chunk: %sN)r\   r   rG   dumpsr   rL   rI   rJ   rP   r   r   )rT   excr6   r6   r7   _serialize_chunk  s   


z1ManagedResponsesWebSocketHandler._serialize_chunkserver_errorr:   
error_typec                    sB   z| j td||ddI d H  W d S  ty    Y d S w )Nr   )r@   r:   )r@   r   )r   r   rG   r   rP   )r5   r:   r   r6   r6   r7   _send_error  s   z,ManagedResponsesWebSocketHandler._send_errorprevious_response_idc                 C   s(   t |}|d|}t| j|g S )z
        Return accumulated message history for *previous_response_id*.

        The key is the *decoded* response ID (the raw provider response ID before
        LiteLLM base64-encodes it into the ``resp_...`` format).
        response_id)r   !_decode_responses_api_response_idr3   r   r   )r5   r   decodedZraw_idr6   r6   r7   _get_history_messages  s
   z6ManagedResponsesWebSocketHandler._get_history_messagesr   r   c                 C   s   || j |< dS )u   
        Store the complete accumulated message history for *response_id*.

        Replaces any prior value — callers are responsible for passing the full
        history (prior turns + current input + new output).
        N)r   )r5   r   r   r6   r6   r7   _store_history  s   z/ManagedResponsesWebSocketHandler._store_historycompleted_eventc                 C   sB   |  di }t|tr| dnd}|sdS t|}| d|S )z
        Pull the raw (decoded) response ID out of a ``response.completed`` event.
        Returns *None* if the event doesn't contain a usable ID.
        r   r(   Nr   )r3   rI   rJ   r   r   )r   resp_objZ
encoded_idr   r6   r6   r7   _extract_response_id  s   
z5ManagedResponsesWebSocketHandler._extract_response_idc           	      C   s   |  di }t|tsg S g }| dg pg D ]C}t|tsq| d}| dd}|dkrQ| dp4g }dd	 |D }d
|}|rP|d|d|dgd q|dkrZ|| q|S )z
        Convert the output items in a ``response.completed`` event into
        Responses API message dicts suitable for the next turn's ``input``.
        r   r   r@   r   Z	assistantr:   r   c                 S   s0   g | ]}t |tr|d dv r|ddqS )r@   )output_textr   r   r%   )rI   rJ   r3   )r   pr6   r6   r7   r     s    
zMManagedResponsesWebSocketHandler._extract_output_messages.<locals>.<listcomp>r%   r   r@   r   r@   r   r   Zfunction_call)r3   rI   rJ   joinr   )	r   r   r   rA   r   r   Zcontent_partsZ
text_partsr   r6   r6   r7   _extract_output_messages  s,   




z9ManagedResponsesWebSocketHandler._extract_output_messages	input_valc                 C   s<   t | trddd| dgdgS t | trdd | D S g S )z
        Normalise the ``input`` field of a ``response.create`` event to a list
        of Responses API message dicts.
        r:   r   r   r   r   c                 S   s   g | ]	}t |tr|qS r6   )rI   rJ   )r   rA   r6   r6   r7   r     s    zGManagedResponsesWebSocketHandler._input_to_messages.<locals>.<listcomp>)rI   rL   r   )r  r6   r6   r7   _input_to_messages  s
   

z3ManagedResponsesWebSocketHandler._input_to_messagesraw_messagec                    sP   zt |}W n t jy   | ddI dH  Y dS w |ddkr&dS |S )zOParse raw WS text; return the message dict or None (JSON error / ignored type).z%Invalid JSON in response.create eventZinvalid_request_errorNr@   r   )rG   rH   rR   r   r3   )r5   r  r   r6   r6   r7   _parse_message  s   z/ManagedResponsesWebSocketHandler._parse_messager   c                    s@   |  d}t|tr|r|ndd |  D   fddtD S )z
        Extract Responses API params from the event, handling both wire formats:
          Nested: {"type": "response.create", "response": {"input": [...], ...}}
          Flat:   {"type": "response.create", "input": [...], "model": "...", ...}
        r   c                 S   s   i | ]\}}|d kr||qS )r@   r6   r   r6   r6   r7   r   -  s    zLManagedResponsesWebSocketHandler._build_base_call_kwargs.<locals>.<dictcomp>c                    s*   i | ]}| v r | d ur| | qS rx   r6   )r   paramZresponse_paramsr6   r7   r   /  s
    )r3   rI   rJ   r   r   )r   nestedr6   r  r7   _build_base_call_kwargs"  s   

z8ManagedResponsesWebSocketHandler._build_base_call_kwargscall_kwargscurrent_messagesprior_historyc                 C   sF   |sdS |r|| |d< t dt|| dS t d| ||d< dS )zHPrepend in-memory turn history, or fall back to DB-based reconstruction.Nr   zMManagedResponsesWS: prepended %d history messages for previous_response_id=%szuManagedResponsesWS: no in-memory history for previous_response_id=%s; falling back to DB-based session reconstructionr   )r   r   r   )r5   r
  r   r  r  r6   r6   r7   _apply_history5  s   z/ManagedResponsesWebSocketHandler._apply_historyevent_modelc                 C   sp   | j dur
| j |d< | jdur| j|d< | jdur| j|d< | jdur*|s*| j|d< | jr6t| j|d< dS dS )zBInject connection-level credentials and metadata into call_kwargs.Nr   r*   r   r!   r    )r   r*   r   r!   r    rJ   )r5   r
  r  r6   r6   r7   _inject_credentialsP  s   






z4ManagedResponsesWebSocketHandler._inject_credentialsc                 C   s   |  dpi  dpi }t|tsdS t| dpi }|  d|d< |  d|d< ||d< dD ]}|| v rB| | durB| | ||< q0i |d|i}d| vrSi | d< || d d< | d	i  || d	 d< dS )
zGUpdate proxy_server_request body so spend logs record the full request.r    proxy_server_requestNbodyr   storer   )ZtoolsZtool_choiceZinstructionsmetadatar&   )r3   rI   rJ   
setdefault)r
  r   r  r  r   r6   r6   r7   _update_proxy_requestb  s*   
z6ManagedResponsesWebSocketHandler._update_proxy_requestc           	         s   d}t jdd|i|I dH }|2 zg3 dH W }|du rqt|ddp.t|tr-|dnd}| |}|du r9q|dkrR|du rRzt|}W n	 t	yQ   Y nw z| j
|I dH  W q t	yy } ztd| |W  Y d}~  S d}~ww 6 |S )a>  
        Stream ``litellm.aresponses`` and forward every chunk over the WebSocket.

        Captures the ``response.completed`` event type from the chunk object
        directly (before serialization) to avoid a redundant JSON round-trip on
        every chunk.  Returns the completed event dict, or ``None``.
        Nr   r@   r   z5ManagedResponsesWS: error sending chunk to client: %sr6   )r;   r   r-   rI   rJ   r3   r   rG   rH   rP   r   r   r   r   )	r5   r   r
  r   Zstream_responserT   Z
chunk_typeZ
serializedZsend_excr6   r6   r7   _stream_and_forwardx  s:   

z4ManagedResponsesWebSocketHandler._stream_and_forwardc                 C   sV   |du rdS |  |}|sdS | |}|| | }| || tdt|| dS )zMStore this turn in in-memory history for future previous_response_id lookups.Nz9ManagedResponsesWS: stored %d messages for response_id=%s)r   r  r   r   r   r   )r5   r   r  r  Znew_response_idZoutput_msgsZall_messagesr6   r6   r7   _save_turn_history  s   

z3ManagedResponsesWebSocketHandler._save_turn_historyc              
      s  |  |I dH }|du rdS | |}d|d< |dd}|p"| j}|dd}| |d}|r8| |ng }| |||| | || | 	|| |
| j z| ||I dH }	W n# ty }
 ztd|
 | t|
I dH  W Y d}
~
dS d}
~
ww | |	|| dS )a  
        Parse one ``response.create`` event, call ``litellm.aresponses(stream=True)``,
        and forward every streaming event to the client.

        Multi-turn support via in-memory session history
        ------------------------------------------------
        When ``previous_response_id`` is present in the event:
        1. Look up the accumulated message history in ``self._session_history``
           (keyed by the decoded provider response ID).
        2. Prepend those messages to the current ``input`` so the model has full
           conversation context.
        3. After the stream completes, extract the new response ID and output
           messages from ``response.completed`` and store them in
           ``self._session_history`` for the next turn.

        This in-memory approach avoids the async DB-write race condition that
        occurs when spend logs haven't been committed by the time the second
        ``response.create`` arrives over the same WebSocket connection.
        NTstreamr   r   r   z8ManagedResponsesWS: error processing response.create: %s)r  r	  popr   r  r3   r   r  r  r  rd   r   r  rP   r   rg   r   rL   r  )r5   r  r   r
  r  r   r   r  r  r   r   r6   r6   r7   _process_response_create  s:   

z9ManagedResponsesWebSocketHandler._process_response_createc              
      s   z/	 z
| j  I dH }W n ty' } ztd| W Y d}~W dS d}~ww | |I dH  q tyT } ztd| | d| I dH  W Y d}~dS d}~ww )z
        Main loop: accept ``response.create`` events sequentially and handle
        each one before waiting for the next message.
        TNz+ManagedResponsesWS: client disconnected: %sz(ManagedResponsesWS: unexpected error: %szInternal server error: )r   r   rP   r   r   r  rg   r   )r5   r:   r   r6   r6   r7   run  s&   $z$ManagedResponsesWebSocketHandler.run)NNNNNN)r   rm   )rn   ro   rp   rq   r   rL   r   r   floatr8   staticmethodr   r   r   r   r   r   r  r  r  r	  r  r  r  r  r  r  r  r6   r6   r6   r7   r   z  s    	

$"	 ( $



 

$
<r   )>r   rG   r0   ri   r   typingr   r   r   r   rr   r;   Zlitellm.constantsr   r   Z#litellm.litellm_core_utils.asyncifyr	   Z'litellm.litellm_core_utils.core_helpersr
   Z*litellm.litellm_core_utils.litellm_loggingr   rt   Z:litellm.litellm_core_utils.llm_response_utils.get_api_baser   Z?litellm.litellm_core_utils.llm_response_utils.response_metadatar   Z/litellm.litellm_core_utils.thread_pool_executorr   Z.litellm.llms.base_llm.responses.transformationr   Zlitellm.responses.utilsr   Zlitellm.types.llms.openair   r   r   r   r   r   r   Zlitellm.types.utilsr   Zlitellm.utilsr   r   r   r`   rv   r   r   Zlitellm._loggingr   r   r   r   __required_keys____optional_keys__r   	frozenset__annotations__r   r   r6   r6   r6   r7   <module>   sL   
 $	  D
ihq	 
-
