o
    i                      @   s"  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZ d dlmZmZ dd	lmZ dd
lmZmZmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$ edj%Z&dZ'dZ(G dd de)e	Z*G dd dZ+dd Z,dS )    N)Iterable)datetime)Enum)get_context)OptionalUnion)ConnectionPoolRedis   )parse_connection)DEFAULT_LOGGING_DATE_FORMATDEFAULT_LOGGING_FORMAT!DEFAULT_SCHEDULER_FALLBACK_PERIOD)Job)setup_loghandlers)Queue)ScheduledJobRegistry)resolve_serializer)current_timestampparse_namesforkzrq:scheduler:%szrq:scheduler-lock:%sc                   @   s   e Zd ZdZdZdZdS )SchedulerStatusstartedZworkingstoppedN)__name__
__module____qualname__STARTEDWORKINGSTOPPED r    r    C/home/app/Keep/.python/lib/python3.10/site-packages/rq/scheduler.pyr      s    r   c                   @   s   e Zd ZeZdejeedfde	de
eef fddZedd Zed	d
 Zedd Zedd Zd(ddZd)deee  fddZedefddZdd Zdd Zd*ddZdd Zd d! Zd"d# Zd$d% Zd&d' Z dS )+RQSchedulerr
   N
connectionlogging_levelc                 C   s   t t|| _t  | _g | _d | _t|\| _| _| _	t
|| _d | _|| _d| _| jj| _d | _tt| _t|t||d d S )NF)levelname
log_formatdate_format)setr   _queue_names_acquired_locks_scheduled_job_registrieslock_acquisition_timer   _connection_class_pool_class_pool_kwargsr   
serializer_connectioninterval_stop_requestedStatusr   _status_processlogging	getLoggerr   logr   )selfqueuesr#   r3   r$   r(   r'   r1   r    r    r!   __init__*   s$   



zRQScheduler.__init__c                 C   s2   | j r| j S | jtdd| ji| jd| _ | j S )NZconnection_class)Zconnection_poolr    )r2   r.   r   r/   r0   r;   r    r    r!   r#   H   s   zRQScheduler.connectionc                 C      | j S N)r+   r>   r    r    r!   acquired_locksQ      zRQScheduler.acquired_locksc                 C   r?   r@   )r6   r>   r    r    r!   statusU   rB   zRQScheduler.statusc                 C   s0   | j | jkrdS | jsdS t | j  tkS )zCReturns True if lock_acquisition_time is longer than 10 minutes agoFT)r*   rA   r-   r   nowtotal_secondsr   r>   r    r    r!   should_reacquire_locksY   s
   z"RQScheduler.should_reacquire_locksFc                 C   s   t  }t }| jdd| j | jD ]}| jj | ||d| j	d dr3| j
d| || qg | _| j|| _t | _| jrT|rT| jrP| j sT|   |S )z7Returns names of queue it successfully acquires lock onzAcquiring scheduler lock for %s, T<   )nxexzAcquired scheduler lock for %s)r)   osgetpidr:   debugjoinr*   r#   get_locking_keyr3   infoaddr,   r+   unionr   rD   r-   r7   is_alivestart)r;   Z
auto_startZsuccessful_lockspidr&   r    r    r!   acquire_locksb   s   
 


zRQScheduler.acquire_locksqueue_namesc                 C   s8   g | _ |s| j}|D ]}| j t|| j| jd q
dS )z(Prepare scheduled job registries for user#   r1   N)r,   r+   appendr   r#   r1   )r;   rW   r&   r    r    r!   prepare_registriesy   s   zRQScheduler.prepare_registriesr&   c                 C   s   t | S )z,Returns scheduler key for a given queue name)SCHEDULER_LOCKING_KEY_TEMPLATE)clsr&   r    r    r!   rO      s   zRQScheduler.get_locking_keyc           	   	   C   s   | j j| _| js| jr|   | jD ]Z}t }||}|sqt|j	| j
| jd}| j
 6}tj|| j
