o
    )i2                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	 d dl
mZmZ d dlmZ d dlmZmZmZmZ d dlmZ d dlZd dlZd d	lmZmZmZ d d
lmZ d dlmZ d dl m!Z! d dl"m#Z#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+m,Z, erd dl-m.Z. ee/Z0dZ1G dd deZ2G dd dZ3eG dd dZ4eG dd dZ5G dd dZ6e j7dede8ded fd d!Z9G d"d# d#Z:e j7	$d3ded%e;e) d&e<d'e8dee=eee6e:f  ee' e4f  f
d(d)Z>d*ej?d+e4d,e@e3 d-ed.ed/ee6 d0ee fd1d2ZAdS )4    N)Iterator)	dataclass)Enumauto)Process
connection)BaseProcess)TYPE_CHECKINGCallableOptionalUnion)patch)CacheConfigParallelConfig
VllmConfig)init_logger)current_platform)get_env_vars_to_copy)get_mp_contextget_open_zmq_ipc_pathzmq_socket_ctx)DPCoordinator)Executor)get_engine_client_zmq_addrshutdown)PlacementGroupi'  c                   @   s   e Zd Ze Ze Ze ZdS )CoreEngineStateN)__name__
__module____qualname__r   NEW	CONNECTEDREADY r#   r#   `/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/vllm/v1/engine/utils.pyr   #   s    
r   c                   @   s$   e Zd ZdZd	dedefddZdS )

CoreEnginezCOne per data parallel rank, used to track state during handshaking.r   Tindexlocalc                 C   s    || _ |dd| _tj| _d S )N   little)r'   to_bytesidentityr   r    state)selfr&   r'   r#   r#   r$   __init__,   s   zCoreEngine.__init__N)r   T)r   r   r   __doc__intboolr.   r#   r#   r#   r$   r%   )   s    r%   c                   @   sV   e Zd ZU ee ed< ee ed< dZee ed< dZee ed< dZ	ee ed< dS )EngineZmqAddressesinputsoutputsNcoordinator_inputcoordinator_outputfrontend_stats_publish_address)
r   r   r   liststr__annotations__r5   r   r6   r7   r#   r#   r#   r$   r2   3   s   
 r2   c                   @   s2   e Zd ZU dZeed< eeee	ef f ed< dS )EngineHandshakeMetadatazMetadata sent to each engine process during startup handshake,
    including addresses of the front-end ZMQ queues that they should
    connect to.
    	addressesparallel_configN)
r   r   r   r/   r2   r:   dictr9   r   r0   r#   r#   r#   r$   r;   C   s   
 r;   c                   @   s   e Zd ZdZ	ddedededededed	ed
e	e
 dedee fddZdd Zdd ZdefddZdeeef fddZdS )CoreEngineProcManagerz
    Utility class to handle creation, readiness, and shutdown
    of background processes used by the AsyncLLM and LLMEngine.
    N	target_fnlocal_engine_countstart_indexlocal_start_indexvllm_configlocal_clienthandshake_addressexecutor_class	log_statsclient_handshake_addressc              
   C   s  t  }|||||	d}|
r|
|d< g | _g }t|D ]#}|| }|| }|| | j|j|d| |||dB d qt| t| j| _|j	j
dk}z7t| j|D ]#\}}|r_t||nt  |  W d    n1 srw   Y  qTW |  r|   d S d S |  r|   w w )N)rD   rE   rF   rG   rH   rI   ZEngineCore_)dp_ranklocal_dp_rank)targetnamekwargs   )r   	processesrangeappendr   weakreffinalizer   
_finalizerr=   data_parallel_sizezipset_device_control_env_var
contextlibnullcontextstartfinished_procsclose)r-   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   contextZcommon_kwargslocal_dp_ranksr&   local_indexZglobal_indexZdata_parallelprocrK   r#   r#   r$   r.   S   s\   


