o
    1 iT-                     @   s@  d Z ddlZddlZddlmZmZ ddlZddlmZ z
ddlm	Z	m
Z
 W n ey9   edej d Y nw ddlmZmZ dd	lmZmZmZ dd
lmZ ejdkr\dd Zndd Zd(ddZdd Zd)ddZdZdefddZdd Zdd Zdd Z d*dd Z!d!d" Z"dede e!de"e"fd#d$Z#d+d&d'Z$dS ),zu
The following is adapted from Dask release 2021.03.1:
    https://github.com/dask/dask/blob/2021.03.1/dask/local.py
    N)EmptyQueue)config)DataNodeDependenciesMappingzEDask on Ray is available only on dask>=2024.11.0, you are on version .)local_callbacksunpack_callbacks)flattenget_dependenciesreverse_dict)orderntc                 C   s(   	 z| j dddW S  ty   Y nw q)NTg?)blocktimeout)getr   q r   i/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/util/dask/scheduler_utils.py	queue_get   s   r   c                 C   s   |   S N)r   r   r   r   r   r   &   s   c              
      s2  |du r	t | j}|du rtdd}|du rt }t  |  D ]\}}t|tr4| ||<  | q!| 	 }|
| t| } fdd| D }t|}|D ]}	||	dD ]	}
||
 |	 q[qSdd | D }dd | D }t||d	d
}dd | D }||||||t t t d	}|S )a  Start state from a dask
    Examples
    --------
    >>> dsk = {
        'x': 1,
        'y': 2,
        'z': (inc, 'x'),
        'w': (add, 'z', 'y')}  # doctest: +SKIP
    >>> from pprint import pprint  # doctest: +SKIP
    >>> pprint(start_state_from_dask(dsk))  # doctest: +SKIP
    {'cache': {'x': 1, 'y': 2},
     'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
     'dependents': {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}},
     'finished': set(),
     'ready': ['z'],
     'released': set(),
     'running': set(),
     'waiting': {'w': {'z'}},
     'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
    Ncachec                    s"   i | ]\}}| vr|t |qS r   )set.0kvZ	data_keysr   r   
<dictcomp>P   s   " z)start_state_from_dask.<locals>.<dictcomp>r   c                 S   s   i | ]\}}|r||  qS r   )copyr   r   r   r   r   V   s    c                 S   s   h | ]\}}|s|qS r   r   r   r   r   r   	<setcomp>X   s    z(start_state_from_dask.<locals>.<setcomp>Tkeyreversec                 S   s   i | ]	\}}|r||qS r   r   r   r   r   r   r   Z       )	dependencies
dependentswaitingwaiting_datar   readyrunningfinishedreleased)r   r   r   dictr   items
isinstancer   addr    updater   r   removesorted)dskr   sortkeyr   r   Zdsk2r&   r(   r'   abr)   Z	ready_setr*   stater   r   r   start_state_from_dask*   sH   




r:   c              
   C   sl   z||\}}||}| }	|||	f}d}
W n t y0 } z|||}d}
W Y d}~nd}~ww | ||
fS )zx
    Compute task and handle all administration
    See Also
    --------
    _execute_task : actually execute task
    FTN)BaseException)r#   Z	task_infodumpsloadsget_idpack_exceptiontaskdataresultidfaileder   r   r   execute_taskk   s   

rF   Tc                 C   sJ   | |d v r|d |  rJ |d | = |d  |  |r#|d | = dS dS )zQRemove data from temporary storage
    See Also
    --------
    finish_task
    r)   r-   r   N)r1   )r#   r9   deleter   r   r   release_data~   s   
rH   Fc           
   
   C   s  t |d | |ddD ]}|d | }|| |s&|d |= |d | q|d | D ]J}||d v rj|d | }|| |si||vritrbdd	lm}	 td
||tt|	|d 	 d f  ||||d q-|rw||vrw||||d q-|d 
| |d | |S )zn
    Update execution state after a task finishes
    Mutates.  This should run atomically (with a lock).
    r'   Tr"   r(   r*   r&   r)   r   )nbytesz&Key: %s	Dep: %s	 NBytes: %.2f	 Releaser   g    .A)rG   r,   r+   )r4   r3   appendDEBUGZ
chest.corerI   printsummapvaluesr1   )
r5   r#   r9   resultsr6   rG   rH   depsrI   r   r   r   finish_task   s6   


rS   c                    s(   t | trt fdd| D S  |  S )zGet nested index from collection
    Examples
    --------
    >>> nested_get(1, 'abc')
    'b'
    >>> nested_get([1, 0], 'abc')
    ('b', 'a')
    >>> nested_get([[1, 0], [0, 1]], 'abc')
    (('b', 'a'), ('a', 'b'))
    c                 3   s    | ]}t | V  qd S r   )