| jd}|D ]}|durL|j||t|jd q;|D ]	}|j||d qO|  W d   n1 sgw   Y  q| j j| _dS )z+Enqueue jobs whose timestamp is in the pastrX   N)pipelineZat_front)r]   )r5   r   r6   r,   r+   rZ   r   Zget_jobs_to_scheduler   r&   r#   r1   r]   r   Z
fetch_manyZ_enqueue_jobboolZenqueue_at_frontremoveexecuter   )	r;   registry	timestampZjob_idsqueuer]   jobsjobZjob_idr    r    r!   enqueue_scheduled_jobs   s*   



z"RQScheduler.enqueue_scheduled_jobsc                 C   s$   t  t j| j t  t j| j dS )zUInstalls signal handlers for handling SIGINT and SIGTERM
        gracefully.
        N)signalSIGINTrequest_stopSIGTERMr>   r    r    r!   _install_signal_handlers   s   z$RQScheduler._install_signal_handlersc                 C   s
   d| _ dS )z8Toggle self._stop_requested that's checked on every loopTN)r4   )r;   signumframer    r    r!   ri      s   
zRQScheduler.request_stopc                 C   s   | j dd| j t| jdkrB| j !}| jD ]}| |}|	|| j
d  q|  W d   dS 1 s;w   Y  dS | jr[| tt| j}| j	|| j
d  dS dS )z/Updates the TTL on scheduler keys and the locksz!Scheduler sending heartbeat to %srG   r
   rH   N)r:   rM   rN   rA   lenr+   r#   r]   rO   Zexpirer3   r`   nextiter)r;   r]   r&   keyr    r    r!   	heartbeat   s   


"zRQScheduler.heartbeatc                 C   s,   | j dd| j |   | jj| _d S )Nz-Scheduler stopping, releasing locks for %s...rG   )r:   rP   rN   r+   release_locksr5   r   r6   r>   r    r    r!   stop   s   zRQScheduler.stopc                    s,    fdd j D } jj|  t  _ dS )zRelease acquired locksc                    s   g | ]}  |qS r    )rO   ).0r&   r>   r    r!   
<listcomp>   s    z-RQScheduler.release_locks.<locals>.<listcomp>N)r+   r#   deleter)   )r;   keysr    r>   r!   rs      s   zRQScheduler.release_locksc                 C   s2   | j j| _d | _tt| fdd| _| j  | jS )NZ	Scheduler)targetargsr&   )r5   r   r6   r2   ForkProcessrunr7   rT   r>   r    r    r!   rT      s
   

zRQScheduler.startc                 C   sH   |    	 | jr|   d S | jr|   |   |   t| j	 qr@   )
rk   r4   rt   rF   rV   rf   rr   timesleepr3   r>   r    r    r!   work   s   zRQScheduler.work)Fr@   )NN)!r   r   r   r   r5   r8   INFOr   r   r	   r   strintr=   propertyr#   rA   rC   rF   rV   r   r   rZ   classmethodrO   rf   rk   ri   rr   rt   rs   rT   r   r    r    r    r!   r"   #   s@    








	r"   c              	   C   s`   | j dd| jt  z|   W n   | j dt t	   | j dt  d S )Nz$Scheduler for %s started with PID %srG   z*Scheduler [PID %s] raised an exception.
%sz!Scheduler with PID %d has stopped)
r:   rP   rN   r*   rK   rL   r   error	traceback
format_exc)Z	schedulerr    r    r!   r|      s   r|   )-r8   rK   rg   r}   r   collections.abcr   r   enumr   multiprocessingr   typingr   r   Zredisr   r	   connectionsr   defaultsr   r   r   re   r   Zlogutilsr   rc   r   ra   r   Zserializersr   utilsr   r   Processr{   ZSCHEDULER_KEY_TEMPLATEr[   r   r   r"   r|   r    r    r    r!   <module>   s4    
 ?