
    Ii@                       S SK JrJr  S SKrS SKrS SKrS SKrS SKrS SKrS SK	J
r
  S SKJr  S SKJr  S SKJrJrJrJr  S SKJrJr  S SKJrJrJr  S SKJr  S S	KJr  S S
K J!r!J"r"J#r#  S SK$J%r%  \RL                  " \'5      r(S r)Sr*\)\*S.r+\RX                  " S/ SQ5      r-\RX                  " S/ SQ5      r.\RX                  " S/ SQ5      r/ " S S\R`                  5      r1 " S S\R`                  5      r2 " S S\
Rf                  5      r4 " S S\55      r6 " S S\55      r7\7" \7Rp                  \7Rr                  5      \7l:        \7" \7Rp                  \7Rv                  5      \7l<         " S S\55      r= " S  S!\55      r> " S" S#\55      r? " S$ S%\55      r@g)&    )absolute_importdivisionN)six)Future)AvgCountMaxRate)FetchRequestAbortedTransaction)ListOffsetsRequestOffsetResetStrategyUNKNOWN_OFFSET)MemoryRecords)Deserializer)TopicPartitionOffsetAndMetadataOffsetAndTimestamp)Timer   )read_uncommittedread_committedConsumerRecord)topic	partitionleader_epochoffset	timestamptimestamp_typekeyvalueheaderschecksumserialized_key_sizeserialized_value_sizeserialized_header_sizeCompletedFetch)topic_partitionfetched_offsetresponse_versionpartition_datametric_aggregatorExceptionMetadata)r   r)   	exceptionc                       \ rS rSrSrg)NoOffsetForPartitionError2    N__name__
__module____qualname____firstlineno____static_attributes__r2       </venv/lib/python3.13/site-packages/kafka/consumer/fetcher.pyr0   r0   2       r9   r0   c                       \ rS rSrSrg)RecordTooLargeError6   r2   Nr3   r2   r9   r:   r=   r=   6   r;   r9   r=   c                      \ rS rSrSSSSSS\R
                  SSSS	S
SSS.rS rS rS r	S r
S rS*S jrS*S jrS rS rS rS+S jrS rS rS rS rS rS rS rS rS  rS! rS" rS# rS$ rS% rS& r  " S' S(\!5      r"S)r#g),Fetcher:   Nr   i  i   i   Tconsumeri0u  d   r   )key_deserializervalue_deserializerfetch_min_bytesfetch_max_wait_msfetch_max_bytesmax_partition_fetch_bytesmax_poll_records
check_crcsmetricsmetric_group_prefixrequest_timeout_msretry_backoff_ms!enable_incremental_fetch_sessionsisolation_levelc                    [         R                   " U R                  5      U l        U R                   H  nXC;   d  M
  X4   U R                  U'   M     U R                  S   [        ;  a  [        R
                  " S5      eXl        X l        [        R                  " 5       U l
        SU l        SU l        [        R                  " 5       U l        U R                  S   (       a,  [        U R                  S   U R                  S   5      U l        OSU l        [        U R                  S      U l        0 U l        [%        5       U l        SU l        SU l        g)a	  Initialize a Kafka Message Fetcher.

Keyword Arguments:
    key_deserializer (callable): Any callable that takes a
        raw message key and returns a deserialized key.
    value_deserializer (callable, optional): Any callable that takes a
        raw message value and returns a deserialized value.
    enable_incremental_fetch_sessions: (bool): Use incremental fetch sessions
        when available / supported by kafka broker. See KIP-227. Default: True.
    fetch_min_bytes (int): Minimum amount of data the server should
        return for a fetch request, otherwise wait up to
        fetch_max_wait_ms for more data to accumulate. Default: 1.
    fetch_max_wait_ms (int): The maximum amount of time in milliseconds
        the server will block before answering the fetch request if
        there isn't sufficient data to immediately satisfy the
        requirement given by fetch_min_bytes. Default: 500.
    fetch_max_bytes (int): The maximum amount of data the server should
        return for a fetch request. This is not an absolute maximum, if
        the first message in the first non-empty partition of the fetch
        is larger than this value, the message will still be returned
        to ensure that the consumer can make progress. NOTE: consumer
        performs fetches to multiple brokers in parallel so memory
        usage will depend on the number of brokers containing
        partitions for the topic.
        Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 MB).
    max_partition_fetch_bytes (int): The maximum amount of data
        per-partition the server will return. The maximum total memory
        used for a request = #partitions * max_partition_fetch_bytes.
        This size must be at least as large as the maximum message size
        the server allows or else it is possible for the producer to
        send messages larger than the consumer can fetch. If that
        happens, the consumer can get stuck trying to fetch a large
        message on a certain partition. Default: 1048576.
    check_crcs (bool): Automatically check the CRC32 of the records
        consumed. This ensures no on-the-wire or on-disk corruption to
        the messages occurred. This check adds some overhead, so it may
        be disabled in cases seeking extreme performance. Default: True
    isolation_level (str): Configure KIP-98 transactional consumer by
        setting to 'read_committed'. This will cause the consumer to
        skip records from aborted tranactions. Default: 'read_uncommitted'
rQ   zUnrecognized isolation_levelNrL   rM   )copyDEFAULT_CONFIGconfigISOLATION_LEVEL_CONFIGErrorsKafkaConfigurationError_client_subscriptionscollectionsdeque_completed_fetches_next_partition_records	_iterator_fetch_futuresFetchManagerMetrics_sensors_isolation_level_session_handlersset"_nodes_with_pending_fetch_requests_cached_list_offsets_exception _next_in_line_exception_metadata)selfclientsubscriptionsconfigsr    s        r:   __init__Fetcher.__init__L   s   T ii 3 34;;C~#*<C   ;;()1GG001OPP+"-"3"3"5'+$)//1;;y!/I0FTiHjkDM DM 6t{{CT7U V!#25%/.2+04-r9   c                 H   / n[         R                  " U R                  5       5       H  u  nu  p4[        R	                  SU5        U R
                  R                  U5        U R                  R                  X#SS9nUR                  U R                  X$[        R                  " 5       5        UR                  U R                  U5        UR                  U R                  U5        UR!                  U5        M     U R"                  R%                  U5        U R'                  5         U$ )zSend FetchRequests for all assigned partitions that do not already have
an in-flight fetch or pending fetch data.

Returns:
    List of Futures: each future resolves to a FetchResponse
zSending FetchRequest to node %sF)wakeup)r   	iteritems_create_fetch_requestslogdebugrf   addrY   sendadd_callback_handle_fetch_responsetimeadd_errback_handle_fetch_erroradd_both_clear_pending_fetch_requestappendr`   extend_clean_done_fetch_futures)ri   futuresnode_idrequestfetch_offsetsfutures         r:   send_fetchesFetcher.send_fetches   s     14t?Z?Z?\1]-G-gII7A3377@\\&&w&FF ; ;WUYU^U^U`at77AOOD==wGNN6" 2^ 	""7+&&(r9   c                      U R                   (       d  g U R                   S   R                  (       d  g U R                   R                  5         MM  Nr   )r`   is_donepopleftri   s    r:   r   !Fetcher._clean_done_fetch_futures   s?    &&&&q)11'') r9   c                 L    U R                  5         [        U R                  5      $ )zAReturn True if there are any unprocessed FetchRequests in flight.)r   boolr`   r   s    r:   in_flight_fetchesFetcher.in_flight_fetches   s    &&(D''((r9   c                 T   U R                   Ssol         U(       a  UeU R                  R                  5       nU(       d  g[        R	                  SU5        [        5       nU H3  nU R                  R                  U   R                  nU(       d  M/  XSU'   M5     U R                  U5        g)at  Reset offsets for the given partitions using the offset reset strategy.

