o
    ia                     @   s  U d dl Zd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
m
Z
mZmZ d dlmZmZmZmZ d dlmZ d dlmZ d dlmZ ddlmZ dd	lmZmZmZ dd
lmZmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1 G dd dZ2G dd dZ3g a4e5e6 e7d< dddddedddf	dede8dee9 dee6 dee: dee8 dee: de:dee: dee: dee6 d e6fd!d"Z;d#ed e3fd$d%Z<dS )&    N)datetime	timedeltatimezone)AnyCallableOptionalUnion)croniter)Redis)Pipeline   )cron_scheduler_registry)DEFAULT_LOGGING_DATE_FORMATDEFAULT_LOGGING_FORMATDEFAULT_RESULT_TTL)SchedulerNotFoundStopRequested)Job)setup_loghandlers)Queue)resolve_serializer)	NOT_JSON_SERIALIZABLEdecode_redis_hashnormalize_config_pathnowsafe_json_dumpsstr_to_date	utcformatutcparsevalidate_absolute_pathc                   @   s   e Zd ZdZdddddddedddfdedee dee dee dee	 dee
 d	ee d
ee
 de
dee
 dee
 dee	 fddZdedefddZdefddZdefddZdeddfddZde	eef fddZede	eef dd fdd ZdS )!CronJobz2Represents a function to be run on a time intervalN
queue_namefunc	func_nameargskwargsintervalcronjob_timeout
result_ttlttlfailure_ttlmetac                 C   s   |r|rt d|s|st d|r || _|j d|j | _n|r)d | _|| _nt d|p0d| _|p5i | _|| _|| _|| _	d | _
d | _| jrVt| jt }|t| _
||	|
||d| _dd | j D | _d S )	Nz0Cannot specify both interval and cron parametersz.Must specify either interval or cron parameter.z)Either func or func_name must be provided )r(   r)   r*   r+   r,   c                 S   s   i | ]\}}|d ur||qS Nr.   ).0kvr.   r.   >/home/app/Keep/.python/lib/python3.10/site-packages/rq/cron.py
<dictcomp>Y   s    z$CronJob.__init__.<locals>.<dictcomp>)
ValueErrorr"   
__module____name__r#   r$   r%   r&   r'   r!   next_enqueue_timelatest_enqueue_timer	   r   get_nextr   job_optionsitems)selfr!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   	cron_iterr.   r.   r3   __init__(   s8   

zCronJob.__init__
connectionreturnc                 C   sf   | j stdt| j|d}|j| j g| jR i | j| j}t	t
d| j j
 d| j  |S )z:Enqueue this job to its queue and update the next run timezUCronJob has no function to enqueue. It may have been created for monitoring purposes.r@   zEnqueued job z
 to queue )r"   r5   r   r!   enqueuer$   r%   r;   logging	getLoggerr7   info)r=   r@   queuejobr.   r.   r3   rC   [   s   $"zCronJob.enqueuec                 C   sH   | j rt| j | jpt }|tS | jr!| jr!| jt| jd S tjS )z@Calculate the next run time based on interval or cron expression)seconds)	r'   r	   r9   r   r:   r   r&   r   max)r=   r>   r.   r.   r3   get_next_enqueue_timef   s   
zCronJob.get_next_enqueue_timec                 C   s&   | j s| jsdS | jrt | jkS dS )z Check if this job should run nowTF)r9   r'   r8   r   r=   r.   r.   r3   
should_runr   s
   zCronJob.should_runtimec                 C   s,   || _ | jdus| jdur|  | _dS dS )z<Set latest run time to a given time and update next run timeN)r9   r&   r'   rK   r8   )r=   rN   r.   r.   r3   set_enqueue_time   s   zCronJob.set_enqueue_timec              	   C   s   | j | j| jrt| jnd| jrt| jnd| j| j| jr"t| jnd| j	r+t| j	ndd}| j
 D ]\}}|durH|dkrDt|n|||< q4|S )z@Convert CronJob instance to a dictionary for monitoring purposesN)r#   r!   r$   r%   r&   r'   r9   r8   r,   )r#   r!   r$   r   r%   r&   r'   r9   r   r8   r;   r<   )r=   objr1   r2   r.   r.   r3   to_dict   s   zCronJob.to_dictdatac                 C   s   | d}|r|tkrtt|}| d}|r"|tkr"t|}| d}|r2|tkr2t|}| |d |d ||| d| d| d| d	t| d
