o
    ưi                     @   s   d Z ddlZddlmZ ddlmZmZmZmZmZm	Z	 ddl
Z
ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ erHdd
lmZmZ G dd dZdS )zA
A2A Streaming Iterator with token tracking and logging support.
    N)datetime)TYPE_CHECKINGAnyAsyncIteratorDictListOptional)verbose_logger)A2ACostCalculator)A2ARequestUtils)Logging)executor)SendStreamingMessageRequestSendStreamingMessageResponsec                	   @   s   e Zd ZdZ	dded dddedefd	d
Zdd ZdddZ	de
ddfddZde
defddZdddZdejdeee
f fddZdS )A2AStreamingIteratorz
    Async iterator for A2A streaming responses with token tracking.

    Collects chunks, extracts text, and logs usage on completion.
    unknownstreamr   requestr   logging_obj
agent_namec                 C   s8   || _ || _|| _|| _t | _g | _g | _d | _	d S N)
r   r   r   r   r   now
start_timechunkscollected_text_partsfinal_chunk)selfr   r   r   r    r   ^/home/app/Keep/.python/lib/python3.10/site-packages/litellm/a2a_protocol/streaming_iterator.py__init__   s   

zA2AStreamingIterator.__init__c                 C   s   | S r   r   )r   r   r   r   	__aiter__-   s   zA2AStreamingIterator.__aiter__returnc                    sz   z| j  I d H }| j| | | | |r|| _|W S  ty<   | jd u r4| jr4| jd | _|  I d H   w )N)	r   	__anext__r   append_collect_text_from_chunk_is_completed_chunkr   StopAsyncIteration_handle_stream_complete)r   chunkr   r   r   r#   0   s   

zA2AStreamingIterator.__anext__r)   Nc                 C   sb   z!t |dr|jdddni }t|}|r| j| W dS W dS  ty0   td Y dS w )z?Extract text from a streaming chunk and add to collected parts.
model_dumpjsonTmodeZexclude_nonez/Failed to extract text from A2A streaming chunkN)	hasattrr*   r   Zextract_text_from_responser   r$   	Exceptionr	   debug)r   r)   
chunk_dicttextr   r   r   r%   G   s   
z-A2AStreamingIterator._collect_text_from_chunkc                 C   sz   z2t |dr|jdddni }|di }t|tr-|di }t|tr0|ddkW S W d	S W d	S  ty<   Y d	S w )
z+Check if chunk indicates stream completion.r*   r+   Tr,   resultstatusstate	completedF)r.   r*   get
isinstancedictr/   )r   r)   r1   r3   r4   r   r   r   r&   Q   s   

z(A2AStreamingIterator._is_completed_chunkc              
      s0  z{t  }t| j}t|}t|}| jr| jd nd}t|}|| }tj	|||d}|| j
jd< d| j
jd< t| j
}	|	| j
jd< | |}
t| j
j|
| j|dd	 tj| j
j|
d| j|d
 td| d| d| d|	  W dS  ty } ztd|  W Y d}~dS d}~ww )z8Handle logging and token counting when stream completes.r"    )prompt_tokenscompletion_tokenstotal_tokensusageFr   response_costN)r3   r   end_time	cache_hit)r3   rA   r   r@   z'A2A streaming completed: prompt_tokens=z, completion_tokens=z, total_tokens=z, response_cost=z+Error in A2A streaming completion handler: )r   r   r   Zget_input_message_from_requestr   Zextract_text_from_messageZcount_tokensr   litellmUsager   Zmodel_call_detailsr
   Zcalculate_a2a_cost_build_logging_resultasynciocreate_taskZasync_success_handlerr   r   submitZsuccess_handlerr	   infor/   r0   )r   r@   Zinput_messageZ
input_textr;   Zoutput_textr<   r=   r>   r?   r3   er   r   r   r(   ^   s^   



	
z,A2AStreamingIterator._handle_stream_completer>   c                 C   sr   t | jdddt|dr| nt|d}| jr7z| jjddd}|d	i |d	< W |S  ty6   Y |S w |S )
z Build a result dict for logging.idr   z2.0r*   )rJ   Zjsonrpcr>   r+   Tr,   r3   )getattrr   r.   r*   r9   r   r7   r/   )r   r>   r3   r1   r   r   r   rD      s   z*A2AStreamingIterator._build_logging_result)r   )r!   r   )r!   N)__name__
__module____qualname____doc__r   LiteLLMLoggingObjstrr   r    r#   r   r%   boolr&   r(   rB   rC   r   rD   r   r   r   r   r      s$    



 >r   )rO   rE   r   typingr   r   r   r   r   r   rB   Zlitellm._loggingr	   Z$litellm.a2a_protocol.cost_calculatorr
   Zlitellm.a2a_protocol.utilsr   Z*litellm.litellm_core_utils.litellm_loggingr   rP   Z/litellm.litellm_core_utils.thread_pool_executorr   Z	a2a.typesr   r   r   r   r   r   r   <module>   s     