
    Iip                       S SK JrJr  S SKrS SKrS SKrS SKrS SKJr   S SK	r	S SKrS SKrS SKrS SKJr  S SKJr  S SKJr  S SKJrJrJrJr  S SKJrJrJr  S S	KJ r   S S
K!J"r"  S SK#J$r$  S SK%J&r&  S SK'J(r(  S SK)J*r*  S SK+J,r,  S SK-J.r.  S SK/J0r0  S SK1J2r2  S SK3J4r4  S SK5J6r6  S SK7J8r8  S SK9J:r:  S SK;J<r<  \Rz                  (       a  \R|                  r?\R|                  r@\ArB\R                  " \D5      rESrF S SKGrGSrH \GR                  rI\GR                  rJ\GR                  rK\GR                  rL\R                  S \R                  S!\R                  S"0rS " S# S$\T5      rU " S% S&\T5      rV " S' S(\T5      rWS) rXS* rYS+ rZ\R                  4S, jr[g! \
 a
    S SKJr	   GNuf = f! \M aD    \ER                  S5        \GR                  rI\GR                  rJ\GR                  rK\GR                  rL Nf = f! \
 a    SrH " S S\A5      rJ " S S\A5      rK Nf = f)-    )absolute_importdivisionNuniform)selectors34)six)Future)AvgCountMaxRate)DescribeAclsRequestDescribeClientQuotasRequestListGroupsRequest)ApiVersionsRequest)BROKER_API_VERSIONS)OffsetFetchRequest)FetchRequest)FindCoordinatorRequest)ListOffsetsRequest)MetadataRequest)KafkaProtocol)ProduceRequest)SaslAuthenticateRequest)SaslHandshakeRequest)Int32)get_sasl_mechanism)Socks5Wrapper)__version__i#  TznOld SSL module detected. SSL error handling may not operate cleanly. Consider upgrading to Python 3.3 or 2.7.9Fc                       \ rS rSrSrg)SSLWantReadErrorH    N__name__
__module____qualname____firstlineno____static_attributes__r#       0/venv/lib/python3.13/site-packages/kafka/conn.pyr!   r!   H       r*   r!   c                       \ rS rSrSrg)SSLWantWriteErrorJ   r#   Nr$   r#   r*   r+   r.   r.   J   r,   r*   r.   unspecifiedIPv4IPv6c                   0    \ rS rSrSrSrSrSrSrSr	Sr
S	rg
)ConnectionStatesU   z<disconnected>z<connecting>z<handshake>z<connected>z<authenticating>z<checking_api_versions_send>z<checking_api_versions_recv>r#   N)r%   r&   r'   r(   DISCONNECTED
CONNECTING	HANDSHAKE	CONNECTEDAUTHENTICATINGAPI_VERSIONS_SENDAPI_VERSIONS_RECVr)   r#   r*   r+   r4   r4   U   s'    #LJII'N66r*   r4   c                   
   \ rS rSrSr0 SS\-   _SS_S\_SS	_S
S_SS_SS_SS_SS_SS_S\R                  \R                  S4/_SS_SS_SS_SS_SS_SS_0 S S_S!S_S"S_S#S_S$S_S%S_S&S'_S(\	R                  _S)S* _S+S_S,S-_S.S_S/S_S0S_S1S_S2S3_S4S_ESSS5.ErS6rS7\S	   " 5       4S8\S	   " S95      4S:\S	   " S9/ 5      4S;\S	   " / 5      44rS< rS= rS> rS? r\" S@5      4SA jrSB rSC rSD rSE rSF rSG rSH rSI rSJ r SK r!SL r"SM r#SN r$SO r%SP r&SQ r'SR r(SS r)ST r*SU r+SV r,SW r-SX r.SY r/SZ r0S[ r1S\ r2S] r3S^ r4S_ r5S` r6SsSa jr7Sb r8StSc jr9StSd jr:Se r;Sf r<Sg r=Sh r>Si r?Sj r@Sk rASl rBSm rCSn rDSo rESuSp jrFSq rGSrrHg)vBrokerConnection_   a  Initialize a Kafka broker connection