| d|d}| drdt|d |_| drpt|d |_|S )zCreate a CronJob instance from dictionary data for monitoring purposes.

        Note: The returned CronJob will not have a func attribute and cannot be executed,
        but contains all the metadata for monitoring.
        r$   r%   r,   r!   r#   r&   r'   r(   r)   r*   r+   )r!   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r9   r8   )	getr   tuplejsonloadsr   r   r9   r8   )clsrR   r$   r%   r,   rH   r.   r.   r3   	from_dict   s6   







zCronJob.from_dict)r7   r6   __qualname____doc__r   strr   r   rT   dictintr?   r
   r   rC   r   rK   boolrM   rO   r   rQ   classmethodrX   r.   r.   r.   r3   r    %   sZ    	

3 r    c                   @   s  e Zd ZdZejdfdedeee	f defddZ
defd	d
Zde	fddZdddddedddf	dededee dee dee	 dee dee	 de	dee	 dee	 dee defddZdee fddZdee fddZdefdd Zd!d" Zd#d$ Zd%d& Zd'efd(d)Zedefd*d+Zdefd,d-Z dDd.ee! ddfd/d0Z"dEd1d2Z#d3eddfd4d5Z$e%dededd fd6d7Z&e%dFded9eded  fd:d;Z'dEd<d=Z(dDd.ee! ddfd>d?Z)dEd@dAZ*edee+ fdBdCZ,dS )GCronSchedulerz*Simple interval-based job scheduler for RQ r@   logging_levelnamec                 C   s   || _ g | _t | _t | _|p$| j d| j dt	 j
d d  | _d| _t | _t | _tt| _| j sJt|tttd d| j_d S d S )N:   ra   )levelrc   Z
log_formatZdate_formatF)r@   
_cron_jobssocketgethostnamehostnameosgetpidpiduuiduuid4hexrc   config_filer   
created_atr   
serializerrD   rE   r7   loghasHandlersr   r   r   	propagate)r=   r@   rb   rc   r.   r.   r3   r?      s$   

,
zCronScheduler.__init__rA   c                 C   s   t || jsdS | j|jkS )z;Equality does not take the database/connection into accountF)
isinstance	__class__rc   )r=   otherr.   r.   r3   __eq__   s   zCronScheduler.__eq__c                 C   s
   t | jS )z;The hash does not take the database/connection into account)hashrc   rL   r.   r.   r3   __hash__   s   
zCronScheduler.__hash__Nr"   r!   r$   r%   r&   r'   r(   r)   r*   r+   r,   c                 C   s   t |||||||||	|
|d}| j| |j d|j }|r2| jd| d| d| d |S |rD| jd| d| d| d |S )	z2Register a function to be run at regular intervals)r!   r"   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   zRegistered 'z' to run on z every z secondsz with cron schedule '')r    rg   appendr6   r7   rt   rF   )r=   r"   r!   r$   r%   r&   r'   r(   r)   r*   r+   r,   Zcron_jobjob_keyr.   r.   r3   register   s*     zCronScheduler.registerc                 C   s   | j S )zGet all registered cron jobs)rg   rL   r.   r.   r3   get_jobs  s   zCronScheduler.get_jobsc                 C   sB   t  }g }| jD ]}| r|| j || || q|S )z$Enqueue all jobs that are due to run)r   rg   rM   rC   r@   rO   r~   )r=   Zenqueue_timeZenqueued_jobsrH   r.   r.   r3   enqueue_jobs  s   


zCronScheduler.enqueue_jobsc                 C   sH   t  }dd | jD }|sdS t|}||  }|dkrdS t|dS )zCalculate how long to sleep until the next job is due.

        Returns the number of seconds to sleep, with a maximum of 60 seconds
        to ensure we check regularly.
        c                 S   s   g | ]}|j r|j qS r.   )r8   r0   rH   r.   r.   r3   
<listcomp>-  s    z:CronScheduler.calculate_sleep_interval.<locals>.<listcomp><   r   )r   rg   mintotal_seconds)r=   current_timeZnext_job_timesZclosest_timeZseconds_until_nextr.   r.   r3   calculate_sleep_interval$  s   
z&CronScheduler.calculate_sleep_intervalc                 C   s$   t  t j| j t  t j| j dS )z.Install signal handlers for graceful shutdown.N)signalSIGINT_request_stopSIGTERMrL   r.   r.   r3   _install_signal_handlers?  s   z&CronScheduler._install_signal_handlersc                 C   s   | j d| j| t )z#Handle shutdown signals gracefully.z-CronScheduler %s: received shutdown signal %s)rt   rF   rc   r   )r=   signumframer.   r.   r3   r   D  s   zCronScheduler._request_stopc              
   C   s
  | j d| j |   |   zgz'	 |  }|r|   |   |  }|dkr8| j 	d| d t
