
    Ii3                       S SK JrJr  S SKJr  S SKrS SKrS SKrS SKrSSK	J
r
  S SKJr  S SKJrJrJrJrJrJrJr  S SKJrJr  S S	KJrJrJr  S SKJr  S S
KJrJ r J!r!J"r"J#r#  S SK$J%r%  S SK&J'r'J(r(  S SK)J*r*J+r+J,r,J-r-J.r.J/r/J0r0J1r1J2r2J3r3J4r4J5r5J6r6J7r7J8r8  S SK9J:r:  S SK;J<r<  S SK=J>r>  S SK?J@r@  S SKAJBrBJCrCJDrDJErE  S SKFJGrG  \R                  " \I5      rJ " S S\K5      rLg)    )absolute_importdivision)defaultdictN   )ConfigResourceType)six)ACLOperationACLPermissionType	ACLFilterACLResourcePatternResourceTypeACLResourcePatternType)KafkaClient	selectors)ConsumerProtocolMemberMetadata ConsumerProtocolMemberAssignmentConsumerProtocol)IncompatibleBrokerVersionKafkaConfigurationErrorUnknownTopicOrPartitionErrorUnrecognizedBrokerVersionIllegalArgumentError)Future)MetricConfigMetrics)CreateTopicsRequestDeleteTopicsRequestDescribeConfigsRequestAlterConfigsRequestCreatePartitionsRequestListGroupsRequestDescribeGroupsRequestDescribeAclsRequestCreateAclsRequestDeleteAclsRequestDeleteGroupsRequestDeleteRecordsRequestDescribeLogDirsRequestElectLeadersRequestElectionType)OffsetFetchRequest)FindCoordinatorRequest)MetadataRequest)Array)TopicPartitionOffsetAndMetadataMemberInformationGroupInformation)__version__c                   H   \ rS rSrSr0 SS_SS\-   _SS_S	S
_SS_SS_SS_SS_SS_S\R                  \R                  S4/_SS_SS_SS_SS_SS_SS_S S!_0 S"S_S#S_S$S_S%S_S&S_S'S_S(S)_S*\	R                  _S+S_S,S_S-S_S.S_S/S0_S1S_S2S_S3S_S4/ _ES5S\S6.ErS7 rS8 rS9 rSnS: jrS; rS< rS= rSoS> jrS? rS@ rSA r\SB 5       rSpSC jrSqSD jrSpSE jrSF rSqSG jrSH r\SI 5       r SJ r!\SK 5       r"\SL 5       r#\SM 5       r$SN r%\SO 5       r&\SP 5       r'\SQ 5       r(SR r)\SS 5       r*SrST jr+\SU 5       r,SV r-\SW 5       r.SpSX jr/SqSY jr0SsSZ jr1SrS[ jr2S\ r3SpS] jr4S^ r5S_ r6SqS` jr7 SqSa jr8Sb r9  SsSc jr:SqSd jr;Se r<Sf r=\Sg 5       r>Sh r?Si r@SsSj jrASk rBSl rCSmrDg)tKafkaAdminClient%   aF  A class for administering the Kafka cluster.

Warning:
    This is an unstable interface that was recently added and is subject to
    change without warning. In particular, many methods currently return
    raw protocol tuples. In future releases, we plan to make these into
    nicer, more pythonic objects. Unfortunately, this will likely break
    those interfaces.

The KafkaAdminClient class will negotiate for the latest version of each message
protocol format supported by both the kafka-python client library and the
Kafka broker. Usage of optional fields from protocol versions that are not
supported by the broker will result in IncompatibleBrokerVersion exceptions.

Use of this class requires a minimum broker version >= 0.10.0.0.

