o
    i(                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZmZmZ d dlmZ d dlmZmZ d d	lmZ d
dlmZ d
dlmZmZ d
dlm Z  d
dl!m"Z" d
dl#m$Z$ d
dl%m&Z& d
dl'm(Z(m)Z) edj*Z+erd dlm,Z, G dd deZ-G dd dZ.e)ee e$ddd fde/dee/ de0de1e( ddd e1e  d!e1e$ d"e2d#e/d$e3fd%d&Z4dS )'    N)Iterable)Enum)get_context)BaseProcess)TYPE_CHECKING
NamedTupleOptionalUnion)uuid4)ConnectionPoolRedis)DefaultSerializer   )parse_connection)DEFAULT_LOGGING_DATE_FORMATDEFAULT_LOGGING_FORMAT)Job)setup_loghandlers)Queue)parse_names)
BaseWorkerWorkerfork)
Serializerc                   @   s&   e Zd ZU eed< eed< eed< dS )
WorkerDatanamepidprocessN)__name__
__module____qualname__str__annotations__intr    r$   r$   E/home/app/Keep/.python/lib/python3.10/site-packages/rq/worker_pool.pyr   "   s   
 r   c                   @   sl  e Zd ZG dd deZdeeeefde	e
eef  dededee dd	d
ee dee fddZedee fddZedefddZdd Zd7ddZdefddZdd ZdefddZd8d eddfd!d"Z	#	$d9d%ed&ed'ed(edef
d)d*Z			#	$d:d+e e d&ed'ed(efd,d-Z!d;d&ed'ed(efd.d/Z"e#j$fdefd0d1Z%d2d3 Z&d<d&ed(efd5d6Z'dS )=
WorkerPoolc                   @   s   e Zd ZdZdZdZdS )zWorkerPool.Statusr         N)r   r   r    IDLESTARTEDSTOPPEDr$   r$   r$   r%   Status)   s    r,   r   queues
connectionnum_workersworker_class
serializerr   	job_classqueue_classc           
      O   s   || _ g | _tdtttd tt| _t	|| _
|| _t j| _d| _d| _| jj| _|| _|| _|| _|| _i | _t|\| _| _| _d S )NINFOr   Tr   )r/   Z_workersr   r   r   r   logging	getLoggerlogr   _queue_namesr.   r
   hexr   _burst_sleepr,   r)   statusr0   r1   r2   r3   worker_dictr   _connection_class_pool_class_pool_kwargs)
selfr-   r.   r/   r0   r1   r2   r3   argskwargsr$   r$   r%   __init__.   s    


zWorkerPool.__init__returnc                    s    fdd j D S )Returns a list of Queue objectsc                    s   g | ]
} j | jd qS )r.   )r3   r.   .0r   rB   r$   r%   
<listcomp>Q   s    z%WorkerPool.queues.<locals>.<listcomp>)r9   rK   r$   rK   r%   r-   N   s   zWorkerPool.queuesc                 C   s
   t | jS )rG   )lenr>   rK   r$   r$   r%   number_of_active_workersS   s   
z#WorkerPool.number_of_active_workersc                 C   s$   t  t j| j t  t j| j dS )zUInstalls signal handlers for handling SIGINT and SIGTERM
        gracefully.
        N)signalSIGINTrequest_stopSIGTERMrK   r$   r$   r%   _install_signal_handlersX   s   z#WorkerPool._install_signal_handlersNc                 C   s"   | j d | jj| _|   dS )z8Toggle self._stop_requested that's checked on every loopz)Received SIGINT/SIGTERM, shutting down...N)r8   infor,   r+   r=   stop_workers)rB   signumframer$   r$   r%   rQ   _   s   
zWorkerPool.request_stopc                 C   s   |    | jdkS )z)Returns True if all workers have stopped.r   )reap_workersrN   rK   r$   r$   r%   all_workers_have_stoppede   s   
z#WorkerPool.all_workers_have_stoppedc                 C   s^   | j d t| j }|D ]}|jd |j r'| j d|j|j	 q| 
| qdS )z%Removes dead workers from worker_dictzReaping dead workersg?zWorker %s with pid %d is aliveN)r8   debuglistr>   valuesr   joinis_aliver   r   handle_dead_worker)rB   worker_datasdatar$   r$   r%   rX   k   s   

zWorkerPool.reap_workersworker_datac                 C   sR   | j d|j|j tt | j|j W d   dS 1 s"w   Y  dS )z&
        Handle a dead worker
        zWorker %s with pid %d is deadN)	r8   rT   r   r   