| q tyI   | j d| j Y n tyY   | j d| j Y nw W |   | j d| j d	S W |   | j d| j d	S |   | j d| j w )
zStart the cron schedulerzCronScheduler %s: starting...Tr   zSleeping for z seconds...z,CronScheduler %s: received KeyboardInterruptz CronScheduler %s: stop requestedz#CronScheduler %s: shutdown completeN)rt   rF   rc   r   register_birthr   save_jobs_data	heartbeatr   debugrN   sleepKeyboardInterruptr   register_death)r=   ZenqueuedZ
sleep_timer.   r.   r3   startI  s6   

zCronScheduler.startconfig_pathc           
      C   s  || _ | jd|  g atj|r| jd|  t| dtj	|
dd }z;tj||}|du s?|jdu rNd| }| j| t|tj|}|tj|< |j| | jd|  W n ty } z|tjv rztj|= d	| d
| }| j| t||d}~ww t|}| jd|  z|tjv rttj|  nt| | jd|  W nD ty } zd| d| d| }| j| t||d}~w ty } zd| d| d| }| j| t||d}~ww d}tD ]B}	| jd|	d j  z| jdi |	 |d7 }W q tyH } z| jjd|	d j d| dd W Y d}~qd}~ww g a| jd| d| d dS )a  
        Dynamically load a cron config file and register all jobs with this Cron instance.

        Supports both dotted import paths (e.g. 'app.cron_config') and file paths
        (e.g. '/path/to/app/cron_config.py', 'app/cron_config.py'). The .py
        extension is recommended for file paths for clarity.

        Jobs defined in the config file must use the global `rq.cron.register` function.

        Args:
            config_path: Path to the cron_config.py file or module path.
        z Loading cron configuration from zLoading absolute file path: Zrq_cron_config_r-   _Nz!Could not create module spec for z&Successfully loaded config from file: z#Failed to load configuration file 'z': zNormalized path: z(Successfully loaded config from module: z'Failed to import configuration module 'z	' (from 'z'): z#An error occurred while importing 'r   zRegistering job from config: r"   r   zFailed to register job z from config: T)exc_infozSuccessfully registered z cron jobs from 'r}   r.   )rq   rt   rF   _job_data_registryrk   pathisabsr   r   basenamereplace	importlibutilspec_from_file_locationloadererrorImportErrormodule_from_specsysmodulesexec_module	Exceptionr   reloadimport_moduler7   r   )
r=   r   module_namespec	error_msgmoduleeZnormalized_pathZ	job_countrR   r.   r.   r3   load_config_from_filee  sn   







