o
    1 i                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlmZmZ d dlmZmZmZmZmZ d dlmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3mZ4 d dl5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z=m>Z> erd dlm?Z? d dl@mAZA eBeCZDe
G dd dZEG dd dZFG dd dZGe6G dd dZHeI ZJeeK eLd< daMeeH eLd< deeNeOf d ee> fd!d"ZPddd$d%ZQd#eeH fd&d'ZRd(d) ZSd*d+ ZTd,ee# d#e#fd-d.ZUd/e#d#dfd0d1ZVded2efd3d4ZWe7d5d6eW ddd7d8ed9ee! d:eeN d#dfd;d<ZXe7d5d6eW d#ee! fd=d>ZYe7d?d6eW d#eeNef fd@dAZZe7d?d6eW d#eNfdBdCZ[e7d?d6eW d#eNfdDdEZ\e7d?d6eW d#eNfdFdGZ]e7dHd6eW d#eNfdIdJZ^e7d?d6eW dfdLdMZ_e7d?d6eW d#eNfdNdOZ`e7d?d6eWdPdQd#eafdRdSZbe7d?d6eWd dQd#eafdTdUZce7d?d6eWd dQd#eafdVdWZde7d?d6eWd dQd#eafdXdYZee7d?d6eWd dQd#eafdZd[Zfe7d5d6eW 	ded\eeN d#ed] fd^d_Zge6eW d#e%fd`daZhd#eifdbdcZjdS )g    N)	dataclass)datetime)TYPE_CHECKINGAnyCallableDictOptionalSetType)RunnerThreadStartTraceback)_ERROR_FETCH_TIMEOUT_RESULT_FETCH_TIMEOUTSESSION_MISUSE_LOG_ONCE_KEYTIME_THIS_ITER_S	TIMESTAMP)Dataset)
Checkpoint)Accelerator)StorageContext)CHECKPOINT_DIR_NAMEDETAILED_AUTOFILLED_KEYSRAY_CHDIR_TO_TRIAL_DIRTIME_TOTAL_SWORKER_HOSTNAMEWORKER_NODE_IP
WORKER_PID_v2_migration_warnings_enabledSessionMisuseError)_log_deprecation_warning)queue)DeveloperAPI	PublicAPI)log_once)_valid_resource_shape) PlacementGroupSchedulingStrategySchedulingStrategyT)DataIterator)PlacementGroupFactoryc                   @   sj   e Zd ZU dZeed< eed< eeef ed< eed< eed< eed< dZe	e ed	< dZ
e	e ed
< dS )	TrialInfoz3The trial information to propagate to TrainSession.nameid	resourceslogdirZ	driver_ipZdriver_node_idNexperiment_namerun_id)__name__
__module____qualname____doc__str__annotations__r   floatr/   r   r0    r8   r8   g/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/train/_internal/session.pyr*   8   s   
 r*   c                   @   s8   e Zd ZdZdejfddZddeded fd	d
Z	dS )_FutureTrainingResultzA future that will be resolved to a `_TrainingResult`.

    This is needed for specific schedulers such as PBT that schedule saves.

    This wrapper should be removed after refactoring PBT to not schedule saves anymore.
    futurec                 C   s
   || _ d S N)r;   )selfr;   r8   r8   r9   __init__N   s   
z_FutureTrainingResult.__init__Tblockreturn_TrainingResultc              
   C   sj   |rd}nd}z	t j| j|dW S  ty   Y dS  ty4 } ztd|  W Y d}~dS d}~ww )zResolve into ``_TrainingResult``.

        This will return None for function trainables if no checkpoint has been
        saved before.
        Ng&.>timeoutzError resolving result: )raygetr;   TimeoutError	Exceptionloggererror)r=   r?   rC   excr8   r8   r9   resolveQ   s   z_FutureTrainingResult.resolveN)T)