zCoreEngineProcManager.__init__c                 C   s   |    dS )zShutdown all procs.N)rU   r-   r#   r#   r$   r]      s   zCoreEngineProcManager.closec                 C   s   t dd | jD  dS )zWait for any process to exit.c                 s   s    | ]}|j V  qd S Nsentinel.0ra   r#   r#   r$   	<genexpr>   s    z3CoreEngineProcManager.join_first.<locals>.<genexpr>N)r   waitrP   rb   r#   r#   r$   
join_first   s   z CoreEngineProcManager.join_firstreturnc                 C      dd | j D S )Nc                 S   s   g | ]}|j qS r#   rd   rf   r#   r#   r$   
<listcomp>   s    z3CoreEngineProcManager.sentinels.<locals>.<listcomp>rP   rb   r#   r#   r$   	sentinels   s   zCoreEngineProcManager.sentinelsc                 C   rl   )z>Returns dict of proc name -> exit code for any finished procs.c                 S   s    i | ]}|j d ur|j|j qS rc   )exitcoderM   rf   r#   r#   r$   
<dictcomp>   s    z8CoreEngineProcManager.finished_procs.<locals>.<dictcomp>rn   rb   r#   r#   r$   r\      s   z$CoreEngineProcManager.finished_procsrc   )r   r   r   r/   r
   r0   r   r1   r9   typer   r   r.   r]   rj   r8   ro   r>   r\   r#   r#   r#   r$   r?   M   s8    	

8r?   rD   rK   rk   c                 c   s    | j j}tj}zddd t|| |d | D }W n' tyD } ztd| d||  d|d |  dt	| d		|d
}~ww t
jtj||ffd d
V  W d
   d
S 1 s_w   Y  d
S )zW
    Temporarily set CUDA_VISIBLE_DEVICES or equivalent
    for engine subprocess.
    ,c                 s   s    | ]
}t t|V  qd S rc   )r9   r   Zdevice_id_to_physical_device_idrg   ir#   r#   r$   rh      s
    
z-set_device_control_env_var.<locals>.<genexpr>rO   zError setting z: local range: [z, z) base value: ""N)values)r=   
world_sizer   Zdevice_control_env_varjoinrQ   
IndexError	Exceptionosgetenvr   r>   environ)rD   rK   rx   Zevarvalueer#   r#   r$   rX      s0   


"rX   c                   @   s   e Zd ZdZ		ddededee dede	e
d  d	e	e
e  fd
dZededee
d e
e f fddZedededee
d e
e f fddZdededdfddZdededdfddZdd Zdd ZdS )CoreEngineActorManagerz
    Utility class to handle creation, readiness, and shutdown
    of core engine Ray actors used by the AsyncLLM and LLMEngine.

    Different from CoreEngineProcManager, this class manages
    core engines for both local and remote nodes.
    NrD   r<   rG   rH   placement_groupsr   r_   c              
   C   s  dd l }dd l}ddlm}	 ddlm}
 ddlm} g | _g | _	t
dd}dd |D | _|	| jd	}|| _|| _|| _|jj}|jj}|jj}| rRtd
 n|  |d urw|d usbJ dt|t|ksnJ dtd g | _n
t|\}}|| _t||ksJ dg | _g }tt|||D ]G\}}}||}||j_||k }| |j!|
||d|dj |||||||d}|r| j"| n| j	"| | j"| |"|j#   q|$| g | _%| j| j	 D ]}| j%"|j&   qd S )Nr   
RuntimeEnv PlacementGroupSchedulingStrategyDPEngineCoreActorr   )destinationc                 S   s"   i | ]}|t jv r|t j| qS r#   )r|   r~   )rg   rM   r#   r#   r$   rq      s    
z3CoreEngineActorManager.__init__.<locals>.<dictcomp>Zenv_varsz8Ray is already initialized. Skipping Ray initialization.z?local_dp_ranks must be provided if placement_groups is providedz=placement_groups and local_dp_ranks must have the same lengthzUsing provided placement groupsz8Number of placement groups must match data parallel sizeplacement_groupZplacement_group_bundle_indexZscheduling_strategyruntime_envrD   rG   rH   rE   r<   rJ   rK   )'copyrayray.runtime_envr   ray.util.scheduling_strategiesr   vllm.v1.engine.corer   local_engine_actorsremote_engine_actorsr   env_vars_dictr<   rG   rH   r=   rV   data_parallel_size_localrx   Zis_initializedloggerinfoinitlencreated_placement_groupsr   create_dp_placement_groupsplacement_group_is_localrW   rQ   deepcopyr   remoteoptionsrR   wait_for_initgetrun_refsrun)r-   rD   r<   rG   rH   r   r_   r   r   r   r   r   Zenv_vars_listr   dp_sizerA   rx   refsr&   r`   pgdp_vllm_configrE   actorr#   r#   r$   r.      s   	






