o
    1 i$                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZ d dlZd dlm	Z
 d dlm  mZ d dlmZ d dlmZmZmZmZ eeZG dd dZG d	d
 d
eZG dd deZG dd deZG dd deZdS )    N)deque)ListTupleaio)get_or_create_event_loop)gcs_pb2gcs_service_pb2gcs_service_pb2_grpc
pubsub_pb2c                   @   sX   e Zd ZddefddZedd Zdd Zd	d
 Zdd Z	e
dejddfddZdS )_SubscriberBaseN	worker_idc                 C   s8   || _ ttdd tdD | _d| _d| _d| _d S )Nc                 s   s    | ]}t d V  qdS )   N)randomgetrandbits).0_ r   c/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/_private/gcs_pubsub.py	<genexpr>   s    z+_SubscriberBase.__init__.<locals>.<genexpr>   r       )
_worker_idbytes	bytearrayrange_subscriber_id_last_batch_size_max_processed_sequence_id_publisher_id)selfr   r   r   r   __init__   s
   
z_SubscriberBase.__init__c                 C   s   | j S N)r   r    r   r   r   last_batch_size"   s   z_SubscriberBase.last_batch_sizec                 C   s(   t j|i d}tj| j| j|gd}|S )N)channel_typeZsubscribe_messagesubscriber_idZ	sender_idcommands)r   Commandr	    GcsSubscriberCommandBatchRequestr   r   )r    channelcmdreqr   r   r   _subscribe_request&   s
   z"_SubscriberBase._subscribe_requestc                 C   s   t j| j| j| jdS )N)r'   Zmax_processed_sequence_idpublisher_id)r	   ZGcsSubscriberPollRequestr   r   r   r#   r   r   r   _poll_request-   s
   z_SubscriberBase._poll_requestc                 C   s8   t j| j| jg d}|D ]}|jtj|i d q|S )Nr&   )r%   Zunsubscribe_message)r	   r*   r   r   r(   appendr   r)   )r    channelsr-   r+   r   r   r   _unsubscribe_request4   s   
z$_SubscriberBase._unsubscribe_requestereturnc                 C   s,   |   tjjkr
dS |   tjjkrdS dS )NTF)codegrpcZ
StatusCodeZDEADLINE_EXCEEDEDUNAVAILABLE)r4   r   r   r   _should_terminate_polling>   s
   z)_SubscriberBase._should_terminate_pollingr"   )__name__
__module____qualname__r   r!   propertyr$   r.   r0   r3   staticmethodr7   RpcErrorr9   r   r   r   r   r      s    

r   c                       sb   e Zd ZdZ			ddededejf fddZdd	d
Z	dddZ
ddddZdddZ  ZS )_AioSubscribera#  Async io subscriber to GCS.

    Usage example common to Aio subscribers:
        subscriber = GcsAioXxxSubscriber(address="...")
        await subscriber.subscribe()
        while running:
            ...... = await subscriber.poll()
            ......
        await subscriber.close()
    Nr   addressr+   c                    sh   t  | |r|d u sJ dtj|dd}n|d us J dt|| _|| _t | _	t
 | _d S )Nz,address and channel cannot both be specifiedTr   z,One of address and channel must be specified)superr!   	gcs_utilsZcreate_gcs_channelr
   ZInternalPubSubGcsServiceStub_stub_channelr   _queueasyncioEvent_close)r    Zpubsub_channel_typer   rA   r+   	__class__r   r   r!   V   s   z_AioSubscriber.__init__r5   c                    s6   | j  rdS | | j}| jj|ddI dH  dS )zRegisters a subscription for the subscriber's channel type.

        Before the registration, published messages in the channel will not be
        saved for the subscriber.
        N   timeout)rI   is_setr.   rE   rD   GcsSubscriberCommandBatchr    r-   r   r   r   	subscriben   s
   
z_AioSubscriber.subscribec                    s   | j j||dI d H S )NrM   )rD   ZGcsSubscriberPoll)r    r-   rN   r   r   r   
_poll_cally   s   z_AioSubscriber._poll_callc           
   
      sz  t | jdkr|  }t | j||d}t | j }tj||g|tj	dI d H \}}|
 }| s<|  ||vsD||v rFd S zQt | j| _| j| jkru| jdkrltd| j d| j d | j| _d| _| jD ]}|j| jkrtd|  qz|j| _| j| qzW n tjy }	 z| |	rW Y d }	~	d S  d }	~	ww t | jdksd S d S )	Nr   rM   )rN   return_when zreplied publisher_id zdifferent from z/, this should only happens during gcs failover.zIgnoring out of order message )lenrF   r0   r   create_taskrS   rI   waitrG   FIRST_COMPLETEDpopdonecancelresultZpub_messagesr   r/   r   loggerdebugr   Zsequence_idwarningr1   r7   r?   r9   )
