o
    * is                     @   sH  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZ	d dl
Z
d dl
mZ d dlmZmZ d dlmZ d dlmZ d dlmZ ddlmZmZmZ d	d
lmZmZmZ ddlmZ ddlm Z m!Z! ddl"m#Z#m$Z$ ddl%m&Z&m'Z'm(Z(m)Z)m*Z* da+dd Z,e-e, G dd dZ.G dd de.Z/G dd de.Z0dS )    N)profiler)_current_expected_place_set_expected_place)datatype_to_vartype)	benchmark)in_profiler_mode   )corein_dynamic_modein_pir_mode   )MP_STATUS_CHECK_INTERVALCleanupFuncRegistrar_set_SIGCHLD_handler   )_InfiniteIterableSampler)default_collate_fndefault_convert_fn)_flatten_batch_restore_batch)_DatasetKind_IterableDatasetStopIteration_ResumeIteration_worker_loop_WorkerExceptionc                   C   s*   t d urzt   b W d S    Y d S d S N)_loader__del__ r   r   p/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/paddle/io/dataloader/dataloader_iter.py_clear_loaderD   s   r    c                   @   sL   e Zd ZdZdd Zedd Zdd Zdd	 Zd
d Z	dd Z
dd ZdS )_DataLoaderIterBasez
    Iterator implement of DataLoader, will load and feed mini-batch
    data by setting in given dataloader.

    Args:
        loader(instance of DataLoader): instance of `paddle.io.DataLoader`
    c                 C   s   |j | _|jpg | _|j| _|j| _|j| _	|j
| _|j| _|j| _|j| _|j| _|j| _|jdkr6|jnt| _|j| _|j| _|j| _t| j | _!| jrU|j"pRt#| _$n|j"pYt%| _$d | _&d | _'t() | _*d S )Nr   )+Zdataset_datasetZ	feed_list
_feed_listZplaces_placesZreturn_list_return_listbatch_sampler_batch_samplerZ	drop_last
_drop_lastZauto_collate_batch_auto_collate_batchZnum_workers_num_workersZuse_buffer_reader_use_buffer_readerZprefetch_factor_prefetch_factorZuse_shared_memory_use_shared_memorytimeoutr   _timeoutZworker_init_fn_worker_init_fnZdataset_kind_dataset_kindZ
pin_memory_pin_memoryiter_index_sampler_sampler_iterZ
collate_fnr   _collate_fnr   _blocking_queue_thread	threadingEvent_thread_done_eventselfloaderr   r   r   __init__Z   s.   z_DataLoaderIterBase.__init__c                 C   s6   | j r| jS | jtjkrttt| jS t	| jdS Nr   )
r)   r'   r1   r   ZMAPlistrangelenr"   r   r=   r   r   r   r4   |   s
   z"_DataLoaderIterBase._index_samplerc                 C   s   | S r   r   rD   r   r   r   __iter__   s   z_DataLoaderIterBase.__iter__c                 C   s   t d)Nz*Should implement `__next__` for a iterator)NotImplementedErrorrD   r   r   r   __next__   s   z_DataLoaderIterBase.__next__c                 C   s
   t | jS r   )rC   r'   rD   r   r   r   __len__   s   
z_DataLoaderIterBase.__len__c                 C   "   | j   | jr| j  d S d S r   )r;   setr7   closerD   r   r   r   _exit_thread_expectedly      
z+_DataLoaderIterBase._exit_thread_expectedlyc                 C   rI   r   )r;   rJ   r7   killrD   r   r   r   _exit_thread_unexpectedly   rM   z-_DataLoaderIterBase._exit_thread_unexpectedlyN)__name__
__module____qualname____doc__r?   propertyr4   rE   rG   rH   rL   rO   r   r   r   r   r!   Q   s    "
	r!   c                       sP   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
  ZS )_DataLoaderIterSingleProcesszg
    Single process implement of DataLoaderIter, loading data from
    loader.data in main process
    c                    st   t  | t| j| j| j| j| j| _	g | _
