o
    i"                  
   @   s   d dl Z d dlmZ d dlmZmZmZ d dlmZm	Z	m
Z
 zd dlmZ W n ey7 Z zededZ[ww G dd deZdS )	    N)Job)BaseJobStoreConflictingIdErrorJobLookupError)datetime_to_utc_timestamp	maybe_refutc_timestamp_to_datetime)	RethinkDBz.RethinkDBJobStore requires rethinkdb installedc                       s   e Zd ZdZdddejf fdd	Z fddZd	d
 Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zd!ddZdd  Z  ZS )"RethinkDBJobStorea1  
    Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to
    rethinkdb's `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_.

    Plugin alias: ``rethinkdb``

    :param str database: database to store jobs in
    :param str collection: collection to store jobs in
    :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing
        connection arguments
    :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
        highest available
    ZapschedulerjobsNc                    sX   t    |std|std|| _|| _d | _|| _|| _|| _t	 | _
d | _d S )Nz*The "database" parameter must not be emptyz'The "table" parameter must not be empty)super__init__
ValueErrordatabase
table_nametableclientpickle_protocolconnect_argsr	   rconn)selfr   r   r   r   r   	__class__ V/home/app/Keep/.python/lib/python3.10/site-packages/apscheduler/jobstores/rethinkdb.pyr       s   

zRethinkDBJobStore.__init__c                    s   t  || | jrt| j| _n| jjdd| ji| j| _| j| j	 
| jvr6| j| j
| j | j| j 
| jvrM| j| j
| j d| j| j 
| jvrj| j| jd
| j | j| j| j| _d S )Ndbnext_run_timer   )r   startr   r   r   r   connectr   r   Zdb_listrunZ	db_creater   Z
table_listZtable_creater   Z
index_listZindex_creater   )r   Z	scheduleraliasr   r   r   r   8   s   zRethinkDBJobStore.startc                 C   s8   t | j|d| j}|r| |d d S d S )N	job_stater   )listr   get_allpluckr    r   _reconstitute_jobr   job_idresultsr   r   r   
lookup_jobM   s   zRethinkDBJobStore.lookup_jobc                 C   s   |  | jjd t|kS Nr   )	_get_jobsr   rowr   )r   nowr   r   r   get_due_jobsQ   s   zRethinkDBJobStore.get_due_jobsc                 C   sV   t | j| jjd d k| jddd d	| j
}|r)t|d S d S )Nr   c                 S   s   | d S r+   r   xr   r   r   <lambda>Z   s    z5RethinkDBJobStore.get_next_run_time.<locals>.<lambda>   r   )r#   r   filterr   r-   order_byascmaplimitr    r   r   )r   r)   r   r   r   get_next_run_timeV   s   
z#RethinkDBJobStore.get_next_run_timec                 C   s   |   }| | |S N)r,   Z_fix_paused_jobs_sorting)r   r   r   r   r   get_all_jobs`   s   
zRethinkDBJobStore.get_all_jobsc                 C   sX   |j t|j| jt| | jd}| j	
|| j}|d dkr*t|j d S )N)idr   r"   errorsr   )r<   r   r   r   binarypickledumps__getstate__r   r   insertr    r   r   )r   jobZjob_dictr)   r   r   r   add_jobe   s   
zRethinkDBJobStore.add_jobc                    s   t |j| jt| | jd}| j	|j
|| j dt fdd  v } d dks< d dks<|sAt|j
d S )N)r   r"   Fc                    s    |  dkS )Nr   r   r0   r)   r   r   r2   y   s    z.RethinkDBJobStore.update_job.<locals>.<lambda>skippedr   r=   )r   r   r   r>   r?   r@   rA   r   r   r$   r<   updater    r   r7   keysr   )r   rC   changesrF   r   rE   r   
update_jobq   s   
zRethinkDBJobStore.update_jobc                 C   s8   | j | | j}|d |d  dkrt|d S )NZdeletedrF   r3   )r   r$   deleter    r   r   r'   r   r   r   
remove_job}   s   zRethinkDBJobStore.remove_jobc                 C   s   | j  | j d S r:   )r   rK   r    r   r   r   r   r   remove_all_jobs   s   z!RethinkDBJobStore.remove_all_jobsc                 C   s   | j   d S r:   )r   closerM   r   r   r   shutdown   s   zRethinkDBJobStore.shutdownc                 C   s2   t |}tt}|| | j|_| j|_|S r:   )r?   loadsr   __new____setstate__Z
_scheduler_aliasZ_jobstore_alias)r   r"   rC   r   r   r   r&      s   


z#RethinkDBJobStore._reconstitute_jobc              	      s   g }g }|r j  jjd d k|n j }|dddd}| jD ]'}z| 	|d  W q( t
yO    jd|d  ||d  Y q(w |rc j| fdd j |S )Nr   r<   r"   z)Unable to restore job "%s" -- removing itc                    s    j |  S r:   )r   r$   rK   )r(   rM   r   r   r2      s    z-RethinkDBJobStore._get_jobs.<locals>.<lambda>)r   r4   r   r-   r5   r%   r    r   appendr&   	ExceptionZ_logger	exceptionexprZfor_each)r   	predicater   Zfailed_job_idsqueryZdocumentr   rM   r   r,      s,    

zRethinkDBJobStore._get_jobsc                 C   s   | j }d| jj d| dS )N<z (connection=z)>)r   r   __name__)r   
connectionr   r   r   __repr__   s   zRethinkDBJobStore.__repr__r:   )r\   
__module____qualname____doc__r?   HIGHEST_PROTOCOLr   r   r*   r/   r9   r;   rD   rJ   rL   rN   rP   r&   r,   r^   __classcell__r   r   r   r   r
      s(    

r
   )r?   Zapscheduler.jobr   Zapscheduler.jobstores.baser   r   r   Zapscheduler.utilr   r   r   Z	rethinkdbr	   ImportErrorexcr
   r   r   r   r   <module>   s    
