
    Ii                         S SK Jr  S SKrS SKrS SKJr  S SKJr  S SK	J
r
  S SKJrJr  S SKJr  \R                   " \5      r " S S\5      rg)	    )absolute_importN)FindCoordinatorResponse)
KafkaBytes)Int32TaggedFields__version__c                   P    \ rS rSrSrSS jrS rS rSS jrS r	S	 r
S
 rS rSrg)KafkaProtocol   a  Manage the kafka network protocol

Use an instance of KafkaProtocol to manage bytes send/recv'd
from a network socket to a broker.

Arguments:
    client_id (str): identifier string to be included in each request
    api_version (tuple): Optional tuple to specify api_version to use.
        Currently only used to check for 0.8.2 protocol quirks, but
        may be used for more in the future.
Nc                     Uc  U R                  5       nXl        X l        SU l        [	        S5      U l        S U l        SU l        [        R                  " 5       U l
        / U l        g )Nr      F)_gen_client_id
_client_id_api_version_correlation_idr   _header_rbuffer
_receivingcollectionsdequein_flight_requestsbytes_to_send)self	client_idapi_versions      ;/venv/lib/python3.13/site-packages/kafka/protocol/parser.py__init__KafkaProtocol.__init__   s[    ++-I#' !!}"-"3"3"5    c                 H    U R                   S-   S-  U l         U R                   $ )N   l        )r   r   s    r   _next_correlation_id"KafkaProtocol._next_correlation_id'   s&     $ 4 4q 8EA###r    c                     S[         -   $ )Nzkafka-pythonr   r#   s    r   r   KafkaProtocol._gen_client_id+   s    ++r    c                    [         R                  SU5        Uc  U R                  5       nUR                  X R                  S9nSR                  UR                  5       UR                  5       /5      n[        R                  " [        U5      5      nXT-   nU R                  R                  U5        UR                  5       (       a  X!4nU R                  R                  U5        U$ )a9  Encode and queue a kafka api request for sending.

Arguments:
    request (object): An un-encoded kafka request.
    correlation_id (int, optional): Optionally specify an ID to
        correlate requests with responses. If not provided, an ID will
        be generated automatically.

Returns:
    correlation_id
zSending request %s)correlation_idr   r    )logdebugr$   build_headerr   joinencoder   lenr   appendexpect_responser   )r   requestr)   headermessagesizedataifrs           r   send_requestKafkaProtocol.send_request.   s     			&0!!668N%%^%_((FMMOW^^-=>?||CL)~!!$'""$$!+C##**3/r    c                 J    SR                  U R                  5      n/ U l        U$ )z1Retrieve all pending bytes to send on the networkr    )r-   r   )r   r6   s     r   
send_bytesKafkaProtocol.send_bytesH   s#    xx**+r    c                 `   Sn[        U5      n/ nX#:  Ga  U R                  (       d  [        SU R                  R	                  5       -
  X2-
  5      nU R                  R                  XX%-    5        X%-  nU R                  R	                  5       S:X  aS  U R                  R                  S5        [        R                  " U R                  5      n[        U5      U l
        SU l        O4U R                  R	                  5       S:  a  [        R                  " S5      eU R                  (       a  [        U R                  5      nU R                  R	                  5       n[        Xx-
  X2-
  5      nU R                  R                  XX%-    5        X%-  nU R                  R	                  5       nX:  a  [        R                  " S5      eX:w  a   U$ SU l        U R                  R                  S5        U R                  U R                  5      n	UR                  U	5        U R                  5         X#:  a  GM  U$ )a  Process bytes received from the network.

Arguments:
    data (bytes): any length bytes received from a network connection
        to a kafka broker.

Returns:
    responses (list of (correlation_id, response)): any/all completed
        responses, decoded from bytes to python objects.

Raises:
     KafkaProtocolError: if the bytes received could not be decoded.
     CorrelationIdError: if the response does not match the request
         correlation id.
r   r   Tz+this should not happen - are you threading?z,Receive buffer has more bytes than expected?F)r/   r   minr   tellwriteseekr   decoder   r   Errors
KafkaError_process_responser0   _reset_buffer)
r   r6   in	responsesbytes_to_readnbytestotal_bytesstaged_bytesresps
             r   receive_bytesKafkaProtocol.receive_bytesN   s     I	e ?? #A(9(9(;$;QU C""4!/#:;"<<$$&!+LL%%a("\\$,,7F$.v$6DM&*DO\\&&(1, ++,YZZ!$--0#}}113 #K$> F##D1?$;<"#}}113- ++,Z[[.  #(""1%--dmm<  &""$G eH r    c           
         U R                   (       d  [        R                  " S5      eU R                   R                  5       u  p#UR                  nUR                  U5      nUR                  n[        R                  SU5        US:X  aE  US:w  a?  U[        S   L a3  U R                  S:X  d  U R                  c  [        R                  S5        OX&:w  a  [        R                  " SX&4-  5      e[        R                  SUR                  5         UR                  U5      nX'4$ ! [         aZ    UR                  S5        UR!                  5       n[        R#                  SX$U[%        U5      U5        [        R&                  " S	5      ef = f)
Nz.No in-flight-request found for server responsezReceived correlation id: %dr   )r         zKafka 0.8.2 quirk -- GroupCoordinatorResponse Correlation ID does not match request. This should go away once at least one topic has been initialized on the broker.z.Correlation IDs do not match: sent %d, recv %dzProcessing response %szOResponse %d [ResponseType: %s Request: %s]: Unable to decode %d-byte buffer: %rzUnable to decode response)r   rC   CorrelationIdErrorpopleftRESPONSE_TYPEparse_headerr)   r*   r+   r   r   warning__name__rB   
ValueErrorrA   readerrorr/   KafkaProtocolError)	r   read_bufferr)   r2   response_typeresponse_headerrecv_correlation_idresponsebufs	            r   rE   KafkaProtocol._process_response   sv   &&++,\]]$($;$;$C$C$E!--'44[A-<<		/1DE1$a4Q77)+t/@/@/HKK 6 7
 2++@!789 9
 			*M,B,BC		I$++K8H ))  	IQ""$CII =$s3x. ++,GHH	Is   D" "A$Fc                 V    SU l         U R                  R                  S5        S U l        g )NFr   )r   r   rA   r   r#   s    r   rF   KafkaProtocol._reset_buffer   s"    !r    )r   r   r   r   r   r   r   r   )NN)N)rY   
__module____qualname____firstlineno____doc__r   r$   r   r8   r;   rO   rE   rF   __static_attributes__ r    r   r   r      s1    

 $,47r%*Nr    r   )
__future__r   r   loggingkafka.errorserrorsrC   kafka.protocol.find_coordinatorr   kafka.protocol.framer   kafka.protocol.typesr   r   kafka.versionr	   	getLoggerrY   r*   objectr   rl   r    r   <module>rw      s;    &    C + 4 %!bF br    