Keyword Arguments:
    bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
        strings) that the consumer should contact to bootstrap initial
        cluster metadata. This does not have to be the full node list.
        It just needs to have at least one broker that will respond to a
        Metadata API Request. Default port is 9092. If no servers are
        specified, will default to localhost:9092.
    client_id (str): a name for this client. This string is passed in
        each request to servers and can be used to identify specific
        server-side log entries that correspond to this client. Also
        submitted to GroupCoordinator for logging with respect to
        consumer group administration. Default: 'kafka-python-{version}'
    reconnect_backoff_ms (int): The amount of time in milliseconds to
        wait before attempting to reconnect to a given host.
        Default: 50.
    reconnect_backoff_max_ms (int): The maximum amount of time in
        milliseconds to backoff/wait when reconnecting to a broker that has
        repeatedly failed to connect. If provided, the backoff per host
        will increase exponentially for each consecutive connection
        failure, up to this maximum. Once the maximum is reached,
        reconnection attempts will continue periodically with this fixed
        rate. To avoid connection storms, a randomization factor of 0.2
        will be applied to the backoff resulting in a random range between
        20% below and 20% above the computed value. Default: 30000.
    request_timeout_ms (int): Client request timeout in milliseconds.
        Default: 30000.
    connections_max_idle_ms: Close idle connections after the number of
        milliseconds specified by this config. The broker closes idle
        connections after connections.max.idle.ms, so this avoids hitting
        unexpected socket disconnected errors on the client.
        Default: 540000
    retry_backoff_ms (int): Milliseconds to backoff when retrying on
        errors. Default: 100.
    max_in_flight_requests_per_connection (int): Requests are pipelined
        to kafka brokers up to this number of maximum requests per
        broker connection. Default: 5.
    receive_buffer_bytes (int): The size of the TCP receive buffer
        (SO_RCVBUF) to use when reading data. Default: None (relies on
        system defaults). Java client defaults to 32768.
    send_buffer_bytes (int): The size of the TCP send buffer
        (SO_SNDBUF) to use when sending data. Default: None (relies on
        system defaults). Java client defaults to 131072.
    socket_options (list): List of tuple-arguments to socket.setsockopt
        to apply to broker connection sockets. Default:
        [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
    metadata_max_age_ms (int): The period of time in milliseconds after
        which we force a refresh of metadata even if we haven't seen any
        partition leadership changes to proactively discover any new
        brokers or partitions. Default: 300000
    security_protocol (str): Protocol used to communicate with brokers.
        Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
        Default: PLAINTEXT.
    ssl_context (ssl.SSLContext): Pre-configured SSLContext for wrapping
        socket connections. If provided, all other ssl_* configurations
        will be ignored. Default: None.
    ssl_check_hostname (bool): Flag to configure whether SSL handshake
        should verify that the certificate matches the broker's hostname.
        Default: True.
    ssl_cafile (str): Optional filename of CA file to use in certificate
        verification. Default: None.
    ssl_certfile (str): Optional filename of file in PEM format containing
        the client certificate, as well as any CA certificates needed to
        establish the certificate's authenticity. Default: None.
    ssl_keyfile (str): Optional filename containing the client private key.
        Default: None.
    ssl_password (str): Optional password to be used when loading the
        certificate chain. Default: None.
    ssl_crlfile (str): Optional filename containing the CRL to check for
        certificate expiration. By default, no CRL check is done. When
        providing a file, only the leaf certificate will be checked against
        this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
        Default: None.
    api_version (tuple): Specify which Kafka API version to use. If set
        to None, KafkaClient will attempt to infer the broker version by
        probing various APIs. Example: (0, 10, 2). Default: None
    api_version_auto_timeout_ms (int): number of milliseconds to throw a
        timeout exception from the constructor when checking the broker
        api version. Only applies if api_version is None
    selector (selectors.BaseSelector): Provide a specific selector
        implementation to use for I/O multiplexing.
        Default: selectors.DefaultSelector
    metrics (kafka.metrics.Metrics): Optionally provide a metrics
        instance for capturing network IO stats. Default: None.
    metric_group_prefix (str): Prefix for metric names. Default: ''
    sasl_mechanism (str): Authentication mechanism when security_protocol
        is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
        PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
    sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication.
        Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
    sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication.
        Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
    sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with
        sasl mechanism handshake. If provided, sasl_kerberos_service_name and
        sasl_kerberos_domain name are ignored. Default: None.
    sasl_kerberos_service_name (str): Service name to include in GSSAPI
        sasl mechanism handshake. Default: 'kafka'
    sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
        sasl mechanism handshake. Default: one of bootstrap servers
    sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
        token provider instance. Default: None
    socks5_proxy (str): Socks5 proxy url. Default: None
    kafka_client (callable): Custom class / callable for creating KafkaClient instances
bootstrap_servers	localhost	client_idzkafka-python-request_timeout_ms0u  connections_max_idle_msi`= reconnect_backoff_ms2   reconnect_backoff_max_ms%max_in_flight_requests_per_connection   receive_buffer_bytesNsend_buffer_bytessocket_optionsr   sock_chunk_bytesi   sock_chunk_buffer_count  retry_backoff_msd   metadata_max_age_msi security_protocol	PLAINTEXTssl_contextssl_check_hostnameT
ssl_cafilessl_certfilessl_keyfilessl_passwordssl_crlfileapi_versionapi_version_auto_timeout_msi  selectorsasl_mechanismsasl_plain_usernamesasl_plain_passwordsasl_kerberos_namesasl_kerberos_service_namekafkasasl_kerberos_domain_namesasl_oauth_token_providersocks5_proxymetric_reporters   )metrics_num_samplesmetrics_sample_window_mskafka_clientc                    [         R                  SU5        [        U5      R                  U R                  5      nU(       a  [        SR                  U5      5      e[        R                  " U R                  5      U l        U R                  R                  U5        SU R                  S   0n[        U R                  S   U R                  S   US9nU R                  S    Vs/ s H	  oU" 5       PM     nn[        XF5      U l        U R                  S	   " SU R                  S
S.U R                  D6U l        U R                  R                  S   U R                  S'   SU l        U R                  5         [         R                  S5        g s  snf )Nz0Starting KafkaAdminClient with configuration: %szUnrecognized configs: {}z	client-idr:   rc   rd   )samplestime_window_mstagsra   re   admin)metricsmetric_group_prefixrU   FzKafkaAdminClient started. )logdebugset
differenceDEFAULT_CONFIGr   formatcopyconfigupdater   r   _metrics_client_closed_refresh_controller_id)selfconfigsextra_configsmetrics_tagsmetric_configreporter	reporterss          8/venv/lib/python3.13/site-packages/kafka/admin/client.py__init__KafkaAdminClient.__init__   sO   		DgNG//0C0CD)*D*K*KM*Z[[ii 3 347# $T[[%=>$T[[9N-O48KK@Z4[*68 15<N0OP0OHXZ0O	P9{{>2 
MM '
 kk
 &*\\%8%8%GM"##%		-. Qs   #F	c                    [        U S5      (       a  U R                  (       a  [        R                  S5        gU R                  R                  5         U R                  R                  5         SU l        [        R                  S5        g)z:Close the KafkaAdminClient connection to the Kafka broker.ry   z KafkaAdminClient already closed.NTzKafkaAdminClient is now closed.)hasattrry   rn   inforw   closerx   ro   )r{   s    r   r   KafkaAdminClient.close   sY    tY''4<<HH78		34    c                 2    U=(       d    U R                   S   $ )zValidate the timeout is set or use the configuration default.

Arguments:
    timeout_ms: The timeout provided by api call, in milliseconds.

Returns:
    The timeout to use for the operation.
r;   )ru   )r{   
timeout_mss     r   _validate_timeout"KafkaAdminClient._validate_timeout   s     >T[[)=>>r   c                    U R                   R                  [        SS9nSUs=::  a  S::  Ga#  O  GO[        R                  " 5       US-  -   n[        R                  " 5       U:  a  [        U   " 5       nU R	                  U R                   R                  5       U5      nU R                  U/5        UR                  nUR                  nUS:X  a-  [        R                  S5        [        R                  " S5        M  U R                   R                  US9nUS:  a  [        S	R                  U5      5      eXpl        g
[         R"                  " S5      e[%        SR                  U5      5      e)z'Determine the Kafka cluster controller.   max_versionr   rH   z#Controller ID not available, got -1)node_id)r   
   r   z]The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0.N
controllerzPKafka Admin interface cannot determine the controller using MetadataRequest_v{}.)rx   rU   r.   time_send_request_to_nodeleast_loaded_node_wait_for_futuresvaluecontroller_idrn   warningsleepcheck_versionr   rs   _controller_idErrorsNodeNotReadyErrorr   )	r{   r   version
timeout_atrequestfutureresponser   controller_versions	            r   rz   'KafkaAdminClient._refresh_controller_id   s2   ,,**?*J1zD'88J))+
*)'2433DLL4R4R4TV]^&&x0!<< ( 6 6 B&KK EFJJqM%)\\%?%?%?%V"%
23w 235 5 '4#..|<<+b" "r   c                     U R                   R                  [        SS9nUS::  a  [        U   " U5      nO1US::  a  [        U   " US5      nO[        SR	                  U5      5      eU R                  U R                   R                  5       U5      $ )zSend a FindCoordinatorRequest to a broker.

Arguments:
    group_id: The consumer group ID. This is typically the group
    name as a string.

Returns:
    A message future
rb   r   r   zRSupport for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient.)rx   rU   r-   NotImplementedErrorrs   r   r   )r{   group_idr   r   s       r   !_find_coordinator_id_send_request2KafkaAdminClient._find_coordinator_id_send_request  s     ,,**+Aq*Qa<,W5h?G\,W5hBG%d" " ))$,,*H*H*JGTTr   c                     [         R                  " UR                  5      nU[         R                  La  U" SR	                  U5      5      eUR
                  $ )zProcess a FindCoordinatorResponse.

Arguments:
    response: a FindCoordinatorResponse.

Returns:
    The node_id of the broker that is the coordinator.
z1FindCoordinatorRequest failed with response '{}'.)r   for_code
error_codeNoErrorrs   coordinator_idr{   r   
error_types      r   %_find_coordinator_id_process_response6KafkaAdminClient._find_coordinator_id_process_response1  sN     __X%8%89
V^^+ C!# # &&&r   c                    U Vs0 s H  nX R                  U5      _M     nnU R                  UR                  5       5        UR                  5        VVs0 s H   u  p$X R	                  UR
                  5      _M"     nnnU$ s  snf s  snnf )a  Find the broker node_ids of the coordinators of the given groups.

Sends a FindCoordinatorRequest message to the cluster for each group_id.
Will block until the FindCoordinatorResponse is received for all groups.
Any errors are immediately raised.

Arguments:
    group_ids: A list of consumer group IDs. This is typically the group
    name as a string.

Returns:
    A dict of {group_id: node_id} where node_id is the id of the
    broker that is the coordinator for the corresponding group.
)r   r   valuesitemsr   r   )r{   	group_idsr   groups_futuresr   groups_coordinatorss         r   _find_coordinator_ids&KafkaAdminClient._find_coordinator_idsC  s    " &
% <<XFF% 	 
 	~4467 %3$8$8$:
$:  @@NN$: 	 
 #"


s   B 'Bc                      U R                   R                  U5        U R                   R                  XU5      $ ! [        R                   a#  n[	        5       R                  U5      s SnA$ SnAff = f)a>  Send a Kafka protocol message to a specific broker.

Arguments:
    node_id: The broker id to which to send the message.
    request: The message to send.


Keyword Arguments:
    wakeup (bool, optional): Optional flag to disable thread-wakeup.

Returns:
    A future object that may be polled for status and results.
N)rx   await_readyr   KafkaConnectionErrorr   failuresend)r{   r   r   wakeupes        r   r   &KafkaAdminClient._send_request_to_node]  s]    	'LL$$W- ||  6:: ** 	'8##A&&	's   9 A0A+%A0+A0c           	      L   SnU(       a  US-  nU R                  U R                  U5      nU R                  U/5        UR                  n[	        US[	        USS5      5      nUb  U R                  XQXB5      nOU R                  XU5      nU(       a  U$ U(       a  M  [        S5      e)zSend a Kafka protocol message to the cluster controller.

Will block until the message result is received.

Arguments:
    request: The message to send.

Returns:
    The Kafka protocol response for the message.
rb   r   topic_errorstopic_error_codesNzOThis should never happen, please file a bug with full stacktrace if encountered)r   r   r   r   getattr_parse_topic_request_response'_parse_topic_partition_request_responseRuntimeError)r{   r   triesr   r   topic_error_tuplessuccesss          r   _send_request_to_controller,KafkaAdminClient._send_request_to_controllerq  s     QJE//0C0CWMF""F8,||H ")>78UhjnCo!p!-<<=OZbj FFwZ_`+ e, lmmr   c                    [        S U5       Hq  u  pV[        R                  " U5      nU(       a%  U[        R                  L a  U R	                  5           gU[        R
                  Ld  M\  U" SR                  X#5      5      e   g)Nc                     U S S $ Nrb   rm   r   s    r   <lambda>@KafkaAdminClient._parse_topic_request_response.<locals>.<lambda>  s
    q!ur   F'Request '{}' failed with response '{}'.T)mapr   r   NotControllerErrorrz   r   rs   )r{   r   r   r   r   topicr   r   s           r   r   .KafkaAdminClient._parse_topic_request_response  st    
 "%_6H!IE4Jv'@'@@ ++-6>>1 =VG.0 0 "J r   c                 V   UR                    H  u  pE[        S U5       H  u  pg[        R                  " U5      nU(       a&  U[        R                  L a  U R                  5             gU[        R                  [        R                  4;  d  Mn  U" SR                  X5      5      e   M     g)Nc                     U S S $ r   rm   r   s    r   r   JKafkaAdminClient._parse_topic_partition_request_response.<locals>.<lambda>  s
    !BQ%r   Fr   T)	replication_election_resultsr   r   r   r   rz   r   ElectionNotNeededErrorrs   )	r{   r   r   r   r   partition_resultspartition_idr   r   s	            r   r   8KafkaAdminClient._parse_topic_partition_request_response  s    
 )1(M(M$E,/AR,S(#__Z8
Z6+D+DD //1 8U8U'VV$A24 4 -T )N r   c           
         U R                   U R                  U R                  U R                  R	                  5        VVs/ s H  u  pX4PM
     snnU R
                  R	                  5        VVs/ s H  u  p4X44PM
     snn4$ s  snnf s  snnf )a  
Build the tuple required by CreateTopicsRequest from a NewTopic object.

Arguments:
    new_topic: A NewTopic instance containing name, partition count, replication factor,
                  replica assignments, and config entries.

Returns:
    A tuple in the form:
         (topic_name, num_partitions, replication_factor, [(partition_id, [replicas])...],
          [(config_key, config_value)...])
)namenum_partitionsreplication_factorreplica_assignmentsr   topic_configs)	new_topicr   replicas
config_keyconfig_values        r   _convert_new_topic_request+KafkaAdminClient._convert_new_topic_request  s     NN$$((GPGdGdGjGjGlGl-C\(Gl LUKbKbKhKhKjKj/Gz*Kj

 
	
s   B.Bc                    U R                   R                  [        SS9nU R                  U5      nUS:X  a\  U(       a'  [	        SR                  U R                  S   5      5      e[        U   " U Vs/ s H  oPR                  U5      PM     snUS9nOOUS::  a/  [        U   " U Vs/ s H  oPR                  U5      PM     snUUS9nO[        SR                  U5      5      eU R                  U5      $ s  snf s  snf )	a  Create new topics in the cluster.

Arguments:
    new_topics: A list of NewTopic objects.

Keyword Arguments:
    timeout_ms (numeric, optional): Milliseconds to wait for new topics to be created
        before the broker returns.
    validate_only (bool, optional): If True, don't actually create new topics.
        Not supported by all versions. Default: False

Returns:
    Appropriate version of CreateTopicResponse class.
   r   r   zUvalidate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}.rU   )create_topic_requeststimeout)r   r   validate_onlyzHSupport for CreateTopics v{} has not yet been added to KafkaAdminClient.)
rx   rU   r   r   r   rs   ru   r   r   r   )r{   
new_topicsr   r   r   r   r   s          r   create_topicsKafkaAdminClient.create_topics  s    ,,**+>A*N++J7
a</kVDKK679 9 *'2cm&ncmV_'F'Fy'Qcm&n"G \)'2cm&ncmV_'F'Fy'Qcm&n"+G &Z" "
 //88 'o
 'os   1C2%C7c                     U R                   R                  [        SS9nU R                  U5      nUS::  a!  [        U   " UUS9nU R	                  U5      nU$ [        SR                  U5      5      e)a(  Delete topics from the cluster.

Arguments:
    topics ([str]): A list of topic name strings.

Keyword Arguments:
    timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted
        before the broker returns.

Returns:
    Appropriate version of DeleteTopicsResponse class.
r   r   )topicsr   zHSupport for DeleteTopics v{} has not yet been added to KafkaAdminClient.)rx   rU   r   r   r   r   rs   )r{   r   r   r   r   r   s         r   delete_topicsKafkaAdminClient.delete_topics  s~     ,,**+>A*N++J7
a<)'2"G 77@H
  &Z" "r   c                 z   U R                   R                  [        SS9nUS::  a<  U(       a'  [        SR	                  U R
                  S   5      5      e[        U   " US9nOUS::  a  [        U   " UUS9nU R                  U R                   R                  5       W5      nU R                  U/5        UR                  $ )z'
topics == None means "get all topics"
rB   r   r   zVauto_topic_creation requires MetadataRequest >= v4, which is not supported by Kafka {}rU   r   )r   allow_auto_topic_creation)
rx   rU   r.   r   rs   ru   r   r   r   r   )r{   r   auto_topic_creationr   r   r   s         r   _get_cluster_metadata&KafkaAdminClient._get_cluster_metadata  s     ,,**?*Ja<"/4VDKK679 9
 &g.f=G\%g.*=G
 ++LL**,
 	x(||r   c                 z    U R                  SS9nUR                  5       nUS    Vs/ s H  o3S   PM	     sn$ s  snf )z_Retrieve a list of all topic names in the cluster.

Returns:
    A list of topic name strings.
Nr   r   r   r  	to_object)r{   metadataobjts       r   list_topicsKafkaAdminClient.list_topics4  sC     --T-:  "$'M2Mq'
M222s   8c                 J    U R                  US9nUR                  5       nUS   $ )a  Fetch metadata for the specified topics or all topics if None.