| jt| j | _d| _z	|   W | ad S  ty9   |    w NF)superr?   r   Zcreate_fetcherr1   r"   r)   r6   r(   _dataset_fetcher_structure_infosr,   rC   r$   _blocking_queue_capacity	_shutdown_init_thread	Exception_try_shutdown_allr   r<   	__class__r   r   r?      s(   
z%_DataLoaderIterSingleProcess.__init__c                 C   s   dd | j D | _dd | j D | _t r(dd | j D | _dd | j D | _ndd | j D | _dd | j D | _tt | j	t
| jdk| _t| j| j| j| j| j| j| jd	| j	| _tj| jt fd
| _d	| j_| j  d S )Nc                 S      g | ]}|j qS r   name.0vr   r   r   
<listcomp>       z=_DataLoaderIterSingleProcess._init_thread.<locals>.<listcomp>c                 S   ra   r   shaperd   r   r   r   rg      rh   c                 S      g | ]}d qS Fr   rd   r   r   r   rg          c                 S      g | ]}t |j qS r   r   dtyperd   r   r   r   rg          
c                 S      g | ]}|j  qS r   ZdescZneed_check_feedrd   r   r   r   rg      rq   c                 S   ra   r   rp   rd   r   r   r   rg      rh   r   Ttargetargs)r#   
_var_names_shapesr   _need_check_feed_dtypesr	    init_dense_tensor_blocking_queueVariablerZ   rC   r$   r7   create_py_readerr+   r2   _readerr9   Thread_thread_loopr   r8   daemonstartrD   r   r   r   r\      s@   

z)_DataLoaderIterSingleProcess._init_threadc           	   
   C   sj  t dtt|   t| | j szt| j}| j	
|| j}W n ty1   |   Y d S w |d u s;| j r<nst|\}}| j| | j rNnazIt  }|D ](}t|tjrd|  }nt|t jsxt  }||t   |}|| qU| j rW n*z| j| W n   |   Y W n ty } z|   |d }~ww | j r|   d S )NDataloader_)r	   set_current_thread_namestridr   r;   is_setnextr5   rX   fetchStopIterationrL   r   rY   appendDenseTensorArray
isinstancepaddleTensorvalue
get_tensorDenseTensorrJ   CPUPlacer7   pushr]   rO   )	r=   legacy_expected_placeindicesbatch	structurearrayslottmper   r   r   r      sT   





1z)_DataLoaderIterSingleProcess._thread_loopc                    sD  t  rtjdtjjd}|  zzvt   t   t	 r5t
j j d }t| jd}nB jrr j }tt|D ]
}||  ||< qC fddtt jD }dd t||D }t jdkrq|d }n j }t   |W W t  r|  S S  ty    j      w t  r|  w w )NrU   rc   Z
event_typer   c                       g | ]} j d qS r   rY   popre   _rD   r   r   rg   :      
z9_DataLoaderIterSingleProcess.__next__.<locals>.<listcomp>c                 S      g | ]	\}}t ||qS r   r   re   dsr   r   r   rg   >      r   )r   r   RecordEventTracerEventType
Dataloaderbeginr   check_if_need_recordbefore_readerr
   r	   eagerread_next_tensor_listr   read_next_listr   rY   r   r%   rB   rC   _move_to_listr$   zip	read_nextafter_readerendr   shutdownr^   r=   Ztrace_eventdataistructsr   rD   r   rG   %  sN   







z%_DataLoaderIterSingleProcess.__next__c                 C   s\   | j r,| j  tdD ]}| j  rtd q n| j t ur'| j 	  d | _ d S d S )Nr   r   )
r8   r;   rJ   rB   is_alivetimesleepr9   current_threadjoinr=   r   r   r   r   _shutdown_threadQ  s   



z-_DataLoaderIterSingleProcess._shutdown_threadc                 C   s>   | j sz| jr| j  d | _|   W d| _ d S d| _ w d S )NT)r[   r7   rK   r   rD   r   r   r   r^   a  s   

z._DataLoaderIterSingleProcess._try_shutdown_allc                 C      |    d S r   r^   rD   r   r   r   r   p     z$_DataLoaderIterSingleProcess.__del__)rP   rQ   rR   rS   r?   r\   r   rG   r   r^   r   __classcell__r   r   r_   r   rU      s    $%<,rU   c                       s   e Zd Z fddZdd Zdd Zdd Zd	d
 ZdddZd ddZ	dd Z
dd Zdd Zdd Zdd Zdd Zdd Z  ZS )!_DataLoaderIterMultiProcessc                    s>  t  | |j| _d| _| jdksJ d| j dd | _d| _d| _d| _i | _	g | _
| jt| jt| j | _t | _tjjdtjd| _tjdddv rlzdt| jd  | _W n   d| _td	 Y nd| _| jd
 | j | _ d| _!| "  t#| jD ]}| $  qz| %  W d S  t&y   | '   w )Nr   z-Multi-process DataLoader invalid num_workers())lowhighZFLAGS_use_shm_cacheF)r   1TTruetruer   zUSetting the shm cache buffer size to 0, equivalent to not using the shm cache policy.r   )(rW   r?   _persistent_workers_resume_worker_cntr*   _data_queue	_send_idx	_rcvd_idx_batches_outstanding_task_infosrY   r,   maxrC   r$   _outstanding_capacityr9   Lock_thread_locknprandomrandintsysmaxsize