nested_get)r   icollr   r   	<genexpr>   s    znested_get.<locals>.<genexpr>)r0   listtuple)indrW   r   rV   r   rT      s   
rT   c                   C   s   dS )zDefault get_idNr   r   r   r   r   default_get_id   s   r\   c                  C   s    r   r   )rE   r<   r   r   r   default_pack_exception   s   r]   c                 C   s   | j |ur
| || r   )__traceback__with_traceback)exctbr   r   r   reraise   s   

rb   c                 C   s   | S )z<Identity function. Returns x.
    >>> identity(3)
    3
    r   )xr   r   r   identity   s   rd   c           "         s  t  t|trtt|}n|h}t|}tt|	=}	t|	\}}}}g }d}i z|	D ]}|d r?|d  || q3t	}t
||jd|	D ]\}}}}}|ra| qS|du rltdd}d rxd sxtd f	d	d
}d rtd |k r|  d rtd |k sd sd sd r&t\}}}|rو|\}}|rԇfddt|D }| }|| n||| |\}}|d |< t|||j |D ]
} | ||| qd rtd |k r|  d rtd |k s	d sd sd sd}W |D ]\}}}}}!|!r<|!|  q+n|D ]\}}}}}!|!rR|!|  qAw W d   n	1 s`w   Y  t|d S )a  Asynchronous get function
    This is a general version of various asynchronous schedulers for dask.  It
    takes a an apply_async function as found on Pool objects to form a more
    specific ``get`` method that walks through the dask array with parallel
    workers, avoiding repeat computation and minimizing memory use.
    Parameters
    ----------
    apply_async : function
        Asynchronous apply function as found on Pool or ThreadPool
    num_workers : int
        The number of active tasks we should have at any one time
    dsk : dict
        A dask dictionary specifying a workflow
    result : key or list of keys
        Keys corresponding to desired data
    cache : dict-like, optional
        Temporary storage of results
    get_id : callable, optional
        Function to return the worker id, takes no arguments. Examples are
        `threading.current_thread` and `multiprocessing.current_process`.
    rerun_exceptions_locally : bool, optional
        Whether to rerun failing tasks in local process to enable debugging
        (False by default)
    pack_exception : callable, optional
        Function to take an exception and ``dumps`` method, and return a
        serialized tuple of ``(exception, traceback)`` to send back to the
        scheduler. Default is to just raise the exception.
    raise_exception : callable, optional
        Function that takes an exception and a traceback, and raises an error.
    dumps: callable, optional
        Function to serialize task data and results to communicate between
        worker and parent.  Defaults to identity.
    loads: callable, optional
        Inverse function of `dumps`.  Defaults to identity.
    callbacks : tuple or list of tuples, optional
        Callbacks are passed in as tuples of length 5. Multiple sets of
        callbacks may be passed in as a list of tuples. For more information,
        see the dask.diagnostics documentation.
    See Also
    --------
    threaded.get
    Fr   )r   r6   Nrerun_exceptions_locallyr(   r*   z Found no accessible jobs in daskc                     st   d   } d |  D ]}||  qfddt| D } t| |  |ffjd dS )z"Fire off a task to the thread poolr*   r+   c                       i | ]	}| d  | qS r   r   r   rQ   r9   r   r   r   C  r%   z0get_async.<locals>.fire_task.<locals>.<dictcomp>)argscallbackN)popr1   r   rF   put)r#   frA   	apply_asyncr5   r<   r>   r=   r?   Zpretask_cbsqueuer9   r   r   	fire_task:  s    
zget_async.<locals>.fire_taskr+   c                    rf   rg   r   rh   ri   r   r   r   \  s    zget_async.<locals>.<dictcomp>r   T)r   r0   rY   r   r
   r.   r   r	   rJ   r   r:   r   r   
ValueErrorlenr   r   rS   rT   )"rp   Znum_workersr5   rB   r   r>   re   r?   Zraise_exception	callbacksr<   r=   kwargsZresult_flatrP   _Zposttask_cbsZstarted_cbsZ	succeededcbZkeyorderZstart_staterr   r#   Zres_inforD   r`   ra   rA   r@   resZ	worker_idrn   finishr   ro   r   	get_async   s|   9




Ur{   r   c                 C   s2   |du ri }| |i |}|dur|| dS dS )z*A naive synchronous version of apply_asyncNr   )funcrj   kwdsrk   ry   r   r   r   
apply_syncw  s   r~   )NN)Tr   )r   NN)%__doc__oswarningsrq   r   r   Zdaskr   Zdask._task_specr   r   ImportErrorwarn__version__Zdask.callbacksr   r	   Z	dask.corer
   r   r   Z
dask.orderr   namer   r:   rF   rH   rK   rS   rT   r\   r]   rb   rd   r{   r~   r   r   r   r   <module>   sV    

	
A

$

 