
    IiA                         S SK Jr  S SKrS SKrS SKrS SKrS SKrS SKrS SKrS SK	J
r
  S SKJr  S SKJr  S SKJr  S SKJrJrJr  \R,                  " \5      r " S S	\5      rSS
 jrg)    )absolute_importN)six)errors)get_ip_port_afi)Future)BrokerMetadataPartitionMetadataTopicPartitionc                       \ rS rSrSrSS/ S.rS rS rS rS	 r	S
 r
S rS rS rS rS rS rS rS rS r\S 5       rSS jrS rS rS rS rS rS rS rSrg) ClusterMetadata   a  
A class to manage kafka cluster metadata.

This class does not perform any IO. It simply updates internal state
given API responses (MetadataResponse, FindCoordinatorResponse).

Keyword Arguments:
    retry_backoff_ms (int): Milliseconds to backoff when retrying on
        errors. Default: 100.
    metadata_max_age_ms (int): The period of time in milliseconds after
        which we force a refresh of metadata even if we haven't seen any
        partition leadership changes to proactively discover any new
        brokers or partitions. Default: 300000
    bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
        strings) that the client should contact to bootstrap initial
        cluster metadata. This does not have to be the full node list.
        It just needs to have at least one broker that will respond to a
        Metadata API Request. Default port is 9092. If no servers are
        specified, will default to localhost:9092.
d   i )retry_backoff_msmetadata_max_age_msbootstrap_serversc                 4   0 U l         0 U l        [        R                  " [        5      U l        0 U l        SU l        SU l        SU l	        S U l
        [	        5       U l        [        R                  " 5       U l        SU l        [	        5       U l        [	        5       U l        S U l        S U l        [(        R(                  " U R*                  5      U l        U R,                   H  nX!;   d  M
  X   U R,                  U'   M     U R/                  5       U l        0 U l        g )Nr   TF)_brokers_partitionscollectionsdefaultdictset_broker_partitions_coordinators_last_refresh_ms_last_successful_refresh_ms_need_update_future
_listeners	threadingLock_lockneed_all_topic_metadataunauthorized_topicsinternal_topics
controller
cluster_idcopyDEFAULT_CONFIGconfig_generate_bootstrap_brokers_bootstrap_brokers_coordinator_brokers)selfconfigskeys      3/venv/lib/python3.13/site-packages/kafka/cluster.py__init__ClusterMetadata.__init__0   s    "-"9"9#"> !+,( %^^%
',$#&5 "uii 3 34;;C~#*<C   #'"B"B"D$&!    c                     [        U R                  S   5      n0 n[        U5       H  u  nu  pEnSU-  n[        XtUS 5      X''   M     U$ )Nr   zbootstrap-%s)collect_hostsr)   	enumerater   )r-   bootstrap_hostsbrokersihostport_node_ids           r0   r*   +ClusterMetadata._generate_bootstrap_brokersI   sU    '4G(HI"+O"<AA$q(G-gT4HG #= r3   c                     XR                   ;   $ N)r+   )r-   r=   s     r0   is_bootstrapClusterMetadata.is_bootstrapS   s    1111r3   c                     [        U R                  R                  5       5      =(       d#    [        U R                  R                  5       5      $ )z@Get all BrokerMetadata

Returns:
    set: {BrokerMetadata, ...}
)r   r   valuesr+   r-   s    r0   r8   ClusterMetadata.brokersV   s5     4=='')*Sc$2I2I2P2P2R.SSr3   c                     U R                   R                  U5      =(       d=    U R                  R                  U5      =(       d    U R                  R                  U5      $ )zGet BrokerMetadata

Arguments:
    broker_id (int or str): node_id for a broker to check

Returns:
    BrokerMetadata or None if not found
)r   getr+   r,   r-   	broker_ids     r0   broker_metadataClusterMetadata.broker_metadata^   sM     MMi( 5##''	25%%)))4	
r3   c                 n    XR                   ;  a  g[        U R                   U   R                  5       5      $ )zReturn set of all partitions for topic (whether available or not)

Arguments:
    topic (str): topic to check for partitions

Returns:
    set: {partition (int), ...}
    None if topic not found.
N)r   r   keys)r-   topics     r0   partitions_for_topic$ClusterMetadata.partitions_for_topicm   s2     (((4##E*//122r3   c                     XR                   ;  a  g[        [        R                  " U R                   U   5       VVs/ s H  u  p#UR                  S:w  d  M  UPM     snn5      $ s  snnf )zReturn set of partitions with known leaders

Arguments:
    topic (str): topic to check for partitions

Returns:
    set: {partition (int), ...}
    None if topic not found.
N)r   r   r   	iteritemsleader)r-   rO   	partitionmetadatas       r0   available_partitions_for_topic.ClusterMetadata.available_partitions_for_topic{   se     (((!$t/?/?/F!G8!G #6)!)B!6 !G8 9 	9 8s   A$
A$
c                     UR                   U R                  ;  a  gUR                  U R                  UR                      ;  a  gU R                  UR                      UR                     R                  $ )z:Return node_id of leader, -1 unavailable, None if unknown.N)rO   r   rV   rU   r-   rV   s     r0   leader_for_partition$ClusterMetadata.leader_for_partition   s]    ??$"2"22  (8(8(II	01D1DELLLr3   c                 b    U R                   UR                     UR                     R                  $ r@   )r   rO   rV   leader_epochr[   s     r0   leader_epoch_for_partition*ClusterMetadata.leader_epoch_for_partition   s(    	01D1DERRRr3   c                 8    U R                   R                  U5      $ )zReturn TopicPartitions for which the broker is a leader.

Arguments:
    broker_id (int or str): node id for a broker

Returns:
    set: {TopicPartition, ...}
    None if the broker either has no partitions or does not exist.
)r   rH   rI   s     r0   partitions_for_broker%ClusterMetadata.partitions_for_broker   s     &&**955r3   c                 <    U R                   R                  SU45      $ )zReturn node_id of group coordinator.

Arguments:
    group (str): name of consumer group

Returns:
    node_id (int or str) for group coordinator, -1 if coordinator unknown
    None if the group does not exist.
group)r   rH   )r-   rf   s     r0   coordinator_for_group%ClusterMetadata.coordinator_for_group   s      !!%%w&677r3   c                     [         R                   " 5       S-  nU R                  (       a  SnO XR                  -
  nU R                  S   U-
  nXR                  -
  nU R                  S   U-
  n[        X%S5      $ )z/Milliseconds until metadata should be refreshed  r   r   r   )timer   r   r)   r   max)r-   nowttlmetadata_age	retry_age