_base_seedosenvirongetr"   _worker_shm_buffer_sizewarningswarn_main_thread_shm_buffer_sizer[   _init_workersrB   _try_put_indicesr\   r]   r^   )r=   r>   r   r_   r   r   r?   u  sL   

z$_DataLoaderIterMultiProcess.__init__c                 C   s  ddl m} g | _g | _g | _tt| j| _	|
 | _| | _t | _t| jD ]D}|
 }|  | j| |jt| j| j|| j| j| j| j| j| j|| j| j| j| jfd}d|_|  | j| | jd q,t t!| t"dd | jD  t#  d S )Nr   )multiprocessingru   Tc                 s   s    | ]}|j V  qd S r   )pidre   wr   r   r   	<genexpr>  s    z<_DataLoaderIterMultiProcess._init_workers.<locals>.<genexpr>)$Zpaddle.incubater   _workers_worker_status_indices_queues	itertoolscyclerB   r*   _workers_idx_cycleQueuer   r:   _workers_done_eventr9   r;   cancel_join_threadr   Processr   r"   r1   r)   r6   r(   r0   r-   r   r   r   r   r	   Z_set_process_pidsr   tupler   )r=   r   r   Zindices_queueworkerr   r   r   r     sH   


 
z)_DataLoaderIterMultiProcess._init_workersc                 C   sB   | j d ur	 z| j   W n   | j   | j   Y d S qd S r   )r   
get_nowaitr   rK   rD   r   r   r   _clear_and_remove_data_queue  s   


z8_DataLoaderIterMultiProcess._clear_and_remove_data_queuec                 C   s  dd | j D | _dd | j D | _t r(dd | j D | _dd | j D | _ndd | j D | _dd | j D | _tt | j	t
| jdk| _t| j t| j| j| j| j| j| j| jd	| j	| _t | _tj| jt fd
| _d	| j_| j  d S )Nc                 S   ra   r   rb   rd   r   r   r   rg     rh   z<_DataLoaderIterMultiProcess._init_thread.<locals>.<listcomp>c                 S   ra   r   ri   rd   r   r   r   rg     rh   c                 S   rk   rl   r   rd   r   r   r   rg     rm   c                 S   rn   r   ro   rd   r   r   r   rg     rq   c                 S   rr   r   rs   rd   r   r   r   rg      rq   c                 S   ra   r   rt   rd   r   r   r   rg     rh   r   Tru   )r#   rx   ry   r   rz   r{   r	   r|   r}   r   rC   r$   r7   Z(_set_max_memory_map_allocation_pool_sizer   r~   r+   r2   r   r9   r:   r;   r   r   r   r8   r   r   rD   r   r   r   r\     sD   


z(_DataLoaderIterMultiProcess._init_threadc                 C   s2  | j $ | j| _t| jD ]}| j| t  |  jd7  _qW d    n1 s*w   Y  | jdkr>t	d | jdks4| j
 t| jkrot rWtj| j d }n| jr`| j  n| j }| j
 t| jksHd| _d| _d| _i | _g | _dg| j | _t| j| _t| jD ]}|   qd S )Nr   r   g      ?T) r   r*   r   rB   r   putr   r   r   r   r7   sizerC   r$   r
   r	   r   r   r   r   r%   r   r   r   r   rY   r   r3   r4   r5   r   r   )r=   	worker_idr   r   r   r   r   _reset  s:   




z"_DataLoaderIterMultiProcess._resetFc                 C   sL   |t | jk r | j| s| jr"|r$| j| d  d| j|< d S d S d S d S rV   )rC   r   r   r   r   )r=   r   r   r   r   r   _shutdown_workerJ  s   z,_DataLoaderIterMultiProcess._shutdown_workerNc              
   C   s   | j sazR|   |   | j  t| jD ]	}| j|dd q| j s<| jD ]}|	| q&| j
D ]}|  |  q1W tt|  d| _ d S W tt|  d| _ d S tt|  d| _ w d S )NT)r   )r[   rL   r   r   rJ   rB   r*   r  r   r   r   r   rK   r	   Z_erase_process_pidsr   )r=   r.   r   r   qr   r   r   r^   S  s,   





z-_DataLoaderIterMultiProcess._try_shutdown_allc              
   C   s`  t dtt|   t| | j s|  }| j s|d u r&|   nt	|t