r    rN   r-   pollcloser[   ZothersZ
other_taskmsgr4   r   r   r   _poll}   sP   

z_AioSubscriber._pollc                    sb   | j  rdS | j   | j| jgd}z| jj|ddI dH  W n	 ty+   Y nw d| _dS )z2Closes the subscriber and its active subscription.N)r2      rM   )rI   rO   setr3   rE   rD   rP   	ExceptionrQ   r   r   r   rb      s   


z_AioSubscriber.closeNNN)r5   Nr"   )r:   r;   r<   __doc__r   straiogrpcChannelr!   rR   rS   rd   rb   __classcell__r   r   rJ   r   r@   J   s     

'r@   c                       sX   e Zd Z			ddededejf fddZddeeef fdd	Z	e
d
d Z  ZS )GcsAioResourceUsageSubscriberNr   rA   r+   c                       t  tj||| d S r"   )rB   r!   r   ZRAY_NODE_RESOURCE_USAGE_CHANNELr    r   rA   r+   rJ   r   r   r!      s   
z&GcsAioResourceUsageSubscriber.__init__r5   c                    s    | j |dI dH  | | jS )zPolls for new resource usage message.

        Returns:
            A tuple of string reporter ID and resource usage json string.
        rM   N)rd   _pop_resource_usagerF   )r    rN   r   r   r   ra      s   z"GcsAioResourceUsageSubscriber.pollc                 C   s*   t | dkrdS |  }|j |jjfS )Nr   )NN)rV   popleftkey_iddecodeZnode_resource_usage_messagejson)queuerc   r   r   r   rq      s   z1GcsAioResourceUsageSubscriber._pop_resource_usagerh   r"   )r:   r;   r<   r   rj   r7   rl   r!   r   ra   r>   rq   rm   r   r   rJ   r   rn      s    
	rn   c                       sl   e Zd Z			ddededejf fddZedd Z		dd	e
eeejf  fd
dZedd Z  ZS )GcsAioActorSubscriberNr   rA   r+   c                    ro   r"   )rB   r!   r   ZGCS_ACTOR_CHANNELrp   rJ   r   r   r!         zGcsAioActorSubscriber.__init__c                 C   s
   t | jS r"   )rV   rF   r#   r   r   r   
queue_size   s   
z GcsAioActorSubscriber.queue_sizer5   c                    $   | j |dI dH  | j| j|dS )z}Polls for new actor message.

        Returns:
            A list of tuples of binary actor ID and actor table data.
        rM   N
batch_size)rd   _pop_actorsrF   r    r|   rN   r   r   r   ra         zGcsAioActorSubscriber.pollc                 C   f   t | dkrg S d}g }t | dkr1||k r1|  }||j|jf |d7 }t | dkr1||k s|S Nr      )rV   rr   r1   rs   Zactor_messagerv   r|   poppedZmsgsrc   r   r   r   r}         z!GcsAioActorSubscriber._pop_actorsrh   r"   )r:   r;   r<   r   rj   r7   rl   r!   r=   ry   r   r   r   ZActorTableDatara   r>   r}   rm   r   r   rJ   r   rw      s&    

rw   c                       s`   e Zd Z			ddededejf fddZ	ddee	ee
jf  fdd	Zed
d Z  ZS )GcsAioNodeInfoSubscriberNr   rA   r+   c                    ro   r"   )rB   r!   r   ZGCS_NODE_INFO_CHANNELrp   rJ   r   r   r!      rx   z!GcsAioNodeInfoSubscriber.__init__r5   c                    rz   )zsPolls for new node info message.

        Returns:
            A list of tuples of (node_id, GcsNodeInfo).
        rM   Nr{   )rd   _pop_node_infosrF   r~   r   r   r   ra      r   zGcsAioNodeInfoSubscriber.pollc                 C   r   r   )rV   rr   r1   rs   Znode_info_messager   r   r   r   r     r   z(GcsAioNodeInfoSubscriber._pop_node_infosrh   r"   )r:   r;   r<   r   rj   r7   rl   r!   r   r   r   ZGcsNodeInfora   r>   r   rm   r   r   rJ   r   r      s"    	
r   )rG   loggingr   collectionsr   typingr   r   r7   r   rk   Zray._private.gcs_utilsZ_privaterC   Zray._common.utilsr   Zray.core.generatedr   r	   r
   r   	getLoggerr:   r^   r   r@   rn   rw   r   r   r   r   r   <module>   s     
4i%