Keyword Arguments:
    client_id (str): a name for this client. This string is passed in
        each request to servers and can be used to identify specific
        server-side log entries that correspond to this client. Also
        submitted to GroupCoordinator for logging with respect to
        consumer group administration. Default: 'kafka-python-{version}'
    client_software_name (str): Sent to kafka broker for KIP-511.
        Default: 'kafka-python'
    client_software_version (str): Sent to kafka broker for KIP-511.
        Default: The kafka-python version (via kafka.version).
    reconnect_backoff_ms (int): The amount of time in milliseconds to
        wait before attempting to reconnect to a given host.
        Default: 50.
    reconnect_backoff_max_ms (int): The maximum amount of time in
        milliseconds to backoff/wait when reconnecting to a broker that has
        repeatedly failed to connect. If provided, the backoff per host
        will increase exponentially for each consecutive connection
        failure, up to this maximum. Once the maximum is reached,
        reconnection attempts will continue periodically with this fixed
        rate. To avoid connection storms, a randomization factor of 0.2
        will be applied to the backoff resulting in a random range between
        20% below and 20% above the computed value. Default: 30000.
    request_timeout_ms (int): Client request timeout in milliseconds.
        Default: 30000.
    max_in_flight_requests_per_connection (int): Requests are pipelined
        to kafka brokers up to this number of maximum requests per
        broker connection. Default: 5.
    receive_buffer_bytes (int): The size of the TCP receive buffer
        (SO_RCVBUF) to use when reading data. Default: None (relies on
        system defaults). Java client defaults to 32768.
    send_buffer_bytes (int): The size of the TCP send buffer
        (SO_SNDBUF) to use when sending data. Default: None (relies on
        system defaults). Java client defaults to 131072.
    socket_options (list): List of tuple-arguments to socket.setsockopt
        to apply to broker connection sockets. Default:
        [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
    security_protocol (str): Protocol used to communicate with brokers.
        Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
        Default: PLAINTEXT.
    ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
        socket connections. If provided, all other ssl_* configurations
        will be ignored. Default: None.
    ssl_check_hostname (bool): flag to configure whether ssl handshake
        should verify that the certificate matches the brokers hostname.
        default: True.
    ssl_cafile (str): optional filename of ca file to use in certificate
        verification. default: None.
    ssl_certfile (str): optional filename of file in pem format containing
        the client certificate, as well as any ca certificates needed to
        establish the certificate's authenticity. default: None.
    ssl_keyfile (str): optional filename containing the client private key.
        default: None.
    ssl_password (callable, str, bytes, bytearray): optional password or
        callable function that returns a password, for decrypting the
        client private key. Default: None.
    ssl_crlfile (str): optional filename containing the CRL to check for
        certificate expiration. By default, no CRL check is done. When
        providing a file, only the leaf certificate will be checked against
        this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
        default: None.
    ssl_ciphers (str): optionally set the available ciphers for ssl
        connections. It should be a string in the OpenSSL cipher list
        format. If no cipher can be selected (because compile-time options
        or other configuration forbids use of all the specified ciphers),
        an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
    api_version (tuple): Specify which Kafka API version to use.
        Must be None or >= (0, 10, 0) to enable SASL authentication.
        Default: None
    api_version_auto_timeout_ms (int): number of milliseconds to throw a
        timeout exception from the constructor when checking the broker
        api version. Only applies if api_version is None. Default: 2000.
    selector (selectors.BaseSelector): Provide a specific selector
        implementation to use for I/O multiplexing.
        Default: selectors.DefaultSelector
    state_change_callback (callable): function to be called when the
        connection state changes from CONNECTING to CONNECTED etc.
    metrics (kafka.metrics.Metrics): Optionally provide a metrics
        instance for capturing network IO stats. Default: None.
    metric_group_prefix (str): Prefix for metric names. Default: ''
    sasl_mechanism (str): Authentication mechanism when security_protocol
        is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
        PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
    sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
        Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
    sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
        Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
    sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
        sasl mechanism handshake. If provided, sasl_kerberos_service_name and
        sasl_kerberos_domain name are ignored. Default: None.
    sasl_kerberos_service_name (str): Service name to include in GSSAPI
        sasl mechanism handshake. Default: 'kafka'
    sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
        sasl mechanism handshake. Default: one of bootstrap servers
    sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
        token provider instance. Default: None
    socks5_proxy (str): Socks5 proxy url. Default: None
	client_idzkafka-python-client_software_namezkafka-pythonclient_software_versionnode_idr   request_timeout_msi0u  reconnect_backoff_ms2   reconnect_backoff_max_ms%max_in_flight_requests_per_connection   receive_buffer_bytesNsend_buffer_bytessocket_options   sock_chunk_bytesi   sock_chunk_buffer_count  security_protocol	PLAINTEXTssl_contextssl_check_hostnameT
ssl_cafilessl_certfilessl_keyfilessl_crlfilessl_passwordssl_ciphersapi_versionapi_version_auto_timeout_msi  selectorstate_change_callbackc                     g)NTr#   )rC   sockconns      r+   <lambda>BrokerConnection.<lambda>   s    Tr*   metricsmetric_group_prefix sasl_mechanismsasl_plain_usernamesasl_plain_passwordsasl_kerberos_namesasl_kerberos_service_namekafkasasl_kerberos_domain_name)sasl_oauth_token_providersocks5_proxy)rR   SSLSASL_PLAINTEXTSASL_SSL)r   	   )r         zkafka-python-default-group)r   rt   rM   )r   rt   r   c                 ,   Xl         X l        X0l        X0l        S U l        S U l        S U l        S U l        SU l        S U l	        S U l
        [        R                  " U R                  5      U l        U R                   H  nXT;   d  M
  XE   U R                  U'   M     U R                  R                  S5      U l        U R                  S   bJ  U R                  S   R!                  ["        R$                  ["        R&                  U R                  S   45        U R                  S   bJ  U R                  S   R!                  ["        R$                  ["        R(                  U R                  S   45        U R                  S   U R*                  ;   d#   SSR-                  U R*                  5      -   5       eU R                  S   S	;   a  [.        (       d   S
5       eU R1                  5         [2        R4                  " 5       U l        [9        5       U l        [=        U R                  S   U R                  S   S9U l        [@        RB                  U l"        U RG                  5         S U l$        SU l%        S U l&        U R                  S   b  U R                  S   U l&        S U l'        U R                  S   U l(        S U l)        SU l*        / U l+        S U l,        U R                  S   (       a7  [[        U R                  S   U R                  S   U R                  5      U l,        g g )N   rC   rJ   rL   rK   rQ   zsecurity_protocol must be in z, rp   rr   z$Python wasn't built with SSL supportr@   r[   r@   r[   r*   rS   r\   r   rd   re   ).hostportafi	_sock_afi
_sock_addr_api_versions_api_version_check_version_idx_api_versions_idx_throttle_time_socks5_proxycopyDEFAULT_CONFIGconfigpoprC   appendsocket
SOL_SOCKET	SO_RCVBUF	SO_SNDBUFSECURITY_PROTOCOLSjoinssl_available_init_sasl_mechanism	threadingLock_lockdictin_flight_requestsr   	_protocolr4   r6   state_reset_reconnect_backoff_sock_send_buffer_ssl_context_api_versions_future_api_versions_check_timeout_sasl_auth_futurelast_attempt_gai_sensorsBrokerConnectionMetrics)selfrz   r{   r|   configskeys         r+   __init__BrokerConnection.__init__   s   		! "&!""!ii 3 34;;C~#*<C   {{y1;;-.:KK()00""F$4$43467 ;;*+7KK()00##V%5%50134 {{./43J3JJ 	R+dii8O8O.PP	RJ ;;*+/BB =H"HH=!!# ^^%
 #'&&kk+.M24 &22
%%'
 ;;}%1 $M :D$(!+/;;7T+U(!%	;;y!3DKK	4J48KK@U4V48LLBDM "r*   c                     U R                   S   S;   a:  [        U R                   S   5      " SSU R                  0U R                   D6U l        g S U l        g )NrQ   rq   rr   rg   rz   r#   )r   r   rz   _sasl_mechanismr   s    r+   r   %BrokerConnection._init_sasl_mechanism:  sO    ;;*+/MM#5dkkBR6S#T#sZ^ZcZc#sgkgrgr#sD #'D r*   c                     [        U R                  U R                  U R                  5      U l        U R                  (       d7  [
        R                  SX R                  U R                  U R                  5        gg)Nz$%s: DNS lookup failed for %s:%i (%s)FT)
dns_lookuprz   r{   r|   r   logerrorr   s    r+   _dns_lookupBrokerConnection._dns_lookup@  sM    tyy$))TXX>	yyII<IItyy$((<r*   c                     U R                   (       d  U R                  5       (       d  g U R                   R                  S5      u  pp4nX4$ Nr   )r   r   r   )r   r|   ______sockaddrs         r+   _next_afi_sockaddr#BrokerConnection._next_afi_sockaddrH  s<    yy##%%$(IIMM!$4!r*   infc                    U R                  5       (       a  gU[        R                  " 5       -  nU R                  5         S nU R                  (       Ga  [        R                  " 5       U:  a  U R	                  5         U R                  5       (       a  Ub  UR                  5         gU R                  5       (       aS  Uc>  U R                  S   " 5       nUR                  U R                  [        R                  5        UR                  S5        O+U R                  5       (       a  Ub  UR                  5         S nO[        R                  " 5       U:  a  M   gU R                  (       a  GM  g)NTr]   rM   F)	connectedtimer   r   connectclose
connectingr   registerr   	selectorsEVENT_WRITEselectdisconnected)r   timeoutr]   s      r+   connect_blocking!BrokerConnection.connect_blockingO  s   >>499; 	 iii))+'>>##+ (__&&'#';;z#:#< ))$**i6K6KLOOA&&&((+ (#' ))+'" ' iii& r*   c           	         U R                   [        R                  L Ga<  U R                  5       (       Gd&  [        R                  U l         [
        R
                  " 5       U l        U R                  5       nU(       d1  U R                  [        R                  " S5      5        U R                   $ [        R                  SU 5        U R                  b   eUu  U l        U l         U R                   S   bb  [#        U R                   S   U R$                  5      U l        U R&                  R)                  U R                  [(        R*                  5      U l        O5[(        R(                  " U R                  [(        R*                  5      U l         U R                   S    H2  n[        R                  SX5        U R                  R0                  " U6   M4     U R                  R3                  S5        U R                   S   " U R4                  U R                  U 5        [        R7                  S	X R8                  U R:                  U R                  [<        U R                     5        U R                   [        R                  L Ga  Sn U R&                  (       a&  U R&                  R?                  U R                  5      nO&U R                  R?                  U R                  5      n U(       a  U[@        RB                  :X  a  [        R                  S
U 5        U R                   S   S;   ah  [        RD                  U l         [        R                  SU 5        U R                   S   " U R4                  U R                  U 5        U RG                  5         GO[        RH                  U l         [        R                  SU 5        U R                   S   " U R4                  U R                  U 5        OU[@        RJ                  [@        RL                  [@        RN                  S4;  av  [        R-                  SX5        [@        RP                  RS                  US5      nU R                  [        R                  " SRU                  XF5      5      5        U R                   $  U R                   [        RD                  L a  U RW                  5       (       al  [        R                  SU 5        [        RH                  U l         [        R                  SU 5        U R                   S   " U R4                  U R                  U 5        U R                   [        RH                  [        RX                  4;   Ga  U R[                  5       (       a  U R                   [        RH                  [        RX                  4;   a  U R                   S   S;   aW  [        R\                  U l         [        R                  SU 5        U R                   S   " U R4                  U R                  U 5        Of[        R^                  U l         [        R7                  SU 5        U Ra                  5         U R                   S   " U R4                  U R                  U 5        U R                   [        R\                  L a  U R                   S   S;   d   eU Rc                  5       (       a  U R                   [        R\                  L af  [        R^                  U l         [        R7                  SU 5        U Ra                  5         U R                   S   " U R4                  U R                  U 5        U R                   [        R^                  [        R                  4;  a~  U R                   S   S-  n[
        R
                  " 5       XpR                  -   :  aG  [        R-                  SU 5        U R                  [        R                  " S5      5        U R                   $ U R                   $ ! [(        R,                  [.        4 a'  nU R                  U5        U R                   s SnA$ SnAff = f! [(        R,                   a  nUR@                  n SnAGNzSnAff = f)z-Attempt to connect and return ConnectionStatezDNS failurez%s: creating new socketNro   rL   z%s: setting socket option %sFr^   z%s: connecting to %s:%d [%s %s]z%s: established TCP connectionrQ   rx   z%s: initiating SSL handshakez %s: checking broker Api Versionsi&'  z5%s: Connect attempt returned error %s. Disconnecting.UNKNOWNz{} {}z%s: completed SSL handshake.r   z"%s: initiating SASL authenticationz%s: Connection complete.rD        @@z %s: Connection attempt timed outr   )2r   r4   r6   blacked_outr7   r   r   r   r   ErrorsKafkaConnectionErrorr   debugr   r}   r~   r   r   r|   r   r   SOCK_STREAMr   OSError
setsockoptsetblockingrC   inforz   r{   	AFI_NAMES
connect_exerrnoEISCONNr8   	_wrap_sslr;   EINPROGRESSEALREADYEWOULDBLOCK	errorcodegetformat_try_handshaker<   _try_api_versions_checkr:   r9   r   _try_authenticate)r   next_lookupeoptionreterrerrstrrequest_timeouts           r+   r   BrokerConnection.connecto  s   ::)666t?O?O?Q?Q)44DJ $		D113K

666}EFzz!		3T:zz)))2=/&{{>2>-:4;;~;VX\X`X`-a*%)%7%7%>%>t~~vOaOa%b
%+]]4>>6CUCU%V

 ++&67		8$G

%%v. 8 JJ""5)KK/0tzz4PHH6iiYY4>>1JL ::)444 C %%,,77HC**//@C
 #.		:DA;;237JJ!1!;!;DJII<dCKK 78tzzSWXNN$!1!C!CDJII@$GKK 78tzzSWX U..@Q@QSXYY		 ,-18,,S)<

666w~~c7RSTzz! ::)333""$$		8$?-??
		<dC34T\\4::tT::*<<>N>`>`aa++--::"2"D"DFVFhFh!ii{{#67;YY%5%D%D
		"FM$;<T\\4::W[\ &6%?%?
!;TB557$;<T\\4::W[\::)888;;237UUUU%%''::!1!@!@@!1!;!;DJHH7>113KK 78tzzSWX::.88.;;= = #kk*>?&HOyy{_/@/@@@		<dC

666yABzz!zz} g. &JJqM::%&, <<  ii sC   !A1_) 4_) <6`- 3%`- )`*`%`*%`*-aaac                 D   U R                   S   S;   d   eU R                  Gc[  [        R                  SU 5        [        R
                  " [        R                  5      U l        U R                  =R                  [        R                  -  sl        U R                  =R                  [        R                  -  sl        [        R                  U R                  l        U R                   S   (       a  SU R                  l        U R                   S   (       ak  [        R                  SX R                   S   5        U R                  R                  U R                   S   5        [        R                  U R                  l        OD[        R                  SU [        R                   " 5       5        U R                  R#                  5         U R                   S	   (       a  U R                   S
   (       a  [        R                  SX R                   S	   5        [        R                  SX R                   S
   5        U R                  R%                  U R                   S	   U R                   S
   U R                   S   S9  U R                   S   (       a  ['        [        S5      (       d  [)        S5      e[        R                  SX R                   S   5        U R                  R                  U R                   S   5        U R                  =R*                  [        R,                  -  sl        U R                   S   (       aK  [        R                  SX R                   S   5        U R                  R/                  U R                   S   5        [        R                  SU 5         U R                  R1                  U R2                  U R4                  R7                  S5      SS9U l        g ! [        R8                   a1  n[        R;                  SU 5        U R=                  U5         S nAg S nAff = f)NrQ   rx   z#%s: configuring default SSL ContextrT   TrU   z%s: Loading SSL CA from %sz*%s: Loading system default SSL CAs from %srV   rW   z%s: Loading SSL Cert from %sz%s: Loading SSL Key from %srY   )certfilekeyfilepasswordrX   VERIFY_CRL_CHECK_LEAFz4This version of Python does not support ssl_crlfile!z%s: Loading SSL CRL from %srZ   z%s: Setting SSL Ciphers: %sz"%s: wrapping socket in ssl context.F)server_hostnamedo_handshake_on_connectz(%s: Failed to wrap socket in SSLContext!)r   r   r   r   ssl
SSLContextPROTOCOL_SSLv23optionsOP_NO_SSLv2OP_NO_SSLv3CERT_OPTIONALverify_modecheck_hostnamer   load_verify_locationsCERT_REQUIREDget_default_verify_pathsload_default_certsload_cert_chainhasattrRuntimeErrorverify_flagsr   set_cipherswrap_socketr   rz   rstripSSLError	exceptionr   )r   r   s     r+   r   BrokerConnection._wrap_ssl  s   {{./3FFFF$II;TB #s/B/B CD%%8%%%8%,/,=,=D){{/037!!0{{<(5t[[=VW!!77L8QR030A0A!!-EtSMiMiMkl!!446{{>*t{{=/I7{{>?Z[6kk->XY!!11![[8 KK6![[8 2 : {{=)s$;<<&']^^6kk->XY!!77M8RS!!..#2K2KK.{{=)6kk->XY!!--dkk-.HI		6=	**66

 $		 0 0 5(- 7 /DJ || 	MMDdKJJqMM	s   AO P.'PPc                 <   U R                   S   S;   d   e U R                  R                  5         g! [        [        4 a     g[
        [        [        [        4 a>    [        R                  SU 5        U R                  [        R                  " S5      5         gf = f)NrQ   rx   Tz5%s: SSL connection closed by server during handshake.z0SSL connection closed by server during handshakeF)r   r   do_handshaker!   r.   SSLZeroReturnErrorConnectionErrorTimeoutErrorSSLEOFErrorr   warningr   r   r   r   s    r+   r   BrokerConnection._try_handshake  s    {{./3FFFF	hJJ##% "34 	  #O\;O 	hKKOQUVJJv223efg 	hs   2 BABBc                    U R                   Gc  U R                  S   b  U R                  S   U l        U R                  [        ;  a%  [        R
                  " SU R                  < S35      e[        U R                     U l        [        R                  SX R                  5        gU R                  Gc  U R                  nUS:  a*  [        U   " U R                  S   U R                  S   0 S	9nO[        U   " 5       n[        5       nU =R                  S
-  sl        U R                  USU R                  S9nUR                  U R                   U5        UR#                  U R$                  U5        X0l         [&        R(                  U l        U R                  S   " U R,                  U R.                  U 5        GOU R                  [1        U R2                  5      :  a  U R2                  U R                     u  p[        5       nU =R                  S
-  sl        U R                  USU R                  S9nUR                  U R4                  X15        UR#                  U R6                  U5        X0l         [&        R(                  U l        U R                  S   " U R,                  U R.                  U 5        O&U R9                  [        R:                  " S5      5        gU R=                  5        H  u  pVUR?                  U5        M     U R                   c  gU R                   RA                  5       (       a7  U R                   RB                  n[E        U[        R:                  5      (       d  UeU R                   RG                  5       $ )Nr[   zapi_version z0 not found in kafka.protocol.broker_api_versionsz7%s: Using pre-configured api_version %s for ApiVersionsT   rA   rB   )rA   rB   _tagged_fieldsru   blockingrD   r^   z#Unable to determine broker version.F)$r   r   r   r   r   UnrecognizedBrokerVersionr   r   r   r   r   r   r	   r   _sendadd_callback_handle_api_versions_responseadd_errback_handle_api_versions_failurer4   r<   r   rC   r   lenVERSION_CHECKS_handle_check_version_response_handle_check_version_failurer   r   recvsuccessfailedr
  
isinstance	succeeded)r   versionrequestfutureresponserfexs           r+   r   (BrokerConnection._try_api_versions_check  s   $$,{{=)5$(KK$>!$$,?? ::  @D  @Q  @Q  <S  T  T%89J9J%K"		SUY[l[lm((000a<09-1[[9O-P04<U0V')+G
 19;G00A50::gQUQqQq:r%%d&H&H&Q$$T%F%FO,2)-??
34T\\4::tT((3t/B/B+CC#'#6#6t7N7N#O 00A50::gQUQqQq:r%%d&I&I6[$$T%G%GP,2)-??
34T\\4::tT

6667\]^IIKDAIIaL   $$,&&--//**44Bb&"="=>>((2244r*   c           
         [         R                  " UR                  5      nU[         R                  La  UR	                  U" 5       5        U[         R
                  L a  U =R                  S-  sl        UR                   H7  nUS S u  pVnXRR                  :X  d  M  [        U R                  U5      U l          O   U R                  S:  aG  S U l
        [        R                  U l        U R                  S   " U R                  U R                   U 5        g U R#                  U" 5       S9  g [%        UR                   Vs/ s H  nUS   US   US   44PM     sn5      U l        U R)                  U R&                  5      U l        [,        R/                  SU SR1                  [3        [4        U R*                  5      5      5        UR7                  U R*                  5        U R9                  5         g s  snf )	NrM   r  r   r^   r   ru   #%s: Broker version identified as %sr   )r   for_code
error_codeNoErrorfailureUnsupportedVersionErrorr   api_versionsAPI_KEYminr   r4   r;   r   r   rC   r   r   r   r   '_infer_broker_version_from_api_versionsr   r   r   r   mapstrr$  r   )r   r*  r+  
error_typeapi_version_dataapi_keymin_versionmax_versions           r+   r  .BrokerConnection._handle_api_versions_responseR  s   __X%8%89
V^^+NN:<(V;;;&&!+&(0(=(=$8H!8L5G+"2"2214T5K5K[1Y. )> ))Q.04D-!1!C!CDJKK 78tzzSWX  


.!$,$9$9#
$9  a #3A#68H8K"LM$9#
  !HHI[I[\6chhs3PTPaPaGb>cdt(()#
s   9G(c                 d    UR                  U5        U R                  S:  a  SU l        g SU l        g r   )r6  r   r   r   r*  r.  s      r+   r  -BrokerConnection._handle_api_versions_failuren  s.    r !!A%%&D"&'D#r*   c           
         [         R                  SU SR                  [        [        U5      5      5        [         R                  SU5        [
        U   U l        X l        UR                  U5        U R                  5         g )Nr2  r   zOSet configuration api_version=%s to skip auto check_version requests on startup)
r   r   r   r<  r=  r   r   r   r$  r   )r   r*  r(  	_responses       r+   r!  /BrokerConnection._handle_check_version_responsey  sa    6chhs3PWGX>YZ 67>	@09#wr*   c                 P    UR                  U5        U =R                  S-  sl        g )NrM   )r6  r   rE  s      r+   r"  .BrokerConnection._handle_check_version_failure  s    r1$r*   c                 :   U R                   c  [        S5      e[        S   R                  U R                   ;  a  [        R
                  " S5      eU R                   [        S   R                     u  pUS:  a  [        R
                  " SU-  5      e[        US5      $ )Nz_api_versions not setr   SaslHandshakerM   zSaslHandshake %s)r   r  r   r9  r   r7  r:  )r   rA  rB  s      r+   _sasl_handshake_version(BrokerConnection._sasl_handshake_version  s    %677"**$2D2DD00AA $(#5#56J16M6U6U#V ?001Ck1QRR;""r*   c                 b   U R                   c{  U R                  5       n[        U   " U R                  S   5      n[	        5       nU R                  USS9nUR                  U R                  U5        UR                  S U5        X0l         U R                  5        H  u  pVUR                  U5        M     U R                   c  gU R                   R                  5       (       a7  U R                   R                  n[        U[        R                  5      (       d  UeU R                   R!                  5       $ )Nrg   Tr  c                 $    U R                  U5      $ N)r6  )r-  r   s     r+   rb   4BrokerConnection._try_authenticate.<locals>.<lambda>  s    199Q<r*   F)r   rN  r   r   r	   r  r  _handle_sasl_handshake_responser  r#  r$  r%  r
  r&  r   r   r'  )r   r(  r)  r*  sasl_responser,  r-  r.  s           r+   r   "BrokerConnection._try_authenticate  s    !!)224G*73DKK@P4QRGXF JJwJ>M&&t'K'KVT%%&?H%+"IIKDAIIaL   !!)##**,,''11Bb&"="=>>%%//11r*   c                 Z   [         R                  " UR                  5      nU[         R                  La.  U" U 5      nU R	                  US9  UR                  U" U 5      5      $ U R                  S   UR                  ;  aE  UR                  [         R                  " SU R                  S   < SUR                  < 35      5        OU R                  U5        UR                  (       d   S5       eUR                  5       (       a  U R	                  UR                  S9  g U R                  5         g )Nr1  rg   zKafka broker does not support z) sasl mechanism. Enabled mechanisms are: z4SASL future not complete after mechanism processing!)r   r3  r4  r5  r   r6  r   enabled_mechanismsUnsupportedSaslMechanismError_sasl_authenticateis_doner%  r
  r   )r   r*  r+  r>  r   s        r+   rU  0BrokerConnection._handle_sasl_handshake_response  s    __X%8%89
V^^+t$EJJUJ#>>*T"233;;'(0K0KKNN44{{#34h6Q6QSTU
 ##F+~~UUU~==??JJV--J.LLNr*   c                    SnU[        U5      :  a3   U R                  R                  XS 5      nX#-  nU[        U5      :  a  M3  U$ ! [        [        4 a     U$ [
        [        4 a?  n[        R                  (       a$  UR                  [        R                  :X  a   SnAU$ e SnAf[         a    [        R                  (       a   U$ e f = f)zSend some data via non-blocking IO

Note: this method is not synchronized internally; you should
always hold the _lock before calling

Returns: number of bytes
Raises: socket exception
r   N)r  r   sendr!   r.   r  r  r   PY2r   r   BlockingIOErrorPY3)r   data
total_sent
sent_bytesr   s        r+   _send_bytesBrokerConnection._send_bytes  s     
3t9$!ZZ__T+->?
(
 3t9$  %&78   $\2 77qww%*;*;;  " 77 s)   !A C
C
)3B#"B##"C
C
c                 0   U R                   R                  S5        U R                   R                  U R                  S   S-  5        Sn U[	        U5      :  a2  U R                   R                  XS  5      nX#-  nU[	        U5      :  a  M2  U[	        U5      :w  a  [        S5      eUU R                   R                  S5        U R                   R                  S5        $ ! U R                   R                  S5        U R                   R                  S5        f = f)NTrD   rP   r   z!Buffer overrun during socket send        F)r   r   
settimeoutr   r  r_  r  )r   rc  rd  re  s       r+   _send_bytes_blocking%BrokerConnection._send_bytes_blocking  s    

t$

dkk*>?$FG
		*s4y(!ZZ__T+->?
(
 s4y( SY&%&IJJJJ!!#&JJ""5) JJ!!#&JJ""5)s   
?C C 8Dc                 4   U R                   R                  S5        U R                   R                  U R                  S   S-  5         Sn[	        U5      U:  aN  U R                   R                  U[	        U5      -
  5      nU(       d  [        S5      eX#-  n[	        U5      U:  a  MN  UU R                   R                  S5        U R                   R                  S5        $ ! U R                   R                  S5        U R                   R                  S5        f = f)NTrD   rP   r*   zConnection reset during recvri  F)r   r   rj  r   r  r#  r  )r   nrc  fragments       r+   _recv_bytes_blocking%BrokerConnection._recv_bytes_blocking  s    

t$

dkk*>?$FG
	*Dd)a-::??1s4y=9)*HII 	 d)a-
 JJ!!#&JJ""5) JJ!!#&JJ""5)s   AC 'C 8Dc                    U R                  5       nUS:X  a   [        S   " U5      nU R                  USS9  g [        R	                  SU [        U5      5         U R                  [        R                  " [        U5      5      U-   5        g ! [        [        4 aL  n[        R                  SU 5        [        R                  " U < SU< 35      nU R                  US9   S nAg S nAff = f)	NrM   r   TrQ  z,%s: Sending %d raw sasl auth bytes to serverz+%s: Error sending sasl auth bytes to server: r1  )rN  r   r  r   r   r  rk  r   encoder  r  r
  r   r   r   )r   sasl_auth_bytesr(  r)  r   r   s         r+   _send_sasl_authenticate(BrokerConnection._send_sasl_authenticate  s    ..0a<-a0AGJJwJ.IIDdCP_L`a&))%,,s?7K*L*^_#\2 &KTR11dA2FG


%&s   1B
 
C&AC!!C&c                 l   U R                  5       nUS:X  a   U R                  R                  5       (       a  g U R                  S5      n[        R
                  " [        R                  " U5      5      nX R                  U5      -  nUS:X  Ga/  U R                  R!                  U5      u  u  pgU R"                  R%                  U5      u  pn
[&        R&                  " 5       U	-
  S-  nU R(                  (       a%  U R(                  R*                  R-                  U5        [        R/                  S	XX5        [        R0                  " UR2                  5      nU[        R4                  LaK  [        R7                  S
XR8                  UR:                  5        U R                  U" UR:                  5      S9  g UR<                  $ [        R/                  SX5        USS  $ ! [        [        4 aL  n[        R                  SU 5        [        R                  " U < SU< 35      nU R                  US9   S nAg S nAff = f)Nr   r*   rw   z/%s: Error receiving sasl auth bytes from serverrs  r1  rM   rP   %s: Response %d (%s ms): %sz#%s: SaslAuthenticate error: %s (%s)z/%s: Received %d raw sasl auth bytes from server)rN  r   r\  rp  r   decodeioBytesIOr  r  r   r
  r   r   r   r   receive_bytesr   r   r   r   request_timerecordr   r3  r4  r5  r   r%   error_message
auth_bytes)r   r(  rc  nbytesr   r   correlation_idr+  r*  	timestamp_timeout
latency_msr>  s                r+   _recv_sasl_authenticate(BrokerConnection._recv_sasl_authenticate  s   ..0a<D0088::	,,Q/D\\"**T"23F--f55D a<,0NN,H,H,N)'n,0,C,C,G,G,W)V))+	1T9J}}**11*=II3T:`)<)<=J/		? 3 3X5K5KM

H,B,B!C
D&&& IIGV8O1  . 	MMKTR--$.BCCJJSJ!		s   AG H3'AH..H3c                 $   U R                   R                  5       (       d  U R                   R                  5       nU R                  U5        U R	                  5       (       d(  UR                  [        R                  " SU -  5      5      $ U R                  5       nUc(  UR                  [        R                  " SU -  5      5      $ U R                   R                  U5        U R                   R                  5       (       d  M  U R                   R                  5       (       a?  [        R                  SX R                   R                  5       5        UR                  S5      $ UR                  [        R                  " SU R                   S   -  5      5      $ )Nz/%s: Connection failure during Sasl Authenticatez%s: %sTz"Failed to authenticate via SASL %srg   )r   r\  r  rv  _can_send_recvr6  r   r   r  receiveis_authenticatedr   r   auth_detailsr$  SaslAuthenticationFailedErrorr   )r   r*  
send_token
recv_tokens       r+   r[  #BrokerConnection._sasl_authenticate(  sN   &&..00--88:J((4&&((~~f&A&ABsvzBz&{||557J!~~f&A&ABsvzBz&{||$$,,Z8 &&..00 0022HHXt%9%9%F%F%HI>>$''>>&"F"FGknrnyny  {K  oL  HL  #M  N  Nr*   c                 d    U R                   [        R                  L a  U R                  5       S:  $ g)z`
Return true if we are disconnected from the given node and can't
re-establish a connection yet
r   F)r   r4   r6   connection_delayr   s    r+   r   BrokerConnection.blacked_out;  s.    
 ::)666((*Q..r*   c                 d    U R                   [        R                  La  gU R                  5       S:  $ )z:
Return True if we are connected but currently throttled.
Fr   )r   r4   r9   throttle_delayr   s    r+   	throttledBrokerConnection.throttledD  s.     ::-777""$q((r*   c                     U R                   b5  U R                   [        R                  " 5       -
  S-  nUS:  a  U$ SU l         gg)zT
Return the number of milliseconds to wait until connection is no longer throttled.
NrP   r   )r   r   )r   remaining_mss     r+   r  BrokerConnection.throttle_delayL  sG     * //$))+=ELa##&*#r*   c                    U R                  5       (       d  U R                  5       (       aX  [        U R                  5      S:  a  g[        R                  " 5       U R
                  -
  n[        U R                  U-
  S5      S-  $ [        S5      $ )a  
Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When connecting or disconnected,
this respects the reconnect backoff time. When connected, returns a very
large number to handle slow/stalled connections.
r   rP   r   )	r   r   r  r   r   r   max_reconnect_backofffloat)r   time_waiteds     r+   r  !BrokerConnection.connection_delayY  sq     $//"3"3499~!"iikD,=,==422[@!DtKK
 <r*   c                 :    U R                   [        R                  L $ )z$Return True iff socket is connected.)r   r4   r9   r   s    r+   r   BrokerConnection.connectedl  s    zz-7777r*   c                     U R                   [        R                  [        R                  [        R                  [        R
                  [        R                  4;   $ )zzReturns True if still connecting (this may encompass several
different states, such as SSL handshake, authorization, etc).)r   r4   r7   r8   r:   r;   r<   r   s    r+   r   BrokerConnection.connectingp  sJ     zz.99.88.==.@@.@@	B B 	Br*   c                 z    U R                   [        R                  [        R                  [        R                  4;   $ )zReturns True if socket is connected but full connection is not complete.
During this time the connection may send api requests to the broker to
check api versions and perform SASL authentication.)r   r4   r:   r;   r<   r   s    r+   initializingBrokerConnection.initializingy  s8     zz.==.@@.@@B B 	Br*   c                 :    U R                   [        R                  L $ )z Return True iff socket is closed)r   r4   r6   r   s    r+   r   BrokerConnection.disconnected  s    zz-::::r*   c                     U R                  5       =(       a.    U R                  S:  =(       a    [        U R                  5      S:H  $ )zJReturn True iff connection attempt failed after attempting all dns recordsr   )r   r   r  r   r   s    r+   connect_failedBrokerConnection.connect_failed  s4      "Ut'8'8A'=U#dii.TUBUUr*   c                 @    SU l         U R                  S   S-  U l        g )Nr   rE   r   )	_failuresr   r  r   s    r+   r   )BrokerConnection._reset_reconnect_backoff  s     "&++.D"E"Nr*   c                     [        SS5      $ )Ng?g333333?r   r   s    r+   _reconnect_jitter_pct&BrokerConnection._reconnect_jitter_pct  s    sC  r*   c                    [        U R                  5      S:  a  g U R                  S   U R                  S   :  a  U =R                  S-  sl        U R                  S   SU R                  S-
  -  -  U l        [        U R                  U R                  S   5      U l        U =R                  U R                  5       -  sl        U =R                  S-  sl        [        R                  SX R                  U R                  5        g g )Nr   rG   rE   rM   ru   r   z*%s: reconnect backoff %s after %s failures)	r  r   r   r  r  r:  r  r   r   r   s    r+   _update_reconnect_backoff*BrokerConnection._update_reconnect_backoff  s    tyy>A;;12T[[AW5XXNNaN&*kk2H&IARVR`R`cdRdLe&eD#&)$*A*A4;;OiCj&kD###t'A'A'CC###v-#IIBDJaJacgcqcqr Yr*   c                     [        U S5      (       a0  U R                  b"  U R                  R                  5         S U l        g g g )Nr   )r  r   r   r   s    r+   _close_socketBrokerConnection._close_socket  s7    4!!djj&<JJDJ '=!r*   c                 $    U R                  5         g rS  )r  r   s    r+   __del__BrokerConnection.__del__  s    r*   c                    U R                   [        R                  L a  gU R                     U R                   [        R                  L a
   SSS5        g[        R	                  U(       a  [
        R                  O[
        R                  SX=(       d    S5        U(       a  U R                  5         SU l	        SU l
        U R                  5         [        U R                  S   U R                  S   S9U l        SU l        Uc  [         R"                  " [%        U 5      5      n['        U R(                  R+                  5       5      nU R(                  R-                  5         [        R                  U l         U R.                  nSU l        SSS5        U R                  S   " U R0                  WU 5        U(       a  UR3                  5         W H  u  nu  pVnUR5                  U5        M     g! , (       d  f       Ng= f)	zClose socket and fail all in-flight-requests.

Arguments:
    error (Exception, optional): pending in-flight-requests
        will be failed with this exception.
        Default: kafka.errors.KafkaConnectionError.
Nz%s: Closing connection. %srf   r@   r[   ry   r*   r^   )r   r4   r6   r   r   loggingERRORINFOr  r   r   r   r   r   r   r   r   	Cancelledr=  listr   itemsclearr   rC   r   r6  )r   r   ifrsr`   _correlation_idr*  
_timestampr  s           r+   r   BrokerConnection.close  sw    ::)666ZZzz-::: Z GGUGMM>Z\`bmkmn..0(,D%%)D"%%'*++k2 KK68DN !$D}((T3//5578D##))+)66DJ ::DDJ1 6 	+,T\\4FJJLAE=_<v8NN5! BF= Zs   G%D1G%%
G3c                 P    U R                  5       =(       d    U R                  5       $ )z8Return True iff socket is ready for requests / responses)r   r  r   s    r+   r  BrokerConnection._can_send_recv  s    ~~64#4#4#66r*   c                 N   [        5       nU R                  5       (       a.  UR                  [        R                  " [        U 5      5      5      $ U R                  5       (       d.  UR                  [        R                  " [        U 5      5      5      $ U R                  5       (       dq  U R                  5       (       a.  UR                  [        R                  " [        U 5      5      5      $ UR                  [        R                  " [        U 5      5      5      $ U R                  XUS9$ )a  Queue request for async network send, return Future()

Arguments:
    request (Request): kafka protocol request object to send.

Keyword Arguments:
    blocking (bool, optional): Whether to immediately send via
        blocking socket I/O. Default: True.
    request_timeout_ms: Custom timeout in milliseconds for request.
        Default: None (uses value from connection configuration)

Returns: future
r  )r	   r   r6  r   NodeNotReadyErrorr=  r   r   can_send_morer  ThrottlingQuotaExceededErrorTooManyInFlightRequestsr  )r   r)  r  rD   r*  s        r+   r_  BrokerConnection.send  s     ??>>&":":3t9"EFF!!>>&"="=c$i"HII##%%~~~~f&I&I#d)&TUU>>&"@"@T"KLLzz'I[z\\r*   c                    U=(       d    U R                   S   n[        5       nU R                     U R                  5       (       d7  UR	                  [
        R                  " [        U 5      5      5      sS S S 5        $ U R                  R                  U5      n[        R                  SXX15        UR                  5       (       aD  XPR                  ;  d   S5       e[        R                  " 5       nXcS-  -   nXFU4U R                  U'   OUR                  S 5        S S S 5        U(       a  U R!                  5         U$ ! , (       d  f       N'= f)NrD   z"%s: Request %d (timeout_ms %s): %sz!Correlation ID already in-flight!rP   )r   r	   r   r  r6  r   r  r=  r   send_requestr   r   expect_responser   r   r$  send_pending_requests)r   r)  r  rD   r*  r  	sent_time
timeout_ats           r+   r  BrokerConnection._send  s   /T4;;?S3TZZ&&(( ~~f&>&>s4y&IJ Z "^^88ANII:DRdn&&((%-D-DDiFiiD IIK	&t*CD
;Aj:Y''7t$! ( &&(/ Zs   AD9<BD99
Ec                     U R                      U R                  5       (       d
   SSS5        gU R                  R                  5       nU R	                  U5      nSSS5        U R
                  (       a%  U R
                  R                  R                  W5        g! , (       d  f       NE= f! [        [        4 aL  n[        R                  SU 5        [        R                  " U < SU< 35      nU R                  US9   SnAgSnAff = f)zAttempts to send pending requests messages via blocking IO
If all requests have been sent, return True
Otherwise, if the socket is blocked and there are more bytes to send,
return False.
NFT%s: Error sending request datars  r1  )r   r  r   
send_bytesrk  r   
bytes_sentr  r  r  r   r
  r   r   r   )r   rc  total_bytesr   r   s        r+   r  &BrokerConnection.send_pending_requests	  s    	**,,   ~~002"77=	  }}((//<   . 	MM:DA//D!0DEEJJUJ#		s9   B) BB) +B>B) 
B&"B) )D9AD  Dc                     U R                      U R                  5       (       d
   SSS5        gU R                  (       d  U R                  R	                  5       U l        SnU R                  (       a/  U R                  U R                  5      nU R                  US U l        SSS5        U R                  (       a%  U R                  R                  R                  W5        [        U R                  5      S:H  $ ! , (       d  f       N\= f! [        [        [        4 aL  n[        R                  SU 5        [        R                   " U < SU< 35      nU R#                  US9   SnAgSnAff = f)zAttempts to send pending requests messages via non-blocking IO
If all requests have been sent, return True
Otherwise, if the socket is blocked and there are more bytes to send,
return False.
NFr   r  rs  r1  )r   r  r   r   r  rf  r   r  r  r  r  r  	Exceptionr   r
  r   r   r   )r   r  r   r   s       r+   send_pending_requests_v2)BrokerConnection.send_pending_requests_v2   s   	**,,   (((,(A(A(CD%$$"&"2"243D3D"EK(,(9(9+,(GD%  }}((//<t(()Q..% (  y9 	MM:DA//D!0DEEJJUJ#		s;   D C6D A2C6 AD 6
D D E(AE##E(c                    [        USS5      nU R                  (       a%  U R                  R                  R                  U5        U(       d  U R                  b  S U l        g U R
                  S   bQ  U R
                  S   S:  a>  [        R                  " 5       US-  -   n[        X0R                  =(       d    S5      U l        [        R                  SU UR                  R                  U5        g )Nthrottle_time_msr   r[   ru   r   rP   z"%s: %s throttled by broker (%d ms))getattrr   throttle_timer  r   r   r   r  r   r  	__class__r%   )r   r+  r  r  s       r+   _maybe_throttle BrokerConnection._maybe_throttleA  s    "8-?C==MM''../?@"".&*# ;;}%1dkk-6PTZ6Z IIK*:T*AAM"%m5H5H5MA"ND8$&&//1A	Cr*   c                 z    U R                  5       S:  a  gU R                  S   n[        U R                  5      U:  $ )zBCheck for throttling / quota violations and max in-flight-requestsr   FrH   )r  r   r  r   )r   max_ifrss     r+   r  BrokerConnection.can_send_moreQ  s;     1$;;FG4**+h66r*   c                 B   U R                  5       nU(       dv  U R                  5       (       aa  U R                  5       nUS   S   US   S   -
  S-  n[        R	                  SX5        U R                  [        R                  " SU-  5      S9  g[        U5       H  u  nu  pV U R                     U R                  R                  U5      u  pxn	S	S	S	5        [        R                  " 5       W-
  S-  n
U R                  (       a%  U R                  R                   R#                  U
5        [        R%                  SXX5        U R'                  U5        UW4X'   M     U$ ! , (       d  f       N= f! [         a)    U R                  [        R                  " S
5      5           gf = f)zHNon-blocking network receive.

Return list of (response, future) tuples
r   ru   rM   rP   z.%s: timed out after %s ms. Closing connection.zRequest timed out after %s msr1  r#   Nz$Received unrecognized correlation idry  )_recvrequests_timed_outtimed_out_ifrsr   r  r   r   RequestTimedOutError	enumerater   r   r   KeyErrorr   r   r   r~  r  r   r  )r   	responses	timed_out
timeout_msir  r+  r*  r  r  r  s              r+   r#  BrokerConnection.recvX  sk   
 JJL	T4466++-I#A,q/IaLO;tCJKKH*JJV88/J   .7y-A)A)ZZ484K4K4O4OP^4_1V  
 ))+	1T9J}}**11*=II3T:`  *$f-IL .B   Z 

6667]^_s0   "E+.EE+
E(	$E+(E++/FFc                    / nSnU R                      U R                  5       (       d   [        R                  SU 5         SSS5        g[	        U5      U R
                  S   :  a   U R                  R                  U R
                  S   5      nU(       d-  [        R                  SU 5        [        R                  " S5      nO0UR                  U5         [	        U5      U R
                  S   :  a  M  Ucu  S	R/                  U5      nU R0                  (       a.  U R0                  R2                  R5                  [	        U5      5         U R6                  R9                  U5      sSSS5        $ SSS5        U R=                  US
9  g! [        [        4 a     M  [        [        4 aq  n[         R"                  (       a$  UR$                  [$        R&                  :X  a   SnAM  [        R)                  SU 5        [        R                  " U5      n SnAGM  SnAf[*         a    [         R,                  (       a   GMD  e f = f! [        R:                   a  nUn SnANSnAff = f! , (       d  f       N= f)zNTake all available bytes from socket, return list of any responses from parserNz%%s: cannot recv: socket not connectedr#   rO   rN   z%s: socket disconnectedzsocket disconnectedz/%s: Error receiving network data closing socketr*   r1  )r   r  r   r  r  r   r   r#  r   r   r   r   r!   r.   r  r  r   r`  r   r   r
  ra  rb  r   r   bytes_receivedr  r   r}  KafkaProtocolErrorr   )r   recvdr   rc  r   
recvd_datas         r+   r  BrokerConnection._recvz  s   ZZ&&((CTJ Z
 e*t{{+DEE::??4;;7I+JKD
  		";TB$99:OPT* e*t{{+DEE< { XXe_
==MM0077JH
>>77
CY ZZ` 	


= )*;< '6 ww177e.?.?#?MM #459; 55a8C& ww	" 00 C[ Zs   -II$AF ?I F I0AIH9 H6IH6"3HI,HI"H60I4H66I9IIIII
I*c                 (    U R                  5       S:H  $ r   )next_ifr_request_timeout_msr   s    r+   r  #BrokerConnection.requests_timed_out  s    //1Q66r*   c                    ^ [         R                   " 5       m[        U R                  R                  5       SS S9n[	        [        U4S jU5      5      $ )NTc                     U S   $ Nru   r#   )ifrs    r+   rb   1BrokerConnection.timed_out_ifrs.<locals>.<lambda>  s    VYZ[V\r*   )reverser   c                    > U S   T:*  $ r  r#   )r  nows    r+   rb   r    s    s1v}r*   )r   sortedr   valuesr  filter)r   r  r	  s     @r+   r  BrokerConnection.timed_out_ifrs  s?    iikd--446J\]F4d;<<r*   c                 D   U R                      U R                  (       a^  S n[        [        UU R                  R	                  5       5      5      n[        SU[        R                  " 5       -
  S-  5      sS S S 5        $ [        S5      sS S S 5        $ ! , (       d  f       g = f)Nc                     U S   $ r  r#   )vs    r+   get_timeoutABrokerConnection.next_ifr_request_timeout_ms.<locals>.get_timeout  s    Q4Kr*   r   rP   r   )r   r   r:  r<  r  r  r   r  )r   r  next_timeouts      r+   r   ,BrokerConnection.next_ifr_request_timeout_ms  sr    ZZ&& "3{#'#:#:#A#A#C$E  F1|diik9TAB Z U| ZZs   A&B=
B
Bc                 T    U R                   c  U R                  5         U R                   $ rS  )r   check_versionr   s    r+   get_api_versions!BrokerConnection.get_api_versions  s(     % !!!r*   c                    S[         S   4S[        S   4S[        S   4S[        S   4S	[        S
   4S[        S   4S[        S   4S[        S   4S[
        S
   4S[
        S   4S[        S   4S[
        S   4/n[        USS9 HE  u  p4UR                  U;  a  M  XR                     u  pVXTR                  s=::  a  U::  d  M?   Us  $   MG     g)N)ru      r   )ru   rI   ru   )ru   rw   rt   )ru   r     )ru   ru   rI   )ru   rM   
   r  )rM   rM      )rM   r   )r   r  rw   )r   r  ru   )r   r  rM   T)r  )r   r  r   )
r   r   r   r   r   r   r   r
  r9  API_VERSION)r   r8  
test_casesbroker_versionproto_structrA  rB  s          r+   r;  8BrokerConnection._infer_broker_version_from_api_versions  s   ( 034(+,^A&'\"%&'*+\"%&\!_%\!_%_Q'(oa()+A./+,9

@ -3:t,L(N##<7'34H4H'I$K66E+E%% F	 -M r*   c                     [         R                   " 5       U-   nU R                  U[         R                   " 5       -
  5      (       d  [        R                  " 5       eU R                  $ )a4  Attempt to guess the broker version.

Keyword Arguments:
    timeout (numeric, optional): Maximum number of seconds to block attempting
        to connect and check version. Default 2

Note: This is a blocking call.

Returns: version tuple, i.e. (3, 9), (2, 4), etc ...

Raises: NodeNotReadyError on timeout
)r   r   r   r  r   )r   r   kwargsr  s       r+   r  BrokerConnection.check_version  sK     YY[7*
$$Z$))+%=>>**,,$$$r*   c                     SU R                   S   U R                  U R                  U R                  U R                  [
        U R                     U R                  4-  $ )NzA<BrokerConnection client_id=%s, node_id=%s host=%s:%d %s [%s %s]>r@   )r   rC   rz   r{   r   r   r}   r~   r   s    r+   __str__BrokerConnection.__str__  sK    RKK$dllDIItyy$**dnn%tV8 8 	8r*   )r   r   r   r   r   r   r  r   r   r   r  r   r   r   r   r   r~   r}   r   r   r   r|   r   rz   r   r   rC   r{   r   rS  )TN)ru   )Ir%   r&   r'   r(   __doc__r   r   IPPROTO_TCPTCP_NODELAYr   DefaultSelectorr   r   r   r   r   r   r   r   r   r   r   r  r   r   r   r   r   r  r  r!  r"  rN  r   rU  rf  rk  rp  rv  r  r[  r   r  r  r  r   r   r  r   r  r   r  r  r  r  r   r  r_  r  r  r  r  r  r#  r  r  r  r   r  r;  r  r'  r)   r#   r*   r+   r>   r>   _   sa   bH%_{2%% 	";% 	1	%
 	e% 	% 	#E% 	0% 	% 	T% 	F..0B0BAFG% 	D% 	"4% 	[% 	t%  	d!%" 	d#%$ 	%%& 	t'%( 	t)%* 	+%, 	t-%. 	t/%0 	&t1%2 	I--3%4 	 !A5%6 	47%8 	r9%: 	$;%< 	t=%> 	t?%@ 	dA%B 	%gC%D 	$TE%F &*I%NL L	"1%'(	*1-.JKL	&q)*FKL	OA&r*+	NFBP( (-U| @qf*X35j8(%

#2,*4**&"HN&) &8BB;VO!
s
)"V7]48.BC 7 D5n7=
	$",\%&8r*   r>   c                       \ rS rSrS rSrg)r   i  c                 
   Xl         UR                  S5      nU(       GdH  US-   nUR                  S5      nUR                  UR	                  SUS5      [        [        5       S95        UR                  SU/S9nUR                  UR	                  SUS	5      [        5       5        UR                  UR	                  S
US5      [        [        5       S95        UR                  UR	                  SUS5      [        5       5        UR                  UR	                  SUS5      [        5       5        UR                  SU/S9nUR                  UR	                  SUS5      [        5       5        UR                  UR	                  SUS5      [        [        5       S95        UR                  S5      n	U	R                  UR	                  SUS5      [        5       5        U	R                  UR	                  SUS5      [        5       5        UR                  S5      n
U
R                  UR	                  SUS5      [        5       5        U
R                  UR	                  SUS5      [        5       5        SR                  U5      nUR                  US -   5      nU(       GdP  US!-   U-   nUR                  US -   UR                  S5      /S9nUR                  UR	                  SUS"5      [        5       5        UR                  UR	                  S
US5      [        [        5       S95        UR                  UR	                  SUS5      [        5       5        UR                  UR	                  SUS5      [        5       5        UR                  US#-   UR                  S5      /S9nUR                  UR	                  SUS$5      [        5       5        UR                  UR	                  SUS%5      [        [        5       S95        UR                  US&-   UR                  S5      /S9nUR                  UR	                  SUS5      [        5       5        UR                  UR	                  SUS5      [        5       5        UR                  US'-   UR                  S5      /S9n
U
R                  UR	                  SUS5      [        5       5        U
R                  UR	                  SUS5      [        5       5        UR                  US -   5      U l
        UR                  US#-   5      U l        UR                  US&-   5      U l        UR                  US'-   5      U l        g )(Nzbytes-sent-receivedz-metricsznetwork-io-ratezYThe average number of network operations (reads or writes) on all connections per second.)sampled_statz
bytes-sent)parentszoutgoing-byte-ratezDThe average number of outgoing bytes sent per second to all servers.zrequest-ratez/The average number of requests sent per second.zrequest-size-avgz/The average size of all requests in the window.zrequest-size-maxz3The maximum size of any request sent in the window.zbytes-receivedzincoming-byte-ratez!Bytes/second read off all socketszresponse-ratez#Responses received sent per second.zrequest-latencyzrequest-latency-avgz"The average request latency in ms.zrequest-latency-maxz"The maximum request latency in ms.zthrottle-timezthrottle-time-avgz The average throttle time in ms.zthrottle-time-maxz The maximum throttle time in ms.znode-{0}z.bytes-sentz-node-metrics.z5The average number of outgoing bytes sent per second.z.bytes-receivedz,Bytes/second read off node-connection socketz4The average number of responses received per second.z.latencyz	.throttle)rd   
get_sensorsensoraddmetric_namer   r   r
   r   r   r  r  r~  r  )r   rd   re   rC   all_conns_transferredmetric_group_namebytes_transferredr  r  request_latencyr  node_strnode_sensorr~  s                 r+   r    BrokerConnectionMetrics.__init__  s    !( 2 23H I$ 3j @ '/D E!!'"5"5!#4+#, .2uw-GI
 !1B0C ( EJNN7..$&7 #f& NN7.. 1AC %'*, NN7.."$5ACDGEK NN7.."$5EGHKO %^^,<5F4G , INw22$&73 56:f> w22!25 7 %'*,
 &nn->?O 3 3%'84!6   3 3%'84!6 
 $NN?;Mg11#%624  g11#%624  $$W-((M)AB 36F F Q =( ++L9: ( <J NN7..$&7GI  NN7.. 1AC %'*, NN7.."$5AC  NN7.."$5EG 
 %^^,, ++,<=> , @N w22$&7> @  w22!2F H %'*,
 #>>:% ++,=>? * AL W00%'846  W00%'846 
 $NN;& ++O<= + ?M g11#%624  g11#%624  "..M)AB%nnX8I-IJ#NN8j+@A$^^H{,BCr*   )r  r  rd   r~  r  N)r%   r&   r'   r(   r   r)   r#   r*   r+   r   r     s	    ADr*   r   c                 f   U R                  S5      (       a&  U R                  S5      (       a  [        R                  $ [        R                  [        R                  4 H  n [        R
                  " X5        Us  $    [        R                  $ ! [        [        [        R                  4 a     MR  f = f)z
Attempt to determine the family of an address (or hostname)

:return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family
         could not be determined
[])

startswithendswithr   AF_INET6AF_INET	inet_pton
ValueErrorAttributeErrorr   	AF_UNSPEC)addressafs     r+   _address_familyrI    s     #7#3#3C#8#8~~v/	R)I 0  NFLL9 		s   !BB0/B0c                 d   U R                  5       n U R                  S5      (       aF  [        R                  nU SS R	                  S5      u  p#U(       a  [        USS 5      nO[        nX$U4$ SU ;  a  [        U 5      nU [        U4$  [        R                  " [        R                  U 5        U [        [        R                  4$ ! [         a    [        R                  S5         O[        [        R                  4 a     Of = fU R                  SS5      u  p$[        U5      n[        U5      nX$U4$ )a1  
Parse the IP and port from a string in the format of:

    * host_or_ip          <- Can be either IPv4 address literal or hostname/fqdn
    * host_or_ipv4:port   <- Can be either IPv4 address literal or hostname/fqdn
    * [host_or_ip]        <- IPv6 address literal
    * [host_or_ip]:port.  <- IPv6 address literal

.. note:: IPv6 address literals with ports *must* be enclosed in brackets

.. note:: If the port is not specified, default will be returned.

:return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC
r=  rM   Nr>  :zUsocket.inet_pton not available on this platform. consider `pip install win_inet_pton`)stripr?  r   rA  splitintDEFAULT_KAFKA_PORTrI  rC  rE  r   r  rD  r   rsplit)host_and_port_strrH  rz   restr{   s        r+   get_ip_port_afirS    s!    *//1##C((__&qr*005
tABx=D%D2~'' !23B$&8"<<
  2CD(*<fooMM!  D E-  +11#q9JDt9D &Br>!s   ;C D%D Dc                 N    U S   [         R                  [         R                  4;   $ )z8Given a getaddrinfo struct, return True iff ipv4 or ipv6r   )r   rB  rA  )gais    r+   is_inet_4_or_6rV    s    q6fnnfoo666r*   c                      [        [        [        [        R                  " XU[        R
                  5      5      5      $ ! [        R                   a#  n[        R                  SXU5        / s SnA$ SnAff = f)zRReturns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)zDNS lookup failed for %s:%d, exception was %s. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?N)	r  r  rV  r   getaddrinfor   gaierrorr   r  )rz   r{   r|   r.  s       r+   r   r     sq    F>!--d#.4.@.@BC D 	D ??  /
 	$ 	s   <? A6A1+A61A6)\
__future__r   r   r   r   r{  r  randomr   r   ImportErrorkafka.vendorr   r   r   r   r   kafka.errorserrorsr   kafka.futurer	   kafka.metrics.statsr
   r   r   r   kafka.protocol.adminr   r   r   kafka.protocol.api_versionsr   "kafka.protocol.broker_api_versionsr   kafka.protocol.commitr   kafka.protocol.fetchr   kafka.protocol.find_coordinatorr   kafka.protocol.list_offsetsr   kafka.protocol.metadatar   kafka.protocol.parserr   kafka.protocol.producer    kafka.protocol.sasl_authenticater   kafka.protocol.sasl_handshaker   kafka.protocol.typesr   
kafka.saslr   kafka.socks5_wrapperr   kafka.versionr   r`  r   r  r  r  ra  	getLoggerr%   r   rO  r   r   r  r!   r.   r  rE  r  r	  rF  rB  rA  r   objectr4   r>   r   rI  rS  rV  r   r#   r*   r+   <module>rt     s   0   	  6
       5 5 d d : B 4 - B : 3 / 1 D > & ) . % 77llO<<LO! M*oo//11 33( m
NNF
OOV	7v 7q8v q8h%BDf BDJ$."b7
  &// i.  6556`  * A 	B ll<<LL \\*  M9 I sB   F
 2G* 90F 
FFAG'$G* &G''G* *H
H