o
    1 i}I                     @   s\  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
 d dlmZmZmZmZmZ d dlZd dlZd dlm  m  mZ d dlm  m  mZ d dlmZ d dlmZmZmZ d dlm Z  d d	l!m"Z" ertd d
l#m$Z$ e %e&Z'dZ(dede)fddZ*dej+de)fddZ,deej+ ddddfddZ-G dd dZ.G dd dej/Z0dS )    N)defaultdict)Queue)EventLockThread)TYPE_CHECKINGAnyDictIteratorUnion)disable_client_hook)CLIENT_SERVER_MAX_THREADSOrderedResponseCache_propagate_error_in_context)loads_from_client)log_once)RayletServicer
   contextreturnc                 C   sD   t |  }|d}|du s|dvrtd| d dS |dkS )zE
    Get `reconnecting` from gRPC metadata, or False if missing.
    reconnectingN)TrueFalsez9Client connecting with invalid value for "reconnecting": zF, This may be because you have a mismatched client and server version.Fr   )dictinvocation_metadatagetloggererror)r   metadataval r    o/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/util/client/server/dataservicer.py_get_reconnecting_from_context   s   

r"   reqc                 C   s^   |  d}|dkr| jjrdS |dkr| jj| jjd kS |dkr+| jj| jjd kS |dvS )a  
    Returns True if the response should to the given request should be cached,
    false otherwise. At the moment the only requests we do not cache are:
        - asynchronous gets: These arrive out of order. Skipping caching here
            is fine, since repeating an async get is idempotent
        - acks: Repeating acks is idempotent
        - clean up requests: Also idempotent, and client has likely already
             wrapped up the data connection by this point.
        - puts: We should only cache when we receive the final chunk, since
             any earlier chunks won't generate a response
        - tasks: We should only cache when we receive the final chunk,
             since any earlier chunks won't generate a response
    typer   Fput   task)acknowledgeconnection_cleanup)
WhichOneofr   asynchronousr%   chunk_idtotal_chunksr'   )r#   req_typer    r    r!   _should_cache/   s   
r/   grpc_input_generatoroutput_queuezEQueue[Union[ray_client_pb2.DataRequest, ray_client_pb2.DataResponse]]c              
   C   s~   z8z| D ]}| | qW n tjy( } ztd|  W Y d}~nd}~ww W | d dS W | d dS | d w )z<
    Pushes incoming requests to a shared output_queue.
    zHclosing dataservicer reader thread grpc error reading request_iterator: N)r%   grpcZRpcErrorr   debug)r0   r1   r#   er    r    r!   