next_retrys         r0   rn   ClusterMetadata.ttl   sq    iikD C!A!AAL++34|CC///	[[!34y@
3A&&r3   c                      U R                   S   $ )zDReturn milliseconds to wait before attempting to retry after failurer   )r)   rE   s    r0   refresh_backoffClusterMetadata.refresh_backoff   s    {{-..r3   c                     U R                      SU l        U R                  (       a  U R                  R                  (       a  [	        5       U l        U R                  sSSS5        $ ! , (       d  f       g= f)zFlags metadata for update, return Future()

Actual update must be handled separately. This method will only
change the reported ttl()

Returns:
    kafka.future.Future (value will be the cluster object after update)
TN)r!   r   r   is_doner   rE   s    r0   request_updateClusterMetadata.request_update   s@     ZZ $D<<4<<#7#7%x<<	 ZZs   AA%%
A3c                     U R                   $ r@   )r   rE   s    r0   need_updateClusterMetadata.need_update   s       r3   c                 v    [        U R                  R                  5       5      nU(       a  X R                  -
  $ U$ )aJ  Get set of known topics.

Arguments:
    exclude_internal_topics (bool): Whether records from internal topics
        (such as offsets) should be exposed to the consumer. If set to
        True the only way to receive records from an internal topic is
        subscribing to it. Default True

Returns:
    set: {topic (str), ...}
)r   r   rN   r$   )r-   exclude_internal_topicstopicss      r0   r   ClusterMetadata.topics   s4     T%%**,-"0000Mr3   c                    SnU R                      U R                  (       a  U R                  nSU l        SSS5        U(       a  UR                  U5        [        R                  " 5       S-  U l        g! , (       d  f       ND= f)z4Update cluster state given a failed MetadataRequest.Nrj   )r!   r   failurerk   r   )r-   	exceptionfs      r0   failed_updateClusterMetadata.failed_update   sV    ZZ||LL#  IIi  $		d 2 Zs   %A22
B c                 0	   UR                   (       d:  [        R                  S5        U R                  [        R
                  " U5      5      $ 0 nUR                    H=  nUR                  S:X  a  Uu  pEnSnOUu  pEpgUR                  U[        XEXg5      05        M?     UR                  S:X  a  SnOUR                  UR                  5      nUR                  S:  a  Sn	OUR                  n	0 n
[        R                  " [        5      n[        5       n[        5       nUR                   GH  nUR                  S:X  a	  Uu  nnnSnOUu  nnnnU(       a  UR!                  U5        [        R"                  " U5      nU[        R$                  L a  0 U
U'   U H  nSn/ nUR                  S:  a  Uu  nnnnnnnO"UR                  S:  a
  Uu  nnnnnnOUu  nnnnn['        UUUUUUUUS	9U
U   U'   US:w  d  Mb  UU   R!                  [)        UU5      5        M     M  U R*                  (       a  GM  U[        R,                  L a  [        R                  S
U5        GM2  U[        R.                  L a  [        R1                  SU5        GM^  U[        R2                  L a*  [        R1                  SU5        UR!                  U5        GM  U[        R4                  L a  [        R1                  SU5        GM  [        R1                  SUU5        GM     U R6                     X l        Xl        Xl        Xl        Xl        Xl         Xl!        SnU RD                  (       a  U RD                  nSU l"        SU l#        SSS5        [H        RH                  " 5       S-  nUU l%        UU l&        W(       a  [O        UR                  5      S:X  aq  UR                  S   S   [        R$                  RP                  :w  aC  UR                  S   SS u  nn[        R"                  " U5      " U5      nURS                  U5        OURU                  U 5        [        RW                  SU 5        U RX                   H  nU" U 5        M     U R*                  (       a  SU l#        gg! , (       d  f       GN"= f)zUpdate cluster state given a MetadataResponse.

Arguments:
    metadata (MetadataResponse): broker response to a metadata request

Returns: None
z9No broker metadata found in MetadataResponse -- ignoring.r   N   FrS         )rO   rV   rU   r_   replicasisroffline_replicaserrorz;Topic %s is not available during auto-create initializationz&Topic %s not found in cluster metadataz*Topic %s is not authorized for this clientz'%s' is not a valid topic namez(Error fetching metadata for topic %s: %srj      zUpdated cluster metadata to %s)-r8   logwarningr   ErrorsMetadataEmptyBrokerListAPI_VERSIONupdater   rH   controller_idr&   r   r   r   r   addfor_codeNoErrorr	   r
   r"   LeaderNotAvailableErrorUnknownTopicOrPartitionErrorr   TopicAuthorizationFailedErrorInvalidTopicErrorr!   r   r%   r   r   r#   r$   r   r   rk   r   r   lenerrnor   successdebugr   ) r-   rW   _new_brokersbrokerr=   r:   r;   rack_new_controller_new_cluster_id_new_partitions_new_broker_partitions_new_unauthorized_topics_new_internal_topics
topic_data
error_coderO   
partitionsis_internal
error_typepartition_datar_   r   p_errorrV   rU   r   r   r   rm   r   listeners                                    r0   update_metadataClusterMetadata.update_metadata   s    KKST%%f&D&DX&NOO&&F##q(&,#t,2)ttB!  ' 1$"O*..x/E/EFO!#"O&11O!,!8!8!=#&5 "u"//J##q(0:-
E:#=G:
E;
$((/4JV^^+)+&&0N#%L')$++q0draFL(CQa!--2VdSFHcCSDRAFHc8I#y%L!)sEU%	9'OE*95
 |.v6::*5)<>! '1( --v=== ./46vBBB		BEJvCCC		FN(,,U3v777		:EB		D-[ *` ZZ(M-O-O.&<#'?$#7 A||LLDL %D  iikD  #+.( 8??#q(X__Q-?-BfnnFZFZ-Z$,OOA$6r$:!
E
3E:		% 		$		2D9HTN ( ''
 !&D (C Zs   AR
Rc                 :    U R                   R                  U5        g)z<Add a callback function to be called on each metadata updateN)r   r   r-   r   s     r0   add_listenerClusterMetadata.add_listenern  s    H%r3   c                 :    U R                   R                  U5        g)z+Remove a previously added listener callbackN)r   remover   s     r0   remove_listenerClusterMetadata.remove_listenerr  s    x(r3   c                    [         R                  SX#U5        [        R                  " UR                  5      nU[        R
                  La'  [         R                  SU5        SU R                  X#4'   gSR                  UR                  5      n[        UUR                  UR                  S5      n[         R                  SX#U5        X`R                  U'   XPR                  X#4'   U$ )a/  Update with metadata for a group or txn coordinator

Arguments:
    response (FindCoordinatorResponse): broker response
    coord_type (str): 'group' or 'transaction'
    coord_key (str): consumer_group or transactional_id

Returns:
    string: coordinator node_id if metadata is updated, None on error
z"Updating coordinator for %s/%s: %sz!FindCoordinatorResponse error: %srS   Nzcoordinator-{}zCoordinator for %s/%s is %s)r   r   r   r   r   r   r   r   formatcoordinator_idr   r:   r;   infor,   )r-   response
coord_type	coord_keyr   r=   coordinators          r0   add_coordinatorClusterMetadata.add_coordinatorv  s     			6
xX__X%8%89
V^^+II9:F:<D
67 #))(*A*AB$MMMM	 	.
{S-8!!'*6=J23r3   c                    [        S0 U R                  D6n[        R                  " U R                  5      Ul        [        R                  " U R
                  5      Ul        [        R                  " U R                  5      Ul        [        R                  " U R                  5      Ul        [        R                  " U R                  5      Ul        [        R                  " U R                  5      Ul	        U H  nX2R
                  UR                     UR                  '   UR                  c  M7  UR                  S:w  d  MI  UR                  UR                     R                  [        UR                  UR                  5      5        M     U$ )z8Returns a copy of cluster metadata with partitions addedrS    )r   r)   r'   deepcopyr   r   r   r   r$   r#   rO   rV   rU   r   r
   )r-   partitions_to_addnew_metadatarV   s       r0   with_partitionsClusterMetadata.with_partitions  s   &55 $dmm <#'==1A1A#B *.--8O8O*P'%)]]43E3E%F"'+}}T5I5I'J$+/==9Q9Q+R(*IMV$$Y__5i6I6IJ+	0@0@B0F//	0@0@AEE"9??I4G4GHJ	 + r3   c                     S[        U R                  5      [        U R                  5      [        U R                  5      4-  $ )Nz:ClusterMetadata(brokers: %d, topics: %d, coordinators: %d))r   r   r   r   rE   s    r0   __str__ClusterMetadata.__str__  s:    KDMM"C(8(8$93t?Q?Q;RST 	Tr3   )r+   r   r   r,   r   r   r   r   r   r!   r   r   r&   r)   r%   r$   r"   r#   NT)__name__
__module____qualname____firstlineno____doc__r(   r1   r*   rA   r8   rK   rP   rX   r\   r`   rc   rg   rn   rt   rx   propertyr{   r   r   r   r   r   r   r   r   __static_attributes__r   r3   r0   r   r      s    *  %N'22T
39 MS
6
8'/  ! !$	3|&|&)@&Tr3   r   c                 D   [        U [        R                  5      (       a  U R                  5       R	                  S5      n / nU  H<  n[
        R                  " SSU5      n[        U5      u  pEnUR                  XEU45        M>     U(       a  [        R                  " U5        U$ )za
Collects a comma-separated set of hosts (host:port) and optionally
randomize the returned list.
,z^.*:// )
isinstancer   string_typesstripsplitresubr   appendrandomshuffle)hosts	randomizeresult	host_portr:   r;   afis          r0   r5   r5     s     %))**##C(F	FF8R3	))4Ct3'(	  vMr3   r   )
__future__r   r   r'   loggingr   r   r   rk   kafka.vendorr   kafkar   r   
kafka.connr   kafka.futurer   kafka.structsr   r	   r
   	getLoggerr   r   objectr   r5   r   r3   r0   <module>r      sU    &     	    " &  K K!VTf VTrr3   