contextlibsuppressKeyErrorr>   pop)rB   rb   r$   r$   r%   r_      s   "zWorkerPool.handle_dead_workerTrespawnc                 C   sj   | j d |   |r-| j| jjkr/| jt| j }|r1t	|D ]}| j
| j| jd q!dS dS dS dS )z7
        Check whether workers are still alive
        zChecking worker processes)burstr<   N)r8   rZ   rX   r=   r,   r+   r/   rM   r>   rangestart_workerr;   r<   )rB   rg   deltair$   r$   r%   check_workers   s   zWorkerPool.check_workersr   r4   r   rh   r<   logging_levelc              
   C   sF   t t|| j| j| j| jf|||| j| j| jdd| d| j	 ddS )zReturns the worker process)r<   rh   rn   r0   r2   r1   zWorker z (WorkerPool ))targetrC   rD   r   )
ForkProcess
run_workerr9   r?   r@   rA   r0   r2   r1   r   )rB   r   rh   r<   rn   r$   r$   r%   get_worker_process   s   zWorkerPool.get_worker_processcountc                 C   sR   t  j}| j||||d}|  t||j|d}|| j|< | jd||j dS )z
        Starts a worker and adds the data to worker_datas.
        * sleep: waits for X seconds before creating worker, for testing purposes
        rh   r<   rn   )r   r   r   zSpawned worker: %s with PID %dN)	r
   r:   rs   startr   r   r>   r8   rZ   )rB   rt   rh   r<   rn   r   r   rb   r$   r$   r%   rj      s   
zWorkerPool.start_workerc                 C   s@   | j d| j d t| jD ]}| j|d |||d qdS )zx
        Run the workers
        * sleep: waits for X seconds before creating worker, only for testing purposes
        z	Spawning z workersr   ru   N)r8   rZ   r/   ri   rj   )rB   rh   r<   rn   rl   r$   r$   r%   start_workers   s   zWorkerPool.start_workersc              
   C   sh   zt |j| | jd|j W dS  ty3 } z|jtjkr'| jd n W Y d}~dS d}~ww )zm
        Send stop signal to worker and catch "No such process" error if the worker is already dead.
        z'Sent shutdown command to worker with %szHorse already deadN)	oskillr   r8   rT   OSErrorerrnoZESRCHrZ   )rB   rb   siger$   r$   r%   stop_worker   s   zWorkerPool.stop_workerc                 C   s:   | j dt| j t| j }|D ]}| | qdS )zSend SIGINT to all workersz!Sending stop signal to %s workersN)r8   rT   rM   r>   r[   r\   r~   )rB   r`   rb   r$   r$   r%   rU      s
   zWorkerPool.stop_workersFc                 C   s   || _ | }t|tttd | jd| j dt	  | j
j| _| j| j |d |   	 | j| j
jkrM|  rA| jd d S | jd td q-| j|d	 |rb| jd
krb| jd d S td q.)Nr5   zStarting worker pool z with pid %d...)rh   rn   TzAll workers stopped, exiting...z"Waiting for workers to shutdown...r   )rg   r   )r;   r   r   r   r   r8   rT   r   rx   getpidr,   r*   r=   rw   rS   r+   rY   timesleeprm   rN   )rB   rh   rn   rg   r$   r$   r%   rv      s*   


zWorkerPool.start)NN)T)r   r4   )NTr   r4   )Tr   r4   )Fr4   )(r   r   r    r   r,   r   r   r   r   r   r	   r!   r   r#   typer   rE   propertyr[   r-   rN   rS   rQ   boolrY   rX   r   r_   rm   floatr   rs   r   rj   rw   rO   rP   r~   rU   rv   r$   r$   r$   r%   r&   (   s|    	
 


	r&   Tr4   worker_namequeue_namesconnection_pool_kwargsr0   r1   r   r2   r3   rh   rn   r<   c                    sp   |t d	d|i|d  fdd|D }|||  ||d}|jdt  t| |j|	d|
d d S )
Nconnection_class)Zconnection_poolc                    s   g | ]}| d qS rH   r$   rI   r.   r3   r$   r%   rL     s    zrun_worker.<locals>.<listcomp>)r   r.   r1   r2   r3   z#Starting worker started with PID %sT)rh   Zwith_schedulerrn   r$   )r   r8   rT   rx   r   r   r   Zwork)r   r   r   Zconnection_pool_classr   r0   r1   r2   r3   rh   rn   r<   r-   workerr$   r   r%   rr      s   
rr   )5rc   r{   r6   rx   rO   r   collections.abcr   enumr   multiprocessingr   Zmultiprocessing.processr   typingr   r   r   r	   uuidr
   Zredisr   r   Zrq.serializersr   connectionsr   defaultsr   r   Zjobr   Zlogutilsr   queuer   utilsr   r   r   r   Processrq   r   r   r&   r!   dictr   r   r#   rr   r$   r$   r$   r%   <module>   sn    
 Z	
