o
    i-                     @   s@  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
 ddlmZ ddlmZmZ ddlmZmZ ddlmZ dd	lmZ d
dlmZmZmZmZ e	rnddlmZ zd dlmZ W n	 e ym   Y nw G dd deZ!G dd de!Z"G dd deZ#G dd de!Z$G dd de!Z%G dd de!Z&dS )    N)shuffle)TYPE_CHECKINGOptional   )DEFAULT_WORKER_TTL)InvalidJobOperationShutDownImminentException)Job	JobStatus)HorseMonitorTimeoutException)now   )SHUTDOWN_SIGNAL
BaseWorkerWorkerStatussignal_name)Queue)struct_rusagec                   @   sb   e Zd ZefdejfddZdeee	 ee	 ed f fddZ
dddZdddZdddZdS )Workersigc              
   C   sv   zt t | j| | jd| j| j W dS  ty: } z|jtj	kr.| j
d| j n W Y d}~dS d}~ww )zKill the horse but catch "No such process" error has the horse could already be dead.

        Args:
            sig (signal.Signals, optional): _description_. Defaults to SIGKILL.
        zWorker %s: killed horse pid %szWorker %s: horse already deadN)oskillpggetpgid	horse_pidloginfonameOSErrorerrnoZESRCHdebug)selfr   e r"   O/home/app/Keep/.python/lib/python3.10/site-packages/rq/worker/worker_classes.py
kill_horse   s   zWorker.kill_horsereturnr   c                 C   sT   d } }}t t t| jd\}}}W d   n1 s w   Y  |||fS )zWaits for the horse process to complete.
        Uses `0` as argument as to include "any child in the process group of the current process".
        Nr   )
contextlibsuppressChildProcessErrorr   wait4r   )r    pidstatrusager"   r"   r#   wait_for_horse+   s
   
zWorker.wait_for_horsejobr	   queuer   c                 C   sn   t  }| jt jd< |jt jd< |dkr%t   | || t d dS || _| 	d| dt

   dS )zSpawns a work horse to perform the actual work and passes it a job.
        This is where the `fork()` actually happens.

        Args:
            job (Job): The Job that will be ran
            queue (Queue): The queue
        RQ_WORKER_ID	RQ_JOB_IDr   zForked  at N)r   forkr   environidsetpgrpZmain_work_horse_exit
_horse_pidproclinetime)r    r.   r/   	child_pidr"   r"   r#   fork_work_horse4   s   zWorker.fork_work_horsec           
   
   C   s,  d } }}t  |_	 z | | jt |  \}}}W d   n1 s%w   Y  W nU tyc   | t  |j   |jdkr\| j	|jd kr\| 
| jd  |   |   Y n%| | Y n ty } z|jtjkrq | 
  W Y d}~nd}~ww q| d d| _| jd| j|j|| |tjkrdS z| }W n
 ty   Y dS w | j|jkr| jd| j|j |jr|| j | j||dd	 dS |tjtj fvrt  |_!|rt"|rd
t#| dnd}d| | d}	| jd| j|j|	 | $|||| | j|||	d	 dS dS )zThe worker will monitor the work horse and make sure that it
        either executes successfully or the status of the job is set to
        failed

        Args:
            job (Job): _description_
            queue (Queue): _description_
        NT<   r   z@Worker %s: work horse finished for job %s: retpid=%s, ret_val=%szBWorker %s: job %s stopped by user, moving job to FailedJobRegistryz+Job stopped by user, work-horse terminated.)r/   
exc_stringz	 (signal ) z5Work-horse terminated unexpectedly; waitpid returned z; zWorker %s: job %s failed (%s))%r   Z
started_atZdeath_penalty_classZjob_monitoring_intervalr   r-   Zset_current_job_working_timetotal_secondstimeoutZcurrent_job_working_time	heartbeatr$   Zmaintain_heartbeatsr   r   ZEINTRr8   r   r   r   r5   r   EX_OKZ
get_statusr   Z_stopped_job_idwarningZstopped_callbackZexecute_stopped_callbackZhandle_job_failurer
   FINISHEDZFAILEDZended_atWIFSIGNALEDWTERMSIGZhandle_work_horse_killed)
r    r.   r/   ZretpidZret_valr,   r!   Z
job_statusZ
signal_msgr?   r"   r"   r#   monitor_work_horseG   s`   	

$zWorker.monitor_work_horsec                 C   s2   |  | | || | || | tj dS )zSpawns a work horse to perform the actual work and passes it a job.
        The worker will wait for the work horse and make sure it executes
        within the given timeout bounds, or will end the work horse with
        SIGALRM.
        N)prepare_executionr<   rJ   	set_stater   IDLEr    r.   r/   r"   r"   r#   execute_job   s   
zWorker.execute_jobNr.   r	   r/   r   )__name__
__module____qualname__r   signalSignalsr$   tupler   intr-   r<   rJ   rO   r"   r"   r"   r#   r      s    $
	
Jr   c                   @   s   e Zd ZdZd	ddZdS )
SpawnWorkerzWorker implementation that uses os.spawn() instead of os.fork().
    This implementation is intended for environments where `os.fork()` is not available.
    r.   r	   r/   r   c                 C   s   | j tjd< |jtjd< | jjtjd< | jjj}|dr |d= |dr(|d= t	tj
tjtjdd| d| j d	|j d
|j  d	g}|| _| d| dt   dS )z@Spawns a work horse to perform the actual work using os.spawn().r0   r1   ZRQ_EXECUTION_IDretryZdriver_infoz-cz
import os
import sys
from redis import Redis
from rq import Worker, Queue
from rq.job import Job
from rq.executions import Execution