r1   r2   r3   r4   rD   Z	ObjectRefr>   boolr   rK   r8   r8   r8   r9   r:   F   s    r:   c                   @   s<   e Zd ZdZdee deeef fddZ	defddZ
d	S )
rA   z4A (checkpoint, metrics) result reported by the user.
checkpointmetricsc                 C   s   || _ || _d S r<   rM   rN   )r=   rM   rN   r8   r8   r9   r>   g   s   
z_TrainingResult.__init__r@   c                 C   s   d| j  d| j dS )NzTrainingResult(checkpoint=z
, metrics=)rO   r=   r8   r8   r9   __repr__k   s   z_TrainingResult.__repr__N)r1   r2   r3   r4   r   r   r   r5   r   r>   rR   r8   r8   r8   r9   rA   d   s    rA   c                   @   s  e Zd ZdZ							dJdedee dee dee dee d	ee d
ee deee	e
f  dee	ef dee dedee defddZde	defddZde	defddZdd Zdd Z	dKded
edefddZd d! ZdKd"ee dee fd#d$Zdee fd%d&Zd'd( Zd)edee fd*d+Zd,edefd-d.Zd,edefd/d0ZdLd1d2Zd3eddfd4d5Z dKd6edee ddfd7d8Z!e"de	fd9d:Z#e"de	fd;d<Z$e"de	fd=d>Z%e"de	fd?d@Z&e"dMdBdCZ'e"de	fdDdEZ(	dKdFee	 dedG fdHdIZ)dS )N_TrainSessionz.Holds information for training on each worker.NFtraining_func
world_rank
local_rank	node_ranklocal_world_size
world_size
trial_infodataset_shardmetadatarM   detailed_autofilled_metricsstoragesynchronous_result_reportingc                 C   s   || _ || _|	| _|| _|| _|| _|| _|| _|sJ t	d| d|  | j
||||
d || _t | _d| _d| _|  | _d | _i | _d S )Nz StorageContext on SESSION (rank=z):
)rT   rZ   r^   loaded_checkpointr   g        )r_   r[   r\   rU   rV   rW   rX   rY   rH   debugresetr]   timelast_report_time	iteration
time_totalget_current_iplocal_ipaccelerator_state)r=   rT   rU   rV   rW   rX   rY   rZ   r[   r\   rM   r]   r^   r_   r8   r8   r9   r>   t   s.   


z_TrainSession.__init__keyr@   c                 C   s   | j |S r<   )rj   rE   )r=   rk   r8   r8   r9   	get_state   s   z_TrainSession.get_statevaluec                 C   s   || j |< d S r<   )rj   )r=   rk   rm   r8   r8   r9   	set_state   s   z_TrainSession.set_statec                 C   s   t j | _| jS r<   )rD   utilZget_node_ip_addressrh   rQ   r8   r8   r9   rg      s   z_TrainSession.get_current_ipc                 C   s   d| _ | j  dS )zStarts the training thread.TN)training_startedtraining_threadstartrQ   r8   r8   r9   rr      s   z_TrainSession.startc                 C   s   t d| _t  | _td| _d | _td| _	t
|d| j	d| _|| _|| _|| _i | _d| _d| _d| _tj|jdd tttjtdr\td|j  t|j d S d S )	Nr      T)targetdaemonerror_queueF)exist_ok1z#Changing the working directory to: )	threading	Semaphorecontinue_lockEvent
stop_eventr!   Queueresult_queue_inter_actor_queuerv   r   rq   rZ   r^   r`   rj   ignore_reportrp   _first_reportosmakedirsZtrial_working_directoryrL   intenvironrE   r   rH   ra   chdir)r=   rT   rZ   r^   r`   r8   r8   r9   rb      s,   

z_TrainSession.resetc                 C   s
   d| _ dS )z-Ignore all future ``session.report()`` calls.TN)r   rQ   r8   r8   r9   pause_reporting   s   
z_TrainSession.pause_reportingrC   c                 C   s>   | j   | j  | jjdd d}| jr| jj|d}|S )zSFinishes the training thread.

        Raises any Exception from training.
        TforceNrB   )	r}   setr{   releaser^   persist_artifactsrp   rq   join)r=   rC   outputr8   r8   r9   finish   s   

z_TrainSession.finishc                 C   s   | j std| jr| js| j  d| _d}|du r/| j r/| jdd}|du r/| j s |du r9| jdd}|du rD| j	dd n
| j
 sNtd | jsV| j  |S )zGets the next ``_TrainingResult`` from the result queue.

        If the result queue is empty, then this function returns ``None``.
        z*Please call start before calling get_next.FNTr?   zVRunner error waiting to be raised in main thread. Logging all available results first.)rp   RuntimeErrorr_   r   r{   r   rq   is_alive_get_result_from_queues_report_thread_runner_errorrv   emptyrH   ra   )r=   resultr8   r8   r9   get_next  s*   