zCoreEngineActorManager.__init__rk   c                    s  ddl }ddlm} ddlm} td | jj | jj	}| jj
}t|dgd fdd	d
}|d j ks:J dt|dksK|d j ksKJ d| }| jj}g }	g }
|D ]}|j}||j }d|vrgqXt|d | }| kr||ks~J d| t|D ]*}ddd  dig| ddig }|jjdt|	 d|d}|	| |
| qqXt|D ].}t|	|kr n%ddig| ddig }|jjdt|	 d|d}|	| |
| qqXt|	|k rtd| dt|	 d| |	|
fS )z<
        Create placement groups for data parallel.
        r   N)available_resources_per_node
list_nodesz+Creating placement groups for data parallel)r,   =ZALIVE)filtersc                    
   | j  kS rc   node_ipnodeZdp_master_ipr#   r$   <lambda>-     
 zCCoreEngineActorManager.create_dp_placement_groups.<locals>.<lambda>keyz The head node is missing or deadrO   There can only be one head nodeGPUz<Not enough resources to allocate DP ranks on DP master node       ?node:MbP?CPUdp_rank_STRICT_PACKrM   Zstrategybundlesz!Not enough resources to allocate z  placement groups, only created z( placement groups. Available resources: )r   ray._private.stater   ray.util.stater   r   r   r=   data_parallel_master_iprV   r   sortedr   r   rx   node_idr0   rQ   utilr   rR   
ValueError)rD   r   r   r   num_pg_to_createrA   nodesavailable_resourcesrx   r   r_   r   r   Znode_resourcesavailable_engine_countru   r   r   r#   r   r$   r     s   





z1CoreEngineActorManager.create_dp_placement_groupsold_vllm_confignew_data_parallel_sizec                    s  ddl }ddlm}m} ddlm} | jj}|| }|dkr"g g fS | jj | jj	}| }	t
|	 fddd}	|	d j ksBJ dt|	d	ksS|	d	 j ksSJ d
| }
| }g }g }d}|	D ]}||krl ||fS |j}|j}t|
| d }t|| d }td|| }|| }|| }t|D ]K}||kr nD|| }| krddd  dig| ddig }nddig| ddig }|jjd| d|d}|| || }|| |d	7 }qqa||fS )zB
        Add placement groups for new data parallel size.
        r   N)r   total_resources_per_noder   c                    r   rc   r   r   r   r#   r$   r   ~  r   z@CoreEngineActorManager.add_dp_placement_groups.<locals>.<lambda>r   z$The first node must be the head noderO   r   r   r   r   r   r   r   r   r   )r   r   r   r   r   r   r=   rV   r   rx   r   r   r   r   r0   maxrQ   r   r   rR   )r   r   r   r   r   r   Zold_dp_sizer   rx   r   r   Ztotal_resourcesr   r_   Znum_pg_createdr   r   r   Zavailable_gpusZ
total_gpusZ	used_gpusZused_engines_on_noder   ru   rankr   r   
local_rankr#   r   r$   add_dp_placement_groupsh  st   0


z.CoreEngineActorManager.add_dp_placement_groupscur_vllm_configc              
      s:  dd l }dd l}ddlm} ddlm} ddlm} t| j	t| j
 }||ks3J d| d| d| ||\}	}