# Recreate worker instance
redis = Redis(**z)
worker = Worker.find_by_key("zu", connection=redis)
if not worker:
    sys.exit(1)

# Reconstruct job, queue and execution objects
job = Job.fetch("z0", connection=worker.connection)
queue = Queue("a  ", connection=worker.connection)
execution_id = os.environ.get('RQ_EXECUTION_ID')
worker.execution = Execution.fetch(execution_id, job.id, connection=worker.connection)

# Set up work horse
os.setpgrp()
worker._is_horse = True
worker.main_work_horse(job, queue)
zSpawned r2   N)r   r   r4   r5   Z	execution
connectionZconnection_poolZconnection_kwargsgetspawnvP_NOWAITsys
executablekeyr8   r9   r:   )r    r.   r/   Zredis_kwargsr;   r"   r"   r#   r<      s4   


	
"zSpawnWorker.fork_work_horseNrP   )rQ   rR   rS   __doc__r<   r"   r"   r"   r#   rX      s    rX   c                   @   s(   e Zd ZdddZdddefdd	Zd
S )SimpleWorkerr.   r	   r/   r   c                 C   s&   |  | | || | tj dS )z1Execute job in same thread/process, do not fork()N)rK   Zperform_jobrL   r   rM   rN   r"   r"   r#   rO      s   
zSimpleWorker.execute_jobr%   c                 C   s    |j dkrtS t|j ptd S )z-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59.
        We should just stick to DEFAULT_WORKER_TTL.

        Args:
            job (Job): The Job

        Returns:
            ttl (int): TTL
        r=   r>   )rC   r   rW   )r    r.   r"   r"   r#   get_heartbeat_ttl   s   

zSimpleWorker.get_heartbeat_ttlNrP   )rQ   rR   rS   rO   rW   rc   r"   r"   r"   r#   rb      s    
rb   c                   @   s<   e Zd ZdZdZg dZdd Zdd Zdd	 Zd
d Z	dS )HerokuWorkerz
    Modified version of rq worker which:
    * stops work horses getting killed with SIGTERM
    * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
    causes the horse to crash `imminent_shutdown_delay` seconds later
       )f_codef_lastif_linenof_localsf_tracec                 C   s4   t  t j| j t  t jt j t  t jt j dS )z>Modified to ignore SIGINT and SIGTERM and only handle SIGRTMINN)rT   SIGRTMINrequest_stop_sigrtminSIGINTSIG_IGNSIGTERMr    r"   r"   r#   setup_work_horse_signals   s   z%HerokuWorker.setup_work_horse_signalsc                 C   s<   | j dkr| jd| j | jtjd dS | jd dS )z"If horse is alive send it SIGRTMINr   zBWorker %s: warm shut down requested, sending horse SIGRTMIN signal)r   z(Warm shut down requested, no horse foundN)r   r   r   r`   r$   rT   rk   rF   rp   r"   r"   r#   handle_warm_shutdown_request   s   
z)HerokuWorker.handle_warm_shutdown_requestc                 C   sf   | j dkr| jd | || d S | jd| j  ttj| j ttj| j t| j  d S )Nr   z@Imminent shutdown, raising ShutDownImminentException immediatelyzBImminent shutdown, raising ShutDownImminentException in %d seconds)imminent_shutdown_delayr   rF   request_force_stop_sigrtminrT   rk   SIGALRMalarm)r    signumframer"   r"   r#   rl     s   
z"HerokuWorker.request_stop_sigrtminc                    s6    fdd| j D }| jd tdt| d|)Nc                    s   i | ]}|t  |qS r"   )getattr).0attrrx   r"   r#   
<dictcomp>  s    z<HerokuWorker.request_force_stop_sigrtmin.<locals>.<dictcomp>z2raising ShutDownImminentException to cancel job...zshut down imminent (signal: r@   )frame_propertiesr   rF   r   r   )r    rw   rx   r   r"   r|   r#   rt     s   z(HerokuWorker.request_force_stop_sigrtminN)
rQ   rR   rS   ra   rs   r~   rq   rr   rl   rt   r"   r"   r"   r#   rd      s    rd   c                   @      e Zd ZdZdd ZdS )RoundRobinWorkerze
    Modified version of Worker that dequeues jobs from the queues using a round-robin strategy.
    c                 C   s6   | j |}| j |d d  | j d |d   | _ d S )Nr   )_ordered_queuesindex)r    reference_queueposr"   r"   r#   reorder_queues  s   *zRoundRobinWorker.reorder_queuesNrQ   rR   rS   ra   r   r"   r"   r"   r#   r         r   c                   @   r   )RandomWorkerz`
    Modified version of Worker that dequeues jobs from the queues using a random strategy.
    c                 C   s   t | j d S )N)r   r   )r    r   r"   r"   r#   r   &  s   zRandomWorker.reorder_queuesNr   r"   r"   r"   r#   r   !  r   r   )'r&   r   r   rT   r^   r:   randomr   typingr   r   defaultsr   
exceptionsr   r   r.   r	   r
   Ztimeoutsr   utilsr   baser   r   r   r   r/   r   resourcer   ImportErrorr   rX   rb   rd   r   r   r"   r"   r"   r#   <module>   s8     8+