z_TrainSession.get_nextc                 C   s$   | j du rtjdddid| _ | j S )z$Get or create the inter-actor queue.Nrs   Znum_cpusr   )Zactor_options)r   	ray_queuer~   rQ   r8   r8   r9    _get_or_create_inter_actor_queueB  s   
z._TrainSession._get_or_create_inter_actor_queuer?   c                 C   s~   d}| j dur(z| j j|td}|r| j  | | W n
 tjy'   Y nw z| jj|td}W |S  t	jy>   Y |S w )zUGet result from result queue. Pass result from training actor result queue if needed.Nr?   rC   )
r   rE   r   r{   r   reportr   Emptyr   r!   )r=   r?   r   Zinter_actor_itemr8   r8   r9   r   H  s(   


z%_TrainSession._get_result_from_queuesr   c              
   C   s   t   }t }t|v r|t }n|| j }|  jd7  _|  j|7  _|| _ttt 	|
 t| jtt tt t| ji}| jsLdd | D }| }|| |S )-Add autofilled metrics and update attributes.rs   c                 S   s   i | ]\}}|t vr||qS r8   )r   .0kvr8   r8   r9   
<dictcomp>q  s
    z4_TrainSession._auto_fill_metrics.<locals>.<dictcomp>)rc   r   nowr   rd   re   rf   r   r   mktime	timetupler   r   r   getpidr   platformnoder   rh   r]   itemscopyupdate)r=   r   current_timecurrent_datetimeZtime_this_iterauto_filled_metricsr8   r8   r9   _auto_fill_metrics\  s*   


z _TrainSession._auto_fill_metricsc                 C   s4   t  }ttt| i}| }|| |S )r   )	r   r   r   r   rc   r   r   r   r   )r=   r   r   r   r8   r8   r9   _auto_fill_checkpoint_metrics{  s   
z+_TrainSession._auto_fill_checkpoint_metricsc                 C   s.   z| j j|td}t| tjy   Y d S w )Nr   )rv   rE   r   r   r!   r   )r=   r?   er8   r8   r9   r     s   z)_TrainSession._report_thread_runner_errortraining_resultc                 C   sN   |j r|j | _| jj|dd | j  | j r%| j  t	
d dS dS )a  Place a training result on the result queue for the main thread to process,
        then block until the main thread signals that training should continue.

        NOTE: This is used internally to report results from Train to Tune
        without persisting checkpoints to storage 2 times.
        `report` is the public API that directly persists to storage, which
        should only be called by user code.
        Tr   r   N)rM   r`   r   putr{   acquirer}   is_setclearsysexit)r=   r   r8   r8   r9   _report_training_result  s   	


z%_TrainSession._report_training_resultrN   c           
      C   s   dt jv rddlm} ||rtd| jrd S | |}d }|r4| j| | j	|}| jj
|t< nd |t< |o>| jjj}| jj|d |rf| jrf| }| j D ]\}}||vr`|||< qT|| t||d}	| |	 d S )NZtorchr   )contains_tensorzPassing objects containg Torch tensors as metrics is not supported as it will throw an exception on deserialization. You can either convert the tensors to Python objects or report a `train.Checkpoint` with `ray.train.report` to store your Torch objects.r   rO   )r   modulesZray.air._internal.torch_utilsr   
ValueErrorr   r   r^   Z_update_checkpoint_indexZpersist_current_checkpointcheckpoint_dir_namer   Zsync_configZsync_artifacts_on_checkpointr   r\   get_metadatar   Zset_metadatarA   r   )
r=   rN   rM   r   Zpersisted_checkpointZforce_artifact_syncZuser_metadatar   r   r   r8   r8   r9   r     s8   



z_TrainSession.reportc                 C      | j jS r<   )rZ   r/   rQ   r8   r8   r9   r/        z_TrainSession.experiment_namec                 C   r   r<   )rZ   r+   rQ   r8   r8   r9   
trial_name  r   z_TrainSession.trial_namec                 C   r   r<   )rZ   r,   rQ   r8   r8   r9   trial_id  r   z_TrainSession.trial_idc                 C   r   r<   )rZ   r0   rQ   r8   r8   r9   r0     r   z_TrainSession.run_idr)   c                 C   r   r<   )rZ   r-   rQ   r8   r8   r9   trial_resources  r   z_TrainSession.trial_resourcesc                 C   r   r<   )rZ   r.   rQ   r8   r8   r9   	trial_dir  r   z_TrainSession.trial_dirdataset_namer(   c                 C   s@   | j }|d u rtd |S t|tr|std||S |S )NziNo dataset passed in. Returning None. Make sure to pass in a Dataset to Trainer.run to use this function.zMultiple datasets were passed into ``Trainer``, but no ``dataset_name`` is passed into ``get_dataset_shard``. Please specify which dataset shard to retrieve.)r[   warningswarn
isinstancedictr   rE   )r=   r   Zshardr8   r8   r9   get_dataset_shard  s   