|jj}|jj d}|| jdd	iB d
}tt|	|
D ]g\}\}}|| }||}||j_||j_t fdd|jD }|r|d7 }|jj| |j_||j|||d|dj|| j| j|| j||d}|r| j	| n| j
| | j| | j| qV| dd |dkr| j	| d  ng | j
t|	|  d   D  |dkr| j	| d  ng | j
t|	|  d   }|D ]}| j!|j"  q||j_|dkr|j j|7  _d S d S )Nr   r   r   r   zNew data parallel size z1 must be greater than current data parallel size z for scale upZVLLM_ELASTIC_EP_SCALE_UP_LAUNCH1r   c                 3   s$    | ]}| d   ddkV  qdS )r   r   N)r   )rg   Zbundler   r#   r$   rh     s
    
z=CoreEngineActorManager.scale_up_elastic_ep.<locals>.<genexpr>rO   r   r   r   c                 S   s   g | ]}|j  qS r#   )r   r   )rg   r   r#   r#   r$   rm         z>CoreEngineActorManager.scale_up_elastic_ep.<locals>.<listcomp>)#r   r   r   r   r   r   r   r   r   r   r   r   r=   rx   r   r   	enumeraterW   r   rV   r   anyZbundle_specsr   r   r   rG   rH   r<   rR   r   r   r   r   r   )r-   r   r   r   r   r   r   r   cur_data_parallel_sizer   r_   rx   Znew_local_enginesr   ru   r   r   r   r   rE   r   Zactorsr#   r   r$   scale_up_elastic_ep  s   





	



z*CoreEngineActorManager.scale_up_elastic_epr   c                 C   sv   dd l }||ksJ d| d| dt|| D ]}| j }| j }|r-| j  n| j  |j| qd S )Nr   zcur_data_parallel_size z- must be greater than new_data_parallel_size z for scale down)	r   rQ   r   popr   r   r   r   remove_placement_group)r-   r   r   r   _r   is_localr#   r#   r$   scale_down_elastic_ep  s   



z,CoreEngineActorManager.scale_down_elastic_epc                 C   s   | j S rc   )r   rb   r#   r#   r$   get_run_refs.  s   z#CoreEngineActorManager.get_run_refsc                 C   s@   dd l }| j| j D ]}|| q
| jD ]}|j| qd S )Nr   )r   r   r   killr   r   r   )r-   r   r   r   r#   r#   r$   r]   1  s   
zCoreEngineActorManager.close)NN)r   r   r   r/   r   r2   rr   r   r1   r   r8   r0   r.   staticmethodtupler   r   r   r   r   r]   r#   r#   r#   r$   r      sV    


ZNV
_
r   rO   rG   rH   num_api_serversc                 #   sH   | j }|j}|j|j}|j}|j|jp|j}|du}	|	p$|p$|k t fddt	|D  fddt	|D d}
|dkoI|	 oI|dk}|rft
|}| \|
_|
_| |
_td|jj nd}|jd	krtd
 t| |
||d}|||
fV  dS |	rdksJ t|ddg}n"|dkrfddt	|D }n|sJ ddd t	|| D }|	p|k}t||j}|r|dkr|rJ t }|}n|}d}t|tjdd<}ddlm} rt|j | ||||d||pdd
}nd}|||
fV  t!||
||| j"||r|jnd W d   dS 1 sw   Y  dS )z5Launch engine and DP coordinator processes as needed.Nc                       g | ]}t  qS r#   r   rg   r   client_local_onlyhostr#   r$   rm   [  r   z'launch_core_engines.<locals>.<listcomp>c                    r   r#   r   r   r   r#   r$   rm   _  r   )r3   r4   rO   r   z(Started DP Coordinator process (PID: %d)r   z(Starting ray-based data parallel backend)rD   r<   rG   rH   Tr&   r'   c                    s   g | ]
}t || k d qS )r   r%   rt   )rA   r#   r$   rm     s    zcAttempting to launch core_engines from dp_rank > 0, but found internal DPLB, which is incompatible.c                 S   s   g | ]}t |d dqS )Tr   r   rt   r#   r#   r$   rm     s    
)bind)EngineCoreProc)	rD   rG   rH   rF   rI   rE   rA   rB   rC   )#r=   rV   r   Zdata_parallel_rank_localZdata_parallel_rankr   data_parallel_hybrid_lbdata_parallel_external_lbr2   rQ   r   Zget_engine_socket_addressesr5   r6   Zget_stats_publish_addressr7   r   r   ra   pidZdata_parallel_backendr   r%   r   Zdata_parallel_rpc_portr   r   zmqZROUTERr   r   r?   Zrun_engine_corewait_for_engine_startupcache_config)rD   rG   rH   r   r=   r   rC   rJ   Zlocal_engines_onlyZoffline_moder<   Zrun_coordinatorZcoordinatorZengine_actor_managerZengines_to_handshakeZhandshake_local_onlyrF   Zlocal_handshake_addressrI   handshake_socketr   Zlocal_engine_managerr#   )r   r   rA   r$   launch_core_engines9  s   



	$r   r   r<   core_enginesr=   r   proc_managercoord_processc              
      st  |j }t|| }||gddg}	}