r:| jdks2J |  jd8  _qzdzHt  }| jrN|D ]}|| qEn)|D ]&}t	|tjr]| }nt	|t jsqt  }||t   |}|| qP| j|s| j  W n ty } z|   |d }~ww W |  jd7  _n|  jd7  _w | j rd S d S )Nr   r   r   )r	   r   r   r   r   r;   r   	_get_datarL   r   r   r   r   r-   r   r   r   r   r   rJ   r   r7   r   rK   r]   rO   r   )r=   r   r   r   Ztensorr   r   r   r   r   r   r   j  sH   





 z(_DataLoaderIterMultiProcess._thread_loopc              
   C   s  | j  sh| jtjkrL| j| jk r?| j| j }t|dks%| j	|d  r&n&| j| j= |  jd7  _|  j
d8  _
| j| jk s| jsL| j
t| jk rLd S | j| jv rot| j| j dkro| j| j}| j|d  |d S z
| jj| jd}W n{ ty } zo| j  rW Y d }~q g }t| jD ]\}}| j	| r| s|| | | qt|dkr|   ddd |D }td	t| d
|  W Y d }~d S t|ttjfrW Y d }~q |   t d| d |d }~ww | jtjkrt|t!r| jrd| j	|j"< n| |j" |  j
d8  _
| #  q |\}}	}
t|t$r6|	d u r6|
d u r6|S t|	t%rD|   |	&  || jkr\|| jv rT| j|= | j|
 |	S | j|  |	|
f7  < q d S )Nr   r   r   r   )r.   z, c                 s   s    | ]}t |jV  qd S r   )r   r   r   r   r   r   r     s    z8_DataLoaderIterMultiProcess._get_data.<locals>.<genexpr>zDataLoader z" workers exit unexpectedly, pids: z DataLoader reader thread failed(z*) to read data from workers' result queue.F)'r;   r   r1   r   ZITERr   r   r   rC   r   r   r   r$   r   rY   r   r   r   r/   r]   	enumerater   r   r  rO   r   loggingwarningr   IOErrorqueueEmptyerrorr   r   r   r   r   reraise)r=   infor   r   Zfailed_workersr   r   Zpidsidxr   r   r   r   r   r    s   
	



 


z%_DataLoaderIterMultiProcess._get_datac              	   C   s   | j | jks
J d| j^ zt| j}W n ty%   Y W d    d S w t| jD ]}t| j}| j	| r9 n
q+	 W d    d S | j
| | j|f |f| j| j< |  j d7  _ |  jd7  _W d    d S 1 snw   Y  d S )Nz'too many indices have been put to queuer   )r   r   r   r   r5   r   rB   r*   r   r   r   r   r   r   )r=   r   r   Z
worker_idxr   r   r   r     s,   

"z,_DataLoaderIterMultiProcess._try_put_indicesc                 C   r   r   r   rD   r   r   r   r   1  r   z#_DataLoaderIterMultiProcess.__del__c                 C   s   |  d d S r@   r   rD   r   r   r   _shutdown_on_exit4  s   z-_DataLoaderIterMultiProcess._shutdown_on_exitc                    s  t  rtjdtjjd}|  zzt   t    j	t
 jk r4 jr*t j   j  t rLtj j d }t| jd}nB jr j }tt
|D ]
}||  ||< qZ fddtt
 jD }dd t||D }t
 jdkr|d }n j }    t !  |W W t  r|"  S S  ty    js j#   $   w t  r|"  w w )Nr   r   r   c                    r   r   r   r   rD   r   r   rg   Y  r   z8_DataLoaderIterMultiProcess.__next__.<locals>.<listcomp>c                 S   r   r   r   r   r   r   r   rg   ]  r   r   )%r   r   r   r   r   r   r   r   r   r   rC   r$   r   r   r;   rJ   r7   rK   r
   r	   r   r   r   r   r   rY   r   r%   rB   r   r   r   _on_output_batchr   r   r   r^   r   r   rD   r   rG   7  s\   









z$_DataLoaderIterMultiProcess.__next__c                 C   s.   t t| jD ]}|  jd8  _|   qd S r@   )rB   rC   r$   r   r   r   r   r   r   r  q  s   
z,_DataLoaderIterMultiProcess._on_output_batchrl   r   )rP   rQ   rR   r?   r   r   r\   r  r  r^   r   r  r   r   r  rG   r  r   r   r   r_   r   r   t  s    H0
(
+
	-{:r   )1r   r  r   r	  r   r9   r   r   numpyr   r   r   Zpaddle.base.frameworkr   r   Zpaddle.pir.corer   Zpaddle.profiler.timerr   Zpaddle.profiler.utilsr   Z	frameworkr	   r
   r   Zmultiprocess_utilsr   r   r   r&   r   Zcollater   r   Zflatr   r   r   r   r   r   r   r   r   r    registerr!   rU   r   r   r   r   r   <module>   s8   

I [