z_TrainSession.get_dataset_shard)NNNNFNFr<   )Fr@   r)   )*r1   r2   r3   r4   r   r   r   r*   r   r5   r   r   r   rL   r   r>   rl   rn   rg   rr   rb   r   r7   r   rA   r   r   r   r   r   r   r   r   r   propertyr/   r   r   r0   r   r   r   r8   r8   r8   r9   rS   p   s    
	


>

15
3rS   _checked_resources_sessionr-   strategyc           
      C   s  t dd | D }|r|tv rdS t|tr|jdu rdS tj }|r-|jj	|j	kr/dS t
| t }|jrB|jdd }n|jdd }t||rPdS | jrcd}| jd | j d | j }n
d}| jd | j }|jd }	d	d
 | D }td| d| d|	 d| d| d)zLaunch hook to catch nested tasks that can't fit in the placement group.

    This gives users a nice warning in case they launch a nested task in a Tune trial
    without reserving resources in the trial placement group to fit it.
    c                 S   s    h | ]\}}|d kr||fqS r   r8   r   r8   r8   r9   	<setcomp>  s     z3_tune_task_and_actor_launch_hook.<locals>.<setcomp>Nr   rs   actor.taskc                 S   s"   i | ]\}}|d kr|t |qS r   )r7   r   r8   r8   r9   r   D  s   " z4_tune_task_and_actor_launch_hook.<locals>.<dictcomp>z3No trial resources are available for launching the z `zg`. To resolve this, specify the Tune option:

>  resources_per_trial=tune.PlacementGroupFactory(
>    [z] + [zB] * N
>  )

Where `N` is the number of slots to reserve for trial zs. If you are using a Ray training library, there might be a utility function to set this automatically for you. For more information, refer to https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html)	frozensetr   r   r   r&   Zplacement_grouprD   ro   Zget_current_placement_groupr,   addget_trial_resourcesZhead_bundle_is_emptyZbundle_specsr%   
class_namemodule_nameZfunction_namer   )
fnr-   r   rk   Zcur_pgZpgfZavailable_bundlesZ	submittedr+   Zmain_resourcesr8   r8   r9    _tune_task_and_actor_launch_hook  sB   





r   r@   c                  O   sD   t rtdddlm}m} dtjvrt|_t|_	t
| i |a d S )NzIA Train session is already in use. Do not call `init_session()` manually.r   )r   remote_functionZTUNE_DISABLE_RESOURCE_CHECKS)r   r   rD   r   r   r   r   r   Z_actor_launch_hookZ_task_launch_hookrS   )argskwargsr   r   r8   r8   r9   init_sessionS  s   
r   c                   C   s   t S r<   r   r8   r8   r8   r9   get_sessione  s   r   c                   C   s   da dS )z#Shuts down the initialized session.Nr   r8   r8   r8   r9   shutdown_sessioni  s   r   c                   C   s   t d)zKRaises a SessionMisuseError because a utility function was used improperly.zjprepare/accelerate utility functions should be called inside a training function executed by `Trainer.run`r   r8   r8   r8   r9   !_raise_accelerator_session_misuseo  s   r   default_accelerator_clsc                 C   s,   t  }|du r
t  |jdu r|  |_|jS )zThe accelerator for this training session.

    If an accelerator has not been set, then this method will construct an
    accelerator using the provided accelerator class.

    Raises:
        SessionMisuseError: if the session is uninitialized.
    N)r   r   ri   )r   sessionr8   r8   r9   get_acceleratorw  s   	
r   ri   c                 C   s0   t  }|du r
t  |jdurtd| |_dS )a   Sets the accelerator for this training session.

    Args:
        accelerator: The accelerator to use for training.

    Raises:
        SessionMisuseError: if the session is unitialized.
        RuntimeError: if the accelerator has already been set.
    Nz#Cannot change accelerator once set.)r   r   ri   r   )ri   r   r8   r8   r9   set_accelerator  s   