t }|| tj |j o$|j }|d ur7| D ]	}||tj q-|d urC||j	tj t
|	sLt
|
r|t}|snt
|	r`tjdg|	R   t
|
rmtjdg|
R   qCt|dks||d d | kr|r| ni }|d ur|jd ur|j||j< td| |  \ }t d}t fdd|D d }|d u rtd	| tj|}|d
 |d |d }}}||jkrt| d|rdnd d| d|jrd d |s||kr|rtd| dtd| d|dkrQ|jtjkrQtjt||j |j!|j"dd}| j# |fdd |	|r8dnd  d8  < |
|rEdnd  d7  < tj$|_nP|dkr|jtj$kr|j%pbd}||d 7 }||_%|j&d u rx|'d|_&|
|r~dnd  d8  < tj(|_ntd| d|rdnd d| d|j d	td ||rdnd| t
|	sLt
|
sLd S d S )!Nr   z?Waiting for %d local, %d remote core engine proc(s) to connect.z=Waiting for %d local, %d remote core engine proc(s) to start.rO   zNEngine core initialization failed. See root cause above. Failed core proc(s): r)   c                 3   s    | ]
}|j  kr|V  qd S rc   )r+   )rg   r   Zeng_identityr#   r$   rh     s    z*wait_for_engine_startup.<locals>.<genexpr>z8Message from engine with unexpected data parallel rank: statusr'   headlessz message from r   z engine z, expected it to be zRemote engine z9 must not use --headless in external or hybrid dp lb modez< must use --headless unless in external or hybrid dp lb modeZHELLO)r   data_parallel_master_portrV   )r<   r=   F)r   r"   num_gpu_blocksZdp_stats_addresszUnexpected z message for z in z state.z"%s from %s core engine process %s.))r   r   r   ZPollerregisterPOLLINr   r   ro   re   r   pollSTARTUP_POLL_PERIOD_MSr   debugr\   rp   rM   RuntimeErrorZrecv_multipartr0   
from_bytesnextmsgspecmsgpackdecoder'   r,   r   r    encoder;   r   r   rV   Zsend_multipartr!   r  r7   r   r"   )r   r<   r   r=   r   r   r   Zlocal_countZremote_countZconn_pendingZstart_pendingZpollerZremote_should_be_headlessre   eventsfinishedZready_msg_bytesZ	eng_indexZenginemsgr   r'   r   Zinit_messager  r#   r   r$   r     s   








r   )rO   )BrY   r|   rS   collections.abcr   dataclassesr   enumr   r   multiprocessingr   r   Zmultiprocessing.processr   typingr	   r
   r   r   Zunittest.mockr   r
  r   Zvllm.configr   r   r   Zvllm.loggerr   Zvllm.platformsr   Zvllm.ray.ray_envr   Z
vllm.utilsr   r   r   Zvllm.v1.engine.coordinatorr   Zvllm.v1.executor.abstractr   Zvllm.v1.utilsr   r   Zray.util.placement_groupr   r   r   r  r   r%   r2   r;   r?   contextmanagerr0   rX   r   rr   r1   r   r   ZSocketr8   r   r#   r#   r#   r$   <module>   s   
	Q    