o
    1 ic                     @   s"  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
 d dlZd dlZd dlmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& erd dl'm(Z( d dl)m*Z* dgZ+e,e-Z.G dd dZ/dS )    N)TYPE_CHECKINGIteratorListOptionalTupleTypeUnion)get_memory_info_replyget_state_from_address)	RefBundle)SourceOperator)LogicalOperator)LogicalPlan)Operator)Read)DatasetStats)BlockMetadataWithSchema_take_first_non_empty_schema)DataContext)omit_traceback_stdout)log_onceStreamingExecutor)DatasetZscheduling_strategyc                   @   s  e Zd ZdZdedefddZdefddZdFd
dZ	defddZ
defddZe				dGdededededef
ddZded defddZdHd!d"ZdId#d$ZdId%d&Zdee fd'd(Z	dJd)edeed*f fd+d,Zd-eed*f fd.d/Zdeee  fd0d1Zdee fd2d3Zede e!e" eed	 f fd4d5Z#e	dJd6ede"fd7d8Z$e%defd9d:Z&dKd<d=Z'defd>d?Z(defd@dAZ)defdBdCZ*defdDdEZ+d;S )LExecutionPlana  A lazy execution plan for a Dataset.

    This lazy execution plan builds up a chain of ``List[RefBundle]`` -->
    ``List[RefBundle]`` operators. Prior to execution, we apply a set of logical
    plan optimizations, such as operator fusion, in order to reduce Ray task
    overhead and data copies.

    Internally, the execution plan holds a snapshot of a computed list of
    blocks and their associated metadata under ``self._snapshot_bundle``,
    where this snapshot is the cached output of executing the operator chain.statsdata_contextc                 C   sF   || _ d| _d| _d| _d| _d| _d| _d| _d| _d| _	|| _
dS )zCreate a plan with no transformation operators.

        Args:
            stats: Stats for the base blocks.
            data_context: :class:`~ray.data.context.DataContext`
                object to use for execution.
        NF)	_in_stats_snapshot_operator_snapshot_stats_snapshot_bundle_snapshot_metadata_schema_schema_dataset_uuid
_run_index_dataset_name_has_started_execution_context)selfr   r    r*   c/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/data/_internal/plan.py__init__0   s   

zExecutionPlan.__init__returnc                 C   s   | j pd d| j d| j S )ziUnique ID of the dataset, including the dataset name,
        UUID, and current execution index.
        Zdataset_)r&   r$   r%   r)   r*   r*   r+   get_dataset_idZ   s   zExecutionPlan.get_dataset_idr   c                 C   s.   ddl m} |  jd7  _|| j|  }|S )z!Create an executor for this plan.r   r      )/ray.data._internal.execution.streaming_executorr   r%   r(   r0   )r)   r   executorr*   r*   r+   create_executorb   s   zExecutionPlan.create_executorc                 C   s   d| j  d| j dS )NzExecutionPlan(dataset_uuid=z, snapshot_operator=))r$   r   r/   r*   r*   r+   __repr__j   s   zExecutionPlan.__repr__c                 C   sX   ddl m} | j}| |j\}}d| }|| j}| j|jdd\}}d| }|| S )z@Return a string representation of the logical and physical plan.r   )get_execution_planz-------- Logical Plan --------
T)show_op_reprz -------- Physical Plan --------
)Z%ray.data._internal.logical.optimizersr7   _logical_plangenerate_plan_stringdag)r)   r7   logical_planZlogical_plan_strr.   Zphysical_planZphysical_plan_strr*   r*   r+   explainr   s   

zExecutionPlan.explain r   TFopcurr_strdepthincluding_sourcer8   c           
      C   s   |st | tr||fS |}|rt| n| j}|dkr"|| d7 }nd|d d  }|| d| d7 }| jD ]}t|||d ||\}}	t||	}q7||fS )zXTraverse (DFS) the Plan DAG and
        return a string representation of the operators.r   
 r1      +- )
isinstancer   reprnameinput_dependenciesr   r:   max)
r?   r@   rA   rB   r8   Zcurr_max_depthZop_strtrailing_spaceinputZinput_max_depthr*   r*   r+   r:      s   