Keyword Arguments:
    topics ([str], optional) A list of topic names. If None, metadata for all
        topics is retrieved.

Returns:
    A list of dicts describing each topic (including partition info).
r   r   r  )r{   r   r  r	  s       r   describe_topics KafkaAdminClient.describe_topics>  s/     --V-<  "8}r   c                 h    U R                  5       nUR                  5       nUR                  S5        U$ )z
Fetch cluster-wide metadata such as the list of brokers, the controller ID,
and the cluster ID.


Returns:
    A dict with cluster-wide metadata, excluding topic details.
r   )r  r  pop)r{   r  r	  s      r   describe_cluster!KafkaAdminClient.describe_clusterL  s1     --/  "
r   c                    U R                   n[        R                  " U R                  5      n/ nU R                   H  nUS:X  a   Uu  pVn[
        R                  R                  nO&US::  a  Uu  pVpO[        SR                  U5      5      eU HU  n	U	u  pp[        U
U[        U5      [        U5      [        [        U5      U[        U5      5      S9nUR                  U5        MW     M     X24$ )a"  Convert a DescribeAclsResponse into a list of ACL objects and a KafkaError.

Arguments:
    describe_response: The response object from the DescribeAclsRequest.

Returns:
    A tuple of (list_of_acl_objects, error) where error is an instance
         of KafkaError (NoError if successful).
r   r   KSupport for DescribeAcls Response v{} has not yet been added to KafkaAdmin.	principalhost	operationpermission_typeresource_pattern)API_VERSIONr   r   r   	resourcesr   LITERALr   r   rs   r   r	   r
   r   r   append)describe_responser   erroracl_listr  resource_typeresource_nameaclsresource_pattern_typeaclr  r  r  r  conv_acls                  r   '_convert_describe_acls_response_to_acls8KafkaAdminClient._convert_describe_acls_response_to_aclsZ  s     $// 1 < <=*44I!|5>2d(>(F(F(L(L%ALUI.CT)a  >A;	'*95$5o$F%4$]3%./DE&
 )  54 !!r   c           
         U R                   R                  [        SS9nUS:X  ac  [        U   " UR                  R                  UR                  R
                  UR                  UR                  UR                  UR                  S9nOUS::  ax  [        U   " UR                  R                  UR                  R
                  UR                  R                  UR                  UR                  UR                  UR                  S9nO[        SR                  U5      5      eU R                  U R                   R                  5       U5      nU R                  U/5        UR                   n["        R$                  " UR&                  5      nU["        R(                  La  U" SR                  X55      5      eU R+                  U5      $ )aQ  Describe a set of ACLs

Used to return a set of ACLs matching the supplied ACLFilter.
The cluster must be configured with an authorizer for this to work, or
you will get a SecurityDisabledError

Arguments:
    acl_filter: an ACLFilter object

Returns:
    tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
r   r   r   )r#  r$  r  r  r  r  )r#  r$  resource_pattern_type_filterr  r  r  r  zBSupport for DescribeAcls v{} has not yet been added to KafkaAdmin.r   )rx   rU   r$   r  r#  r$  r  r  r  r  pattern_typer   rs   r   r   r   r   r   r   r   r   r)  )r{   
acl_filterr   r   r   r   r   s          r   describe_aclsKafkaAdminClient.describe_acls  s}    ,,**+>A*Na<)'2(99GG(99GG$..__$.. * : :G \)'2(99GG(99GG-7-H-H-U-U$..__$.. * : :	G &TVG_ 
 ++DLL,J,J,LgVx(<<__X%8%89
V^^+9VG.0 0 ;;HEEr   c                     U R                   R                  U R                   R                  U R                  U R                  U R
                  U R                  4$ )zConvert an ACL object into the CreateAclsRequest v0 format.

Arguments:
    acl: An ACL object with resource pattern and permissions.

Returns:
    A tuple: (resource_type, resource_name, principal, host, operation, permission_type).
r  r#  r$  r  r  r  r  r'  s    r   (_convert_create_acls_resource_request_v09KafkaAdminClient._convert_create_acls_resource_request_v0  sJ       ..  ..MMHHMM
 	
r   c                     U R                   R                  U R                   R                  U R                   R                  U R                  U R
                  U R                  U R                  4$ )zConvert an ACL object into the CreateAclsRequest v1 format.

Arguments:
    acl: An ACL object with resource pattern and permissions.

Returns:
    A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type).
r  r#  r$  r-  r  r  r  r  r3  s    r   (_convert_create_acls_resource_request_v19KafkaAdminClient._convert_create_acls_resource_request_v1  Y       ..  ..  --MMHHMM
 	
r   c                 T   UR                   n/ n/ n[        UR                  5       H|  u  pVUS::  a  Uu  pxX   n	[        R                  " U5      n
O[        SR                  U5      5      eU
[        R                  L a  UR                  U	5        Mj  UR                  X45        M~     XCS.$ )a  Parse CreateAclsResponse and correlate success/failure with original ACL objects.

Arguments:
    acls: A list of ACL objects that were requested for creation.
    create_response: The broker's CreateAclsResponse object.

Returns:
    A dict with:
         {
           'succeeded': [list of ACL objects successfully created],
           'failed': [(acl_object, KafkaError), ...]
         }
r   r  )	succeededfailed)	r  	enumeratecreation_responsesr   r   r   rs   r   r  )r%  create_responser   creations_errorcreations_successi	creationsr   error_messager'  r!  s              r   %_convert_create_acls_response_to_acls6KafkaAdminClient._convert_create_acls_response_to_acls  s     "--%o&H&HILA!|,5)
g
3)a 
 &!((-&&}5 J  /JJr   c                 V   U H#  n[        U[        5      (       a  M  [        S5      e   U R                  R	                  [
        SS9nUS:X  a-  [
        U   " U Vs/ s H  o R                  U5      PM     snS9nOMUS::  a-  [
        U   " U Vs/ s H  o R                  U5      PM     snS9nO[        SR                  U5      5      eU R                  U R                  R                  5       U5      nU R                  U/5        UR                  nU R                  X5      $ s  snf s  snf )zCreate a list of ACLs

This endpoint only accepts a list of concrete ACL objects, no ACLFilters.
Throws TopicAlreadyExistsError if topic is already present.

Arguments:
    acls: a list of ACL objects

Returns:
    dict of successes and failures
zacls must contain ACL objectsr   r   r   )rD  z@Support for CreateAcls v{} has not yet been added to KafkaAdmin.)
isinstancer   r   rx   rU   r%   r4  r8  r   rs   r   r   r   r   rF  )r{   r%  r'  r   r   r   r   s          r   create_aclsKafkaAdminClient.create_acls  s    Cc3''*+JKK  ,,**+<!*La<'0Y]^Y]RUHHMY]^G \'0Y]^Y]RUHHMY]^G &RVG_ 
 ++DLL,J,J,LgVx(<<99$II! _ _   D!D&c                     U R                   R                  U R                   R                  U R                  U R                  U R
                  U R                  4$ )zConvert an ACLFilter object into the DeleteAclsRequest v0 format.

Arguments:
    acl: An ACLFilter object identifying the ACLs to be deleted.

Returns:
    A tuple: (resource_type, resource_name, principal, host, operation, permission_type).
r2  r3  s    r   (_convert_delete_acls_resource_request_v09KafkaAdminClient._convert_delete_acls_resource_request_v0.  sJ       ..  ..MMHHMM
 	
r   c                     U R                   R                  U R                   R                  U R                   R                  U R                  U R
                  U R                  U R                  4$ )a  Convert an ACLFilter object into the DeleteAclsRequest v1 format.

Arguments:
    acl: An ACLFilter object identifying the ACLs to be deleted.

Returns:
    A tuple: (resource_type, resource_name, pattern_type, principal, host, operation, permission_type).
r7  r3  s    r   (_convert_delete_acls_resource_request_v19KafkaAdminClient._convert_delete_acls_resource_request_v1A  r:  r   c                 H   UR                   n/ n[        UR                  5       H  u  pEUu  pgn[        R                  " U5      n	/ n
U H  nUS:X  a$  Uu  ppnnnn[
        R                  R                  nO+US:X  a  Uu	  ppnnnnnO[        SR                  U5      5      e[        R                  " U5      n[        UU[        U5      [        U5      [        [        U5      U[        U5      5      S9nU
R                  UU45        M     UR                  X   X45        M     U$ )ac  Parse the DeleteAclsResponse and map the results back to each input ACLFilter.

Arguments:
    acl_filters: A list of ACLFilter objects that were provided in the request.
    delete_response: The response from the DeleteAclsRequest.

Returns:
    A list of tuples of the form:
         (acl_filter, [(matching_acl, KafkaError), ...], filter_level_error).
r   r   r  r  )r  r>  filter_responsesr   r   r   r  r   r   rs   r   r	   r
   r   r   r  )acl_filtersdelete_responser   filter_result_listrC  rT  filter_error_codefilter_error_messagematching_aclsfilter_erroracl_result_listr'  r   rE  r#  r$  r  r  r  r  r&  	acl_errorr(  s                          r   ._convert_delete_acls_response_to_matching_acls?KafkaAdminClient._convert_delete_acls_response_to_matching_aclsU  sI    "--#,_-M-M#NAEUB]!??+<=L O$a<{~xJ}YX\^gix,B,J,J,P,P)\ SV  PJ}Mbdmosu~  AP-e#VG_  #OOJ7	'*95$5o$F%4$]3%./DE&
  &&)'=>/ %0 %%{~&VW9 $O: "!r   c                 V   U H#  n[        U[        5      (       a  M  [        S5      e   U R                  R	                  [
        SS9nUS:X  a-  [
        U   " U Vs/ s H  o R                  U5      PM     snS9nOMUS::  a-  [
        U   " U Vs/ s H  o R                  U5      PM     snS9nO[        SR                  U5      5      eU R                  U R                  R                  5       U5      nU R                  U/5        UR                  nU R                  X5      $ s  snf s  snf )a%  Delete a set of ACLs

Deletes all ACLs matching the list of input ACLFilter

Arguments:
    acl_filters: a list of ACLFilter

Returns:
    a list of 3-tuples corresponding to the list of input filters.
         The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance)
z/acl_filters must contain ACLFilter type objectsr   r   r   )filtersz@Support for DeleteAcls v{} has not yet been added to KafkaAdmin.)rI  r   r   rx   rU   r&   rN  rQ  r   rs   r   r   r   r   r^  )r{   rU  r'  r   r   r   r   s          r   delete_aclsKafkaAdminClient.delete_acls  s    Cc9--*+\]]  ,,**+<!*La<'0WbcWbPSFFsKWbcG \'0WbcWbPSFFsKWbcG &RVG_ 
 ++DLL,J,J,LgVx(<<BB;YY! d drL  c                     U R                   U R                  U R                  (       a0  U R                  R                  5        VVs/ s H  u  pUPM	     snn4$ S4$ s  snnf )a	  Convert a ConfigResource into the format required by DescribeConfigsRequest.

Arguments:
    config_resource: A ConfigResource with resource_type, name, and optional config keys.

Returns:
    A tuple: (resource_type, resource_name, [list_of_config_keys] or None).
Nr#  r   r|   r   config_resourcer   r   s      r   )_convert_describe_config_resource_request:KafkaAdminClient._convert_describe_config_resource_request  sl     ))   !(( <K;R;R;X;X;Z;Z7z
;Z
 	

 /3
 	
s   A
c                 .   / n/ nU Hc  nUR                   [        R                  :X  a"  UR                  U R	                  U5      5        MC  UR                  U R	                  U5      5        Me     / nU R
                  R                  [        SS9nUS:X  a  U(       a'  [        SR                  U R                  S   5      5      e[        U5      S:  aE  U H?  n [        US   5      n	UR                  U R                  U	[        U   " U/S95      5        MA     [        U5      S:  aD  UR                  U R                  U R
                  R                  5       [        U   " US95      5        OUS::  a  [        U5      S:  aF  U H@  n [        US   5      n	UR                  U R                  U	[        U   " U/US	95      5        MB     [        U5      S:  aD  UR                  U R                  U R
                  R                  5       [        U   " XBS	95      5        O[!        S
R                  U5      5      eU R#                  U5        U V
s/ s H  oR$                  PM     sn
$ ! [         a    [        S5      ef = f! [         a    [        S5      ef = fs  sn
f )a6  Fetch configuration parameters for one or more Kafka resources.

Arguments:
    config_resources: An list of ConfigResource objects.
        Any keys in ConfigResource.configs dict will be used to filter the
        result. Setting the configs dict to None will get all values. An
        empty dict will get zero values (as per Kafka protocol).

Keyword Arguments:
    include_synonyms (bool, optional): If True, return synonyms in response. Not
        supported by all versions. Default: False.

Returns:
    Appropriate version of DescribeConfigsResponse class.
rb   r   r   z[include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}.rU   r   zHBroker resource names must be an integer or a string represented integerr  )r  include_synonymszKSupport for DescribeConfigs v{} has not yet been added to KafkaAdminClient.)r#  r   BROKERr  rh  rx   rU   r   r   rs   ru   lenint
ValueErrorr   r   r   r   r   )r{   config_resourcesrl  broker_resourcestopic_resourcesrg  futuresr   broker_resource	broker_idfs              r   describe_configs!KafkaAdminClient.describe_configs  s   & /O,,0B0I0II ''(V(VWf(gh&&t'U'UVe'fg	  0 ,,**+Aq*Qa</qM :;= = #$q('7Ou$'(:$;	 NN4#=#=!.w7?BST$  (8 ?#a't99LL224*73oN  
 \#$q('7Ou$'(:$;	 NN4#=#=!.w7'6&7-=?$  (8 ?#a't99LL224*73oq  
 &]ddelmo o 	w'!()A))O & u()sttu& & u()sttu( *s   I I9J I69Jc                     U R                   U R                  U R                  R                  5        VVs/ s H  u  pX4PM
     snn4$ s  snnf )a  Convert a ConfigResource into the format required by AlterConfigsRequest.

Arguments:
    config_resource: A ConfigResource with resource_type, name, and config (key, value) pairs.

Returns:
    A tuple: (resource_type, resource_name, [(config_key, config_value), ...]).
re  rf  s      r   &_convert_alter_config_resource_request7KafkaAdminClient._convert_alter_config_resource_request  sR     ))  KZKbKbKhKhKjKj/Gz*Kj
 	
s   A
c                 v   U R                   R                  [        SS9nUS::  a-  [        U   " U Vs/ s H  o0R                  U5      PM     snS9nO[	        SR                  U5      5      eU R                  U R                   R                  5       U5      nU R                  U/5        UR                  nU$ s  snf )a  Alter configuration parameters of one or more Kafka resources.

Warning:
    This is currently broken for BROKER resources because those must be
    sent to that specific broker, versus this always picks the
    least-loaded node. See the comment in the source code for details.
    We would happily accept a PR fixing this.

Arguments:
    config_resources: A list of ConfigResource objects.

Returns:
    Appropriate version of AlterConfigsResponse class.
r   r   rk  zHSupport for AlterConfigs v{} has not yet been added to KafkaAdminClient.)
rx   rU   r    r{  r   rs   r   r   r   r   )r{   rq  r   rg  r   r   r   s          r   alter_configsKafkaAdminClient.alter_configs  s     ,,**+>A*Na<)'2o  Ao\kFFWo  AG &Z" " ++DLL,J,J,LgVx(<<! As   B6c                 6    U UR                   UR                  44$ )a,  Convert a NewPartitions object into the tuple format for CreatePartitionsRequest.

Arguments:
    topic_name: The name of the existing topic.
    new_partitions: A NewPartitions instance with total_count and new_assignments.

Returns:
    A tuple: (topic_name, (total_count, [list_of_assignments])).
)total_countnew_assignments)
topic_namenew_partitionss     r   "_convert_create_partitions_request3KafkaAdminClient._convert_create_partitions_requestH  s(     **..
 	
r   c           
      R   U R                   R                  [        SS9nU R                  U5      nUS::  aB  [        U   " UR	                  5        VVs/ s H  u  pVU R                  XV5      PM     snnUUS9nO[        SR                  U5      5      eU R                  U5      $ s  snnf )a  Create additional partitions for an existing topic.

Arguments:
    topic_partitions: A map of topic name strings to NewPartition objects.

Keyword Arguments:
    timeout_ms (numeric, optional): Milliseconds to wait for new partitions to be
        created before the broker returns.
    validate_only (bool, optional): If True, don't actually create new partitions.
        Default: False

Returns:
    Appropriate version of CreatePartitionsResponse class.
r   r   )topic_partitionsr   r   zLSupport for CreatePartitions v{} has not yet been added to KafkaAdminClient.)	rx   rU   r!   r   r   r  r   rs   r   )r{   r  r   r   r   r  r  r   s           r   create_partitions"KafkaAdminClient.create_partitions[  s     ,,**+BPQ*R++J7
a<-g6 IY  I_  I_  Ia  "b  Ia  kEjt$"I"I*"e  Ia  "b"+G &^" " //88 "bs   B#
c                 B   U R                  U5      n[        U5      n[        S U 5       5      nU R                  US9R                  5       n[	        [
        5      n[        5       nUR                  SS5       HZ  nUR                  SS5       HA  n[        US   US   S9n	X;   d  M  XXS	      R                  U	5        UR                  U	5        MC     M\     [        U5      [        U5      :w  a2  [        U5      U-
  n
[        S
SR                  S U
 5       5      -  5      eU$ )a  Finds ID of the leader node for every given topic partition.

Will raise UnknownTopicOrPartitionError if for some partition no leader can be found.

:param partitions: ``[TopicPartition]``: partitions for which to find leaders.
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
    config.

:return: Dictionary with ``{leader_id -> {partitions}}``
c              3   8   #    U  H  oR                   v   M     g 7fN)r   ).0tps     r   	<genexpr>>KafkaAdminClient._get_leader_for_partitions.<locals>.<genexpr>  s     3
"XX
   r   r   rm   
partitionsr   	partition)r   r  leaderz*The following partitions are not known: %s, c              3   8   #    U  H  n[        U5      v   M     g 7fr  )str)r  xs     r   r  r    s     4GqCFFGr  )r   rp   r  r  r   listgetr0   r  addrn  r   join)r{   r  r   r   r   leader2partitionsvalid_partitionsr   r  t2punknowns              r   _get_leader_for_partitions+KafkaAdminClient._get_leader_for_partitionsx  s    ++J7
_
3
33--V-<FFH'-5\\(B/E"YY|R8	$5>Y{E[\$%&9:AA#F$((-	 9 0 z?c"233*o(88G.<))4G445 
 ! r   c                    U R                  U5      n/ nU R                  R                  [        SS9nUc  [	        S5      eUc  U R                  [        U5      U5      nOU[        U5      0nUR                  5        H  u  px[        [        5      n	U H   n
XR                     R                  U
5        M"     [        U   " U	R                  5        VVVs/ s H&  u  pX Vs/ s H  oR                  X   4PM     sn4PM(     snnnUS9nU R                  X}5      nU R                  U/5        UR                  UR                  R!                  5       5        M     0 n0 nU HD  nUS    H8  nUS    H,  n
[#        US   U
S   5      nXU'   U
S	   S:w  d  M$  U
S	   UU'   M.     M:     MF     U(       a  [%        U5      S
:X  a]  ['        [)        UR                  5       5      5      u  nn[*        R,                  " U5      " SUR                  < SUR                  < 35      e[*        R.                  " SSR1                  S UR                  5        5       5      -   5      eU$ s  snf s  snnnf )a  Delete records whose offset is smaller than the given offset of the corresponding partition.

:param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the
    given partitions.
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
    config.
:param partition_leader_id: ``str``: If specified, all deletion requests will be sent to
    this node. No check is performed verifying that this is indeed the leader for all
    listed partitions: use with caution.

:return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker.
    See DeleteRecordsResponse for possible fields. error_code for all partitions is
    guaranteed to be zero, otherwise an exception is raised.
r   r   z+Broker does not support DeleteGroupsRequest)r   r   r   r  r   partition_indexr   r   z"Error deleting records from topic z partition z<The following errors occured when trying to delete records: r  c              3      #    U  HA  u  pS UR                   UR                  [        R                  " U5      R                  4-  v   MC     g7f)z%s(partition=%d): %sN)r   r  r   r   __name__)r  r  r!  s      r   r  2KafkaAdminClient.delete_records.<locals>.<genexpr>  sF       1H,I /")*=*=vu?U?^?^_`0Gs   A	A)r   rx   rU   r(   r   r  rp   r   r   r  r   r  r  r   r   r   r  r0   rn  nextiterr   r   BrokerResponseErrorr  )r{   records_to_deleter   partition_leader_id	responsesr   r  r  r  topic2partitionsr  r   r  r   r   partition2resultpartition2errorr   keyr!  s                       r   delete_recordsKafkaAdminClient.delete_records  sr    ++J7
	,,**+?Q*O?+,YZZ & $ ? ?%&
! "5c:K6L M"3"9"9";F*40'	 188C ( +73 .>-C-C-E-E) ZXZrll,=,ABZXY-E &G //@F""F8,V\\3356 #<" !H!(+!&|!4I'fyAR7STB+4R( .!3.7.E+	 "5 , " ?#q(!$'<'<'>"?@
Uooe,KN99VYVcVcd  00RII  1@0E0E0G    I Ys   II8IIc                 ,   U R                   R                  [        SS9nUS::  a0  U(       a  [        SR	                  U5      5      e[        U   " U4S9nO0US::  a  [        U   " U4US9nO[        SR	                  U5      5      eU R                  X%5      $ )zSend a DescribeGroupsRequest to the group's coordinator.

Arguments:
    group_id: The group name as a string
    group_coordinator_id: The node_id of the groups' coordinator broker.

Returns:
    A message future.
r   r   rb   zfinclude_authorized_operations requests DescribeGroupsRequest >= v3, which is not supported by Kafka {})groups)r  include_authorized_operationszQSupport for DescribeGroupsRequest_v{} has not yet been added to KafkaAdminClient.)rx   rU   r#   r   rs   r   r   )r{   r   group_coordinator_idr  r   r   s         r   &_describe_consumer_groups_send_request7KafkaAdminClient._describe_consumer_groups_send_request  s     ,,**+@a*Pa<,/,,2F7O  ,G4XKHG\+G4 {.KG
 &c" " ))*>HHr   c           	         UR                   S::  Ga  [        UR                  5      S:X  d   e[        UR                  R
                  UR                  R                  5       GH  u  p#[        U[        5      (       d  M  UR                  nUR                  U   S   n/ nSn[        XTR                  UR
                  5       GHX  u  pn
U	S:X  a#  UnU[        R                  :H  =(       d    U(       + n[        U
[        5      (       Ga  / nU
R                  nU H  n/ n[        XR
                  UR                  5       H  u  nnnU(       d  M  US:X  a.  U(       a'  UR                  [        R                  " U5      5        MD  US:X  a.  U(       a'  UR                  [         R                  " U5      5        Mx  UR                  U5        M     ["        R$                  " U5      nUR                  U5        M     UR                  U5        GMG  UR                  U5        GM[     UR                   S::  a  UR                  S	5        [&        R$                  " U5      nGM     WR(                  n[*        R,                  " U5      nU[*        R.                  La  U" S
R1                  U5      5      e U$ [3        SR1                  UR                   5      5      e)z:Process a DescribeGroupsResponse into a group description.r   r   r   Fprotocol_typemember_metadatamember_assignmentrb   Nz1DescribeGroupsResponse failed with response '{}'.zRSupport for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient.)r  rn  r  zipSCHEMAfieldsnamesrI  r/   array_of__dict__r   PROTOCOL_TYPEr  r   decoder   r2   _maker3   r   r   r   r   rs   r   )r{   r   response_fieldresponse_namedescribed_groups_field_schemadescribed_group described_group_information_listprotocol_type_is_consumerdescribed_group_informationgroup_information_namegroup_information_fieldr  member_information_listmember_schemamembersmember_informationmembermember_fieldmember_namemember_info_tuplegroup_descriptionr   r   s                          r   *_describe_consumer_groups_process_response;KafkaAdminClient._describe_consumer_groups_process_response  s   1$x'1,,,14X__5K5KX__MbMb1c-ne444B4K4K1&.&7&7&Fq&IO79405-jmn}  @c  @c  eB  eI  eI  kJf4Ne1_D,GM9FJZJhJh9h9}p}l}5%&=uEE683,C,L,LM+F57 2LOPWYmYmo|  pC  pC  MD$GV\;'@'@+6:K+KPV,>,E,EFdFkFklrFs,t-8<O-OTZ,>,E,EFfFmFmntFu,v,>,E,Ef,M MD 5F4K4KL^4_ 1 7 > >?P Q ,G =CCD[\<CCD_`+ kJ2  ++a/8??E(8(>(>?_(`%C 2dD +55J4J/ GVH%' ' 0 !  &d,,-/ /r   c           	      h   / nUb  U Vs0 s H  oUU_M     nnOU R                  U5      nUR                  5        VVs/ s H  u  pWU R                  UUU5      PM     nnnU R                  U5        U H1  n	U	R                  n
U R                  U
5      nUR                  U5        M3     U$ s  snf s  snnf )aW  Describe a set of consumer groups.

Any errors are immediately raised.

Arguments:
    group_ids: A list of consumer group IDs. These are typically the
        group names as strings.

Keyword Arguments:
    group_coordinator_id (int, optional): The node_id of the groups' coordinator
        broker. If set to None, it will query the cluster for each group to
        find that group's coordinator. Explicitly specifying this can be
        useful for avoiding extra network round trips if you already know
        the group coordinator. This is only useful when all the group_ids
        have the same coordinator, otherwise it will error. Default: None.
    include_authorized_operations (bool, optional): Whether or not to include
        information about the operations a group is allowed to perform.
        Only supported on API version >= v3. Default: False.

Returns:
    A list of group descriptions. For now the group descriptions
    are the raw results from the DescribeGroupsResponse. Long-term, we
    plan to change this to return namedtuples as well as decoding the
    partition assignments.
)r   r   r  r   r   r  r  )r{   r   r  r  group_descriptionsr   r   r   rt  r   r   r  s               r   describe_consumer_groups)KafkaAdminClient.describe_consumer_groupsK  s    4  +R["\R[h-A#AR["\"&"<"<Y"G -@,E,E,G

 -H(	 77-/ -H 	 
 	w'F||H $ O OPX Y%%&78 
 "!% #]
s
   B)B.c                     U R                   R                  [        SS9nUS::  a  [        U   " 5       nO[        SR	                  U5      5      eU R                  X5      $ )z|Send a ListGroupsRequest to a broker.

Arguments:
    broker_id (int): The broker's node_id.

Returns:
    A message future
rb   r   zMSupport for ListGroupsRequest_v{} has not yet been added to KafkaAdminClient.)rx   rU   r"   r   rs   r   )r{   rv  r   r   s       r   "_list_consumer_groups_send_request3KafkaAdminClient._list_consumer_groups_send_request|  s^     ,,**+<!*La<'02G%_" " )))==r   c                    UR                   S::  aW  [        R                  " UR                  5      nU[        R                  La  U" SR                  U5      5      e UR                  $ [        SR                  UR                   5      5      e)z3Process a ListGroupsResponse into a list of groups.rb   z,ListGroupsRequest failed with response '{}'.zNSupport for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient.)r  r   r   r   r   rs   r   r  r   s      r   &_list_consumer_groups_process_response7KafkaAdminClient._list_consumer_groups_process_response  s}    1$)<)<=J/ BVH%' ' 0  &`,,-/ /r   c                    [        5       nUc=  U R                  R                  R                  5        Vs/ s H  o3R                  PM     nnU Vs/ s H  o@R                  U5      PM     nnU R                  U5        U H/  nUR                  nUR                  U R                  U5      5        M1     [        U5      $ s  snf s  snf )a  List all consumer groups known to the cluster.

This returns a list of Consumer Group tuples. The tuples are
composed of the consumer group name and the consumer group protocol
type.

Only consumer groups that store their offsets in Kafka are returned.
The protocol type will be an empty string for groups created using
Kafka < 0.9 APIs because, although they store their offsets in Kafka,
they don't use Kafka for group coordination. For groups created using
Kafka >= 0.9, the protocol type will typically be "consumer".

As soon as any error is encountered, it is immediately raised.

Keyword Arguments:
    broker_ids ([int], optional): A list of broker node_ids to query for consumer
        groups. If set to None, will query all brokers in the cluster.
        Explicitly specifying broker(s) can be useful for determining which
        consumer groups are coordinated by those broker(s). Default: None

Returns:
    list: List of tuples of Consumer Groups.

Raises:
    CoordinatorNotAvailableError: The coordinator is not
        available, so cannot process requests.
    CoordinatorLoadInProgressError: The coordinator is loading and
        hence can't process requests.
)rp   rx   clusterbrokersnodeIdr  r   r   rv   r  r  )r{   
broker_idsconsumer_groupsbrokerbrt  rw  r   s           r   list_consumer_groups%KafkaAdminClient.list_consumer_groups  s    D %6:ll6J6J6R6R6TU6TF--6TJUGQRz!::1=zRw'AwwH""4#N#Nx#XY  O$$ VRs   B<Cc                    U R                   R                  [        SS9nUS::  a  Uc#  US::  a  [        SR	                  U5      5      eSnOL[        [        5      nU H  u  pxXg   R                  U5        M     [        [        R                  " U5      5      n[        U   " X5      n	O[        SR	                  U5      5      eU R                  X)5      $ )a  Send an OffsetFetchRequest to a broker.

Arguments:
    group_id (str): The consumer group id name for which to fetch offsets.
    group_coordinator_id (int): The node_id of the group's coordinator broker.

Keyword Arguments:
    partitions: A list of TopicPartitions for which to fetch
        offsets. On brokers >= 0.10.2, this can be set to None to fetch all
        known offsets for the consumer group. Default: None.

Returns:
    A message future
rB   r   Nr   zOffsetFetchRequest_v{} requires specifying the
                        partitions for which to fetch offsets. Omitting the
                        partitions is only supported on brokers >= 0.10.2.
                        For details, see KIP-88.zNSupport for OffsetFetchRequest_v{} has not yet been added to KafkaAdminClient.)rx   rU   r,   rp  rs   r   rp   r  r  r   	iteritemsr   r   )
r{   r   r  r  r   topics_partitionstopics_partitions_dictr   r  r   s
             r   )_list_consumer_group_offsets_send_request:KafkaAdminClient._list_consumer_group_offsets_send_request  s      ,,**+=1*Ma<!a<$4 5;F7O	E E
 %)! *5S)9&(2$E*155i@ )3$(7M)N$O!(1(NG%`" " ))*>HHr   c                 N   UR                   S::  a  UR                   S:  aJ  [        R                  " UR                  5      nU[        R                  La  U" SR                  U5      5      e0 nUR                   H  u  pEU Hy  nUR                   S::  a  Uu  pxpSnOUu  pxpn
[        R                  " U
5      nU[        R                  La  U" SR                  XG5      5      e[        XU5      U[        XG5      '   M{     M     U$ [        SR                  UR                   5      5      e)zProcess an OffsetFetchResponse.

Arguments:
    response: an OffsetFetchResponse.

Returns:
    A dictionary composed of TopicPartition keys and
    OffsetAndMetadata values.
rB   r   z.OffsetFetchResponse failed with response '{}'.   r   zAUnable to fetch consumer group offsets for topic {}, partition {}zOSupport for OffsetFetchResponse_v{} has not yet been added to KafkaAdminClient.)
r  r   r   r   r   rs   r   r1   r0   r   )r{   r   r   offsetsr   r  partition_datar  offsetr  r   leader_epochs               r   -_list_consumer_group_offsets_process_response>KafkaAdminClient._list_consumer_group_offsets_process_response  s     1$ ##a'#__X-@-@A
V^^3$H)+ + G%-__!&0N++q0BP?	8')P^M	<:!'!<J!7(_#VE57 7 ARRXdp@qGN5<= '1 &5"  &a,,-/ /r   c                     Uc  U R                  U/5      U   nU R                  XU5      nU R                  U/5        UR                  nU R	                  U5      $ )a  Fetch Consumer Offsets for a single consumer group.

Note:
This does not verify that the group_id or partitions actually exist
in the cluster.

As soon as any error is encountered, it is immediately raised.

Arguments:
    group_id (str): The consumer group id name for which to fetch offsets.

Keyword Arguments:
    group_coordinator_id (int, optional): The node_id of the group's coordinator
        broker. If set to None, will query the cluster to find the group
        coordinator. Explicitly specifying this can be useful to prevent
        that extra network round trip if you already know the group
        coordinator. Default: None.
    partitions: A list of TopicPartitions for which to fetch
        offsets. On brokers >= 0.10.2, this can be set to None to fetch all
        known offsets for the consumer group. Default: None.

Returns:
    dictionary: A dictionary with TopicPartition keys and
    OffsetAndMetadata values. Partitions that are not specified and for
    which the group_id does not have a recorded offset are omitted. An
    offset value of `-1` indicates the group_id has no offset for that
    TopicPartition. A `-1` can only happen for partitions that are
    explicitly specified.
)r   r  r   r   r  )r{   r   r  r  r   r   s         r   list_consumer_group_offsets,KafkaAdminClient.list_consumer_group_offsets  sb    >  '#'#=#=xj#I(#S ??$,JPx(<<AA(KKr   c                    Ub  U R                  X5      /nO}[        [        5      nU R                  U5      R	                  5        H  u  pVXF   R                  U5        M     UR	                  5        VVs/ s H  u  paU R                  X5      PM     nnnU R                  U5        / nU H-  nUR                  U R                  UR                  5      5        M/     U$ s  snnf )aU  Delete Consumer Group Offsets for given consumer groups.

Note:
This does not verify that the group ids actually exist and
group_coordinator_id is the correct coordinator for all these groups.

The result needs checking for potential errors.

Arguments:
    group_ids ([str]): The consumer group ids of the groups which are to be deleted.

Keyword Arguments:
    group_coordinator_id (int, optional): The node_id of the broker which is
        the coordinator for all the groups. Use only if all groups are coordinated
        by the same broker. If set to None, will query the cluster to find the coordinator
        for every single group. Explicitly specifying this can be useful to prevent
        that extra network round trips if you already know the group coordinator.
        Default: None.

Returns:
    A list of tuples (group_id, KafkaError)
)
$_delete_consumer_groups_send_requestr   r  r   r   r  r   extend_convert_delete_groups_responser   )	r{   r   r  rt  coordinators_groupsr   r   resultsrw  s	            r   delete_consumer_groups'KafkaAdminClient.delete_consumer_groupsA  s    .  +@@abG"-d"3,0,F,Fy,Q,W,W,Y(#3::8D -Z 2E1J1J1L1L-N 99)T1L  
 	w'ANN4??HI s   4Cc                     UR                   S::  a@  / nUR                   H,  u  p4UR                  U[        R                  " U5      45        M.     U$ [        SR                  UR                   5      5      e)zParse the DeleteGroupsResponse, mapping group IDs to their respective errors.

Arguments:
    response: A DeleteGroupsResponse object from the broker.

Returns:
    A list of (group_id, KafkaError) for each deleted group.
r   zPSupport for DeleteGroupsResponse_v{} has not yet been added to KafkaAdminClient.)r  r  r  r   r   r   rs   )r{   r   r  r   r   s        r   r  0KafkaAdminClient._convert_delete_groups_responsej  si     1$G(0(8(8$&//**EFG )9N%bVH0013 3r   c                     U R                   R                  [        SS9nUS::  a  [        U   " U5      nO[        SR	                  U5      5      eU R                  X$5      $ )a7  Send a DeleteGroupsRequest to the specified broker (the group coordinator).

Arguments:
    group_ids ([str]): A list of consumer group IDs to be deleted.
    group_coordinator_id (int): The node_id of the broker coordinating these groups.

Returns:
    A future representing the in-flight DeleteGroupsRequest.
r   r   zOSupport for DeleteGroupsRequest_v{} has not yet been added to KafkaAdminClient.)rx   rU   r'   r   rs   r   )r{   r   r  r   r   s        r   r  5KafkaAdminClient._delete_consumer_groups_send_request}  sa     ,,**+>A*Na<)'29=G%aVG_& & ))*>HHr   c                 \    U R                  5        VVs/ s H	  u  pUU4PM     snn$ s  snnf r  )r   )r  r   partition_idss      r   _convert_topic_partitions*KafkaAdminClient._convert_topic_partitions  sC     )9(>(>(@

 )A$  )A
 	
 
s   (c           	         U R                   R                  R                  5        VVs/ s HP  nUU R                   R                  R                  U   R	                  5        Vs/ s H  o"R
                  PM     sn4PMR     snn$ s  snf s  snnf r  )rx   r  r   _partitionsr   r  )r{   r   partition_infos      r   _get_all_topic_partitions*KafkaAdminClient._get_all_topic_partitions  s     --446

 7 @D@T@T@`@`af@g@n@n@pq@pn))@pq 7
 	
 r
s   :B"B7BBc                 J    Uc  U R                  5       $ U R                  U5      $ r  )r  r  )r{   r  s     r   _get_topic_partitions&KafkaAdminClient._get_topic_partitions  s)    #1133--.>??r   c                     U R                   R                  [        SS9nU R                  U5      n[        U   " [	        U5      U R                  U5      US9nU R                  U5      $ )a  Perform leader election on the topic partitions.

:param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean
:param topic_partitions: A map of topic name strings to partition ids list.
    By default, will run on all topic partitions
:param timeout_ms: Milliseconds to wait for the leader election process to complete
    before the broker returns.

:return: Appropriate version of ElectLeadersResponse class.
r   r   )election_typer  r   )rx   rU   r*   r   r+   r  r   )r{   r  r  r   r   r   s         r   perform_leader_election(KafkaAdminClient.perform_leader_election  sj     ,,**+>A*N++J7
%g.&}5!778HI
 //88r   c                     [        S U 5       5      (       d]  U H=  nU R                  R                  US9  UR                  5       (       d  M3  UR                  e   [        S U 5       5      (       d  M\  gg)zBlock until all futures complete. If any fail, raise the encountered exception.

Arguments:
    futures: A list of Future objects awaiting results.

Raises:
    The first encountered exception if a future fails.
c              3   @   #    U  H  oR                  5       v   M     g 7fr  )r<  )r  r   s     r   r  5KafkaAdminClient._wait_for_futures.<locals>.<genexpr>  s     ?wV&&((ws   )r   N)allrx   pollr=  	exception)r{   rt  r   s      r   r   "KafkaAdminClient._wait_for_futures  s^     ?w???!!!!0==?? ***	 " ?w???r   c                 *   U R                   R                  [        SS9nUS::  aV  [        U   " 5       nU R                  U R                   R	                  5       U5      nU R                  U/5        UR                  $ [        SR                  U5      5      e)zRSend a DescribeLogDirsRequest request to a broker.

Returns:
    A message future
r   r   zRSupport for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient.)	rx   rU   r)   r   r   r   r   rs   r   )r{   r   r   r   s       r   describe_log_dirs"KafkaAdminClient.describe_log_dirs  s     ,,**+Aq*Qa<,W57G//0N0N0PRYZF""F8,
 || &dVG_& &r   )rx   ry   r   rw   ru   )r<   )T)NFr  )F)NN)Er  
__module____qualname____firstlineno____doc__r4   socketIPPROTO_TCPTCP_NODELAYr   DefaultSelectorr   rr   r   r   r   rz   r   r   r   r   r   r   r   staticmethodr   r   r   r  r  r  r  r)  r/  r4  r8  rF  rJ  rN  rQ  r^  rb  rh  rx  r{  r~  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r   r%  __static_attributes__rm   r   r   r6   r6   %   s   vn)[) 	_{2) 	e	)
 	"=) 	) 	#E) 	0) 	) 	T) 	F..0B0BAFG) 	D) 	"4) 	C) 	v)  	[!)" 	t#)$ 	d%)& 	d')( 	))* 	t+), 	-). 	t/)0 	t1)2 	&t3)4 	I--5)6 	$7)8 	t9): 	t;)< 	d=)> 	%g?)@ 	$TA)B 	$TC)D 	E)J 	BK)L  !$)#Q)NV/<	5	?"@U*'$#4;("nH&( 
 
2&9P843 (" ("T4Fl 
 
& 
 
& "K "KH$JL 
 
$ 
 
& *" *"X%ZN 
 
"O*b 
 
""T 
 
$9:"!HK r!IF1!f/"b>$*%Z 26%IN)V JN/3%LN'R3&I& 
 

@
9*+ r   r6   )M
__future__r   r   collectionsr   rt   loggingr+  r    r   kafka.vendorr   kafka.admin.acl_resourcer	   r
   r   r   r   r   r   kafka.client_asyncr   r   kafka.coordinator.protocolr   r   r   kafka.errorserrorsr   r   r   r   r   r   kafka.futurer   kafka.metricsr   r   kafka.protocol.adminr   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   kafka.protocol.commitr,   kafka.protocol.find_coordinatorr-   kafka.protocol.metadatar.   kafka.protocol.typesr/   kafka.structsr0   r1   r2   r3   kafka.versionr4   	getLoggerr  rn   objectr6   rm   r   r   <module>rF     s    0 #          5 y y 5 5   /j j j j j 5 B 3 & ` ` % !wv wr   