fill_queueG   s    	r5   c                   @   s@   e Zd ZdZdd Zdejdeejej	f fddZ
dd	 Zd
S )ChunkCollectorzR
    Helper class for collecting chunks from PutObject or ClientTask messages
    c                 C      d | _ d| _t | _d S Ncurr_req_idlast_seen_chunk_id	bytearraydataselfr    r    r!   __init__`      zChunkCollector.__init__r#   chunkc                 C   s   | j d ur| j |jkrtd| j  d|j d|j| _ | jd }|j|k r(d S |j|kr:td|j d|j d|j| jd krM| j|j |j| _|jd |jkS )Nz1Expected to receive a chunk from request with id z, but found z	 instead.r&   zA chunk z of request z was received out of order.)r;   req_idRuntimeErrorr<   r,   r>   extendr-   )r@   r#   rC   Z
next_chunkr    r    r!   	add_chunke   s(   


zChunkCollector.add_chunkc                 C   r7   r8   r:   r?   r    r    r!   reset~   rB   zChunkCollector.resetN)__name__
__module____qualname____doc__rA   ray_client_pb2DataRequestr   Z
PutRequestZ
ClientTaskrG   rH   r    r    r    r!   r6   [   s    
r6   c                   @   s<   e Zd ZdddZdd Zdeded	efd
dZdd Z	dS )DataServicerbasic_servicer   c                 C   sF   || _ t | _d| _i | _i | _tt| _t	 | _
t | _t | _d S )Nr   )rP   r   clients_locknum_clientsclient_last_seenreconnect_grace_periodsr   r   response_cachesr   stoppedr6   put_request_chunk_collectorclient_task_chunk_collector)r@   rP   r    r    r!   rA      s   zDataServicer.__init__c           "      c   sv   t   }d}t| }|d}|d u rtd d S td| d | |||}| j| }d}	|s8d S zzt	 }
t
td||
fd}|  	 t|
jd D ]}t|tjr`|V  qSt|tjshJ t|r|	r||j}t|tr{||d ur|V  qSd }|d}|d	kr| j|j}tj|d
}| j |jj| j|< |jjdkrd}	W d    n1 sw   Y  nr|dkr|jjr| j|j||j|
}|d u rqSn| j|j|}tj|d}nI|dkr| j !||j"sqS| j#| j j$|j"j%||j"j&}| j '  tj|d}n|dkr8g }|j(j)D ]}| j(||}|*| qtjtj+|dd}n|dkrFtj| , d}n|dkrm| j | j-|j.}tj|d}W d    n	1 sgw   Y  n|dkrd}t/ }tj|d}n|dkr|0|j1j qS|dkr| j< |j2}| j3!||s	 W d    qSt4| j3j$| j\}}| j3'  | j5|j2|||}tj|d}~~W d    n	1 sw   Y  nW|dkr| j | j6|j7|}tj|d}W d    n	1 sw   Y  n/|dkr)| j | j8|j9}tj|d}W d    n	1 s#w   Y  ntd | d!|j|_t|rD|	rD|:|j| |V  qSW n1 tyz } z$t;d" t<||}|=|}|rg|rp|>t?j@jA d}W Y d }~nd }~ww W td#|  |BtC |D rtd$EtC | j|} |s| d urtd%|  d& | jFjG| d' ntd( | j || jHvrtd) 	 W d    d S | jH| }!|!|krtd* 	 W d    d S | jI| | jH|= || jv r| j|= || jv r| j|= |  jJd+8  _Jtd,| d-| jJ  tK  | jJdkr.td. tLM  W d    n1 s9w   Y  W d    d S W d    d S 1 sRw   Y  d S td#|  |BtC |D rstd$EtC | j|} |s| d urtd%|  d& | jFjG| d' ntd( | j || jHvrtd) 	 W d       Y d S | jH| }!|!|krtd* 	 W d       Y d S | jI| | jH|= || jv r| j|= || jv r| j|= |  jJd+8  _Jtd,| d-| jJ  tK  | jJdkrtd. tLM  W d    n1 sw   Y  W d    w W d    w 1 s5w   Y  w )/NF	client_idz#Client connecting with no client_idz New data connection from client z: T)targetdaemonargsr$   init)r]   r   r   )r   r%   )r%   release)ok)r^   connection_info)r`   prep_runtime_env)ra   r)   )r)   r(   r'   )Ztask_ticket	terminate)rb   list_named_actors)rc   zUnreachable code: Request type z not handled in DatapathzError in data channel:zStream is broken with client z5Queue filler thread failed to join before timeout: {}z-Cleanup wasn't requested, delaying cleanup byz	 seconds.)timeoutz/Cleanup was requested, cleaning up immediately.zConnection already cleaned up.z$Client reconnected, skipping cleanupr&   zRemoved client z, remaining=zShutting down ray.)Ntimer   r   r   r   r   r3   _initrU   r   r   r5   startiter
isinstancerM   ZDataResponserN   r/   check_cacherD   	Exceptionr*   rP   ZInitr]   rQ   Zreconnect_grace_periodrT   r+   Z_async_get_objectZ_get_objectrW   rG   r%   Z_put_objectr>   Zclient_ref_idZowner_idrH   r^   idsappendZReleaseResponse_build_connection_responseZPrepRuntimeEnvra   ZConnectionCleanupResponsecleanupr(   r'   rX   r   ZScheduleZ	Terminaterb   ZListNamedActorsrc   Zupdate_cache	exceptionr   Z
invalidateset_coder2   
StatusCodeZFAILED_PRECONDITIONjoinQUEUE_JOIN_SECONDSis_aliveformatrV   waitrS   Zrelease_allrR   r   rayshutdown)"r@   Zrequest_iteratorr   
start_timeZcleanup_requestedr   rY   Zaccepted_connectionZresponse_cacheZreconnect_enabledZrequest_queueZqueue_filler_threadr#   Zcached_resprespr.   Z	resp_initZget_respZput_respZreleasedZrel_idrelZ	resp_prepZcleanup_respr'   ZarglistkwargsZresp_ticketresponser4   ZrecoverableZinvalid_cacheZcleanup_delayZ	last_seenr    r    r!   Datapath   s  



















u


	







 $







  zDataServicer.DatapathrY   r   rz   c              
   C   s,  | j  t|}ttd }| j|kr@td| j d| d| d tdr0tdt d |t	j
j 	 W d	   d
S |r\|| jvr\|t	j
j |d 	 W d	   d
S || jv rktd| d n|  jd7  _td| d| j  || j|< 	 W d	   dS 1 sw   Y  d	S )z
        Checks if resources allow for another client.
        Returns a boolean indicating if initialization was successful.
           z[Data Servicer]: Num clients z has reached the threshold z. Rejecting client: z. Zclient_thresholdzyYou can configure the client connection threshold by setting the RAY_CLIENT_SERVER_MAX_THREADS env var (currently set to z).NFzEAttempted to reconnect to a session that has already been cleaned up.zClient z has reconnected.r&   zAccepted data connection from z. Total clients: T)rQ   r"   intr   rR   r   warningr   rq   r2   rr   ZRESOURCE_EXHAUSTEDrS   	NOT_FOUNDZset_detailsr3   )r@   rY   r   rz   r   	thresholdr    r    r!   rf   k  sL   



$zDataServicer._initc                 C   s^   | j  | j}W d    n1 sw   Y  tj|dtjd tjd tjd tjtj	dS )Nz{}.{}.{}r   r&   r   )rR   python_versionZray_versionZ
ray_commit)
rQ   rR   rM   ZConnectionInfoResponserv   sysversion_inforx   __version__Z
__commit__)r@   Zcur_num_clientsr    r    r!   rn     s   z'DataServicer._build_connection_responseN)rP   r   )
rI   rJ   rK   rA   r   strr   floatrf   rn   r    r    r    r!   rO      s    
 R+rO   )1loggingr   re   collectionsr   queuer   	threadingr   r   r   typingr   r   r	   r
   r   r2   rx   Z!ray.core.generated.ray_client_pb2core	generatedrM   Z&ray.core.generated.ray_client_pb2_grpcZray_client_pb2_grpcZray._private.client_mode_hookr   Zray.util.client.commonr   r   r   Z%ray.util.client.server.server_picklerr   Zray.util.debugr   Zray.util.client.server.serverr   	getLoggerrI   r   rt   boolr"   rN   r/   r5   r6   ZRayletDataStreamerServicerrO   r    r    r    r!   <module>   s<    

)