z"ExecutionPlan.generate_plan_stringdataset_clsr   c                 C   s  ddl m} d}d}|  s| j| jjdd\}}| jdur(| jj}| j }ne| j	dur7| j	j}| j	j
j}nVd}| jj}t|tsVt|jdkrLd}n
|jd }t|trB|r]d}d}n0t|tsfJ |tti dd	| j}	|	t||	j |	 }|	 }n| jdd
}| j }|du rd}
n5t|trt|}
n+g }
t|j|jD ]\}}t|dr|j}|
| d|  qd|
}
d|
 d }
|du rd}d}||kr|  }|dusJ | jdurd | jnd}|rd| dnd}d |j||||
}d}d}d}|| }t||kr| | d|
 }t||kr~g }
t|j|jD ]A\}}t|dr7|j}| |d  | d| }t||kredt| }t!|t| |}|d|  | }|
| q*d|
}
d|
 d| |  d }
| jdurd| | d| j dnd}|rd| | d| dnd}|j d | | d| | d!| d| | d|
 d| d"}|dkr||7 }|S |||d   d#| 7 }|S )$zCreate a cosmetic string representation of this execution plan.

        Returns:
            The string representation of this execution plan.
        r   )MaterializedDatasetr>   F)rB   Nr1   Tmetadataparent)fetch_if_missingzUnknown schema__name__z: z, {}?z	name={}, znum_blocks=z{}({}{}num_rows={}, schema={})P   
   z   zschema=   z...: z,
z{
rC   zname=,(z	num_rows=r5   rF   )"ray.data.datasetrO   has_computed_outputr:   r9   r;   r!   schemanum_rowsr"   rQ   rG   r   lenrJ   r   r   r(   link_logical_planr   
meta_counttypestrzipnamestypeshasattrrT   appendjoininitial_num_blocksr&   formatrK   )r)   rN   rO   Zplan_strZplan_max_depthr_   countZhas_n_ary_operatorr;   ZplanZ
schema_strntZ
num_blocksZname_strZnum_blocks_strZdataset_strZSCHEMA_LINE_CHAR_LIMITZMIN_FIELD_LENGTHZ
INDENT_STRrL   Zschema_str_on_new_lineZcol_strZshortened_suffixZchars_left_for_col_namer*   r*   r+   get_plan_as_string   s   














	


	z ExecutionPlan.get_plan_as_stringr<   r   c                 C   s   || _ | j| j _dS )zLink the logical plan into this execution plan.

        This is used for triggering execution for optimizer code path in this legacy
        execution plan.
        N)r9   r(   )r)   r<   r*   r*   r+   rb   @  s   zExecutionPlan.link_logical_planc                 C   s>   t | j| jd}| jdur| j|_| j|_| j|_| j|_|S )zCreate a shallow copy of this execution plan.

        This copy can be executed without mutating the original, but clearing the copy
        will also clear the original.

        Returns:
            A shallow copy of this execution plan.
        r   N)r   r   r(   r!   r   r    r&   r)   Z	plan_copyr*   r*   r+   copyI  s   	
zExecutionPlan.copyc                 C   sV   t t| j| j d}| jr%t| j|_t| j|_t| j|_| j|_|S )zCreate a deep copy of this execution plan.

        This copy can be executed AND cleared without mutating the original.

        Returns:
            A deep copy of this execution plan.
        rr   )r   rt   r   r(   r!   r   r    r&   rs   r*   r*   r+   	deep_copy^  s   
zExecutionPlan.deep_copyc                 C   s   | j j S )zGet the estimated number of blocks from the logical plan
        after applying execution plan optimizations, but prior to
        fully executing the dataset.)r9   r;   Zestimated_num_outputsr/   r*   r*   r+   rl   r  s   z ExecutionPlan.initial_num_blocksrS   zpyarrow.lib.Schemac                 C   s   | j dur| j S d}|  r| jj}n.| jj }|du rA|rA|  \}}}| tdd |D }W d   n1 s<w   Y  | 	| | j S )aK  Get the schema after applying all execution plan optimizations,
        but prior to fully executing the dataset
        (unless `fetch_if_missing` is set to True).

        Args:
            fetch_if_missing: Whether to execute the plan to fetch the schema.

        Returns:
            The schema of the output dataset.
        Nc                 s       | ]}|j V  qd S Nr_   .0bundler*   r*   r+   	<genexpr>      