r   default_valuec                    s   dt f fdd}|S )zKWarns if fn is being used outside of session and returns ``default_value``.r   c                    s$    j t  fdd}|S )Nc                     sF   t  }|stt d rtd d  d  S | i |S )N-`zb` is meant to only be called inside a function that is executed by a Tuner or Trainer. Returning `z`.)r   r$   r   r   r   )r   r   r   )r   r   fn_namer8   r9   wrapper  s   z4_warn_session_misuse.<locals>.inner.<locals>.wrapper)r1   	functoolswraps)r   r   r   )r   r   r9   inner  s   z#_warn_session_misuse.<locals>.inner)r   )r   r   r8   r   r9   _warn_session_misuse  s   r   Zstable)Z	stability)rM   r   rN   rM   r   c                C   s^   |dur	t d ddlm} | r%ddl}t rtd |jj| |dS t	 j| |d dS )a  Report metrics and optionally save a checkpoint.

    If a checkpoint is provided, it will be
    :ref:`persisted to storage <persistent-storage-guide>`.

    If this is called in multiple distributed training workers:

    - Only the metrics reported by the rank 0 worker will be tracked by Ray Train.
      See :ref:`the metrics logging guide <train-monitoring-and-logging>`.
    - A checkpoint will be registered as long as one or more workers reports
      checkpoint that is not None.
      See the :ref:`checkpointing guide <train-dl-saving-checkpoints>`.
    - Checkpoints from multiple workers will be merged into one directory
      in persistent storage.
      See :ref:`the distributed checkpointing guide <train-distributed-checkpointing>`.

    .. note::

        Each invocation of this method will automatically increment the underlying
        ``training_iteration`` number. The physical meaning of this "iteration" is
        defined by user depending on how often they call ``report``.
        It does not necessarily map to one epoch.

    .. warning::

        All workers must call `ray.train.report` the same number of times
        so that Ray Train can properly synchronize the training state across
        workers. Otherwise, your training will hang.

    .. warning::

        This method does NOT act as a barrier for distributed training workers.
        Workers will upload their checkpoint, then continue training immediately.
        If you need to synchronize workers, you can use a framework-native barrier
        such as `torch.distributed.barrier()`.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...
                       # torch.save(...)

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

                        # Example: Only the rank 0 worker uploads the checkpoint.
                        if ray.train.get_context().get_world_rank() == 0:
                            train.report(metrics, checkpoint=checkpoint)
                        else:
                            train.report(metrics, checkpoint=None)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Args:
        metrics: The metrics you want to report.
        checkpoint: The optional checkpoint you want to report.
    Nz`checkpoint_dir_name` is only supported in the new Ray Train implementation, which can be enabled with `RAY_TRAIN_V2_ENABLED=1`. This argument will be ignored.r   _in_tune_sessionz`ray.train.report` should be switched to `ray.tune.report` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)rM   )