0z#CronScheduler.load_config_from_filec                 C   s   d| j  S )z)Redis key for this CronScheduler instancerq:cron_scheduler:)rc   rL   r.   r.   r3   key  s   zCronScheduler.keyc              	   C   s>   | j t| j| jt| j| jpdtdd | j	D d}|S )z@Convert CronScheduler instance to a dictionary for Redis storagera   c                 S      g | ]}|  qS r.   rQ   r   r.   r.   r3   r         z)CronScheduler.to_dict.<locals>.<listcomp>)rj   rm   rc   rr   rq   	cron_jobs)
rj   r[   rm   rc   r   rr   rq   rU   dumpsrg   )r=   rP   r.   r.   r3   rQ     s   zCronScheduler.to_dictpipelinec                 C   s8   |dur|n| j }|j| j|  d || jd dS )z2Save CronScheduler instance to Redis hash with TTLN)mappingr   )r@   hsetr   rQ   expire)r=   r   r@   r.   r.   r3   save  s   zCronScheduler.savec                 C   s,   t dd | jD }| j| jd| dS )zSave cron jobs data to Redis.c                 S   r   r.   r   r   r.   r.   r3   r     r   z0CronScheduler.save_jobs_data.<locals>.<listcomp>r   N)rU   r   rg   r@   r   r   )r=   rR   r.   r.   r3   r     s   zCronScheduler.save_jobs_dataraw_datac              
   C   s   t |dd}|d | _t|dd| _|d | _t|d | _|d | _|d	r`zt	
|d	 }d
d |D | _W dS  t	jttfy_ } z| jd|  g | _W Y d}~dS d}~ww g | _dS )z4Restore CronScheduler instance from Redis hash data.T)Zdecode_valuesrj   rm   r   rc   rr   rq   r   c                 S   s   g | ]}t |qS r.   )r    rX   )r0   job_datar.   r.   r3   r     s    z)CronScheduler.restore.<locals>.<listcomp>zFailed to restore cron jobs: N)r   rj   r]   rS   rm   rc   r   rr   rq   rU   rV   rg   JSONDecodeErrorKeyError	TypeErrorrt   warning)r=   r   rP   Z	jobs_datar   r.   r.   r3   restore  s    




zCronScheduler.restorec                 C   sB   d| }| |}|std| d| ||d}|| |S )z2Fetch a CronScheduler instance from Redis by name.r   zCronScheduler with name 'z' not found)r@   rc   )Zhgetallr   r   )rW   rc   r@   r   r   	schedulerr.   r.   r3   fetch  s   


zCronScheduler.fetchTcleanupc              	   C   st   ddl m} |rt| t|}g }|D ]!}|t | ||}|| W d   n1 s2w   Y  q|S )a  Returns all CronScheduler instances from the registry

        Args:
            connection: Redis connection to use
            cleanup: If True, removes stale entries from registry before fetching schedulers

        Returns:
            List of CronScheduler instances
        r   )suppressN)
contextlibr   r   r   Zget_keysr   r   r~   )rW   r@   r   r   Zscheduler_namesZ
schedulersrc   r   r.   r.   r3   all  s   


zCronScheduler.allc                 C   sd   | j d| j d | j }t| | | | |  W d   dS 1 s+w   Y  dS )zURegister this scheduler's birth in the scheduler registry and save data to Redis hashCronScheduler z: registering birth...N)	rt   rF   rc   r@   r   r   r   r   executer=   r   r.   r.   r3   r     s   

"zCronScheduler.register_birthc                 C   s&   | j d| j d t| | dS )zJRegister this scheduler's death by removing it from the scheduler registryr   z: registering death...N)rt   rF   rc   r   
unregisterr   r.   r.   r3   r     s   zCronScheduler.register_deathc                 C   s   | j  I}|jt | jt iddd || jd |	 }|d }|r3| j
d| j d n| j
d| j d W d   dS W d   dS 1 sQw   Y  dS )	zSend a heartbeat to update this scheduler's last seen timestamp in the registry
        and extend the scheduler's Redis hash TTL.
        T)xxchx   r   r   z: heartbeat sent successfullyz4: heartbeat failed - scheduler not found in registryN)r@   r   Zzaddr   get_registry_keyrc   rN   r   r   r   rt   r   r   )r=   piperesultsZzadd_resultr.   r.   r3   r   !  s    "zCronScheduler.heartbeatc                 C   s0   | j t | j}|du rdS tj|tjdS )zReturn the UTC datetime of the last heartbeat, or None if no heartbeat recorded

        Returns:
            datetime: UTC datetime of the last heartbeat, or None if scheduler not found in registry
        N)tz)	r@   Zzscorer   r   rc   r   fromtimestampr   utc)r=   Zscorer.   r.   r3   last_heartbeat1  s   zCronScheduler.last_heartbeatr/   )rA   N)T)-r7   r6   rY   rZ   rD   INFOr
   r   r[   r]   r?   r^   rz   r|   r   r   r   rT   r\   r    r   listr   r   floatr   r   r   r   r   propertyr   rQ   r   r   r   r   r_   r   r   r   r   r   r   r   r.   r.   r.   r3   r`      s    

	

'U

	
r`   r   r"   r!   r$   r%   r&   r'   r(   r)   r*   r+   r,   rA   c                 C   s\   | |||||||||	|
d}t | tt}| j d| j }|d| d|  |S )a  
    Register a function to be run as a cron job by adding its definition
    to a temporary global registry.

    This function should typically be called from within a cron configuration file
    that will be loaded using `CronScheduler.load_config_from_file()`.

    Example (in your cron_config.py):
        from rq import cron
        from my_app.tasks import my_func

        cron.register(my_func, 'default', interval=60)  # Run every 60 seconds

    Returns:
        dict: The job data dictionary added to the registry.
    )r"   r!   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   zCron config: Adding job 'z' to registry for queue )r   r~   rD   rE   r7   r6   r   )r"   r!   r$   r%   r&   r'   r(   r)   r*   r+   r,   r   loggerr   r.   r.   r3   r   E  s"   

r   r@   c                 C   s>   t | d}tD ]}td|d j  |jdi | q|S )z8Create a CronScheduler instance with all registered jobsrB   zRegistering job: r"   Nr.   )r`   r   rD   r   r7   r   )r@   Zcron_instancerR   r.   r.   r3   create_cron{  s
   
r   )=importlib.utilr   rU   rD   rk   r   rh   r   rN   rn   r   r   r   typingr   r   r   r   r	   Zredisr
   Zredis.clientr   ra   r   defaultsr   r   r   
exceptionsr   r   rH   r   Zlogutilsr   rG   r   Zserializersr   utilsr   r   r   r   r   r   r   r   r   r    r`   r   r   r\   __annotations__r[   rT   r]   r   r   r.   r.   r.   r3   <module>   s~   
 , $  |	

6