z'ExecutionPlan.schema.<locals>.<genexpr>)
r#   r^   r!   r_   r9   r;   Zinfer_schemaexecute_to_iteratorr   cache_schema)r)   rS   r_   Ziter_ref_bundlesr.   r3   r*   r*   r+   r_   x  s   



zExecutionPlan.schemar_   c                 C   s
   || _ d S rw   )r#   )r)   r_   r*   r*   r+   r     s   
zExecutionPlan.cache_schemac                 C   s   | j j jS )z1Get the input files of the dataset, if available.)r9   r;   infer_metadatainput_filesr/   r*   r*   r+   r     s   zExecutionPlan.input_filesc                 C   sN   | j j}|  rtdd | jjD }|S | jdur#| j}|S d}|S )zGet the number of rows after applying all plan optimizations, if possible.

        This method will never trigger any computation.

        Returns:
            The number of records of the result Dataset, or None.
        c                 s   rv   rw   )r`   )rz   mr*   r*   r+   r|         z+ExecutionPlan.meta_count.<locals>.<genexpr>N)r9   r;   r^   sumr!   rQ   r   r`   )r)   r;   r`   r*   r*   r+   rc     s   
zExecutionPlan.meta_countc                 C   s   d| _ |  r|  }t|g| jdfS ddlm} |  }||| }t|}zt	t
|g|}W n	 ty;   Y nw | | _|| j|fS )a  Execute this plan, returning an iterator.

        This will use streaming execution to generate outputs.

        NOTE: Executor will be shutdown upon either of the 2 following conditions:

            - Iterator is fully exhausted (ie until StopIteration is raised)
            - Executor instances is garbage-collected

        Returns:
            Tuple of iterator over output RefBundles, DatasetStats, and the executor.
        TNr   )!execute_to_legacy_bundle_iterator)r'   r^   executeiterr    *ray.data._internal.execution.legacy_compatr   r4   	itertoolschainnextStopIteration	get_stats)r)   r{   r   r3   Zbundle_itergenr*   r*   r+   r~     s   

z!ExecutionPlan.execute_to_iteratorpreserve_orderc              
      s  d| _ | j}t dstdrtd |  sddl	m
}m} t| jjtrZ| jj durZ|| | jj }tdd	 |D }td
d	 |D }tdd |D ||d}n?|  }	||	| | j|d}
tt|
 |
j|
 d}W d   n1 sw   Y  |	  jdd}|jrt| z%tt t! j"}|j#j$dkrt%|j#j&_'|j#j(dkrt%|j#j)_*W n t+y } zt,d|  W Y d}~nd}~ww d_- fdd   || _.| jj| _/| _0| j| j0_1| j.S )zExecutes this plan (eagerly).

        Args:
            preserve_order: Whether to preserve order in execution.

        Returns:
            The blocks of the output dataset.
        TZCPUZcpu_warninga<  Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/latest/data/data-internals.html#ray-data-and-tuner   )_get_initial_stats_from_planexecute_to_legacy_block_listNc                 s   rv   rw   )owns_blocksry   r*   r*   r+   r|     r   z(ExecutionPlan.execute.<locals>.<genexpr>c                 s   rv   rw   rx   ry   r*   r*   r+   r|     r}   c                 S   s$   g | ]}|j D ]\}}||fqqS r*   )blocks)rz   r{   blockrQ   r*   r*   r+   
<listcomp>
  s    z)ExecutionPlan.execute.<locals>.<listcomp>)r   r_   )dataset_uuidr   F)Zinclude_parentzLSkipping recording memory spilled and restored statistics due to exception: c                    s0    j | jdd7  _ | jD ]} | qd S )NZobj_store_mem_spilledr   )dataset_bytes_spilledZextra_metricsgetparents)Z	cur_statsrR   collect_statsr   r*   r+   r   =  s   

z,ExecutionPlan.execute.<locals>.collect_stats)2r'   r(   rayZavailable_resourcesr   r   loggerwarningr^   r   r   r   rG   r9   r;   r   Zoutput_dataallr   r   r4   r$   tupleZiter_blocks_with_metadataZ_owned_by_consumerZ
get_schemar   Z
to_summaryZ	to_stringZenable_auto_log_statsinfor	   r
   Zget_runtime_contextZgcs_addressZstore_statsZspill_time_total_sintZspilled_bytes_totalZglobal_bytes_spilledZrestore_time_total_sZrestored_bytes_totalZglobal_bytes_restored	Exceptiondebugr   r!   r   r    r   )r)   r   contextr   r   Zoutput_bundlesr   r_   r{   r3   r   Zstats_summary_stringZreplyer*   r   r+   r     s   




zExecutionPlan.executec                 C   s   | j S )zBReturn ``True`` if this plan has been partially or fully executed.)r'   r/   r*   r*   r+   has_started_executionN  s   z#ExecutionPlan.has_started_executionNc                 C   s   d| _ d| _d| _dS )z;Clear the snapshot kept in the plan to the beginning state.N)r!   r   r    r/   r*   r*   r+   clear_snapshotS  s   
zExecutionPlan.clear_snapshotc                 C   s   | j s	ti ddS | j S )zqReturn stats for this plan.

        If the plan isn't executed, an empty stats object will be returned.
        NrP   )r    r   r/   r*   r*   r+   r   Y  s   zExecutionPlan.statsc                 C   s   t dd | j D S )z/Return whether this plan has lazy input blocks.c                 s   s    | ]}t |tV  qd S rw   )rG   r   )rz   r?   r*   r*   r+   r|   d  s    z/ExecutionPlan.has_lazy_input.<locals>.<genexpr>)r   r9   sourcesr/   r*   r*   r+   has_lazy_inputb  s   zExecutionPlan.has_lazy_inputc                 C   s   | j duo| j| jjkS )ztWhether this plan has a computed snapshot for the final operator, i.e. for
        the output of this plan.
        N)r!   r   r9   r;   r/   r*   r*   r+   r^   f  s   
z!ExecutionPlan.has_computed_outputc                 C   sB   ddl m} ddlm} | jj D ]}t|||fr dS qdS )z-Whether this plan requires to preserve order.r   )Sort)ZipTF)Z8ray.data._internal.logical.operators.all_to_all_operatorr   Z3ray.data._internal.logical.operators.n_ary_operatorr   r9   r;   Zpost_order_iterrG   )r)   r   r   r?   r*   r*   r+   require_preserve_ordero  s   z$ExecutionPlan.require_preserve_order)r-   r   )r>   r   TF)r<   r   )r-   r   )F)r-   N),rT   
__module____qualname____doc__r   r   r,   re   r0   r4   r6   r=   staticmethodr   r   boolr:   r   rq   rb   rt   ru   r   rl   r   rd   r_   r   r   r   rc   r   r   r   r   r~   r   propertyr   r   r   r   r^   r   r*   r*   r*   r+   r   $   sx    
*
 

	


!%v
		r   )0rt   r   loggingtypingr   r   r   r   r   r   r   Zpyarrowr   Zray._private.internal_apir	   r
   Z'ray.data._internal.execution.interfacesr   Z%ray.data._internal.logical.interfacesr   Z6ray.data._internal.logical.interfaces.logical_operatorr   Z2ray.data._internal.logical.interfaces.logical_planr   Z.ray.data._internal.logical.interfaces.operatorr   Z2ray.data._internal.logical.operators.read_operatorr   Zray.data._internal.statsr   Zray.data.blockr   r   Zray.data.contextr   Zray.data.exceptionsr   Zray.util.debugr   r2   r   r]   r   ZINHERITABLE_REMOTE_ARGS	getLoggerrT   r   r   r*   r*   r*   r+   <module>   s0    $
