o
    )iE0                  	   @   sX  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	 d dl
mZ d dl mZ d dlmZmZmZmZ d dlZd dlZd dlmZ d d	lmZ eeZG d
d dejddddZG dd dejdddddZG dd deZG dd deZG dd deZG dd deZ G dd deZ!G dd de!Z"G dd de!Z#G d d! d!Z$dS )"    N)ABCabstractmethod)deque)asdict)count)Queue)AnyCallableOptionalUnion)KVEventsConfig)init_loggerc                   @   s2   e Zd ZU eed< ee ed< dZee	 ed< dS )
EventBatchtseventsNdata_parallel_rank)
__name__
__module____qualname__float__annotations__listr   r   r
   int r   r   f/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/distributed/kv_events.pyr      s   
 r   TF)
array_likeomit_defaultsgcc                   @   s   e Zd ZdZdS )KVCacheEventz*Base class for all KV cache-related eventsN)r   r   r   __doc__r   r   r   r   r   "   s    r   )r   r   r   tagc                   @   sF   e Zd ZU ee ed< ee ed< ee ed< eed< ee ed< dS )BlockStoredblock_hashesZparent_block_hashZ	token_ids
block_sizeZlora_idN)r   r   r   r   r   r   r
   r   r   r   r   r!   +   s   
 r!   c                   @   s   e Zd ZU ee ed< dS )BlockRemovedr"   N)r   r   r   r   r   r   r   r   r   r   r$   3   s   
 r$   c                   @   s   e Zd ZdS )AllBlocksClearedN)r   r   r   r   r   r   r   r%   7   s    r%   c                   @   s$   e Zd ZU eeeeef  ed< dS )KVEventBatchr   N)	r   r   r   r   r   r!   r$   r%   r   r   r   r   r   r&   ;   s   
 r&   c                   @   sH   e Zd ZdZddeddfddZededdfd	d
ZedddZ	dS )EventPublisheraS  Lightweight publisher for EventBatch batches with data parallelism
    support.
    
    In data parallel setups, each DP rank runs its own EventPublisher instance
    to avoid duplicate events and ensure proper event attribution:
    
    - Each DP rank creates a separate publisher
    - Publishers automatically annotate events with their data_parallel_rank
    - This allows consumers to distinguish events from different DP ranks
    
    The publisher is responsible for adding DP metadata since the scheduler
    operates independently of DP topology and shouldn't need DP awareness.
    r   r   returnNc                 C   s
   || _ d S N)_data_parallel_rank)selfr   r   r   r   __init__N   s   
zEventPublisher.__init__r   c                 C      dS )zEmit events in order.

        Implementations should guarantee at-least-once delivery and
        monotonic ordering (e.g., via sequence numbers).
        Nr   r+   r   r   r   r   publishQ       zEventPublisher.publishc                 C   r-   )zShutdown the publisher.Nr   r+   r   r   r   shutdownY   r0   zEventPublisher.shutdownr   r(   N)
r   r   r   r   r   r,   r   r   r/   r2   r   r   r   r   r'   ?   s    r'   c                   @   s$   e Zd ZdZdddZdddZdS )	NullEventPublisherz-No-op implementation (default when disabled).r(   Nc                 C      d S r)   r   r.   r   r   r   r/   a      zNullEventPublisher.publishc                 C   r6   r)   r   r1   r   r   r   r2   d   r7   zNullEventPublisher.shutdownr4   )r   r   r   r   r/   r2   r   r   r   r   r5   ^   s    
r5   c                       s   e Zd ZU dZdZeed< djddddZ			
				d%de	de
dee
 de	de	de	de
dd
f fddZdedd
fddZd&ddZd&ddZd&dd Zd&d!d"Zedee
 de	dee
 fd#d$Z  ZS )'ZmqEventPublishera  Reliable PUB/ROUTER publisher with an in-memory replay buffer.

    Spawns a separate thread to handle publishing from a queue.

    Parameters
    ----------
    endpoint:
        PUB address. Use ``tcp://*:5557`` to bind or ``tcp://host:5557`` to
        connect.
    replay_endpoint:
        Optional ROUTER address for replay requests. When given, subscribers can
        request missed batches by sending the starting sequence number as an
        8-byte big-endian integer.
    buffer_steps:
        Number of past batches to keep for replay.
    hwm:
        ZeroMQ high-water-mark for PUB socket.
    max_queue_size:
        Maximum number of events to buffer in memory.
    topic:
        Topic to publish events to.
    g      ?SHUTDOWN_TIMEOUT   bigT)signedtcp://*:5557N'  順  r   endpointreplay_endpointbuffer_stepshwmmax_queue_sizetopicr(   c                    s   t  | ttt  |d| _tttt	f  |d| _
tj | _d | _d | _|| _| || j| _| || j| _|| _|   t | _|d| _d| _td tj| j ddd| _!| j!"  d S )N)maxsize)maxlenzutf-8TzStarting ZMQ publisher threadzzmq-publisher)targetdaemonname)#superr,   r   r
   r   _event_queuer   tupler   bytes_bufferzmqContextinstance_ctx_pub_replayZ_dp_rankoffset_endpoint_port	_endpoint_replay_endpoint_hwm_socket_setupr   _seq_genencode_topic_bytes_runningloggerinfo	threadingThread_publisher_thread_threadstart)r+   r   rB   rC   rD   rE   rF   rG   	__class__r   r   r,      s,   
