
    #Ii~                     h    S SK r S SKr S SKrS SKJr  SSKJr   " S S\5      rg! \ a    Sr N"f = f)    N)json   )PubSubManagerc                   l   ^  \ rS rSrSrSr    SU 4S jjrU 4S jrS rS r	S r
S	 rS
 rS rSrU =r$ )KombuManager   a  Client manager that uses kombu for inter-process messaging.

This class implements a client manager backend for event sharing across
multiple processes, using RabbitMQ, Redis or any other messaging mechanism
supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_.

To use a kombu backend, initialize the :class:`Server` instance as
follows::

    url = 'amqp://user:password@hostname:port//'
    server = socketio.Server(client_manager=socketio.KombuManager(url))

:param url: The connection URL for the backend messaging queue. Example
            connection URLs are ``'amqp://guest:guest@localhost:5672//'``
            and ``'redis://localhost:6379/'`` for RabbitMQ and Redis
            respectively. Consult the `kombu documentation
            <http://kombu.readthedocs.org/en/latest/userguide                /connections.html#urls>`_ for more on how to construct
            connection URLs.
:param channel: The channel name on which the server sends and receives
                notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
                   default of ``False`` initializes the class for emitting
                   and receiving.
:param connection_options: additional keyword arguments to be passed to
                           ``kombu.Connection()``.
:param exchange_options: additional keyword arguments to be passed to
                         ``kombu.Exchange()``.
:param queue_options: additional keyword arguments to be passed to
                      ``kombu.Queue()``.
:param producer_options: additional keyword arguments to be passed to
                         ``kombu.Producer()``.
kombuc	                    > [         c  [        S5      e[        T	U ]  X#US9  Xl        U=(       d    0 U l        U=(       d    0 U l        U=(       d    0 U l        U=(       d    0 U l        U R                  5       U l
        g )NzLKombu package is not installed (Run "pip install kombu" in your virtualenv).)channel
write_onlylogger)r	   RuntimeErrorsuper__init__urlconnection_optionsexchange_optionsqueue_optionsproducer_options_connectionpublisher_connection)
selfr   r   r   r   r   r   r   r   	__class__s
            </venv/lib/python3.13/site-packages/socketio/kombu_manager.pyr   KombuManager.__init__1   sz     =  . / / 	O"4": 0 6B*0b 0 6B$($4$4$6!    c                   > [         TU ]  5         SnU R                  R                  S:X  a  SSKJn  U" S5      nO(SU R                  R                  ;   a  SSKJn  U" S5      nU(       d"  [        SU R                  R                  -   5      eg )	NTeventletr   )is_monkey_patchedsocketgevent)is_module_patchedz<Kombu requires a monkey patched socket library to work with )	r   
initializeserver
async_modeeventlet.patcherr   gevent.monkeyr"   r   )r   monkey_patchedr   r"   r   s       r   r#   KombuManager.initializeA   s}    ;;!!Z/:.x8N///7.x8N++0012 2 r   c                 X    [         R                  " U R                  40 U R                  D6$ )N)r	   
Connectionr   r   )r   s    r   r   KombuManager._connectionP   s"    DD,C,CDDr   c                     SSS.nUR                  U R                  5        [        R                  " U R                  40 UD6$ )NfanoutF)typedurable)updater   r	   Exchanger   )r   optionss     r   	_exchangeKombuManager._exchangeS   s6    #6t,,-~~dll6g66r   c                     S[        [        R                  " 5       5      -   nSSS0S.nUR                  U R                  5        [
        R                  " XR                  5       40 UD6$ )Nzpython-socketio.Fz	x-expiresi )r0   queue_arguments)struuiduuid4r1   r   r	   Queuer4   )r   
queue_namer3   s      r   _queueKombuManager._queueX   sS    '#djjl*;;
#f8MNt))*{{:~~'7C7CCr   c                     UR                   " SSU R                  5       0U R                  D6nUR                  X"R                  5      $ )Nexchange )Producerr4   r   ensurepublish)r   
connectionproducers      r   _producer_publishKombuManager._producer_publish^   sH    && @0@ @)-)>)>@  +;+;<<r   c                 d   Sn  U R                  U R                  5      nU" [        R                  " U5      5        g ! [        [
        R                  R                  4 aL    U(       a#  U R                  5       R                  S5        Sn O%U R                  5       R                  S5         g f = fM  )NTz&Cannot publish to rabbitmq... retryingFz'Cannot publish to rabbitmq... giving up)
rG   r   r   dumpsOSErrorr	   
exceptions
KombuError_get_loggererror)r   dataretryproducer_publishs       r   _publishKombuManager._publishc   s    #'#9#9--$/  D!12U--889 $$&,, .8 9!E$$&,,AC s   7= AB-B-,B-c              #   2  #    U R                  5       nSn  U R                  5        nUR                  U5       n UR                  SS9nUR	                  5         UR
                  v   SnM1  ! , (       d  f       O= fS S S 5        O! , (       d  f       O= f! [        [        R                  R                  4 aV    U R                  5       R                  SR                  U5      5        [        R                  " U5        [        US-  S5      n Of = fGM  7f)Nr   T)blockz3Cannot receive from rabbitmq... retrying in {} secs   <   )r=   r   SimpleQueuegetackpayloadrK   r	   rL   rM   rN   rO   formattimesleepmin)r   reader_queueretry_sleeprE   queuemessages         r   _listenKombuManager._listenu   s     {{}7%%':#//="&+iidi&;G#KKM")//1*+K	 # >= ('' U--889 7  "((**0&*=? 

;'!+/267 s]   DB B3A,,
A:	6B=B D
BB DB A7DDDD)r   r   r   r   r   r   )z#amqp://guest:guest@localhost:5672//socketioFNNNNN)__name__
__module____qualname____firstlineno____doc__namer   r#   r   r4   r=   rG   rS   re   __static_attributes____classcell__)r   s   @r   r   r      sK     B D@>B;?6:7 2E7
D=
$7 7r   r   )	r^   r9   r	   ImportErrorengineior   pubsub_managerr   r   rA   r   r   <module>rs      s?       )y7= y7  Es   & 11