Arguments:
    partitions ([TopicPartition]): the partitions that need offsets reset

Returns:
    bool: True if any partitions need reset; otherwise False (no reset pending)

Raises:
    NoOffsetForPartitionError: if no offset reset strategy is defined
    KafkaTimeoutError if timeout_ms provided
NFzResetting offsets for %sT)	rg   rZ   partitions_needing_resetrs   rt   dict
assignmentreset_strategy_reset_offsets_async)ri   exc
partitionsoffset_resetstptss         r:   reset_offsets_if_neededFetcher.reset_offsets_if_needed   s     483V3VX\00I((AAC
		,j9B$$//3BBBr$&b! 
 	!!-0r9   c                 P    U R                  X5      nU H  nXC;  d  M
  SX4'   M     U$ )a  Fetch offset for each partition passed in ``timestamps`` map.

Blocks until offsets are obtained, a non-retriable exception is raised
or ``timeout_ms`` passed.

Arguments:
    timestamps: {TopicPartition: int} dict with timestamps to fetch
        offsets by. -1 for the latest available, -2 for the earliest
        available. Otherwise timestamp is treated as epoch milliseconds.
    timeout_ms (int, optional): The maximum time in milliseconds to block.

Returns:
    {TopicPartition: OffsetAndTimestamp}: Mapping of partition to
        retrieved offset, timestamp, and leader_epoch. If offset does not exist for
        the provided timestamp, that partition will be missing from
        this mapping.

Raises:
    KafkaTimeoutError if timeout_ms provided
N)_fetch_offsets_by_times)ri   
timestamps
timeout_msoffsetsr   s        r:   offsets_by_timesFetcher.offsets_by_times   s2    * ..zFB "  r9   c                    U(       d  0 $ [        USU< S35      n[        R                  " U5      n[        5       n U(       d  0 $ U R                  U5      nU R                  R                  XSR                  S9  UR                  (       d  GOUR                  5       (       aV  UR                  UR                  S   5        UR                  S   (       d  U$ UR                  S    Vs0 s H  ofX   _M	     nnO!UR                  5       (       d  UR                  eUR                  R                  (       d%  U R                  R                  R                  (       aZ  U R                  R                  R!                  5       nU R                  R                  XsR                  S9  UR                  (       d  OOtUR                  b  UR                  U R"                  S   :  a'  [$        R&                  " U R"                  S   S-  5        O#[$        R&                  " UR                  S-  5        UR)                  5         GM  [*        R,                  " SU< S35      es  snf )Nz'Failed to get offsets by timestamps in z ms)r   r   r   r   rO     )r   rS   r   _send_list_offsets_requestsrY   pollr   r   	succeededupdater!   	retriabler.   invalid_metadataclusterneed_updaterequest_updaterU   ry   sleepmaybe_raiserW   KafkaTimeoutError)ri   r   r   timerfetched_offsetsr   r   refresh_futures           r:   r   Fetcher._fetch_offsets_by_times   s   IjT^"`aYYz*
&	55jAFLLV8H8HI >>!!&&v||A7||A**;A<<?K?R*.0?
K
%%''&&&00DLL4H4H4T4T!%!5!5!D!D!F!!DTDT!U~~ & ##+u/?/?$++N`Ba/aJJt{{+=>EFJJu//$67C F &&>HJL 	L' Ls   &Ic                 D    U R                  U[        R                  U5      $ N)beginning_or_end_offsetr   EARLIESTri   r   r   s      r:   beginning_offsetsFetcher.beginning_offsets  s%    +++44jB 	Br9   c                 D    U R                  U[        R                  U5      $ r   )r   r   LATESTr   s      r:   end_offsetsFetcher.end_offsets  s%    +++22J@ 	@r9   c                     [        U Vs/ s H  oDU4PM     sn5      nU R                  XS5      nU H  nXd   R                  Xd'   M     U$ s  snf r   )r   r   r   )ri   r   r   r   r   r   r   s          r:   r   Fetcher.beginning_or_end_offset  sQ    Z@Zr	?Z@A
..zFB!+,,GK 	 As   A
c                    Uc  U R                   S   nUS:  d   eU R                  b~  U R                  nSU l        UR                  nU R                  R	                  U5      (       a?  U R                  R                  U5      R                  UR                  :X  a  UR                  e[        R                  " [        5      nUnSnSn US:  a  U R                  (       d[  U R                  (       d  OU R                  R                  5       n	U	R                  nU	R                  nU R!                  U	5      U l        OLU R                  R                  nU R                  R"                  nX`R%                  UU R                  UU5      -  nUS:  a  M  [+        U5      [-        U R                  5      4$ ! [&         a$  n
U(       d  U
e[)        XxU
5      U l         Sn
A
NISn
A
ff = f)aI  Returns previously fetched records and updates consumed offsets.

Arguments:
    max_records (int): Maximum number of records returned. Defaults
        to max_poll_records configuration.

Raises:
    OffsetOutOfRangeError: if no subscription offset_reset_strategy
    CorruptRecordError: if message crc validation fails (check_crcs
        must be set to True)
    RecordTooLargeError: if a message is larger than the currently
        configured max_partition_fetch_bytes
    TopicAuthorizationError: if consumer is not authorized to fetch
        messages from the topic

Returns: (records (dict), partial (bool))
    records: {TopicPartition: [messages]}
    partial: True if records returned did not fully drain any pending
        partition requests. This may be useful for choosing when to
        pipeline additional fetch requests.
NrJ   r   )rU   rh   r   rZ   is_fetchablepositionr   r)   r.   r[   defaultdictlistr^   r]   r   r(   _parse_fetched_datanext_fetch_offset_append	Exceptionr-   r   r   )ri   max_recordsupdate_offsetsexc_metar   drainedrecords_remainingfetched_partitionr)   
completiones              r:   fetched_recordsFetcher.fetched_records&  s   , ++&89KQ00<<<H48D1##B""//338K8K8T8TUW8X8_8_ckczcz8z((())$/' 	l#a'3322!%!8!8!@!@!BJ(2(B(B%%/%>%>N373K3KJ3WD0(,(D(D(T(T%%)%A%A%S%SN%g6:6R6R6G6D*F F% $a'( G}d4#:#:;;;  	l4EFWij4kD11		ls   (F- 0BF- -
G7GGc                 T   U(       d  gUR                   nU R                  R                  U5      (       d  [        R	                  SU5        GOU R                  R                  U5      (       d  [        R	                  SU5        GOU R                  R                  U   R                  nUR                  UR                  :X  Ga+  [        R	                  SUR                  U5        UR                  U5      nU(       a  X   R                  U5        U R                  R                  U   R                  nUbB  U R                  (       a1  U R                  R                  R                  XR                  -
  5        U(       d  U(       d^  [        R	                  SXRR                  UR                   5        [#        UR                  SS5      U R                  R                  U   l        [%        U5      $ [        R	                  SXRR                  UR                  5        UR'                  5         g)	Nr   zMNot returning fetched records for partition %s since it is no longer assignedzWNot returning fetched records for assigned partition %s since it is no longer fetchablez@Returning fetched records at offset %d for assigned partition %szIUpdating fetch position for assigned partition %s to %s (leader epoch %s) r   zMIgnoring fetched records for %s at offset %s since the current position is %d)r(   rZ   is_assignedrs   rt   r   r   r   r   r   taker   	highwaterrb   records_fetch_lagrecordr   r   lendrain)	ri   r   partr   r   r   r   part_recordsr   s	            r:   r   Fetcher._appendd  s   !!""..r22 II 89;=$$11"55 II <=?A **55b9BBH%%8		 *+3??B@#yy5  K&&|4
 !//::2>HH	(T]]MM33::9G]G];]^!IIi "8"8$:K:KMBSTXTjTjlnprBsD''2226?<((
 		 89;=S=S"//+ 	

r9   c                    U R                   R                  U5      (       d  [        R                  SU5        g U R                   R	                  U5      (       d  [        R                  SU5        g U(       a=  X R                   R
                  U   R                  :X  d  [        R                  SU5        g [        R                  SX5        U R                   R                  X5        g )Nz=Skipping reset of partition %s since it is no longer assignedz>Skipping reset of partition %s since reset is no longer neededzLSkipping reset of partition %s since an alternative reset has been requestedz/Resetting offset for partition %s to offset %s.)	rZ   r   rs   rt   is_offset_reset_neededr   r   infoseek)ri   r   r   r   s       r:   _reset_offset_if_neededFetcher._reset_offset_if_needed  s     ""..y99IIUW`a$$;;IFFIIVXaby,?,?,J,J9,U,d,ddIIdfopHHF	Z$$Y7r9   c                   ^  T R                  U5      n[        R                  " U5       H  u  p4T R                  R	                  U5      (       d  M'  [        UR                  5       5      n[        R                  " 5       T R                  S   S-  -   nT R                  R                  XV5        U 4S jnU 4S jnT R                  X45      n	U	R                  Xt5        U	R                  X5        M     g )NrN   r   c                 j  > Uu  p#U(       af  TR                   R                  U[        R                  " 5       TR                  S   S-  -   5        TR                  R
                  R                  5         [        R                  " U5       H'  u  pEX   u  pgTR                  XFUR                  5        M)     g )NrO   r   )rZ   reset_failedry   rU   rY   r   r   r   rq   r   r   )	timestamps_and_epochsresultr   partitions_to_retryr   r   r   _epochri   s	           r:   
on_success0Fetcher._reset_offsets_async.<locals>.on_success  s    7=4&''445H$))+X\XcXcdvXwz~X~J~LL((779),)G%I!6!AJB00N *Hr9   c                 V  > TR                   R                  U [        R                  " 5       TR                  S   S-  -   5        TR                  R
                  R                  5         [        USS5      (       d0  TR                  (       d  UTl        g [        R                  SU5        g g )NrO   r   r   FzKDiscarding error in ListOffsetResponse because another error is pending: %s)rZ   r   ry   rU   rY   r   r   getattrrg   rs   error)r   r   ri   s     r:   
on_failure0Fetcher._reset_offsets_async.<locals>.on_failure  s    ##00TYY[4;;WiKjmqKq=qr$$335uk599>>>C;		"oqvw	 :r9   )_group_list_offset_requestsr   rq   rY   readyre   keysry   rU   rZ   set_reset_pending_send_list_offsets_requestrw   rz   )
ri   r   timestamps_by_noder   r   r   	expire_atr   r   r   s
   `         r:   r   Fetcher._reset_offsets_async  s    !==jI.1mm<N.O*G<<%%g..2779:J		dkk2F&G$&NNI11*HOx 44WTF
Bz6; /Pr9   c                   ^^	^
 U R                  U5      nU(       d,  [        5       R                  [        R                  " 5       5      $ [        5       m	[        5       m[        5       m
[        U5      /nUU	U
4S jnU	4S jn[        R                  " U5       H8  u  paU R                  Xa5      nUR                  XC5        UR                  U5        M:     T	$ )a&  Fetch offsets for each partition in timestamps dict. This may send
request to multiple nodes, based on who is Leader for partition.

Arguments:
    timestamps (dict): {TopicPartition: int} mapping of fetching
        timestamps.

Returns:
    Future: resolves to a mapping of retrieved offsets
c                    > U S==   S-  ss'   TR                  US   5        TR                  US   5        U S   (       d&  TR                  (       d  TR                  TT45        g g g Nr   r   )r   r   success)remaining_responsesr!   r   list_offsets_futurer   s     r:   r   7Fetcher._send_list_offsets_requests.<locals>.on_success  se    "a'"""58,&&uQx0&q)2E2M2M#++_>Q,RS 3N)r9   c                 L   > TR                   (       d  TR                  U 5        g g r   )r   failure)errr  s    r:   on_fail4Fetcher._send_list_offsets_requests.<locals>.on_fail  s    &..#++C0 /r9   )r   r   r	  rW   StaleMetadatar   re   r   r   rq   r   rw   rz   )ri   r   r   r  r   r  r   _fr   r  r   s           @@@r:   r   #Fetcher._send_list_offsets_requests  s     "==jI!8##F$8$8$:;; %h&!e"#567	T	1 $'==1C#DG00EBOOJ<NN7# $E #"r9   c                 4   [         R                  " [        5      n[        R                  " U5       H  u  p4U R
                  R                  R                  U5      nUca  U R
                  R                  UR                  5        [        R                  SU5        U R
                  R                  R                  5         M  US:X  a<  [        R                  SU5        U R
                  R                  R                  5         M  SnXF4X%   U'   M     [        U5      $ )Nz+Partition %s is unknown for fetching offsetr   zRLeader for partition %s unavailable for fetching offset, wait for metadata refresh)r[   r   r   r   rq   rY   r   leader_for_partition	add_topicr   rs   rt   r   )ri   r   r   r   r   r   r   s          r:   r   #Fetcher._group_list_offset_requests  s    (44T:$'MM*$= Ill**??	JG&&y7		GS$$335B		 >?HJ$$335!:C9R"+I6 %> &''r9   c           	      v  ^ U R                   R                  [        SS9nU R                  S   S:X  a  US:  a  [        R
                  " S5      e[        R                  " [        5      n[        R                  " U5       H^  u  nu  pgUS:  a  UR                  Xv4nO$US:  a  UR                  U4nOUR                  US4nXER                     R                  U5        M`     US::  a.  [        U   " S[        [        R                  " U5      5      5      n	O8[        U   " SU R                  [        [        R                  " U5      5      5      n	[        5       m[         R#                  S	X5        U R                   R%                  X5      n
U
R'                  U R(                  T5        U
R+                  U4S
 j5        T$ )N   max_versionrQ   r      z@read_committed isolation level requires ListOffsetsRequest >= v2r   r   z)Sending ListOffsetRequest %s to broker %sc                 &   > TR                  U 5      $ r   )r	  )r   r   s    r:   <lambda>4Fetcher._send_list_offsets_request.<locals>.<lambda>  s    !2r9   )rY   api_versionr   rU   rW   UnsupportedVersionErrorr[   r   r   r   rq   r   r   r~   rc   r   rs   rt   rv   rw   _handle_list_offsets_responserz   )ri   r   r   versionby_topicr   r   r   datar   r  r   s              @r:   r   "Fetcher._send_list_offsets_request  sp   ,,**+=1*M;;()-=='A+001stt**40-0]];P-Q)B)!|l>Ai0i3XX%%d+ .R a<(1x013G )1))x013G 		=wP\\w0
::FC
23r9   c           	         [        5       n[        5       n[        5       nUR                   GH  u  pgU GH   nUSS u  p[        Xi5      n	[        R
                  " U
5      nU[        R                  L a  UR                  S:X  a3  US   n[        U5      S::  d   S5       eU(       d  [        nOUS   nSnSnO"UR                  S::  a
  USS u  pSnOUSS u  pn[        R                  SXX5        U[        :w  a  [        XU5      X9'   M  M  U[        R                  L a  [        R                  S	U	5        GM  U[        R                  [        R                  [        R                   4;   a5  [        R                  S
UR"                  U	5        UR%                  U	5        GMk  U[        R&                  L a*  [        R)                  SU	5        UR%                  U	5        GM  U[        R*                  L a  UR%                  U5        GM  [        R)                  SXR"                  5        UR%                  U	5        GM     GM     U(       a&  UR-                  [        R*                  " U5      5        gUR/                  X445        g)zCallback for the response of the ListOffsets api call

Arguments:
    future (Future): the future to update based on response
    response (ListOffsetsResponse): response from the server

Raises:
    AssertionError: if response does not match partition
Nr  r   r   z,Expected ListOffsetsResponse with one offsetr      z^Handling ListOffsetsResponse response for %s. Fetched offset %s, timestamp %s, leader_epoch %sz_Cannot search by timestamp for partition %s because the message format version is before 0.10.0zEAttempt to fetch offsets for partition %s failed due to %s, retrying.zReceived unknown topic or partition error in ListOffsets request for partition %s. The topic/partition may not exist or the user may not have Describe access to it.z;Attempt to fetch offsets for partition %s failed due to: %s)r   re   topicsr   rW   for_codeNoErrorAPI_VERSIONr   r   rs   rt   r    UnsupportedForMessageFormatErrorNotLeaderForPartitionErrorReplicaNotAvailableErrorKafkaStorageErrorr4   ru   UnknownTopicOrPartitionErrorwarningTopicAuthorizationFailedErrorr	  r  )ri   r   responser   r   unauthorized_topicsr   	part_datapartition_infor   
error_code
error_typer   r   r   r   s                   r:   r  %Fetcher._handle_list_offsets_response!  s?    &!e!e (E"+(6r(:%	*5<	#__Z8
/++q0"0"3"7|q0`2``0&%3F%,QZF$(	')!--2,:12,>)	'):H:L7	<II Q'J /5G[g5h2 06#J#JJ II IJSUF$E$E$*$C$C$*$<$<$> > II 23=3F3F	S'++I66#F#FFKK !) +45 (++I66#G#GG'++E2KK !&'02E2EG'++I6_ #, !0b NN6??@STUNNOABr9   c                 L   U R                   R                  5       n[        R                  " U R                  5       Vs1 s H  o"R                  iM     nnU R
                  nU(       a  UR                  UR                  5        U Vs/ s H  oUU;  d  M
  UPM     sn$ s  snf s  snf r   )rZ   fetchable_partitionsrS   r]   r(   r^   ru   )ri   	fetchablefetchdiscardcurrentr   s         r:   _fetchable_partitionsFetcher._fetchable_partitionsd  s    ''<<>	 7;ii@W@W6XY6XU((6XY..KK//0&<YrG*;Y<<	 Z =s   B	B!B!c                 l	   U R                   R                  [        SS9n[        R                  " [        R
                  5      nU R                  5        GH4  nU R                   R                  R                  U5      nU R                  R                  U   R                  nUb  US:X  a<  [        R                  SU5        U R                   R                  R                  5         M  U R                   R                  U5      (       d7  U R                   R!                  U5      S:  a  [        R                  SX45        M  U R                   R#                  U5      S:  a  [        R                  SX45        GM   U R                   R%                  U5      (       d  [        R                  S	5        GMX  X@R&                  ;   a  [        R                  S
X45        GM  US:  a'  UR(                  UR*                  U R,                  S   4nO`US::  a(  UR(                  UR*                  SU R,                  S   4nO2UR(                  UR.                  UR*                  SU R,                  S   4nXbU   U'   [        R                  SX5R*                  5        GM7     0 n[0        R2                  " U5       GH  u  pHUS:  aZ  U R,                  S   (       aF  X@R4                  ;  a  [7        U5      U R4                  U'   U R4                  U   R9                  U5      n	O[;        US[<        R>                  5      n	US::  a8  [        U   " SU R,                  S   U R,                  S   U	R@                  5      n
GOUS:X  aE  [        U   " SU R,                  S   U R,                  S   U R,                  S   U	R@                  5      n
OUS::  aP  [        U   " SU R,                  S   U R,                  S   U R,                  S   U RB                  U	R@                  5      n
Op[        U   " SU R,                  S   U R,                  S   U R,                  S   U RB                  U	RD                  U	RF                  U	R@                  U	RH                  5	      n
0 n[0        R2                  " U5       H  u  pUS::  a  US   nOUS   nXU'   M     X4Xt'   GM     U$ )a  Create fetch requests for all assigned partitions, grouped by node.

FetchRequests skipped if no leader, or node has requests in flight

Returns:
    dict: {node_id: (FetchRequest, {TopicPartition: fetch_offset}), ...} (version depends on client api_versions)

   r  Nr   z<No leader found for partition %s. Requesting metadata updater   zMSkipping fetch for partition %s because node %s is awaiting reconnect backoffz<Skipping fetch for partition %s because node %s is throttledzRSkipping fetch for partition %s because connection to leader node is not ready yetzSSkipping fetch for partition %s because there is a pending fetch request to node %s   rI      z2Adding fetch request for partition %s at offset %d   rP   r  rG   rF   r$  rH      r   )%rY   r  r   r[   r   OrderedDictr=  r   r  rZ   r   r   rs   rt   r   	connectedconnection_delaythrottle_delayr   rf   r   r   rU   r   r   rq   rd   FetchSessionHandler
build_nextFetchRequestDataFetchMetadataLEGACYto_sendrc   idepoch	to_forget)ri   r  r9  r   r   r   r3  requestsnext_partitionssessionr   r   r   r+   r   s                  r:   rr   Fetcher._create_fetch_requestso  s-    ,,**<R*H++K,C,CD	335Ill**??	JG**55i@IIH 'R-		 89BD$$335\\++G449V9VW^9_bc9c 		i!, ,,W59 		X!, \\''00 		noCCC		o!,
 Q;!++ $?@&N
 \!++ $?@	&N "++ -- $?@&N 1?'"9-		N#__6s 6x (+i(@$G!|,O P"8"886I'6RD**73009DD_U +?D-BVBVW!|&w/KK 34KK 12OO	%
 A&w/KK 34KK 12KK 12OO% A&w/KK 34KK 12KK 12))OO% 'w/KK 34KK 12KK 12))JJMMOO%%	' M&)mmO&D"a<+A.F+A.F$*b! 'E ") 8Hg )Aj r9   c                 2   UR                   S:  a^  U R                  S   (       aJ  XR                  ;  a  [        R	                  SU5        gU R                  U   R                  U5      (       d  g[        UR                   VVVs/ s H  u  pVU  H  n[        XWS   5      PM     M     snnn5      nU R                  (       a  [        U R                  U5      nOSnUR                   HV  u  pVU HK  n[        XWS   5      n	X)   n
[        XUR                   USS U5      nU R                  R                  U5        MM     MX     U R                  (       a?  U R                  R                  R                  [         R                   " 5       U-
  S-  5        ggs  snnnf )z!The callback for fetch completionrC  rP   zIUnable to find fetch session handler for node %s. Ignoring fetch responseNr   r   r   )r(  rU   rd   rs   r   handle_responsere   r%  r   rb   FetchResponseMetricAggregatorr'   r]   r~   fetch_latencyr   ry   )ri   r   r   	send_timer0  r   r   r+   r,   r   fetch_offsetcompleted_fetchs               r:   rx   Fetcher._handle_fetch_response  sd   1$5X)Y444		egno))'2BB8LL3;??<3B/e0:n )q0AB0: C3B< =
 == =dmmZ X $!)E",#E!+<=,0"0(("12&%	# ''..? #- "1 ==MM''..		i0G4/OP )<s   $Fc                    [        U[        R                  5      (       a  [        R                  O[        R
                  n[        R                  USX5        XR                  ;   a  U R                  U   R                  U5        g g )NzFetch to node %s failed: %s)	
isinstancerW   	CancelledloggingINFOERRORrs   rd   handle_error)ri   r   r.   levels       r:   r{   Fetcher._handle_fetch_error  s^     *9f6F6F G GW]]4gI,,,""7+88C -r9   c                 \     U R                   R                  U5        g ! [         a     g f = fr   )rf   removeKeyError)ri   r   _s      r:   r}   $Fetcher._clear_pending_fetch_request  s-    	33::7C 		s    
++c                 ^   UR                   nUR                  nUR                  S S u  pE[        R                  " U5      nS n U R
                  R                  U5      (       d  [        R                  SU5        GOnU[        R                  L Ga5  U R
                  R                  U   R                  nUb  UR                  U:w  a  [        R                  SX#UR                  5         Uc.  UR                  (       a  UR                  R                  USS5        U[        R                  La  U R
                  R                  U5        g g [!        UR                  S   5      n	S n
UR"                  S:  a  UR                  S   n
OUR"                  S:  a  UR                  S	   n
[        R                  S
U	R%                  5       X#5        U R'                  X2U	U R(                  S   U R(                  S   U R(                  S   U R*                  U
UR                  U R,                  S9
nU	R/                  5       (       dh  U	R%                  5       S:  aT  UR"                  S:  a%  X#0n[1        SU< SU R(                  S   < S3U5      e[        R2                  " SU< SU< S35      eUS:  a  XPR
                  R                  U   l        GO%U[        R6                  [        R8                  [        R:                  [        R<                  4;   aF  [        R                  SX&R>                  5        U R@                  RB                  RE                  5         GOU[        RF                  L a  U R
                  R                  U   R                  nUb  UR                  U:w  a#  [        R                  SX#UR                  5        GO1U R
                  RI                  5       (       a2  [        RK                  SX25        U R
                  RM                  U5        O[        RF                  " X#05      eU[        RN                  L aJ  [        RQ                  SURR                  5        [        RN                  " [U        URR                  /5      5      e[W        USS5      (       aR  [        R                  SX&" 5       5        [W        USS5      (       a$  U R@                  RB                  RE                  5         OU" S5      eUc.  UR                  (       a  UR                  R                  USS5        U[        R                  La  U R
                  R                  U5        U$ ! Uc.  UR                  (       a  UR                  R                  USS5        U[        R                  La  U R
                  R                  U5        f f = f) Nr  zIIgnoring fetched records for partition %s since it is no longer fetchablezdDiscarding fetch response for partition %s since its offset %d does not match the expected offset %dr   r      r  zBPreparing to read %s bytes of data for partition %s with offset %drD   rE   rK   )rD   rE   rK   rQ   aborted_transactionsr,   on_drainr$  z/There are some messages at [Partition=Offset]: z+  whose size is larger than the fetch size rI   z and hence cannot be ever returned. Please condier upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternatively, increase the fetch size on the client (using max_partition_fetch_bytes)z,Failed to make progress reading messages at =zZ. Received a non-empty fetch response from the server, but no complete records were found.zError fetching partition %s: %szqDiscarding stale fetch response for partition %s since the fetched offset %d does not match the current offset %dz6Fetch offset %s is out of range for topic-partition %sz%Not authorized to read from topic %s.r   Fz)Retriable error fetching partition %s: %sr   z$Unexpected error while fetching data),r(   r)   r+   rW   r&  rZ   r   rs   rt   r'  r   r   r   r,   r   move_partition_to_endr   r*   size_in_bytesPartitionRecordsrU   rc   _on_partition_records_drainhas_nextr=   
KafkaErrorr   r*  r+  r-  r,  r4   rY   r   r   OffsetOutOfRangeErrorhas_default_offset_reset_policyr   request_offset_resetr/  r.  r   re   r   )ri   r\  r   r[  r4  r   r5  parsed_recordsr   recordsrp  record_too_large_partitionss               r:   r   Fetcher._parse_fetched_data  s   ,,&55 / > >r B
__Z0
]	>&&33B77 		 =>@B v~~-
  ..99"=FF#x,'FII 457&oo/  J %/*K*K1188QB/ ##99"= 0M ((F(Fr(JK'+$"33r9+:+I+I"+M($55:+:+I+I"+M(		^!//12E!%!6!6|HLTfHgJN++VjJkBF++lB[GKG\G\L`IXIjIj@D@`@` "7 "b ''))g.C.C.E.I&77!;796H31 !< $,G HJ 89 9 %//SUWc1e f f >CL''2226@ A A & ? ? & C C & 8 8 : : 		;RATATU$$335v;;;..99"=FF#x,'FII 346hooW ((HHJJHHUWch''<<R@ 667IJJvCCCCRXXN::3z?KK[%88		Er:<X:'95AALL((779 !GHH %/*K*K1188QB/ ##99"= %/*K*K1188QB/ ##99"= 0s   B#U
 N!U
 
A"V,c                 p    UR                   S:  a&  U R                  R                  UR                  5        g g r   )
bytes_readrZ   rs  r(   )ri   partition_recordss     r:   rv  #Fetcher._on_partition_records_drain  s2     ''!+556G6W6WX ,r9   c                 `    U R                   b  U R                   R                  5         S U l        g r   )r^   r   rh   r   s    r:   closeFetcher.close  s(    ''3((..004-r9   c                   r    \ rS rSrSSS\SSS 4S jrS rS r\rS r	S	 r
SS
 jrS rS rS rS rS rSrg)Fetcher.PartitionRecordsi  NTc                     g r   r2   )xs    r:   r  !Fetcher.PartitionRecords.<lambda>  s    r9   c           
         Xl         X l        SU l        Xl        SU l        SU l        Xpl        [        5       U l        [        R                  " [        U(       a  U Vs/ s H  n[        U6 PM     snO/ S S95      U l        Xl        X`l        [         R"                  " U R$                  U R'                  X#XE5      5      U l        Xl        S U l        g s  snf )Nr   r   c                     U R                   $ r   )first_offset)txns    r:   r  3Fetcher.PartitionRecords.__init__.<locals>.<lambda>  s
    s'7'7r9   )r    )r[  r(   r   r   r  records_readrQ   re   aborted_producer_idsr[   r\   sortedr   rp  r,   rK   	itertools	dropwhile_maybe_skip_record_unpack_recordsrecord_iteratorrq  _next_inline_exception)ri   r[  r   r}  rD   rE   rK   rQ   rp  r,   rq  r!  s               r:   rm   !Fetcher.PartitionRecords.__init__  s    
 !-#%  "D%1"DO !D#2 (+D%(3(9(9Wk>RS>Rd*D1>RSqs79)D% &7"(O#,#6#6''$$R2BW$YD  %M*.D' Ts   Cc                     UR                   U R                  :  a,  [        R                  SUR                   U R                  5        gg)Nz*Skipping message offset: %s (expecting %s)TF)r   r[  rs   rt   )ri   r   s     r:   r  +Fetcher.PartitionRecords._maybe_skip_record  s:    
 }}t000		F --):):<r9   c                     U R                   S L$ r   )r  r   s    r:   __bool__!Fetcher.PartitionRecords.__bool__  s    ''t33r9   c                     U R                   bl  S U l         S U l        U R                  (       a;  U R                  R                  U R                  U R
                  U R                  5        U R                  U 5        g g r   )r  r  r,   r   r(   r  r  rq  r   s    r:   r   Fetcher.PartitionRecords.drain  sa    ##/'+$.2+))**11$2F2FY]YjYjkd# 0r9   c                 P    U R                   (       a  U R                   S sol         Ueg r   )r  )ri   r   s     r:   "_maybe_raise_next_inline_exception;Fetcher.PartitionRecords._maybe_raise_next_inline_exception  s)    **373N3NPT00	 +r9   c                     U R                  5         / n UR                  [        R                  " U R                  SU5      5        U$ ! [
         a  nU(       d  UeX0l         S nAU$ S nAff = fr   )r  r   r  islicer  r   r  )ri   nr}  r   s       r:   r   Fetcher.PartitionRecords.take  sh    335G0y//0D0DaKL N  0G./++N0s   1A 
A+A&&A+c              #   $  #     UR                  5       nS nUGb~  UnU R                  (       aH  UR                  5       (       d3  [        R                  " SU R
                  < SUR                  < S35      eUR                  S:X  Ga9  UR                  U l        U R                  [        :X  a  UR                  5       (       a  U R                  UR                  5        UR                  nU R                  U5      (       a   U R                   R#                  U5        OpU R'                  U5      (       aZ  [(        R+                  SU R
                  XuR                  UR                  5        UR,                  U l        UR                  5       nGMt  UR0                  (       a$  UR,                  U l        UR                  5       nGM  U GH  nU R                  (       aH  UR                  5       (       d3  [        R                  " SU R
                  < SUR2                  < S35      eUR4                  b  [7        UR4                  5      OSn	UR8                  b  [7        UR8                  5      OSn
U R;                  X1R<                  UR4                  5      nU R;                  XAR<                  UR8                  5      nUR>                  nU(       a  [A        S U 5       5      OSnU =RB                  S	-  sl!        U =RD                  URF                  -  sl"        UR2                  S	-   U l        [I        UR<                  URJ                  U R                  UR2                  URL                  URN                  XXRP                  XU5      v   GM     UR                  5       nUb  GM~  U(       a!  UR                  S:X  a  UR,                  U l        U RS                  5         g ! [$         a     GNTf = f! [T         a!    [(        RW                  S
5        [Y        S
5      ef = f7f)NzRecord batch for partition z at offset z failed crc checkr  zXSkipping aborted record batch from partition %s with producer_id %s and offsets %s to %szRecord for partition r   c              3   |   #    U  H2  u  p[        UR                  S 5      5      Ub  [        U5      OS-   v   M4     g7f)zutf-8Nr   )r   encode).0h_keyh_vals      r:   	<genexpr>;Fetcher.PartitionRecords._unpack_records.<locals>.<genexpr>  s;      *%# gsfkCW 56HY#e*_`a#s   :<r   z)StopIteration raised unpacking messageset)-
next_batchrK   validate_crcrW   CorruptRecordErrorr(   base_offsetmagicr   rQ   READ_COMMITTEDhas_producer_id#_consume_aborted_transactions_up_tolast_offsetproducer_id_contains_abort_markerr  rh  ri  _is_batch_abortedrs   rt   next_offsetr   is_control_batchr   r    r   r!   _deserializer   r"   sumr  r  rt  r   r   r   r   r#   r   StopIterationr.   RuntimeError)ri   r   r}  rD   rE   batch
last_batchr  r   key_size
value_sizer    r!   r"   header_sizes                  r:   r  (Fetcher.PartitionRecords._unpack_records  s^    PP**,!
'!&Ju/A/A/C/C$77$($8$8%:K:K!MN N {{a',1,>,>)//>AeF[F[F]F] !DDUEVEVW*/*;*;K#::5AA!)$($=$=$D$D[$Q "&!7!7!>!> #		 +>*.*>*>M^M^`e`q`q!s :?9J9J 6(/(:(:(< ( !115:5F5FD2$+$6$6$8E$"'??63F3F3H3H"(";";(,(<(<fmm%M#N N 7=jj6L3vzz?RT:@,,:RS%6XZ
"//0@((FJJW $ 1 12DhhPVP\P\ ]"(.. )0 '* *%#*% '%57 $ ))Q.)6+?+??171B.,HHbllD4E4Ev}}V\VfVf"113w$+? ?! #(* $..0E{ 'H "j&6&6!&;1;1G1G.JJL_ (0 !)$(!)h ! PIJ"#NOOPsH   PC9O" >O I<O" 8O" P
OO" OO" "+PPc                 p    U(       d  U$ [        U[        5      (       a  UR                  X#5      $ U" U5      $ r   )r_  r   deserialize)ri   fr   bytes_s       r:   r  %Fetcher.PartitionRecords._deserialize)  s0    !\**}}U33V9r9   c                 d   U R                   (       d  g U R                   (       a  U R                   S   R                  U::  ao  U R                  R                  U R                   R	                  5       R
                  5        U R                   (       a   U R                   S   R                  U::  a  Mm  g g g g r   )rp  r  r  ru   r   r  )ri   r   s     r:   r  <Fetcher.PartitionRecords._consume_aborted_transactions_up_to0  s    ,,++0I0I!0L0Y0Y]c0c))--d.G.G.O.O.Q.].]^ ++0I0I!0L0Y0Y]c0c+0c+r9   c                 Z    UR                   =(       a    UR                  U R                  ;   $ r   )is_transactionalr  r  )ri   r  s     r:   r  *Fetcher.PartitionRecords._is_batch_aborted7  s$    ))\e.?.?4C\C\.\\r9   c                 d    UR                   (       d  g[        U5      nU(       d  gUR                  $ )NF)r  nextabort)ri   r  r   s      r:   r  /Fetcher.PartitionRecords._contains_abort_marker:  s'    ))%[F<<r9   )r  r  rp  r  rK   r[  rQ   r   r,   r   rq  r  r  r(   r   )r4   r5   r6   r7   READ_UNCOMMITTEDrm   r  r  __nonzero__r   r  r   r  r  r  r  r  r8   r2   r9   r:   ru  r    sX    &*t $6F*.'+n		/2
		4 	$	
	Q	Pf		_	]	 r9   ru  )rg   rY   r]   r`   rc   r_   rh   r^   rf   rb   rd   rZ   rU   r   )NT)$r4   r5   r6   r7   sysmaxsizerT   rm   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r=  rr   rx   r{   r}   r   rv  r  objectru  r8   r2   r9   r:   r@   r@   :   s     " #%,KK)#-1-N"@5D(*)
@6+LZB@<<|2h8 7D$#L("!FACF	=BQ@DfPY5
q 6 q r9   r@   c                   6    \ rS rSrSrS rS rS rS rS r	Sr
g	)
rI  iC  a9  
FetchSessionHandler maintains the fetch session state for connecting to a broker.

Using the protocol outlined by KIP-227, clients can create incremental fetch sessions.
These sessions allow the client to fetch information about a set of partition over
and over, without explicitly enumerating all the partitions in the request and the
response.

FetchSessionHandler tracks the partitions which are in the session.  It also
determines which partitions need to be included in each fetch request, and what
the attached fetch session metadata should be for each request.
c                 H    Xl         [        R                  U l        0 U l        g r   )r   rL  INITIALnext_metadatasession_partitions)ri   r   s     r:   rm   FetchSessionHandler.__init__Q  s    *22"$r9   c           
         U R                   R                  (       aR  [        R                  SU R                   U R                  [        U5      5        Xl        [        USU R                   5      $ [        U R                  R                  5       5      n[        UR                  5       5      n[        R                  SX25        X2-
  nU H  nX   U R                  U'   M     X#-
  nU H  nU R                  R                  U5        M      [        5       nX2-   H<  nX   U R                  U   :w  d  M  X   U R                  U'   UR                  U5        M>     [        R                  SU R                   U R                  XGX`R                  R                  5       5        [        R                  " U Vs0 s H  oUXG-  ;   d  M  XQU   _M     sn5      n[        XU R                   5      $ s  snf )zm
Arguments:
    next_partitions (dict): TopicPartition -> TopicPartitionState

Returns:
    FetchRequestData
z5Built full fetch %s for node %s with %s partition(s).Nz;Building incremental partitions from next: %s, previous: %szRBuilt incremental fetch %s for node %s. Added %s, altered %s, removed %s out of %s)r  is_fullrs   rt   r   r   r  rK  re   r   popru   r[   rE  )	ri   rS  prev_tpsnext_tpsaddedr   removedalteredrN  s	            r:   rJ  FetchSessionHandler.build_nextV  s    %%IIM""DLL#o2FH&5##OT4;M;MNNt..3356++-.		OQYd#B*9*=D##B' %B##''+ %%B"d&=&=b&AA.=.A''+B &
 			f$$dllEGMdMdMiMiMk	m))_*x_rglgv`w+B2r/B+B_*xy$2D2DEE +ys   -G"<	G"c           
         UR                   [        R                  R                  :w  a  [        R                  " UR                   5      n[
        R                  SU R                  U R                  U" 5       5        U[        R                  L a  [        R                  U l        gU R                  R                  5       U l        gU R                  U5      n[        U R                  R!                  5       5      nU R                  R"                  (       GaB  X4:w  a<  [
        R                  SU R                  X4-
  XC-
  5        [        R                  U l        gUR$                  [        R&                  :X  a@  [
        R)                  SU R                  [+        U5      5        [        R                  U l        gUR$                  [        R,                  :X  a+  [
        R)                  SU R                  [+        U5      5        g[
        R)                  SU R                  UR$                  [+        U5      5        [        R/                  UR$                  5      U l        gX4-
  (       aC  [
        R                  SU R                  X4-
  5        U R                  R                  5       U l        gUR$                  [        R&                  :X  au  [
        R)                  S	U R                  U R                  R$                  [+        U5      [+        U R                  5      [+        U5      -
  5        [        R                  U l        gUR$                  [        R,                  :X  a+  [
        R)                  S
U R                  [+        U5      5        g[
        R)                  SU R                  UR$                  [+        U5      [+        U R                  5      [+        U5      -
  5        U R                  R1                  5       U l        g)Nz<Node %s was unable to process the fetch request with %s: %s.FzFNode %s sent an invalid full fetch response with extra %s / omitted %sz5Node %s sent a full fetch response with %s partitionsTzQNode %s sent a empty full fetch response due to a quota violation (%s partitions)znNode %s sent a full fetch response that created a new incremental fetch session %s with %s response partitionszKNode %s sent an invalid incremental fetch response with extra partitions %szfNode %s sent an incremental fetch response closing session %s with %s response partitions (%s implied)zXNode %s sent a empty incremental fetch response due to a quota violation (%s partitions)zbNode %s sent an incremental fetch response for session %s with %s response partitions (%s implied))r4  rW   r'  errnor&  rs   r   r   r  FetchSessionIdNotFoundErrorrL  r  next_close_existing_response_partitionsre   r  r   r  
session_idINVALID_SESSION_IDrt   r   THROTTLED_SESSION_IDnew_incrementalnext_incremental)ri   r0  r5  response_tpssession_tpss        r:   rW  #FetchSessionHandler.handle_responsex  s   &.."6"66)<)<=JHHSd00*,@V???%2%:%:"  &*%7%7%K%K%M"00:$116689%%%*a|'A;C]_%2%:%:"$$(H(HH		Q,,L(9;%2%:%:"$$(J(JJ		m,,L(9;  		 9,,(;(;l+- &3%B%B8CVCV%W")f|'AC%)%7%7%K%K%M"$$(H(HH		 F,,(:(:(E(El+S1H1H-ICP\L]-]_ &3%:%:"$$(J(JJ		t,,L(9;  		 F,,(;(;l+S1H1H-ICP\L]-]_ &*%7%7%H%H%J"r9   c                 B    U R                   R                  5       U l         g r   )r  r  )ri   
_exceptions     r:   rd   FetchSessionHandler.handle_error  s    !//CCEr9   c                     UR                    VVVs1 s H  u  p#U  H  n[        X$S   5      iM     M     snnn$ s  snnnf r   )r%  r   )ri   r0  r   r   r+   s        r:   r  (FetchSessionHandler._response_partitions  sE    )12)8%E&0N uQ&78&0 9)82 	2 2s   $:)r  r   r  N)r4   r5   r6   r7   __doc__rm   rJ  rW  rd  r  r8   r2   r9   r:   rI  rI  C  s$    %
 FD?BF2r9   rI  c                   n    \ rS rSrSrSrSrSrSrSr	S r
\S 5       r\S 5       rS	 r\S
 5       rS rSrg)rL  i  r  rP  ir   r   c                     Xl         X l        g r   r  )ri   r  rP  s      r:   rm   FetchMetadata.__init__  s    $
r9   c                 t    U R                   U R                  :H  =(       d    U R                   U R                  :H  $ r   )rP  INITIAL_EPOCHFINAL_EPOCHr   s    r:   r  FetchMetadata.is_full  s+    zzT///Q4::AQAQ3QQr9   c                 P    US:  a  U R                   $ XR                  :X  a  gUS-   $ r  )r  	MAX_EPOCH)cls
prev_epochs     r:   
next_epochFetchMetadata.next_epoch  s*    >??"==(>!r9   c                 N    U R                  U R                  U R                  5      $ r   )	__class__r  r  r   s    r:   r  !FetchMetadata.next_close_existing  s    ~~doot/A/ABBr9   c                 D    U " XR                  U R                  5      5      $ r   )r
  r  )r  r  s     r:   r  FetchMetadata.new_incremental  s    :~~c.?.?@AAr9   c                 l    U R                  U R                  U R                  U R                  5      5      $ r   )r  r  r
  rP  r   s    r:   r  FetchMetadata.next_incremental  s$    ~~doottzz/JKKr9   )rP  r  N)r4   r5   r6   r7   	__slots__r  r  r  r  r  rm   propertyr  classmethodr
  r  r  r  r8   r2   r9   r:   rL  rL    ss    'IIMK R R " "C B BLr9   rL  c                   n    \ rS rSrSrS r\S 5       r\S 5       r\S 5       r	\S 5       r
\S 5       rS	rg
)rK  i  )_to_send
_to_forget	_metadatac                 p    U=(       d
    [        5       U l        U=(       d
    [        5       U l        X0l        g r   )r   r  re   r  r  )ri   rN  rQ  metadatas       r:   rm   FetchRequestData.__init__  s#    )46#,su!r9   c                     U R                   $ r   )r  r   s    r:   r  FetchRequestData.metadata  s    ~~r9   c                 .    U R                   R                  $ r   )r  r  r   s    r:   rO  FetchRequestData.id  s    ~~(((r9   c                 .    U R                   R                  $ r   )r  rP  r   s    r:   rP  FetchRequestData.epoch  s    ~~###r9   c                     [         R                  " [        5      n[        R                  " U R
                  5       H"  u  p#XR                     R                  U5        M$     [        UR                  5       5      $ r   )	r[   r   r   r   rq   r  r   r~   items)ri   r+   r   r3  s       r:   rN  FetchRequestData.to_send  sW     %006"%--">B88$++N; #?N((*++r9   c                     [         R                  " [        5      nU R                   H*  nXR                     R                  UR                  5        M,     [        UR                  5       5      $ r   )r[   r   r   r  r   r~   r   r$  )ri   r+   r   s      r:   rQ  FetchRequestData.to_forget  sO     %006//B88$++BLL9 "N((*++r9   )r  r  r  N)r4   r5   r6   r7   r  rm   r  r  rO  rP  rN  rQ  r8   r2   r9   r:   rK  rK    sp    7I"
   ) ) $ $ , , , ,r9   rK  c                       \ rS rSrSrS rSrg)FetchMetricsi  total_bytestotal_recordsc                      SU l         SU l        g r   r*  r   s    r:   rm   FetchMetrics.__init__  s    r9   N)r4   r5   r6   r7   r  rm   r8   r2   r9   r:   r)  r)    s    0Ir9   r)  c                   $    \ rS rSrSrS rS rSrg)rX  i  z
Since we parse the message data for each partition from each fetch
response lazily, fetch-level metrics need to be aggregated as the messages
from each partition are parsed. This class is used to facilitate this
incremental aggregation.
c                 x    Xl         X l        [        5       U l        [        R
                  " [        5      U l        g r   )sensorsunrecorded_partitionsr)  fetch_metricsr[   r   topic_fetch_metrics)ri   r1  r   s      r:   rm   &FetchResponseMetricAggregator.__init__   s*    %/")^#.#:#:<#H r9   c                     U R                   R                  U5        U R                  =R                  U-  sl        U R                  =R                  U-  sl        U R
                  UR                     =R                  U-  sl        U R
                  UR                     =R                  U-  sl        U R                   (       d  U R                  R                  R                  U R                  R                  5        U R                  R                  R                  U R                  R                  5        [        R                  " U R
                  5       H5  u  pEU R                  R                  XER                  UR                  5        M7     gg)z
After each partition is parsed, we update the current metric totals
with the total bytes and number of records parsed. After all partitions
have reported, we write the metric.
N)r2  rh  r3  r+  r,  r4  r   r1  bytes_fetchedr   records_fetchedr   rq   record_topic_fetch_metrics)ri   r   	num_bytesnum_recordsr   rL   s         r:   r   $FetchResponseMetricAggregator.record&  s    	"")))4&&)3&((K7(  1==J=  1??;N? ))LL&&--d.@.@.L.LMLL((//0B0B0P0PQ"%--0H0H"I77?R?RT[TiTij #J *r9   )r3  r1  r4  r2  N)r4   r5   r6   r7   r  rm   r   r8   r2   r9   r:   rX  rX    s    Ikr9   rX  c                        \ rS rSrS rS rSrg)ra   i:  c                 f   Xl         U< S3U l        UR                  S5      U l        U R                  R	                  UR                  SU R                  S5      [        5       5        U R                  R	                  UR                  SU R                  S5      [        5       5        U R                  R	                  UR                  SU R                  S5      [        5       5        U R                   R                  S	5      U l	        U R                  R	                  UR                  S
U R                  S5      [        5       5        U R                  R	                  UR                  SU R                  S5      [        5       5        UR                  S5      U l
        U R                  R	                  UR                  SU R                  S5      [        5       5        U R                  R	                  UR                  SU R                  S5      [        5       5        U R                  R	                  UR                  SU R                  S5      [        [        5       S95        UR                  S5      U l        U R                  R	                  UR                  SU R                  S5      [        5       5        g )Nz-fetch-manager-metricsbytes-fetchedfetch-size-avgz/The average number of bytes fetched per requestfetch-size-maxz/The maximum number of bytes fetched per requestbytes-consumed-ratez/The average number of bytes consumed per secondrecords-fetchedrecords-per-request-avgz-The average number of records in each requestrecords-consumed-ratez1The average number of records consumed per secondzfetch-latencyzfetch-latency-avgz+The average time taken for a fetch request.zfetch-latency-maxz)The max time taken for any fetch request.z
fetch-ratez(The number of fetch requests per second.)sampled_statzrecords-lagzrecords-lag-maxzNThe maximum lag in terms of number of records for any partition in self window)rL   
group_namesensorr7  ru   metric_namer   r	   r
   r8  rY  r   r   )ri   rL   prefixs      r:   rm   FetchManagerMetrics.__init__;  s7   8>@$^^O<w223CT__= ?@C	Gw223CT__= ?@C	Gw223H$//= ?@D	H  $||223DE  !4!45NPTP_P_;"=>Ae	E  !4!45Ldoo?"ABF&	J %^^O<w223F9 ;<?E	Cw223F7 9:=%	Aw22<6 89=579S	U ")!>""7#6#67H$//\$^_b_d	fr9   c                    SR                  SUS/5      nU R                  R                  U5      nU(       d  SUR                  SS5      0nU R                  R	                  U5      nUR                  U R                  R                  SU R                  SU< 3U5      [        5       5        UR                  U R                  R                  SU R                  SU< 3U5      [        5       5        UR                  U R                  R                  S	U R                  S
U< 3U5      [        5       5        UR                  U5        SR                  SUS/5      nU R                  R                  U5      nU(       d  SUR                  SS5      0nU R                  R	                  U5      nUR                  U R                  R                  SU R                  SU< 3U5      [        5       5        UR                  U R                  R                  SU R                  SU< 3U5      [        5       5        UR                  U5        g )N.r   r?  rj  r@  z:The average number of bytes fetched per request for topic rA  z:The maximum number of bytes fetched per request for topic rB  z:The average number of bytes consumed per second for topic rC  rD  z8The average number of records in each request for topic rE  z<The average number of records consumed per second for topic )joinrL   
get_sensorreplacerH  ru   rI  rG  r   r	   r
   r   )ri   r   r:  r;  namer7  metric_tagsr8  s           r:   r9  .FetchManagerMetrics.record_topic_fetch_metricsY  s   xx%9://5"EMM#s$;<K LL//5Mdll667GOOV[]! #&%) dll667GOOV[]! #&%) dll667LOOV[]! #'&* 	Y' xx%):;<,,11$7"EMM#s$;<K"ll11$7O 8 89ROOTY[!! #&%)  8 89POOX]_!! #'&* 	{+r9   )r7  rY  rG  rL   r   r8  N)r4   r5   r6   r7   rm   r9  r8   r2   r9   r:   ra   ra   :  s    f<%,r9   ra   )A
__future__r   r   r[   rS   r  ra  r  ry   kafka.vendorr   kafka.errorserrorsrW   kafka.futurer   kafka.metrics.statsr   r   r	   r
   kafka.protocol.fetchr   r   kafka.protocol.list_offsetsr   r   r   kafka.recordr   kafka.serializerr   kafka.structsr   r   r   
kafka.utilr   	getLoggerr4   rs   r  r  rV   
namedtupler   r'   r-   rx  r0   r=   Iteratorr@   r  rI  rL  r  r  r  r  rM  rK  r)  rX  ra   r2   r9   r:   <module>rc     s   0     
     5 5 A  ' ) O O !   )$ 
 ''(8vw
 ''(8,-
  **+>02 	 1 1 		&++ 	F cll F R |2& |2~"LF "LH &m&F&FHcHcd $]%E%E}G`G`a $,v $,N6 kF kBD,& D,r9   