rH   warning%ray.tune.trainable.trainable_fn_utilsr   ray.tuner   r    tuner   r   )rN   rM   r   r   rD   r8   r8   r9   r     s   Xr   c                  C   s:   ddl m}  |  rddl}t rtd |j S t jS )a  Access the latest reported checkpoint to resume from if one exists.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
                        train.report(metrics, checkpoint=checkpoint)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Returns:
        Checkpoint object if the session is currently being resumed.
            Otherwise, return None.
    r   r   Nz`ray.train.get_checkpoint` should be switched to `ray.tune.get_checkpoint` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)	r   r   r   r   r    r   get_checkpointr   r`   )r   rD   r8   r8   r9   r   $  s   ,
r   betac                   C      t  jS )z5User metadata dict passed to the Trainer constructor.)r   r\   r8   r8   r8   r9   r   b     r   c                   C   r   )z,Experiment name for the corresponding trial.)r   r/   r8   r8   r8   r9   get_experiment_namei  r   r   c                   C   r   )z'Trial name for the corresponding trial.)r   r   r8   r8   r8   r9   get_trial_namep  r   r   c                   C   r   )z%Trial id for the corresponding trial.)r   r   r8   r8   r8   r9   get_trial_idw  r   r   alphac                   C   r   )z0Unique Train Run id for the corresponding trial.)r   r0   r8   r8   r8   r9   
get_run_id~  r   r  r)   c                   C   r   )z,Trial resources for the corresponding trial.)r   r   r8   r8   r8   r9   r     r   r   c                   C   r   )a  Log directory corresponding to the trial directory for a Tune session.
    If calling from a Train session, this will give the trial directory of its parent
    Tune session.

    .. testcode::

        import ray.tune

        def train_func(config):
            print(ray.tune.get_context().get_trial_dir())

        tuner = ray.tune.Tuner(train_func)
        tuner.fit()

    .. testoutput::
        :options: +MOCK

        /Users/root/ray_results/train_func_2023-07-19_15-01-37/train_func_d620c_00000_0_2023-07-19_15-01-40
    )r   r   r8   r8   r8   r9   get_trial_dir  s   r  rs   r   c                  C      t  } t| dstd| jS )a  Get the current world size (i.e. total number of workers) for this run.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        NUM_WORKERS = 2

        def train_loop_per_worker(config):
            assert train.get_context().get_world_size() == NUM_WORKERS

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=NUM_WORKERS),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rY   z`get_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   hasattrr   rY   r   r8   r8   r9   get_world_size  s   
r  c                  C   r  )a  Get the world rank of this worker.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        def train_loop_per_worker(config):
            if train.get_context().get_world_rank() == 0:
                print("Worker 0")

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rU   z`get_world_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r  r   rU   r  r8   r8   r9   get_world_rank     
r  c                  C   r  )a  Get the local rank of this worker (rank of the worker on its node).

    .. testcode::

        import torch

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            if torch.cuda.is_available():
                torch.cuda.set_device(train.get_context().get_local_rank())
            ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rV   z`get_local_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r  r   rV   r  r8   r8   r9   get_local_rank  s    
r	  c                  C   r  )a  Get the local world size of this node (i.e. number of workers on this node).

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_local_world_size())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    rX   z`get_local_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r  r   rX   r  r8   r8   r9   get_local_world_size  r  r
  c                  C   r  )a  Get the rank of this node.

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_node_rank())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    rW   z`get_node_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r  r   rW   r  r8   r8   r9   get_node_rankE  r  r  r   r(   c                 C   s"   t  }t|dstd|| S )a?  Returns the :class:`ray.data.DataIterator` shard for this worker.

    Call :meth:`~ray.data.DataIterator.iter_torch_batches` or
    :meth:`~ray.data.DataIterator.to_tf` on this shard to convert it to the
    appropriate framework-specific data type.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            ...
            for epoch in range(2):
                # Trainer will automatically handle sharding.
                data_shard = train.get_dataset_shard("train")
                for batch in data_shard.iter_torch_batches():
                    ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...

    Args:
        dataset_name: If a Dictionary of Datasets was passed to ``Trainer``, then
            specifies which dataset shard to return.

    Returns:
        The ``DataIterator`` shard to use for this worker.
        If no dataset is passed into Trainer, then return None.
    r   z`get_dataset_shard` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r  r   r   )r   r   r8   r8   r9   r   l  s   /

r   c                   C   r   )a2  Returns the :class:`~ray.train._internal.storage.StorageContext` storage
    context which gives advanced access to the filesystem and paths
    configured through `RunConfig`.

    NOTE: This is a developer API, and the `StorageContext` interface may change
    without notice between minor versions.
    )r   r^   r8   r8   r8   r9   get_storage  s   
r  c                   C   s   t t o
t jduS )z6Check if the current process is a Ray Train V1 worker.N)rL   r   rU   r8   r8   r8   r9   _in_ray_train_worker  s   r  )r@   Nr<   r   )kr   loggingr   r   r!   r   ry   rc   r   dataclassesr   r   typingr   r   r   r   r   r	   r
   rD   Zray.air._internal.utilr   r   Zray.air.constantsr   r   r   r   r   Zray.datar   Z	ray.trainr   Zray.train._internal.acceleratorr   Zray.train._internal.storager   Zray.train.constantsr   r   r   r   r   r   r   r   Zray.train.errorr   Zray.train.utilsr    Zray.utilr   Zray.util.annotationsr"   r#   Zray.util.debugr$   Zray.util.placement_groupr%   Zray.util.scheduling_strategiesr&   r'   r(   Z#ray.tune.execution.placement_groupsr)   	getLoggerr1   rH   r*   r:   rA   rS   r   r   r   r6   r   r5   r7   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r  r   r  r  r	  r
  r  r   r  rL   r  r8   r8   r8   r9   <module>   s   
 $(

   


@p<&%(%%7