zZmqEventPublisher.__init__r   c                 C   s0   | j std|jd u r| j|_| j| d S )NzPublisher is closed)r`   RuntimeErrorr   r*   rN   putr.   r   r   r   r/      s
   
zZmqEventPublisher.publishc                 C   s   d| _ | jd t }d}|r2t | | jk r2| j  }|r'td |r2t | | jk s|r?td| j	 | j | j
 rL| j
j| jd z| jdurY| jjdd | jdurh| jjdd W dS W dS w )	z1Stop the publisher thread and clean up resources.FNT皙?z:Warning: Queue still has %s items after %s seconds timeouttimeoutr   )Zlinger)r`   rN   
put_nowaittimer9   emptysleepra   warningqsizerf   is_alivejoinrV   closerW   )r+   rg   Zpending_itemsr   r   r   r2      s2   



zZmqEventPublisher.shutdownc                 C   s   | j du rC| jtj| _ | j | j | jdur7d| jv s/d| jv s/| jds/| jdr7| j 	| j n| jdurC| j 
| j | jdurY| jtj| _| j	| j dS dS )zoInitialize sockets
        https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety
        N*z::zipc://z	inproc://)rV   rU   socketrR   ZPUBZset_hwmr[   rY   
startswithbindconnectrZ   ZROUTERrW   r1   r   r   r   r\      s    





zZmqEventPublisher._socket_setupc              
   C   sd  t j | _| jdusJ | js| j dkr| jdurA| j	drAz| 
  W n ty@ } ztd| W Y d}~nd}~ww z| jjdd}|du rPW dS W n
 tjy[   Y qw z*t| j}| j|}|dd}| j| j||f | j||f | j  W n ty } ztd| td W Y d}~nd}~ww | js| j dksdS dS )	z1Background thread that processes the event queue.Nr   zError in replay: %srl   rm   r;   r<   zError in publisher thread: %s)msgspecmsgpackZEncoder_packrV   r`   rN   rt   rW   poll_service_replay	Exceptionra   	exceptiongetqueueEmptynextr]   r^   to_bytessend_multipartr_   rQ   append	task_donerp   rr   )r+   eeventseqpayloadZ	seq_bytesr   r   r   re      sB   

z#ZmqEventPublisher._publisher_threadc              	   C   s   | j dusJ | j  }t|dkrtd| dS |\}}}t|d}| jD ]\}}||kr>| j |d|	dd|f q(| j |d| j
df dS )z6If a replay request is waiting, send buffered batches.N   zInvalid replay request: %sr<       r;   )rW   Zrecv_multipartlenra   rs   r   
from_bytesrQ   r   r   END_SEQ)r+   frameZ	client_id_Zstart_seq_bytesZ	start_seqr   bufr   r   r   r     s   

z!ZmqEventPublisher._service_replayc                 C   s   | r|dkr| S d| v r|  d| S d| v r?| r=d| v r=|  d}| d| }t| |d d }|| }| d| S | S td)	a  Helper function to offset the port in an endpoint by 
            the data parallel rank.

        Args:
            endpoint: The endpoint string 
                (e.g., "tcp://*:5557" or "inproc://cache")
            data_parallel_rank: The data parallel rank to offset by

        Returns:
            The endpoint with the port offset by data_parallel_rank 
                or suffix appended
        r   ZinprocZ_dpZtcp:N   z0Invalid endpoint: must contain 'inproc' or 'tcp')rfindr   
ValueError)rB   r   Zlast_colon_idxZ	base_addrZ	base_portZnew_portr   r   r   rX   %  s   
z&ZmqEventPublisher.offset_endpoint_port)r>   Nr?   r@   r@   rA   r4   )r   r   r   r   r9   r   r   r   r   r   strr
   r,   r   r/   r2   r\   re   r   staticmethodrX   __classcell__r   r   rh   r   r8   h   sL   
 	(



&
r8   c                   @   st   e Zd ZU eedZeeede	f f e
d< ededede	f ddfdd	Ze	
ddee dede	fddZdS )EventPublisherFactory)nullrR   .	_registryrL   ctorr(   Nc                 C   s(   || j v rtd| d|| j |< d S )Nzpublisher 'z' already registered)r   KeyError)clsrL   r   r   r   r   register_publisherL  s   
z(EventPublisherFactory.register_publisherr   configr   c              
   C   st   |st  S t|}|dd}|d z| j| }W n ty0 } z	td| d|d}~ww |dd|i|S )	z'Create publisher from a config mapping.Z	publisherr   Zenable_kv_cache_eventszUnknown event publisher ''Nr   r   )r5   r   popr   r   r   )r   r   r   Zconfig_dictkindconstructorexcr   r   r   createS  s   

zEventPublisherFactory.creater3   )r   r   r   r5   r8   r   dictr   r	   r'   r   classmethodr   r
   r   r   r   r   r   r   r   r   F  s&   
 
r   )%r   rc   rp   abcr   r   collectionsr   dataclassesr   	itertoolsr   r   typingr   r	   r
   r   r}   rR   Zvllm.configr   Zvllm.loggerr   r   ra   Structr   r   r!   r$   r%   r&   r'   r5   r8   r   r   r   r   r   <module>   sH   

	
 _