o
    pi                    @  s  U d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZmZmZmZ d dlZd dlZd dlmZ d dlmZmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z& d dlm'Z' d dl(m)Z)m*Z+ d dl,m-Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z= d dl>m?Z?m@Z@mAZA d dlBmCZC d dlDmEZEmFZF d dlGmHZH ddlImJZJmKZKmLZL ddlMmNZNmOZOmPZPmQZQmRZRmSZSmTZT ddlUmVZVmWZWmXZXmYZYmZZZ ddl[m\Z\m]Z] ddl^m_Z_m`Z`maZa erzd dlbmcZcmdZd d dlemfZf d d lmgZg d d!lhmiZimjZjmkZkmlZl d d"lmmnZn d d#l moZo d d$lmpZp d d%lqmrZr d d&lsmtZt d d'lumvZv d d(lwmxZx dd)lymzZzm{Z{m|Z|m}Z}m~Z~mZmZmZmZ ed* Zd+ed,< G d-d. d.ed/d0Zdd3d4Zd5d6 ZG d7d8 d8eCjZ			dddGdHZ-G dIdJ dJeZdKdL ZdMdN Z	OddPdQZG dRdS dSeZ	ddTdUZdVdW ZdXdY Zdd_d`ZddbdcZ			dddldmZddodpZG dqdr dreHZG dsdt dtZG dudv dveZG dwdx dxeZG dydz dzeZG d{d| d|eZ		ddddZdddZG dd dZG dd de+jZG dd dZ					ddddZdddZG dd dZ			/	ddddZdd Zdd ZdS )    )annotationsN)OrderedDict)
MethodType)TYPE_CHECKINGAnyLiteral	TypedDict)_C_opsnnpir)OptimizerState)PyLayer)unique_name)switch_to_static_graph)EagerParamBaseVariabledefault_main_programin_dygraph_modein_pir_modeuse_pir_api)fleet)Enginestrategyshard_tensor)ProcessMesh)$mark_as_sharding_propagation_skip_op)get_default_distributed_context)DistributedOperator)convert_to_dims_mappingfuse_param_funcget_dist_attr
split_meshsplit_param_functo_list)align	alignmentget_current_device_type)core)DistributedBatchSampler_InfiniteIterableSampler)	Optimizer   )_enable_auto_dp_fake_replicate_grad_to_partialin_auto_dp_mode)_cal_local_shape_dist_reshape_dtensor_from_local_NdMeshAlltoAll_only_reshard_mesh_shape_reshard_mesh_shape_specific_alltoall_dim)check_placements_equalget_shard_specplacemetns_to_dist_status
to_dim_mapto_placements)determinate_rng	rng_state)ShardingOptimizerStage1get_mesh_comm_listget_placement_with_sharding)CallableSequence)	TypeAlias)Tensor)	DTypeLikeNestedNumericSequence	PlaceLike
TensorLike)
GradScaler)Program)	Placement)DistributedInputSpec)
DataLoader)Metric)Layer)	
_AMPConfig_DPOptimizationConfig_FusedPassesConfig_GradientMergeConfig_MPOptimizationConfig_PipelineConfig_RecomputeConfig_ShardingConfig_SPOptimizationConfigtrainevalpredictrC   _Modec                   @  sV   e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< ded< ded< dS )_ConfigrW   shardingrR   fused_passesrS   gradient_mergerU   pipelinerP   amprV   	recomputerT   mp_optimizationrQ   dp_optimizationrX   sp_optimizationN)__name__
__module____qualname____annotations__ rl   rl   k/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/auto_parallel/api.pyr^      s   
 r^   F)totaltensorpaddle.Tensorc                 C  sH   t  }|  r|  r||    |S d }|S ||   |S N)r(   DenseTensoris_dist_is_initialized_share_data_with_local_value
get_tensor)ro   Z	lodtensorrl   rl   rm   _to_lodtensor   s   rx   c                 C  s   |  |r| t|d  S d S rq   )
startswithlen)sprefixrl   rl   rm   _get_suffix   s   
r}   c                   @  s$   e Zd ZdZdd Zedd ZdS )DistAttra  
    DistAttr specifies how tensors are distributed or sliced on ProcessMesh.

    Args:
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        sharding_specs(list[str|None]): The specification describing how to shard the Tensor.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]], dim_names=['x', 'y'])
            >>> dist_attr = dist.DistAttr(mesh=mesh, sharding_specs=['x', 'y'])

            >>> print(dist_attr)

    c                   s   t  tjs
tdt |tstdtdd |D s J d|| _ fdd|D }tj|   | _	|| _
| d | d	 d S )
Nz?The mesh must be an instance of paddle.distributed.ProcessMesh.z/The sharding_specs must be an instance of list.c                 s  s"    | ]}t |tp|d u V  qd S rq   )
isinstancestr.0Zdim_namerl   rl   rm   	<genexpr>   s
    
z$DistAttr.__init__.<locals>.<genexpr>z@The dimension name in sharding_specs must be an instance of str.c                   s$   g | ]}|d ur j |ndqS N)	dim_namesindexr   meshrl   rm   
<listcomp>   s    z%DistAttr.__init__.<locals>.<listcomp>process_meshdims_mapping)r   r(   r   
ValueErrorlistall_sharding_specsTensorDistAttr__init__r   r   mark_annotated)selfr   sharding_specsr   rl   r   rm   r      s(   



zDistAttr.__init__c                 C     | j S )zl
        Get sharding_specs of the dist_attr
        Returns:
            list[str]: sharding_specs
        )r   r   rl   rl   rm   r      s   zDistAttr.sharding_specsN)rh   ri   rj   __doc__r   propertyr   rl   rl   rl   rm   r~      s
    r~   data+Tensor | TensorLike | NestedNumericSequencer   r   
placementsSequence[Placement]dtypeDTypeLike | NoneplacePlaceLike | Nonestop_gradientbool | NonereturnrD   c                 C  s  |du r	t j }t j|}|du rt| dd}t j r7t| tdtj	fs,J d| 
 s4J d| }n+t| trL|  sL| jdusIJ d| }nt| t jrY|du rY| }n	t j| |||d}t  rt| trdd	 }tj|f||d
|j}|j|_|jdur|j}	||||	 |S t j||||d}
|j|
_|
S t j rt j|||}
|j|
_|j|
_|
S t|||j}t|||S )a	  
    Creates a distributed Tensor (i.e., Tensor with distributed attributes or DistTensor for short)
    from the input data, which can be a scalar, tuple, list, numpy.ndarray, or paddle.Tensor.

    If the ``data`` is already a Tensor, it will be transformed into a distributed Tensor.

    Args:
        data(scalar|tuple|list|ndarray|Tensor): Initial data for the tensor.
            Can be a scalar, list, tuple, numpy.ndarray, paddle.Tensor.
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        placements(list[paddle.distributed.Placement]): the placements describe how to place the tensor on ProcessMesh, it can
            be Shard, Replicate and Partial.
        dtype(str|np.dtype, optional): The desired data type of returned tensor.
            It Can be 'bool' , 'float16' , 'float32' , 'float64' , 'int8' , 'int16' , 'int32' , 'int64' , 'uint8',
            'complex64' , 'complex128'. Default: None. If None, the the dtype is inferred from ``data``
            except for python float number, in which case the dtype is inferred from ``get_default_type`` .
        place(CPUPlace|CUDAPinnedPlace|CUDAPlace|str, optional): The place to allocate Tensor. Can be
            CPUPlace, CUDAPinnedPlace, CUDAPlace. Default: None, means global place. If ``place`` is
            string, It can be ``cpu``, ``gpu:x`` and ``gpu_pinned``, where ``x`` is the index of the GPUs.
        stop_gradient(bool, optional): Whether to block the gradient propagation of Autograd. If
            ``stop_gradient`` is None, set the returned Tensor's ``stop_gradient`` identical as the
            ``data.stop_gradient`` when ``data`` has ``stop_gradient`` attribute and True otherwise.
            Default: None.

    Returns:
        Tensor: A Tensor constructed from ``data`` with distributed attributes.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]], dim_names=['x', 'y'])

            >>> # dense tensor
            >>> a = paddle.to_tensor([[1,2,3],
            ...                       [5,6,7]])

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> # distributed tensor
            >>> d_tensor = dist.shard_tensor(a, mesh, [dist.Shard(0), dist.Shard(1)])

            >>> print(d_tensor)

    Nr   Tzinput tensor is not pir value.zAshard_tensor() input data only supported dense tensor type right.z:Get an uninitialized param with an unregistered init_func.)r   r   r   c                   s.   j D ]
}| rJ dq fdd}|S )NzLazy init not support partial reshard. Notice that: shard a param to partial won't save any memory, but will increase the communication cost!c                   sb   t  jjvr
d S tt  jjd}t|  | | W d    d S 1 s*w   Y  d S )Nr   r   )distget_rankr   process_idsr<   r   r=   )varblockZrng_nameorigin_hookparamrl   rm   
_init_funcO  s   
"z8shard_tensor.<locals>.lazy_init_hook.<locals>._init_func)r   
is_partial)r   r   	placementr   rl   r   rm   lazy_init_hookG  s   

z$shard_tensor.<locals>.lazy_init_hookr   )r   r   r   )paddle	framework_current_expected_place_get_paddle_placegetattrr   r   typer   ValueZis_dense_tensor_typer   rt   r   rD   	to_tensorin_dynamic_modefrom_tensor__dict__r   Zset_init_funcr	   r   persistabler8   ndimshard_tensor_static)r   r   r   r   r   r   ro   r   Z
dist_paramZorigin_init_funcdist_tensorr   rl   rl   rm   r      sj   6





r   c                   @  s(   e Zd Ze	dddZedd ZdS )_moe_global_mesh_tensorNc                 C  s   t  rE|| }| r|j}	| }
n|}
d }	| t||jt|t| t j	
 }t j	|}t j|
||||d}|j|_|S | t|t|t|t| t j||||||}|d j|_|d j|_|S )Ndimsr   r   r   r   )r   r   rs   r   rv   save_for_backwardcopydeepcopyshaper   r   r   rD   r   r	   moe_global_mesh_tensorr   )ctxlocal_tensor_listlocal_mesh_listlocal_placementsr   r   global_dimsidxlocal_tensor
local_meshZ	local_valr   global_tensorr   rl   rl   rm   forward  sR   

z_moe_global_mesh_tensor.forwardc              
   C  s   t  rD|  \}}}}|d u r| S t j }t j|}g }t|D ]\}}	|t j	| ||	||d |d 
 d q%|S |  \}}
}}t j|||||
S )Nr   r   T)r   r   saved_tensorrv   r   r   r   	enumerateappendrD   rw   _unsafe_set_skip_check_meshr	   moe_sub_mesh_tensors)r   grad_tensorglobal_meshZ
local_dimsr   r   r   outir   global_placementsrl   rl   rm   backward  sB   

	z _moe_global_mesh_tensor.backwardrq   rh   ri   rj   staticmethodr   r   rl   rl   rl   rm   r     s    	:r   c                 C  sR   | d u s|d u s|d u rt dt| |}t|}|t|k r%t ||< ||fS )NzMthe args global_mesh, global_placements and local_mesh_dim should all be set.)r   r"   r   rz   r   	Replicate)r   r   Zsub_mesh_dimZsub_mesh_listr   rl   rl   rm   $_get_sub_meshes_and_local_placements  s   
r   c                 C  sV   t | }t|D ] \}}| r(| }|| dkrq|| }||j|  ||< q|S r   )r   r   is_shardget_dimr   )local_shaper   r   global_shaper   r   	shard_dimlocal_dim_sizerl   rl   rm   _cal_global_shape  s   r   r   c              	   C  s  t |}t|||\}}t|j|j}t|t	
 k}|d jdkr*d}n|| d }| | }	t r|d jdkrKt| d j|d |}
n| |  j}
t|
||}g }t| D ]2\}}| d t|j|rv|j|| kr|t||| | |d  d q^|| q^t|||||||S tj rt|	j||}tj| |||||}| d j |_ | d j!|_!|S t"d)Nr   Tr   zEdtensor_from_local_list() are only supported in dynamic and pir mode.)#r   r   r   nparrayr   reshaper   wherer   r   sizer   r   r0   rv   r   r   rw   r   r7   r   r   r   reshardr   applyr   r   _local_shaper	   r   r   r   NotImplementedError)r   r   r   local_mesh_dimr   r   r   local_coordlocal_tensor_idxr   Zlocal_tensor_shaper   Zresharded_local_tensor_listr   ro   r   rl   rl   rm   r     sv   


	r   c                   @  s0   e Zd Ze					dddZedd ZdS )_moe_sub_mesh_tensorsNc                 C  s  |  t|||t|||j t r|d u r!|d u r!| S |d u s)|d u r-td|j}||jkr9tdt	||j
sKJ d| d|j
 dt|j||}t|D ]\}	}
|
 rq|
 }|| }||d j|	  ||< qVtj }tj|}g }t|D ] \}}tj| ||||d}| d |j|_|| q|S tj rtj|||||}|D ]
}|j|_|j|_q|S d S )	NzAthe args global_mesh and global_placements should be set togetherzAthe global_mesh should be the same as dist_tensor's process_mesh.zthe global_placements (z,) is not equal to dist_tensor's placements (z).r   r   T)r   r   r   r   r   r   rv   r   r   r7   r   r0   r   r   r   r   r   r   rD   rw   r   r   r   r   r	   r   r   )r   r   r   r   r   r   r   Zori_meshr   r   r   r   r   r   r   r   r   r   local_tensorsrl   rl   rm   r   N  s~   




z_moe_sub_mesh_tensors.forwardc                 G  s   |   \}}}}}}tj }tj|}|}	t|	j|	j	}
t
|
t k}|d jdkr4d}n|| d }|| }t r[tj }tj|}tj| ||	||d}|S tj rrt|j|	|}tj||||||S d S )Nr   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rD   rv   r   r   r   r	   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   Z
local_gradr   r   rl   rl   rm   r     sR   	


z_moe_sub_mesh_tensors.backwardNNNNNr   rl   rl   rl   rm   r   M  s    Jr   c                 C  s|   t |}t|||\}}tj rt| |||||S tj r:tj	
| ||||}|D ]
}| j|_| j|_q-|S td)zW
    Get the local part of the ``dist_tensor`` on the specific ``local_mesh_dim``.
    z7moe_sub_mesh_tensors is only supported in dynamic mode.)r   r   r   r   r   r   r   r   r   r	   r   r   r   r   )r   r   r   r   r   r   r   r   rl   rl   rm   r     s8   



r   c                 C  sX   t  r|  du r|  rtdt jj| ||S t j	 r(t j
| ||S td)NTz#The input should be a local tensor.z?dtensor_from_local() are only supported in dynamic or pir mode.)r   r   rs   rt   r   baser(   dtensor_from_localr   r   r	   RuntimeError)r   r   r   rl   rl   rm   r     s   
r   c                 C  sP   t  r|  du rtdt jj| ||S t j r$t j	| ||S t
d)NF)The input should be a distributed tensor.z=dtensor_to_local() are only supported in dynamic or pir mode.)r   r   rs   r   r   r(   dtensor_to_localr   r   r	   r   )r   r   r   rl   rl   rm   r      s   
r   fnCallable[..., Tensor]argsr   kwargsc                 O  s   | |i |}t |||S )a  
    Construct a Distributed Tensor from a function of arguments.

    Args:
        fn (callable): A callable function that creates and returns a tensor, such as paddle.ones, paddle.zeros, etc.
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        placements(list[paddle.distributed.Placement]): the placements describe how to place the tensor on ProcessMesh, it can
            be Shard, Replicate and Partial.
        *args (tuple): A tuple of arguments to be passed to the ``fn`` function.
        **kwargs (dict): A dict of arguments to be passed to the ``fn`` function.

    Returns:
        Tensor: A Tensor constructed from ``fn`` with distributed attributes.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist
            >>> # Create a distributed attribute
            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> # Call the function dtensor_from_fn with dist_attr parameter
            >>> d_tensor = dist.dtensor_from_fn(paddle.ones, mesh, [dist.Replicate()], shape=[1])
            >>> print(d_tensor)

    r   )r   r   r   r   r   ro   rl   rl   rm   dtensor_from_fn  s   !r  r   c                 C  s2  t | ||rt| | j||S tj rt|| jdd\}}}t	 }||_
||_|d |d t|dkrG| D ]
\}}||| q<t|dkrbg }	| D ]	\}}
|	| qS||	 t| ||}|durtt| |||S t| ||rt| | j||S tjj| |S t rtj| ||S t| tsJ d|  dt||| j}t }t }| j t!"d	#d
dg| j$| j| j%| j&| j'd}t(||}| j)dd| gid|gid}t*|}||j+_|j+d d|j+_,|j+-| j.}||_/|d |j+0|j.}||_/|d |1| t2| |S )aO  
    Reshard a distributed ``paddle.Tensor`` with given distributed attributes.

    Args:
        dist_tensor(Tensor): the distributed tensor to be resharded.
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        placements(list[paddle.distributed.Placement]): the placements describe how to place the tensor on ProcessMesh, it can
            be Shard, Replicate and Partial.

    Returns:
        Tensor: A Distributed Tensor resharded with distributed attributes.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])

            >>> # dense tensor
            >>> a = paddle.ones([10, 20])

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> # distributed tensor
            >>> d_tensor = dist.shard_tensor(a, mesh, [dist.Partial()])

            >>> out_d_tensor = dist.reshard(d_tensor, mesh, [dist.Replicate()])

            >>> print(out_d_tensor)

    T)Zreturn_split_factorr   r   r   Nz@in dy2static mode, reshard's input should be Variable, but got [].Zreshard_apitmp)namer   r   r   r   r   assignXZOut)r   inputsoutputs)3r4   r1   r   r   r   r   r9   r   r(   r   Zmulti_dims_mappingr   r   rz   itemsZ_set_split_factorr   Z_set_partial_dimsr6   r3   r   r5   r   r   r   r	   r   r   r8   r   r   Zcurrent_blockZ
create_varr   Zgenerate_with_ignorable_keyjoinr   r   r   r   r   Z	append_opr   	dist_attrZchunk_idZget_input_dist_attrr  r   Zget_output_dist_attrZadd_dist_op_for_programr   )r   r   r   r   partial_statusZsplit_factorr  dimZsfr   _Zalltoall_dimr   Zmain_programZdefault_dist_ctxZout_varZtarget_dims_mappingZtrans_opZdist_opZinput_dist_attrZoutput_dist_attrrl   rl   rm   r   6  s   #









r   layerrO   r   shard_fn0Callable[[str, Layer, ProcessMesh], None] | Noneinput_fn1Callable[[Any, ProcessMesh], list[Tensor]] | None	output_fnc                   s   du rt dttst ddd
d}t rc|du r/| jddD ]	\}}|| q$n| jddD ]\}}||| || q5 durS|  fdd dura| fdd | S td)a$  
    Converts all layer's parameters to DistTensor parameters according to
    the `shard_fn` specified. It could also control the conversion of input
    or output of the layer by specifying the `input_fn` and `output_fn`.
    (i.e. convert the input to `paddle.Tensor` with distributed attributes,
    convert output back to `paddle.Tensor` without distributed attributes.)

    The `shard_fn` should have the following signature:

        def shard_fn(layer_name, layer, process_mesh) -> None

    The `input_fn` should have the following signature:

        def input_fn(inputs, process_mesh) -> list(paddle.Tensor)

    In general, the type of `input_fn` return value is paddle.Tensor with distributed attributes.

    The `output_fn` should have the following signature:

        def output_fn(outputs, process_mesh) -> list(paddle.Tensor)

    In general, the type of `output_fn` return value is paddle.Tensor with distributed attributes.

    Args:
        layer (paddle.nn.Layer): The Layer object to be shard.
        process_mesh (paddle.distributed.ProcessMesh): The `ProcessMesh` information
            to be place the input `layer`.
        shard_fn (Callable): The function to shard layer parameters across
            the `process_mesh`. If not specified, by default we replicate
            all parameters of the layer across the `process_mesh`.
        input_fn (Callable): Specify how the input of the layer is sharded.
            The `input_fn` will be registered for the Layer as a `forward pre-hook`.
            By default we do not shard the input.
        output_fn (Callable): Specify how the output of the layer is sharded or
            convert it back to `paddle.Tensor` without distributed attributes.
            The `output_fn` will be registered for the Layer as `forward post-hook`.
            By default we do not shard or convert the output.
    Returns:
        Layer: A layer that contains parameters/buffers
            that are all `paddle.Tensor` with distributed attributes.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])

            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))

            >>> def shard_fn(layer_name, layer, process_mesh):
            ...     if layer_name == 'fc1':
            ...         layer.weight = dist.shard_tensor(layer.weight, process_mesh, [dist.Shard(0)])

            >>> layer = MLP()
            >>> layer = dist.shard_layer(layer, mesh, shard_fn)
            >>> print(layer)

            >>> # This case need to be executed in multi-card environment
            >>> # export CUDA_VISIBLE_DEVICES=0,1
            >>> # python -m paddle.distributed.launch {test_case}.py
    Nz,The argument `process_mesh` cannot be empty.z;The argument `process_mesh` is not `dist.ProcessMesh` type.r  nn.Layerr   r   r   Nonec                 S  s   | j  D ]$\}}|d ur(| s(dd tt|jD }| |t||| q	 q| j D ]$\}}|d urR| sRdd tt|jD }| 	|t||| q/	 q/d S )Nc                 S     g | ]}t j qS rl   r   distributedr   r   r  rl   rl   rm   r         zKshard_layer.<locals>.replicate_layer_params_and_buffers.<locals>.<listcomp>c                 S  r  rl   r  r  rl   rl   rm   r     r  )
_parametersr
  rs   rangerz   r   Zadd_parameterr   _buffersZregister_buffer)r  r   keyr   r   bufferrl   rl   rm   "replicate_layer_params_and_buffers  s*   

z7shard_layer.<locals>.replicate_layer_params_and_buffersT)Zinclude_selfc                   s
    |S rq   rl   )r  r  )r  r   rl   rm   <lambda>7     
 zshard_layer.<locals>.<lambda>c                   s
    |S rq   rl   )r  r  r	  )r  r   rl   rm   r#  <  r$  zB`paddle.distributed.shard_layer` only supports dynamic graph mode.)r  r  r   r   r   r  )	r   r   r   r   r   Znamed_sublayersregister_forward_pre_hookZregister_forward_post_hookr   )r  r   r  r  r  r"  r  	sublayersrl   )r  r  r   rm   shard_layer  s6   N

r'  boolc                 C  sD   t  rt| t jot| do|  S t| t jjjj	o!| 
 duS )z
    Check if an input is a dist_tensor in both dynamic and static modes.
    Args:
        tensor: The input to check
    Returns:
        bool: True if the input is a dist_tensor, False otherwise
    rs   N)r   r   r   rD   hasattrrs   r   	libpaddler   r   r  )ro   rl   rl   rm   is_dist_tensorG  s   
r+  c                      s   e Zd Zd*ddZdd Zdd Zd	d
 Zdd Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Z	#d+ fd$d%	Zd&d' Zd(d) Z  ZS ),_ShardOptimizerNr,   c                 C  sN  |d usJ dt |tjjtjjfsJ dtjj|jj	|_
|| jd< d| _t|dr>|jd ur>t |jtjjr>d| _|| _d | _d | _|| _| jd u rTtd| _t | jttttfsbJ dt | jtttfrv|   | j| j t | jtr| jjD ]}| j| qg | _g | _g | _ d | _!d | _"d| _#t$ | _%d| _&d| _'d S )	Nz)The argument `optimizer` cannot be empty.zR`paddle.distributed.ShardOptimizer` only supports AdamW and SGD optimizer for now.
_inner_optF
_grad_clipTr   zgshard_fn must be an instance of one of: _ShardingStage0, ShardingStage1, ShardingStage2, ShardingStage3)(r   r   	optimizerZAdamWZSGDr   Zlayer_helperZLayerHelper	__class__rh   helperr   Z_shard_clipr)  r.  r
   ZClipGradByGlobalNorm	_shard_fn_sharding_axis_sharding_degreegradient_accumulation_steps_ShardingStage0ShardingStage1ShardingStage2ShardingStage3'_set_and_check_sharding_prop_from_param_set_sharding_axisr-  _parameter_list_shard_parameterfuse_param_viewparam_storagegrad_storage_sharding_group	_mp_groupdo_tensor_fusion_onceStrategy	_strategyenable_tensor_fusionenable_sharding_overlap)r   r/  r  r5  r   rl   rl   rm   r   ]  s`   
	





z_ShardOptimizer.__init__c                 C  s"  t j }|r|| jj| _n| jjr| jj| jj| _ntdd| _	| j
j}|D ]b}| s3q,|j}|j}t|| j	 tjsTt|D ]\}}t|tjrS|| _	qFt|| j	 tjsaJ d|rot|jt|jk rnq,q,| jjrt|jt| jjjk rq,q,|| j	| jksJ dq,d S )NzIThe global mesh or shard_fn mesh should be set for the sharding strategy.r   z2The placement on sharding_axis should be Replicatez>The sharding degree of all parameters must be equal currently.)r   autoget_meshget_dim_sizer2  _sharding_mesh_dimr4  _meshr   r3  r-  r<  rs   r   r   r   r   r   r   setr   Zdim_size)r   r   
param_listr   r   r   r  r   rl   rl   rm   r:    sV   

z7_ShardOptimizer._set_and_check_sharding_prop_from_paramc                 C  sX  |j d dkr	d S |j}|j| jj v r<| jj|j }|j}t| jttt	fr<| j
||| jj|j< || jj|j _| jj D ]g}| jj| | }| rWt|tjsWqBt r^|j}t| jttt	fru| |||| jj| |< n'| rd|vr|j}ndd tt|jj D }t||j|d| jj| |< t r|| jj| | _qBd S )Nr   r,   betac                 S     g | ]}t  qS rl   r   r   r  rl   rl   rm   r     s    z6_ShardOptimizer._shard_accumulator.<locals>.<listcomp>r   r   )r   r  r-  Z_master_weightskeysr   r2  r7  r8  r9  shard_master_weightZ_accumulatorsrs   r   r   r   r   r   r  rz   r   r   )r   r   target_namemaster_weightr   ZaccumulatorZorigin_accumulator_namer   rl   rl   rm   _shard_accumulator  sV   
z"_ShardOptimizer._shard_accumulatorc                 C  sj   |  r/t| jttfr1t|tjs3|j}t	 || j
< t||j|}| |  d S d S d S d S rq   )rs   r   r2  r7  r8  r   r   r   r   r   r3  r   r   rw   ru   )r   r   Znew_placementZ	out_paramrl   rl   rm   _reset_placements
  s   

z!_ShardOptimizer._reset_placementsc                 C  s<   t |tr
|d}|D ]}| j||g | | qd S )Nparams)r   dictgetr-  _create_accumulatorsrW  )r   r   
parametersprl   rl   rm   r\    s   

z$_ShardOptimizer._create_accumulatorsc                 C  s   | j || | jrX| jD ]	}|  d|_q| jsTtt| j	D ]4}| j
|  | jj }|t| jjd }|| }tj| j
| ||}| jj|| j
|   q!d S d S t|trk|D ]	\}	}
| |	 q_d S |d D ]	\}	}
| |	 qod S )Nr   rY  )r-  _finish_updaterF  r@  Zzero_check_inrG  r  rz   r>  r?  _numelrA  nranksmaxrankr   r	   
view_sliceprocess_group
all_gatherwaitr   r   rX  )r   r   Zparameters_and_gradsr@  r   
shard_sizebeginendslice_bufferr^  r  rl   rl   rm   r_     s:   


z_ShardOptimizer._finish_updatec              	   C  s6   g }|D ]\}}| || d||f qt| |S Ngrad)r   r2  r+   apply_gradients)r   params_gradsnew_params_gradsr   rn  rl   rl   rm   ro  >  s   z_ShardOptimizer.apply_gradientsc                 C  s  | j  }g }t| j jd tr| j jD ]}||d 7 }qn| j j}|D ]}|jr*q$t|dr9|jdur8|  S q$|jdurB|  S q$t	dd |
 D rP|S t| j jd ts| j jD ]8}|jrcq]t|dr|jdurutd|j tj|tjd|_q]|jdurtd|j tj||jd|_q]nD| j jD ]?}|d D ]8}|jrqt|dr|jdurtd|j tj|tjd|_q|jdurtd|j tj||jd|_qq|   | j jd	d
 | j  S )z
        Create and shard the optimizer states e.g., accumulators and master_weights before load_state_dict.
        If training has already started or the optimizer states are already created and sharded, do nothing.
        r   rY  	main_gradNc                 s  s$    | ]\}}|d vr|  V  qdS ))Zmaster_weightsZLR_SchedulerN)rs   r   kvrl   rl   rm   r   `  s    z-_ShardOptimizer.state_dict.<locals>.<genexpr>z gradient should be None, but is )r   F)Zset_to_zero)r-  
state_dictr   r<  rZ  r   r)  rr  rn  anyr
  r   r   Z
zeros_likefloat32r   _param_groupsstepZ
clear_grad)r   rv  rN  Zparam_groupr   rl   rl   rm   rv  G  sx   
















z_ShardOptimizer.state_dictc              	   C  sR  t  r|d  r|d j}|d j}|d }|j}ddd}d}tj }	d|	jv rC|	d}
t	|
D ]}|j
||j
krB|} nq4d}tdd |D r`|j
||j
kr`|j||jkr`d	}|r~tjj||j||ttjjttjjg}|j}t	t|d d
d
D ]}t|| tjrt ||< t||j|}q| jdkrt r|| j }|rtjj||j|t g}|d |f}| j|| | jr%t|d dr'|d j}|d jdkr| j|   | j!j" }|t#| j!j$d }|| }t%j&'| j| ||}t%j(j)| j| || j!dd}d	| j| _*d S d| j| _*d S d S d S )Nr,   r   c                 S  s$   t j }d|jv r|d| }|S )u5   
                获得pp_idx的mesh
                pp)r   rH  rI  r   Zget_mesh_with_dim)Zpp_idxr   rl   rl   rm   rI    s   

z5_ShardOptimizer._append_optimize_op.<locals>.get_meshr{  Fc                 s      | ]	}t |tjV  qd S rq   )r   r   Partial)r   r   rl   rl   rm   r     s    
z6_ShardOptimizer._append_optimize_op.<locals>.<genexpr>Tr   last_idxgroupsync_opr   )+in_auto_parallel_align_moders   r   r   r   rH  rI  r   rJ  r  r   rw  r   auto_parallel	moe_utilsr1   r   r}  Z
ReduceTypeZkRedSumrz   r   r   r   r5  r   r-  _append_optimize_oprG  r)  r~  r?  ra  rA  rb  rc  rd  r   r	   re  r  rg  is_sync)r   r   Zparam_and_gradr   Zmeshsrn  Z	grad_meshrI  Zippr   Z	pp_degreer   Zchange_meshr   ri  rj  rk  rl  taskrl   rl   rm   r    s   



	


	

z#_ShardOptimizer._append_optimize_opc                 C  s   dt jd< d| _| j  d S )N1ZFLAGS_enable_tensor_fusionT)osenvironrF  r2  _enable_tensor_fusionr   rl   rl   rm   r    s   
z%_ShardOptimizer._enable_tensor_fusionc                 C  sL   t |dr|jddrd S d| _t|tjjs!tdt	| || _
d S )Nconfig	to_staticFTz+`layers` must be `paddle.nn.Layer` but got )r)  r  r[  rG  r   r   r
   rO   r   r   _layers)r   Zlayersrl   rl   rm   _enable_sharding_overlap  s   
z(_ShardOptimizer._enable_sharding_overlapc                 C  s`   |  | jj }|t| jjd }|| }tj|||}tjj	||tjj
j| jdd  d S )Nr   Fopr  r  )ra  rA  rb  rc  rd  r   r	   re  r  reduce_scatterReduceOpSUMrh  )r   r@  ri  rj  rk  reduce_scatteredrl   rl   rm   _reduce_scatter_gradients  s   
z)_ShardOptimizer._reduce_scatter_gradientsc                 C  s<  | j stdi }| j  D ]}|jddD ]}||t|< qqtt| jD ]t}| | j	|  dd }dd }t| j| | j
 }dtj jv rU|tj d }| j|  D ]\}}	|	d	 ||| j	| | j q\|t| jd
 k rtt| j|  d	 }
|t|
}||| j|d
  | j q'd S )Nz_Sharding overlap requires an initialized model. Call `_enable_sharding_overlap()` to set model.F)Zinclude_sublayersc                   s   t j  fdd}|S )Nc                    sx    j d7  _ j kr:  j }|t jd }|| }tj||}tjj	|tjj
j dd}|_d S d S )Nr,   r   Fr  )r`  ra  rb  rc  rd  r   r	   re  r  r  r  r  	comm_task)r  ri  rj  rk  r  r  
comm_groupr@  param_group_lenrl   rm   	fuse_comm  s"   

zT_ShardOptimizer._async_sharding_comm.<locals>.fuse_comm_hook_func.<locals>.fuse_commr   ZautogradZno_grad)r  r@  r  r  rl   r  rm   fuse_comm_hook_func  s   zA_ShardOptimizer._async_sharding_comm.<locals>.fuse_comm_hook_funcc                   s   t j  fdd}|S )Nc                    s^   j s-  j }|t jd }|| }tj||}tjj	| dd}d_ d S d S )Nr   Fr  T)
r  ra  rb  rc  rd  r   r	   re  r  rg  )r  ri  rj  rk  rl  r  r  r?  rl   rm   r  ,  s   
zZ_ShardOptimizer._async_sharding_comm.<locals>.fuse_all_gather_hook_func.<locals>.fuse_commr  )r?  r  r  rl   r  rm   fuse_all_gather_hook_func+  s   zG_ShardOptimizer._async_sharding_comm.<locals>.fuse_all_gather_hook_funcr{  r   r,   )r  r   r&  r]  idr  rz   r>  r  r@  r5  r   rH  rI  r   rJ  r
  Z_register_backward_hookrA  nextitervaluesr[  r%  r?  )r   Zparam2layerr  r^  r   r  r  r  r  viewZfirst_paramrl   rl   rm   _async_sharding_comm  sN   
z$_ShardOptimizer._async_sharding_commc              
     s   fdd}d}i }|D ]\}}|||j < |||7 }qtj|g|d d jd}d|_tj}	tj|g|	d}
d|
_d |
_i }|D ]\}}||}|||j  d||j < ||j  }|j}|j	}d|_	|
   t|
 ||||   ||_	tj||||  }| |j t||j|j}| |  t|
 |
|||
    tj|
|||
   }| |j t||j|j}||_|   tjj  qA|||
fS )Nc                   s8   t | j}tt  t| j    }|| d | | S Nr,   )r   prodr   r&   r'   r%   r   )r   r   Z
align_sizesharding_degreerl   rm   get_padded_sized  s   z?_ShardOptimizer._build_fuse_param_view.<locals>.get_padded_sizer   )r   r   F)r   r   T)r  r   Zzerosr   r  rx  r`  r  r   r   rv   Zflatten_r  Z_slicera  r	   re  rw   Z	_set_dimsr   r2   r   r   ru   rr  _cleardevicecudaZempty_cache)r   params_and_gradsr  r  Ztotal_buffer_sizeZparam2indexr   r  Zparam_bufferZ
grad_dtypeZgrad_bufferZviewsrn  Zpadded_sizer   Zparam_shaper   Z	tmp_paramZtmp_gradrl   r  rm   _build_fuse_param_view_  s   





z&_ShardOptimizer._build_fuse_param_viewc           %      C  s  | j rtj }t|d}|D ]}tt|}t |v r!|| _qd|j	v rM|j	
d}|j| | _t|d}|D ]}tt|}t |v rL|| _q:d| _ dd |D }| jjj}	|	dk rbd}	|	d d }
dgt| }d	d
 |D }dd |D }t|||
|
g}t }t|D ]\}}|D ]}||g ||  qq| D ]!\}}| || jj\}}}| j| | j| | j| q| jr|   | j j!durd| j j!_"| j| j j!_#d|j	v r| jdkr| j| j j!_$g }g }t%t| jD ]}| js	| &| j|  | j|  D ]\}}|d }|d }| j| ' | jj }|t(| jj)d }|| }t(||} t*||'  |}!| |!krIqt+j,-| j| | |!}"t.|"|j/t0 g}"||"_1|j2|"_2|j3|"_3d|"_4|j5|"_5|j2|"_2|j6|"_6|j7|"_7|j8|"_8|j9|"_9||" t+j,-| j| | |!}#t.|#|j/t0 g}#||# q| jr||d _:| j| j;dur| j| j;<  qt=t>||}$|$S )aA  
        1. Tensor Fusion
            - Groups params/grads into contiguous param_storage/grad_storage buffers
            - Supports non-uniform partitioning across GPUs
            - Uses view_slice to access individual params/grads each step
        2. Reduce_scatter Overlap
            - Overlaps grad reduce_scatter with backward
        3. All_gather Overlap
            - Overlaps param all_gather with forward
            - Strategically scatters all_gather during forward
            (Launching all all_gather at once blocks overlap with other sync/comm ops)
        ZdpmpFc                 S  s   g | ]}|d  qS r  rl   )r   Zp_grl   rl   rm   r         z2_ShardOptimizer._tensor_fusion.<locals>.<listcomp>r      i   c                 S  s   i | ]}|j |jqS rl   )r  r   r   r   rl   rl   rm   
<dictcomp>  s    z2_ShardOptimizer._tensor_fusion.<locals>.<dictcomp>c                 S  s   g | ]}|  qS rl   )rv   r  rl   rl   rm   r     r  NTr,   r   r   r   )?rC  r   r  rI  r?   Z	new_groupsortedr   rA  Z
_dim_namesr   _shapeZ
_mp_degreerB  rE  r_   comm_buffer_size_MBrz   r(   Zeager_assign_group_by_sizer   r   
setdefaultr   r
  r  rb  r>  r?  r@  rG  r  r-  r.  Zshould_comm_on_shard_dimZsharding_groupZmp_groupr  r  ra  rc  rd  minr   r	   re  r2   r   r   r  r   Z	need_clipr   Z	trainableZoptimize_attrZregularizerZdo_model_averageZis_distributedr~  r  rh  r   zip)%r   rp  r   Zshard_groupsr  r  Zmp_mesh_axisZ	mp_groupsr]  r  Z
group_sizeZis_sparse_gradientZ
shape_dictZdense_paramsZgroup_indicesZ
var_groupsZ	group_idxindicesr   r  r>  r?  r@  
new_paramsZ	new_gradsr  r  r   r   ri  Z
rank_beginZrank_endZparam_beginZ	param_end	new_paramnew_gradrq  rl   rl   rm   _tensor_fusion  s   










z_ShardOptimizer._tensor_fusionc                 C  sv  g }dd }|D ]\}}t |j}|}|j}t }	|jj}
|jD ]}| r0| }|		| q!|||	}|j| j
  s]t || j
< |dkr]|
| j
 dkr]|		| t||| j
< t|jD ]A\}}|| j
krlqb|
| dkryt ||< qb| st ||< |||	}| r|dkrt ||< qb|		| t|||< qb|j|krt||j|}|||f q|S )a*  
        Optimizes gradient placements for parameters in dynamic sharding mode to minimize redundant allreduce
        operations during gradient clipping. This function adjusts tensor placements across mesh axes based
        on priority rules, prioritizing sharding for dimensions marked in `_sharding_axis`.
        For each axis in the mesh:
            1. Preserves existing `Shard(dim)` placements for any axis.
            2. Converts `Partial()` placements to Shard(dim) where possible, falling back to `Replicate()` if sharding isn't feasible.
            3. Maintains `Replicate()` placements unchanged.
        Processes axes in order of `_sharding_axis` first before other mesh axes in their natural order.

            e.g.
                a) sharding_axis = 0, tensor rank = 2,
                    placements: [Partial(), Partial(), Repliacate()] -> [Shard(0), Shard(1), Repliacate()]
                b) sharding_axis = 0, tensor rank = 2,
                    placements: [Partial(), Shard(0), Partial() ] -> [Shard(1), Shard(0), Repliacate()]
        c                 S  s4   t t| D ]}| | dkrq||vr|  S qdS )Nr,   r   )r  rz   )tensor_shapeshard_dims_set
tensor_dimrl   rl   rm   get_first_can_shard_dimV  s   zR_ShardOptimizer._fused_comm_before_apply_optimize.<locals>.get_first_can_shard_dimr   r,   )r   r   r   r   rM  r   r   r   r   addr3  r   r   Shardr   r   r   r   )r   rp  rq  r  r   rn  new_placementsr  r  r  Z
mesh_shaper   r  Z	mesh_axisrl   rl   rm   !_fused_comm_before_apply_optimizeA  sP   	






z1_ShardOptimizer._fused_comm_before_apply_optimizer   c                   sB   t  rt| jtr| jr| |}n| |}t 	||||S rq   )
r   r   r   r2  r7  rF  r  r  super_apply_optimize)r   lossZstartup_programrp  Zparam_group_idxr0  rl   rm   r    s   
z_ShardOptimizer._apply_optimizec                 C  s0   d| j v r|dkr| j | S t| j d |S t)Nr-  )r   r   AttributeError)r   itemrl   rl   rm   __getattr__  s
   

z_ShardOptimizer.__getattr__c                 C  s.   |dkrt | j d}t|t| j||S )Nr-  z._inner_opt is READ ONLY)r   rh   r  setattrr-  )r   r  valuemsgrl   rl   rm   __setattr__  s   z_ShardOptimizer.__setattr__r  r  )rh   ri   rj   r   r:  rW  rX  r\  r_  ro  rv  r  r  r  r  r  r  r  r  r  r  r  __classcell__rl   rl   r  rm   r,  \  s,    
=6:		MV
Y\ Ur,  c                   @  sL   e Zd Zdd Zdd Zdd ZdddZdddZdddZdddZ	dS )_ShardingStageBasec                 C  s   || _ d| _|| _d| _d S )Nr   F)rL  r3  rK  rF  )r   r   sharding_mesh_dimrl   rl   rm   r     s   
z_ShardingStageBase.__init__c                 C  s
   || _ d S rq   )r3  )r   sharding_axisrl   rl   rm   r;       
z%_ShardingStageBase._set_sharding_axisc                 C  s
   d| _ d S )NT)rF  r   rl   rl   rm   r    r  z(_ShardingStageBase._enable_tensor_fusionr   rD   rV  r   c           	      C  s   |  rg| jr|j}nt|| j}t|tjrW| }|	 dks%J dt
|t|j\}}tjjj|j||}tjjj| |}|| tjjj|jg |g|_t rg|  rgt||j|d}|S )N
pd_op.dataz.The master weight must be a result of data op.rR  )rs   rF  r   r@   r3  r   r   r   get_defining_opr  r:   rz   r   r   r   r*  create_tensor_dist_attributer   cvt_to_dist_typer   set_typecreate_op_dist_attributer  r   r   )	r   r   rV  r   Zdata_opdim_mapr  r  	dist_typerl   rl   rm   rT    sB   





z&_ShardingStageBase.shard_master_weightro   r   r   c           	      C  sp   t |t|j\}}tjjj|j||}tjjj	|
 |}|| tjjj|jg |g}|| _d S rq   )r:   rz   r   r   r   r*  r   r  r   r  r   r  r  r  r  )	r   ro   r   r   r  r  r  r  Zop_dist_attrrl   rl   rm   _init_dist_attr  s   




z"_ShardingStageBase._init_dist_attrc                 C  sN   |  r| }| dkr| ||| |S t||j|S t||j|dS )Nr  rR  )rs   r  r  r  r   r   r   r   )r   ro   r   r   r  rl   rl   rm   _apply_placement  s   z#_ShardingStageBase._apply_placementrn  c                 C  s   t || jS rq   )r.   r3  )r   rn  rl   rl   rm   '_reshard_fake_replicate_grad_to_partial
  s   z:_ShardingStageBase._reshard_fake_replicate_grad_to_partialN)r   rD   rV  rD   r   rD   )ro   rD   r   rD   r   r   )ro   rD   r   rD   r   r   r   rD   )rn  rD   r   rD   )
rh   ri   rj   r   r;  r  rT  r  r  r  rl   rl   rl   rm   r    s    

)
r  c                      s,   e Zd Z	dd fdd	ZdddZ  ZS )r6  Nr  	int | strr   ProcessMesh | Noner   r  c                   s   t  || d| _d S )Nr   )r  r   r  r   r  r   r  rl   rm   r     s   
z_ShardingStage0.__init__r   r   r   rD   ro   c                 C  s   |dkrt  r| |S |S rm  )r/   r  )r   r   r   ro   rl   rl   rm   __call__  s   
z_ShardingStage0.__call__rq   r  r  r   r  r   r  r   r   r   rD   ro   rD   r   rD   )rh   ri   rj   r   r  r  rl   rl   r  rm   r6    s    r6  c                      s0   e Zd ZdZ	dd fd	d
ZdddZ  ZS )r7  a  
    A builtin shard_fn for shard_optimizer interface, users can pass it to shard_optimizer to implement sharding optimization with stage 1.

    Args:
        sharding_mesh_dim(int|str): The sharding dimension in the mesh.
        mesh(None|paddle.distributed.ProcessMesh): If mesh is not None, the `ProcessMesh` object describes the Cartesian topology of the used processes for dense type parameters. Note: Currently, only one mesh configuration is supported for all dense parameters. If there is a need for multiple mesh configurations, please configure them yourself in the upper layer networking code.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])

            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> layer = MLP()
            >>> batch = paddle.rand(shape=[8, 8])
            >>> opt = paddle.optimizer.AdamW(parameters=layer.parameters())
            >>> opt = dist.shard_optimizer(opt, dist.ShardingStage1("x", mesh))
            >>> for _ in range(5):
            >>>     loss = layer(batch)
            >>>     loss.backward()
            >>>     opt.step()
            >>>     opt.clear_grad()
            >>> # This case need to be executed in multi-card environment
            >>> # python -m paddle.distributed.launch --gpus=0,1 {test_case}.py
    Nr  r  r   r  r   r  c                      t  || d S rq   r  r   r  r  rl   rm   r   C     zShardingStage1.__init__r   r   r   rD   ro   c                 C  sh   |  s|S | jsd|vrt|| j}ndd tt|jjD }|dkr-t r-| 	|}| 
|||S )NrO  c                 S  rP  rl   rQ  r  rl   rl   rm   r   R  s    z+ShardingStage1.__call__.<locals>.<listcomp>rn  )rs   rF  r@   r3  r  rz   r   r   r/   r  r  r   r   r   ro   r   rl   rl   rm   r  J  s   
zShardingStage1.__call__rq   r  r  )rh   ri   rj   r   r   r  r  rl   rl   r  rm   r7    s
    )r7  c                   @  s   e Zd ZdZdS )r8  a  
    A builtin shard_fn for shard_optimizer interface, users can pass it to shard_optimizer to implement sharding optimization with stage 2.

    Args:
        sharding_mesh_dim(int|str): The sharding dimension name in the mesh.
        mesh(None|paddle.distributed.ProcessMesh): If mesh is not None, the `ProcessMesh` object describes the Cartesian topology of the used processes for dense type parameters. Note: Currently, only one mesh configuration is supported for all dense parameters. If there is a need for multiple mesh configurations, please configure them yourself in the upper layer networking code.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])

            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> layer = MLP()
            >>> batch = paddle.rand(shape=[8, 8])
            >>> opt = paddle.optimizer.AdamW(parameters=layer.parameters())
            >>> opt = dist.shard_optimizer(opt, dist.ShardingStage2("x", mesh))
            >>> for _ in range(5):
            >>>     loss = layer(batch)
            >>>     loss.backward()
            >>>     opt.step()
            >>>     opt.clear_grad()
            >>> # This case need to be executed in multi-card environment
            >>> # python -m paddle.distributed.launch --gpus=0,1 {test_case}.py
    N)rh   ri   rj   r   rl   rl   rl   rm   r8  \  s    (r8  c                      s@   e Zd ZdZ	dd fd	d
Zdd Zdd ZdddZ  ZS )r9  a  
    A builtin shard_fn for shard_optimizer interface, users can pass it to shard_optimizer to implement sharding optimization with stage 3.

    Args:
        sharding_mesh_dim(int|str): The sharding dimension name in the mesh.
        mesh(None|paddle.distributed.ProcessMesh): If mesh is not None, the `ProcessMesh` object describes the Cartesian topology of the used processes for dense type parameters. Note: Currently, only one mesh configuration is supported for all dense parameters. If there is a need for multiple mesh configurations, please configure them yourself in the upper layer networking code.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])

            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> layer = MLP()
            >>> batch = paddle.rand(shape=[8, 8])
            >>> opt = paddle.optimizer.AdamW(parameters=layer.parameters())
            >>> opt = dist.shard_optimizer(opt, dist.ShardingStage3("x", mesh))
            >>> for _ in range(5):
            >>>     loss = layer(batch)
            >>>     loss.backward()
            >>>     opt.step()
            >>>     opt.clear_grad()
            >>> # This case need to be executed in multi-card environment
            >>> # python -m paddle.distributed.launch --gpus=0,1 {test_case}.py
    Nr  r  r   r  r   r  c                   r  rq   r  r  r  rl   rm   r     r  zShardingStage3.__init__c                 C  s   |  r$| jd ur$g }tt| jjD ]	}|t  q||| j |	 rAt
|| j}t||j|}| |  d S d S rq   )Zis_denserL  r  rz   r   r   r   r   Z	_to_dist_rs   r@   r3  r   r   rw   ru   )r   r   r   r  r  Zshard_paramrl   rl   rm   r=    s   zShardingStage3._shard_parameterc                 C  sX   |  r*|j}t|| j tjrt || j< t||j|}|	 
|	  d S d S rq   )rs   r   r   r3  r   r  r   r   r   rw   ru   )r   r   r  r  rl   rl   rm   _unshard_parameter  s   z!ShardingStage3._unshard_parameterr   r   r   rD   ro   c                 C  sp   |  s|S |dkrt rtdd|vr(|j}tdd |D r't|| j}n	dd |jjD }| 	|||S )Nrn  z3Sharding Stage 3 does not support auto dp mode yet.rO  c                 s  r|  rq   )r   r   r   )r   r^  rl   rl   rm   r     s    z*ShardingStage3.__call__.<locals>.<genexpr>c                 S  rP  rl   rQ  r  rl   rl   rm   r     r  z+ShardingStage3.__call__.<locals>.<listcomp>)
rs   r/   r   r   r   r@   r3  r   r   r  r  rl   rl   rm   r    s   zShardingStage3.__call__rq   r  r  )	rh   ri   rj   r   r   r=  r  r  r  rl   rl   r  rm   r9    s    )	r9  r/  r+   .Callable[[str, Tensor, Tensor], Tensor] | Noner5  intc                 C  s   t | ||S )aJ  

    Warp the global view optimizer to distributed view.

    Note:
        The `shard_fn` should have the following signature:
            def shard_fn(accumulator_name, param, accumulator) -> sharded_accumulator

    Args:
        optimizer (paddle.optimizer.Optimizer): The optimizer to be sharded.
        shard_fn (Callable, optional): The function to shard accumulators. If not specified,
           we simply pass down the dist attr of the params.

    Returns:
        An optimizer with distributed view.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist
            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))
            >>> layer = MLP()
            >>> batch = paddle.rand(shape=[8, 8])
            >>> opt = paddle.optimizer.AdamW(parameters=layer.parameters())
            >>> opt = dist.shard_optimizer(opt)
            >>> for _ in range(5):
            >>>     loss = layer(batch)
            >>>     loss.backward()
            >>>     opt.step()
            >>>     opt.clear_grad()
            >>> # This case need to be executed in multi-card environment
            >>> # python -m paddle.distributed.launch --gpus=0,1 {test_case}.py

    )r,  )r/  r  r5  rl   rl   rm   shard_optimizer  s   0r  scalerrI   c                 C  s   dd }t || | _| S )ag  

    Warp the global view grad_scaler to distributed view.

    Args:
        scaler (paddle.amp.GradScaler): The GradScaler to be sharded.

    Returns:
        A GradScaler with distributed view.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist
            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))
            >>> layer = MLP()
            >>> batch = paddle.rand(shape=[8, 8])
            >>> opt = paddle.optimizer.AdamW(parameters=layer.parameters())
            >>> layer, opt = paddle.amp.decorate(layer, opt, level='O2')
            >>> scaler = paddle.amp.GradScaler(init_loss_scaling=1024)
            >>> scaler = dist.shard_scaler(scaler)
            >>> opt = dist.shard_optimizer(opt)
            >>> for _ in range(5):
            >>>     with paddle.amp.auto_cast(True):
            >>>         loss = layer(batch)
            >>>     scaled = scaler.scale(loss)
            >>>     scaled.backward()
            >>>     scaler.step(opt)
            >>>     scaler.update()
            >>>     opt.clear_grad()
            >>> # This case need to be executed in multi-card environment
            >>> # python -m paddle.distributed.launch --gpus=0,1 {test_case}.py

    c                 S  s  | j sd S | jt| }|d tju rtd|d tju r"tdd }d }tt	
dgt	j| _i }t|dd rt|jd tr|jD ]@}|d D ]9}| }|d urt|ddd	  r|d u rg|j}|d u rr| rr|j}|j|vr~|g||j< qM||j | qMqGn=|jD ]9}| }|d urt|dd
d	  r|d u r|j}|d u r| r|j}|j|vr|g||j< q||j | q| D ]\}	}
g }g }tt	
dgt	j}tt	
dgt	j}tt	
dgt	j}| j r| j }n| j}|
D ]!}|jtjjjtj tjjj!tj"fv r%|| q
|| q
t#|rXt$%||\}	}dtj&' v rRt(|d}t)|}t(|d}t$*||}t#|rt$%||\}	}dtj&' v r~t(|d}t)|}t(|d}t$*||}t+,|||j-}t$*| j|| _q| jj|kr|. D ]}t+,| j|| jj-}	qnt+,| j|| jj-| _tj|d< d S )NstatezMunscale_() has already been called on this optimizer since the last update().z(unscale_() is being called after step().r   ry  rY  rt   c                   S     dS NFrl   rl   rl   rl   rm   r#  _	      z6shard_scaler.<locals>.unscale_method.<locals>.<lambda>c                   S  r  r  rl   rl   rl   rl   rm   r#  t	  r  ZxpuZint32r(  )/Z_enableZ_optimizer_statesr  r   ZUNSCALEDr   ZSTEPPEDr   r   r   r   ZastypeZbool_Z
_found_infr   r   ry  rZ  Z
_grad_ivarr   rt   r   r<  r
  Z_scalers   rv   r   r(   ZVarDescZVarTypeZFP16Zfloat16ZBF16Zbfloat16rz   r	   Zcheck_finite_and_unscale_r  Z
get_devicecastsumZ
bitwise_orr   r   r   rS  )r   r/  Zoptimizer_stateZsrc_meshZcurrent_process_meshZmesh2param_gradsr  r   Ztgt_gradr  Zparam_gradsZtemp_param_grads_halfZtemp_param_grads_fp32Ztemp_found_infZtemp_found_inf_halfZtemp_found_inf_fp32Z
temp_scalern  r   rl   rl   rm   unscale_methodD	  s   










z$shard_scaler.<locals>.unscale_method)r   Z_unscale)r  r  rl   rl   rm   shard_scaler	  s   - r  c                   @  s4   e Zd ZU dZded< ded< ded< d	ddZdS )

FusePassesz@
    A helper class for users to configure the fuse passes.
    r(  enablegemm_epiloguedropout_addNc                 C  sX   d| _ d| _d| _|d ur(| D ]\}}t| |r!t| || qtd| d S d S )NFzUnknown fuse pass )r  r  r  r
  r)  r  r   )r   config_dictr   r  rl   rl   rm   r   	  s   
zFusePasses.__init__rq   )rh   ri   rj   r   rk   r   rl   rl   rl   rm   r  	  s   
 r  c                      s   e Zd ZdZdd fddZd	d
 ZedddZed ddZed!ddZ	ed"ddZ
ed#ddZed$ddZ  ZS )%rD  a0  
    The `Strategy` object is used to configure the parallelization
    and optimization strategies for static graph. Currently supports
    configuring ``sharding``, ``fused_passes``, ``gradient_merge``
    and ``pipeline``. More strategies will be supported in the future.

    ``sharding`` is used to configure the sharding states of the optimizer,
    for saving the GPU memory.

    ``fused_passes`` is used to configure the fusion of the computation in
    the model.

    ``gradient_merge`` is used to configure the gradient merge strategy in
    training.

    ``pipeline`` is used to configure the pipeline parallelism strategy.

    Args:
        config(dict|None, optional): The user-defined configurations.
            If ``config`` is None, use default configurations. If it is
            a dict, the items inside the dict will be used to set the
            configurations, and the others remain the default values.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> strategy = dist.Strategy()

            >>> strategy.sharding.enable = True
            >>> strategy.sharding.stage = 2
            >>> strategy.sharding.degree = 2

            >>> strategy.gradient_merge.enable = True
            >>> strategy.gradient_merge.k_steps = 2
            >>> strategy.gradient_merge.avg = False

            >>> strategy.pipeline.enable = True
            >>> strategy.pipeline.schedule_mode = "1F1B" # default is "1F1B"
            >>> strategy.pipeline.micro_batch_size = 2
    Nr  _Config | Noner   r  c                   sl  |d urt |trt|| _n
td| i | _tjj}t	 
|| j | jtjjd }t|| _| jtjjd }t|| _| jtjjd }t|| _| jtjjd }t|| _| jtjjd }t|| _| jtjjd }t|| _| jtjjd }t|| _ | jtjj!d }t"|| _#| jtjj$d }t%|| _&| jdd| _'d S )Nz%Expected a dictionary. But received: 
full_graphT)(r   rZ  r   r   Z_config_dictr   auto_strategy	constantsBASEr  r   r[  ZSHARDINGZShardingConfig	_shardingZGRADIENT_MERGEZGradientMergeConfig_gradient_mergeZPIPELINEZPipelineConfig	_pipelineAMPZ	AMPConfig_ampZFUSED_PASSESr  _fused_passesZ	RECOMPUTEZRecomputeConfig
_recomputeZMP_OPTIMIZATIONZMPOptimizationConfig_mp_optimizationZDP_OPTIMIZATIONZDPOptimizationConfig_dp_optimizationZSP_OPTIMIZATIONZSPOptimizationConfig_sp_optimization_full_graph)r   r  categoryr  r  rl   rm   r   
  sX   

zStrategy.__init__c                 C  s   ddl }tjj}tj|}| D ]}t| |t|| q|jj	| j
_	d|jjv r.d| j
_d|jjv r8d| j
_||j| _||j| _||j| _||j| _||j| _||j| _||j| _||j| _dS )zo
        NOTE(lizhiyu): This is a template function to get `dist.Strategy` from `fleet.auto.Strategy`.
        r   Nfused_gemm_epilogue_passTfused_dropout_add_pass)r   r  r  r  get_category_default_configrS  r  r   r`   r  r
  fused_passes_listr  r  r   rc   r	  r_   r  ra   r  rb   r  rd   r  re   r  rf   r  rg   r  )r   Zlegacy_strategyr   r  base_configr   rl   rl   rm   _from_legacy_strategyX
  s0   zStrategy._from_legacy_strategyr(  c                 C  r   )z*
        Whether to use AST mode.
        )r  r   rl   rl   rm   r  z
  s   zStrategy.full_graphauto_strategy.ShardingConfigc                 C  r   )a{  
        ``sharding`` is used to configure the sharding states of the optimizer,
        containing following configs:

            ``enable`` (bool): whether to enable sharding. Default: False.

            ``stage`` (int): can be set to 1, 2 or 3. 1 indicates the optimizer states segmentation,
            2 indicates optimizer states and gradient segmentation, 3 indicates the segmentation
            of optimizer states, gradient and parameters. Default: 1.

            ``degree`` (int): the number of segmentation pieces. Default: 8.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.sharding.enable = True
                >>> strategy.sharding.stage = 2
                >>> strategy.sharding.degree = 2
        )r  r   rl   rl   rm   r_   
  s   zStrategy.sharding!auto_strategy.GradientMergeConfigc                 C  r   )a  
        ``gradient_merge`` is used to configure the gradient merge strategy in
        training, containing following configs:

            ``enable`` (bool): whether to enable gradient merge. Default: False.

            ``k_steps`` (int): the number of steps for merging gradients. Default: 1.

            ``avg`` (bool): whether to average the gradients of each step. Default: True.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.gradient_merge.enable = True
                >>> strategy.gradient_merge.k_steps = 2
                >>> strategy.gradient_merge.avg = True
        )r  r   rl   rl   rm   ra   
  s   zStrategy.gradient_merger  c                 C  r   )aC  
        ``fused_passes`` is used to configure the fusion of the computation in
        the model, containing following configs:

            ``enable`` (bool): whether to enable fused passes. Default: False.

            ``gemm_epilogue`` (bool): whether to fuse ``matmul`` and ``add`` computation
            in the ``Linear`` layer. Default: False

            "dropout_add" (bool): whether to fuse ``dropout`` and ``add`` computation. Default: False.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.fused_passes.enable = True
                >>> strategy.fused_passes.gemm_spilogue = True
                >>> strategy.fused_passes.dropout_add = True
        )r
  r   rl   rl   rm   r`   
     zStrategy.fused_passesauto_strategy.PipelineConfigc                 C  r   )a  
        ``pipeline`` is used to configure the pipeline parallelism,
        containing following configs:

            ``enable`` (bool): whether to enable pipeline parallelism. Default: False.

            ``schedule_mode`` (str): the scheduling mode of pipeline parallelism. Default: "1F1B".

            ``micro_batch_size`` (int): the size of each micro-batch inside a mini-batch. Default: 1.

            ``accumulate_steps`` (int): number of steps for accumulating. Default: 1.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.pipeline.enable = True
                >>> strategy.pipeline.micro_batch_size = 2
        )r  r   rl   rl   rm   rb   
  r  zStrategy.pipelineauto_strategy.AMPConfigc                 C  r   )a  
        ``amp`` is used to configure the amp,
        containing following configs:

            ``enable`` (bool):  whether to enable AMP. Default: False.
            ``dtype``, (str): the data type of AMP. Default: "float16".
            ``level``, (str): the level of AMP. Default: "O1".
            ``init_loss_scaling``, (float): the initial value of loss scaling. Default: 32768.0
            ``incr_every_n_steps``, (int): the number of steps for increasing loss scaling. Default: 1000
            ``decr_every_n_nan_or_inf``, (int): the number of steps for decreasing loss scaling. Default: 2
            ``incr_ratio``, (float): the ratio for increasing loss scaling. Default: 2.0
            ``decr_ratio``, (float): the ratio for decreasing loss scaling. Default: 2.0
            ``use_dynamic_loss_scaling``, (bool): whether to use dynamic loss scaling. Default: False
            ``custom_white_list``, (list): the list of names for which AMP will be applied. Default: []
            ``custom_black_list``, (list): the list of names for which AMP will not be applied. Default: []
            ``custom_black_varnames``, (list): the list of names for which AMP will not be applied. Default: []
            ``use_fp16_guard``, (bool): whether to use fp16 guard. Default: False
            ``use_bf16_guard``, (bool): whether to use bf16 guard. Default: False
            ``use_master_grad``, (bool): whether to use master grad. Default: False

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.amp.enable = True
                >>> strategy.amp.dtype = "float16"
                >>> strategy.amp.level = "O2"
        )r	  r   rl   rl   rm   rc   
  s   "zStrategy.amprq   )r  r   r   r  r   r(  )r   r  )r   r  )r   r  )r   r  )r   r  )rh   ri   rj   r   r   r  r   r  r_   ra   r`   rb   rc   r  rl   rl   r  rm   rD  	  s     ,;"rD  c                   @  s   e Zd ZdZ					dLdMddZdNddZdNddZdNddZdd ZdOdPd d!Z	dOdPd"d#Z
dOdPd$d%ZdOdPd&d'Zd(d) Zd*d+ ZedQd/d0ZdOd1d2Z	3	4dRdSd9d:Zd;d< ZdTd>d?Zd@dA ZdBdC ZdDdE ZdFdG ZdHdI ZdJdK ZdS )U	DistModela	  
    `DistModel` is the model converted from a ``paddle.nn.layer`` with distributed
    tensors as its parameters. It contains the static graph converted from a
    ``paddle.nn.layer`` whose parameters are distributed tensors (constructed
    from ``paddle.distributed.shard_tensor``), and provides the APIs for training,
    evaluation and prediction with the static graph.

    It is suggested to generate DistModel by ``paddle.distributed.to_static``,
    not directly by ``paddle.distributed.DistModel``.

    Please first set the DistModel to "train", "eval" or "predict" mode with
    ``train()/eval()/predict()`` method and then use the ``__call__`` method for
    training, evaluation and prediction respectively.

    For more details of the usage, please refer to the sample code in
    ``paddle.distributed.to_static``.

    Args:
        layer(paddle.nn.Layer): The layer in dygraph mode, whose parameters
            are distributed tensors generated by ``shard_tensor``.
        loader(ShardDataLoader|paddle.io.DataLoader): The data loader used in dygraph mode,
            used to infer inputs_spec and labels_spec.
        loss(Loss|Callable|None, optional): The loss function for training
            or evaluating the model. Can be a `paddle.nn.Layer` instance or
            any callable function. If loss is not None, DistModel will be set
            to "train" (when the optimizer is also not None) or "eval" mode
            (when optimizer is None) in default. If it is None, DistModel will
            be set to "predict" mode in default. Default: None.
        optimizer(paddle.optimizer.Optimizer|None, optional): The optimizer
            for training. If both optimizer and loss are set, DistModel will
            be set to "train" mode in default. Default: None.
        strategy(paddle.distributed.Strategy|None, optional): Configs for
            parallel strategies and optimization settings (e.g. sharding,
            pipeline parallelism). Default: None.
        input_spec(list[list[paddle.distributed.DistributedInputSpec]]|None, optional):
            The custom input specs specify the shape, dtype, and name information
            of model inputs and labels. If it is not None, the input specs and
            label specs will be inferred from the custom input specs. The custom
            input specs should be a list containing two sublists: the first
            sublist represents theinput specs, and the second sublist represents
            the label specs. Default: None.
    Nr  rO   loaderShardDataloader | DataLoaderr  !Layer | Callable[..., Any] | Noner/  Optimizer | Noner   Strategy | Nonemetricslist[Metric] | None
input_spec'list[list[DistributedInputSpec]] | Noner   r  c                 C  s  |  || _dd |  D | _dd | j D | _tdr4tj	j
tjd tjjdd |re|jjret|tret ret|jtsLJ dt|jtr`|j}|j}	t|	|| j}ntd	 t||||| jd
| _d | _i | _|d ur|d | j_|d | j_ n%t|t!r| j"|\| j_| j_ n|j#j$}
| j%|j&d |
\| j_| j_ t'j(j)*dd | _+| j+s|d ur|d ur| ,  d S |d ur| -  d S | .  d S d S )Nc                 S  s   i | ]\}}||j qS rl   )r  rs  rl   rl   rm   r  I  s    z&DistModel.__init__.<locals>.<dictcomp>c                 S  s   i | ]\}}||qS rl   rl   rs  rl   rl   rm   r  L  s    
ZPOD_NAMEz0Distribute training by paddle.distributed.launchT)Zis_collectivezKThe shard_fn should be ShardingStage1 when stage1 tensor fusion is enabled.z7Sharding tensor fusion only support ShardingStage1 now.)r   r   r,   ZFLAGS_enable_pir_api)/_DistModel__convert_strategy_inner_strategyrv  r
  _structured_to_parameter_name_parameter_to_structured_namer  getenvr   utilsZ	log_utilsZ
get_loggerloggingINFOinfor   initr_   rF  r   r,  r   r2  r7  r-  r>   warningr   _engine_mode_feed_name_list_inputs_specZ_labels_specShardDataloaderZ"_prepare_data_spec_from_dataloaderbatch_sampler
batch_sizeZ_prepare_data_specdatasetr   r   r   	get_flagsZ_in_pir_moderZ   r[   r\   )r   r  r  r  r/  r   r#  r%  r  Z	inner_optr8  rl   rl   rm   r   >  s   




	zDistModel.__init__c                 C  :   | j jd s| j jddd d| _| j d t  dS )z
        Set the DistModel to "train" mode. In "train" mode,
        executing ``__call__`` method will update the
        parameters of the model and return the loss.
        rZ   FmodeZinit_parametersNr2  _has_preparedZ_prepare_programr3  to_moder   disable_staticr   rl   rl   rm   rZ     s
   zDistModel.trainc                 C  r;  )z{
        Set the mode of DistModel to "eval". In "eval" mode,
        executing ``__call__`` will return the loss.
        r[   Fr<  Nr>  r   rl   rl   rm   r[     s
   zDistModel.evalc                 C  sH   | j jd s| j jt| j jdddd d| _| j d t	  dS )z
        Set the mode of DistModel to "predict". In "predict" mode,
        executing ``__call__`` returns a dict that contains the
        outputs of the model.
        r\   NFr<  )
r2  r?  preparer   r   r5  r3  r@  r   rA  r   rl   rl   rm   r\     s   zDistModel.predictc                 C  s<   |d u r| j d u rtd|d u r| j }|dvrtd|S )Nz;Please set the mode or call train()/eval()/predict() first.rY   z.mode can only be 'train', 'eval' or 'predict'.)r3  r   r   r=  rl   rl   rm   Z__validate_mode  s   zDistModel.__validate_moder=  _Mode | NonerJ   c                 C     |  |}| j|S )a  
        Get the distributed main program of the specified ``mode``. Each
        'mode' has its own distributed main program, ``dist_main_program``
        returns the corresponding distributed main program of ``mode``.

        Args:
            mode (str|None, optional): Can be 'train' , 'eval' , 'predict' or None.
                'train' : Return the distributed main program for training.
                'eval' : Return the distributed main program for evaluation.
                'predict' : Return the distributed main program for prediction.
                None : The current mode of the DistModel will be used.
                Default : None.

        Returns:
            The distributed main program of ``mode``.
        )_DistModel__validate_moder2  Zget_dist_main_programrC  rl   rl   rm   dist_main_program  s   
zDistModel.dist_main_programc                 C  rE  )a  
        Get the corresponding distributed startup program of ``mode``,
        which is used for initializing the parameters.

        Args:
            mode (str|None, optional): Can be 'train' , 'eval' , 'predict' or None.
                'train' : Return the distributed startup program for training.
                'eval' : Return the distributed startup program for evaluation.
                'predict' : Return the distributed startup program for prediction.
                None: The current mode of the DistModel will be used.
                Default : None.

        Returns:
            The distributed startup program of ``mode``.
        )rF  r2  Zget_dist_startup_programrC  rl   rl   rm   dist_startup_program     
zDistModel.dist_startup_programc                 C  rE  )ae  
        Get the corresponding serial main program of ``mode``, containing
        the whole variables and operators of the given ``layer``.

        Args:
            mode (str|None, optional): Can be 'train', 'eval', 'predict' or None.
                'train' : Return the main program for training.
                'eval' : Return the main program for evaluation.
                'predict' : Return the main program for prediction.
                None : The current mode of the DistModel will be used.
                Default : None.

        Returns:
            The serial main program of ``mode``.
        )rF  r2  Zget_serial_main_programrC  rl   rl   rm   serial_main_program  rI  zDistModel.serial_main_programc                 C  rE  )a>  
        Get the corresponding serial startup program of ``mode``.

        Args:
            mode (str|None, optional): Can be 'train' , 'eval' , 'predict' or None.
                'train' : Return the serial startup program for training.
                'eval' : Return the serial startup program for evaluation.
                'predict' : Return the serial startup program for prediction.
                None : The current mode of the DistModel will be used.
                Default : None.

        Returns:
            The serial startup program of ``mode``.
        )rF  r2  Zget_serial_startup_programrC  rl   rl   rm   serial_startup_program  s   
z DistModel.serial_startup_programc           
      C  s   | j | jvs| j| j  g kr| j | j| j < | j| j  }t|t|kr-td| dg }g }t|D ]#\}}t|tj	rSt
|}|d u rM|| q5|| q5|| q5g }t|D ]\}}	||vrl||	 q_tt||S )Nz@The input data and feed_list are not consistent.The model takes z	 as input)r3  r4  r2  Zget_feed_name_listrz   r   r   r   r   rD   rx   r   rZ  r  )
r   Z	data_listZfeed_name_list	feed_listZno_data_idsr   r   Zfeed_varZfeed_name_list_with_dataZ	feed_namerl   rl   rm   _make_feeds  s2   
zDistModel._make_feedsc                 C  s0  dd l }|d u r
d S t }tjj}tj|}| D ]}t||t|| q|j	j
|j	_
t|j	ddr<|j	jd t|j	ddrJ|j	jd ||j|_||j|_||j|_||j|_t|drr||j|_t|dr~||j|_t|d	r||j|_t|d
r||j|_|S )Nr   r  Fr  r  r  r  r  r  r  )r   r  rD  r  r  r  rS  r  r   r`   r  r  r   r   rc   r_   ra   rb   r)  r  rd   r  re   r  rf   r  rg   )r   r   r   Zinner_strategyr  r  r   rl   rl   rm   Z__convert_strategy6  sL   



zDistModel.__convert_strategyr   Sequence[Any] | Tensorr   c                 G  s  | j d u r	td| j dkr| jjd u s| jjd u rtd| j dkr-| jjd u r-tdg }t|D ]'}t|ttfrC|t|7 }q3t|tj	t
jfrR||g7 }q3tdt| | |}| j|}|| _| j dkrzd| jv rx| jd S d S d	| jv r| jd	 S d S )
Nz+Please call train()/eval()/predict() first.rZ   z7Please set optimizer and loss function before training.r[   z+Please set loss function before evaluation.z:The inputs of DistModel should be list or tensor, but got r\   r	  r  )r3  r   r2  
_optimizerZ_lossr   r   tupler   rD   r(   rr   	TypeErrorr   rM  runouts)r   r   rL  Z	feed_itemZfeedsrS  rl   rl   rm   r  c  s:   








zDistModel.__call__c                 C  s8   | j j| |du rt| j jd }| j j| dS )aM  
        Get the value of the variable with the given name.

        Args:
            value (pir.Value): The pir Value to fetch.
            name (str|None, optional): The user-defined name of
                the fetched result. If None, the order of the Value
                in the fetch list will be used. Default: None.
        Nr,   )r2  Z_pir_fetch_valuesr   rz   Z_pir_user_defined_fetch_names)r   r  r  rl   rl   rm   _fetch_value  s   
zDistModel._fetch_valuer   TLiteral['opt', 'param', 'all']split_fusionr(  dict[str, Tensor]c                   s  t  rtj } j jjd||}n j jjd|} |} jj	dur|rtj
j   jj	 D ]\}}|D ]}| \\}	}
t| }|D ]}t||	}|dur|| }| soJ d| d| d|j}|j}d|v r| ft|
 }n&t|
dkrd}|
d	 j}|
d
 j}nd}d}d}t| t|
|||d}tt|
D ]}t|| ||}t|| |  |||
| j| < q|| qQq@q:W d   n1 sw   Y   fdd| D }tt|t| }|S )a  
        Get the state dict of model and optimizer.

        Args:
            mode (str): Can be ['opt', 'param', 'all'],
                'opt' :  The return value only contains the variable in the optimizer.
                'param' : The return value only contains the variable in the network, not the variable in the optimizer.
                'all' : The return value contains the variable in the network and optimizer.
                Default: 'all'
        r=  Nkey  value: is not a dist tensor._pow_acc   Tr   r,   F)Z
split_numsis_qkv	num_headsnum_key_value_headsc                   $   g | ]}| j v r j | n|qS rl   r*  r   rt  r   rl   rm   r         

z(DistModel.state_dict.<locals>.<listcomp>) r   r   staticglobal_scoperG  r2  r3  rv  _build_distributed_state_dictfused_ffn_qkvr   dygraphguardr
  r   rS  r}   rs   r   r   rv   rz   local_num_headr#   r  r   r  r  poprZ  r  r  )r   r=  rV  scopelocal_state_dictZdist_state_dictr   pat_list
fusion_mapfused_paramori_params_metaZorigin_paramsr   suffixr  r   r   r   r^  r_  r`  r   r   Zmapping_namesrl   r   rm   rv    s   







4zDistModel.state_dictc                 C  s   | j | jjd}t rt|}n
t|| jj| j }dd }i }tjj	 * |
 D ]\}}||v s>J d| d| d|||| ||< q+W d   |S 1 sSw   Y  |S )zo
        Args:
            local_state_dict(Dict[str, libpaddle.Tensor]): The state dict from program.
        rX  c                 S  s  t | tjtjtjjfsJ t | tjst| } t | tjs,J d|  dt|  dt| jt|d ksEJ d| j d|d  d| j}t	t
|d |d	 |d
 d}t|d |}t| ||}| j| jks}J d| j d| j t| |  |S )Nzlocal tensor:z type z is not paddle.Tensor.r   zlocal tensor shape z! not equal to dims_mapping shape r  rf  Zprocess_shaper   )r   z! not equal to local_tensor.shape:)r   r   rD   r   Zndarrayr   r   rz   r   r   r   r   r;   r   rv   r  )r   r  r   r   r   r   rl   rl   rm   build_distributed_tensor  s2   
zIDistModel._build_distributed_state_dict.<locals>.build_distributed_tensorzvar z not in dist attrs:r  N)rG  r2  r3  r   r!   Z_dist_contextsr   r   ri  rj  r
  )r   rn  rG  Z
dist_attrsrt  Zglobal_state_dictvar_namero   rl   rl   rm   rg    s*   



z'DistModel._build_distributed_state_dictrv  c                 C  s  i }| j | jjd}| jdd}d}| jr| jjjnd}| jjd ur&|r&d}| D ]S\}}|	 s=J d| d| d||v ri|| }	|j
|| j
ksit|j|	jsiJ d|j
 d	|	j
 d
|j d	|	j d	|| jv rs| j| n|}
t| ||
< q*| jjd urItjj  | jj D ]\}}|D ]}| \\}}g }| D ]\}}t|d j|}|d ur|| qt|dkrq|D ]p}g }|D ] }|j| |vrt|j|  d  n|||j|   qt|t|kr6d|v r|d }n"t|dkrd}|d j}|d j}nd}d }d }t||||d}t|||| < |D ]
}|||  q+qqqW d    n	1 sDw   Y  t rY||tj ! | d S ||tj !  d S )NrX  F)rV  TrY  rZ  r[  zprocess_mesh: != z or placements:z
 not matchr   z is not in state_dict.r\  r]  r,   )r^  r_  r`  )"rG  r2  r3  rv  r(  r_   rF  rO  r
  rs   r   r7   r   r)  rx   rv   rh  r   r   ri  rj  r}   r  r   rz   warningswarnrk  r    rl  r   set_state_dictre  rf  )r   rv  rn  rG  Zcur_state_dictZcopy_tensorrF  rt  ru  Zcur_v
param_namer   ro  rp  rq  rr  Zsuffix_namesrs  Zconcat_tensorsZori_pZfused_wr^  r_  r`  rl   rl   rm   ry  +  s   $
	



8
zDistModel.set_state_dictc                 C  sB   | j j}|d u r
|S t|tjjjjr|j}t|tsJ d|S )NzUThe optimizer should be ShardingOptimizerStage1 when stage1 tensor fusion is enabled.)	r2  rO  r   r   re  rc   	decoratorZOptimizerWithMixedPrecisionr>   )r   r/  rl   rl   rm   _get_shard_stage1_optimizer  s   
z%DistModel._get_shard_stage1_optimizerc                   s    j r j jjnd}|sJ d  }|d usJ d fdd| D }tt|t| }|||  fdd| D }tt|t| }|S )NFz:Can only convert state_dict when tensor fusion is enabled.z!The optimizer should not be None.c                   ra  rl   )r)  rc  r   rl   rm   r     rd  z?DistModel._convert_state_dict_tensor_fusion.<locals>.<listcomp>c                   ra  rl   rb  rc  r   rl   rm   r     rd  )	r(  r_   rF  r|  rS  rZ  r  r   r  )r   rv  optimizer_functionrF  r/  Zparameter_namesZstructured_namesrl   r   rm   !_convert_state_dict_tensor_fusion  s&   


z+DistModel._convert_state_dict_tensor_fusionc                 C     dd }|  ||S )Nc                 S     |  | d S rq   )Z(convert_state_dict_with_rank_unique_namer/  rv  rl   rl   rm   r}       zODistModel._convert_state_dict_with_rank_unique_name.<locals>.optimizer_functionr~  r   rv  r}  rl   rl   rm   )_convert_state_dict_with_rank_unique_name     z3DistModel._convert_state_dict_with_rank_unique_namec                 C  r  )Nc                 S  r  rq   )Z.convert_state_dict_without_tensor_fusion_paramr  rl   rl   rm   r}    r  zUDistModel._convert_state_dict_without_tensor_fusion_param.<locals>.optimizer_functionr  r  rl   rl   rm   /_convert_state_dict_without_tensor_fusion_param  r  z9DistModel._convert_state_dict_without_tensor_fusion_paramc                 C  r  )Nc                 S  r  rq   )Z+convert_state_dict_with_tensor_fusion_paramr  rl   rl   rm   r}    r  zRDistModel._convert_state_dict_with_tensor_fusion_param.<locals>.optimizer_functionr  r  rl   rl   rm   ,_convert_state_dict_with_tensor_fusion_param  r  z6DistModel._convert_state_dict_with_tensor_fusion_paramc                 C  r  )Nc                 S  r  rq   )Z#convert_state_dict_with_origin_namer  rl   rl   rm   r}    r  zJDistModel._convert_state_dict_with_origin_name.<locals>.optimizer_functionr  r  rl   rl   rm   $_convert_state_dict_with_origin_name  r  z.DistModel._convert_state_dict_with_origin_namer   )r  rO   r  r  r  r   r/  r!  r   r"  r#  r$  r%  r&  r   r  )r   r  rq   )r=  rD  r   rJ   )r   rN  r   r   )r   T)r=  rU  rV  r(  r   rW  )rv  rW  r   r  )rh   ri   rj   r   r   rZ   r[   r\   rF  rG  rH  rJ  rK  rM  r'  r   r  rT  rv  rg  ry  r|  r~  r  r  r  r  rl   rl   rl   rm   r    s@    /
U

!-
&^
4g%r  r  #ShardDataloader | DataLoader | Noner  r   r!  r   r"  r%  r&  c           
      C  s  t |trlt sl|j}|j}|j}|durl|du rt n|}|dus'J dt |tr9d|j	_
d|j	_||j	_n3t |trKd|j	_
d|j	_||j	_n!t |trhd|j	_
d|j	_||j	_|jD ]}|| q_ntd|du ss|jrt| |||||d}	|	S tjj| d	d
} | S )aE  
    Converts the ``layer`` with distributed tensor (constructed from
    ``paddle.distributed.shard_tensor``) to a static graph. ``to_static``
    returns a DistModel instance containing the static graph for
    distributed training, evaluation and prediction.

    Args:
        layer(paddle.nn.Layer): The layer in dygraph mode, the parameters
            or its inputs can be distributed tensors.
        loader(ShardDataloader|paddle.io.DataLoader): The data loader used in dygraph mode,
            used to infer inputs_spec and labels_spec.
        loss(Loss|Callable|None, optional): The loss function for training
            or evaluating the model. Can be a `paddle.nn.Layer` instance or
            any callable function. Default: None.
        optimizer(paddle.optimizer.Optimizer|_ShardOptimizer|None, optional):
            The optimizer for training. It can `paddle.optimizer.Optimizer`
            or `_ShardOptimizer` wrapped by `shard_optimizer`. Default: None.
        strategy(paddle.distributed.Strategy|None, optional): Configs for
            parallel strategies and optimization settings (e.g. sharding,
            pipeline parallelism). Default: None.
        input_spec(list[list[paddle.distributed.DistributedInputSpec]]|None, optional):
            The custom input specs specify the shape, dtype, and name information
            of model inputs and labels. If it is not None, the input specs and
            label specs will be inferred from the custom input specs. The custom
            input specs should be a list containing two sublists: the first
            sublist represents theinput specs, and the second sublist represents
            the label specs. Default: None.

    Returns:
        DistModel: A ``DistModel`` instance converted the input ``layer``.

    Examples:
        .. code-block:: python

            >>> import numpy as np
            >>> import paddle
            >>> import paddle.distributed as dist
            >>> from paddle import nn
            >>> from paddle.distributed import Replicate, Shard

            >>> BATCH_SIZE = 4
            >>> BATCH_NUM = 4
            >>> IMAGE_SIZE = 16
            >>> CLASS_NUM = 8
            >>> class RandomDataset(paddle.io.Dataset): # type: ignore[type-arg]
            ...     def __init__(self, images, labels, num_samples):
            ...         self.images = images
            ...         self.labels = labels
            ...         self.num_samples = num_samples
            ...     def __getitem__(self, idx):
            ...         return self.images[idx], self.labels[idx]
            ...     def __len__(self):
            ...         return self.num_samples

            >>> class DemoNet(nn.Layer):
            ...     def __init__(self, mesh):
            ...         super().__init__()
            ...         self._mesh = mesh
            ...         self.linear_0 = nn.Linear(IMAGE_SIZE, IMAGE_SIZE)
            ...         self.linear_1 = nn.Linear(IMAGE_SIZE, CLASS_NUM)
            ...         self.relu = nn.ReLU()
            ...         # shard the weights of this layer
            ...         self.linear_0.weight = dist.shard_tensor(
            ...             self.linear_0.weight,
            ...             self._mesh,
            ...             [Shard(1)],
            ...             stop_gradient=False,
            ...         )
            ...         self.linear_1.weight = dist.shard_tensor(
            ...             self.linear_1.weight,
            ...             self._mesh,
            ...             [Shard(0)],
            ...             stop_gradient=False,
            ...         )
            ...     def forward(self, x):
            ...         out = self.linear_0(x)
            ...         out = self.relu(out)
            ...         out = self.linear_1(out)
            ...         return out

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> images = np.random.rand(BATCH_SIZE, IMAGE_SIZE).astype('float32')
            >>> labels = np.random.rand(BATCH_SIZE, CLASS_NUM).astype('float32')
            >>> dataset = RandomDataset(images, labels, BATCH_SIZE)
            >>> loader = paddle.io.DataLoader(dataset, batch_size=BATCH_SIZE)

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> layer = DemoNet(mesh)
            >>> opt = paddle.optimizer.SGD(
            ...     learning_rate=0.1, parameters=layer.parameters()
            ... )
            >>> loss_fn = nn.MSELoss()
            >>> dist_loader = dist.shard_dataloader(loader, meshes=[mesh])
            >>> dist_model = dist.to_static(
            ...     layer, dist_loader, loss_fn, opt
            ... )
            >>> # training
            >>> dist_model.train()
            >>> for batch_id, (image, label) in enumerate(dist_loader()):
            ...     # in train mode, executing the __call__ method will
            ...     # update the parameters of the model and return the
            ...     # loss
            ...     loss = dist_model(image, label)

            >>> # evaluation
            >>> dist_model.eval()
            >>> for batch_id, (image, label) in enumerate(dist_loader()):
            ...     # in eval mode, executing the __call__ method will
            ...     # return the loss
            ...     loss = dist_model(image, label)

            >>> # prediction
            >>> dist_model.predict()
            >>> for batch_id, (image, label) in enumerate(dist_loader()):
            ...     # in predict mode, executing the __call__ method will
            ...     # return a dict that contains the outputs of the model,
            ...     # where the value of "out0" is the first output.
            ...     outs = dist_model(image)

            >>> # This case need to be executed in multi-card environment
            >>> # export CUDA_VISIBLE_DEVICES=0,1
            >>> # python -m paddle.distributed.launch {test_case}.py
    Nz Sharding degree can not be None.Tr,      r]  zdOnly sharding stage 1, 2 and 3 can to_static for now. User-defined shard_fn will be supported later.)r%  F)r  )r   r,  r   r2  r4  r-  r   rD  r7  r_   r  ZstageZdegreer8  r9  r<  r  r   r  r  r   Zjitr  )
r  r  r  r/  r   r%  r  r  r   Z
dist_modelrl   rl   rm   r    sF    






r  c                 C  s   t  r;|  du rtd| j}| j}t gt| }t	| ||}t
| tr4tj| fi | jS t | S t j rQt j|  | j}| | | S td)aQ  
    Converts a distributed tensor to a dense tensor. ``unshard_dtensor``
    first make the ``dist_tensor`` be ``Replicate`` state on all processes and
    then converts it to a dense ``paddle.Tensor``. It can be treated as a
    reverse operation of ``shard_tensor``.

    Args:
        dist_tensor (paddle.Tensor): The distributed tensor which is constructed
            from a dense tensor with ``shard_tensor``.

    Returns:
        paddle.Tensor: The original dense tensor of the input ``dist_tensor``.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist
            >>> from paddle.distributed import Replicate, Shard

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> original_tensor = paddle.rand([4, 1024, 512])
            >>> dist_tensor = dist.shard_tensor(original_tensor, mesh, [Shard(0)])
            >>> # dense_tensor's shape is the same as original_tensor
            >>> dense_tensor = dist.unshard_dtensor(dist_tensor)
    Fr   z;`unshard_dtensor()` only supported in dynamic and pir mode.)r   r   rs   r   r   r   r   r   rz   r   r   r   r   rv   r   rD   r   r   r   Zcreate_shaped_typer   r   r  r   )r   r   r   Zreplicate_placementsZr_dist_tensorZdense_tensor_typerl   rl   rm   unshard_dtensor  s.   



r  c                   @  s   e Zd ZdZ				d(d)ddZdd Zdd Zdd Zdd Zdd Z	dd Z
dd Z	d*d d!Zd"d# Zd$d% Zd&d' ZdS )+r6  au  
    ShardDataloader converts a dataloader to a new dataloader which provided two capabilities:
    1. split dataloader by shard_dim to do data parallel.
    2. reshard the output of dataloader to distributed tensor.
    if is_dataset_splitted is True, just need to do reshard.

    Args:
        dataloader (paddle.io.DataLoader): The dataloader to be sharded.
        meshes (ProcessMesh|list[ProcessMesh]|tuple[ProcessMesh]): The mesh list of the dataloader.
            Identify which mesh the input is on. if len(meshes) == 1 or type(meshes) == ProcessMesh,
            all the inputs are on the same mesh.
        input_keys (list[str]|tuple[str]): if the iteration result of dataloader is a dict of tensors,
            input_keys is the keys of this dict, identify which tensor is located on which mesh,
            one-to-one correspondence with meshes. i.e. dict[input_keys[i]] is on meshes[i].
            Default: None, which means the outputs is a list, and the i'th input is on meshes[i].
        shard_dims (list|tuple|str|int]): The mesh dimension to shard the dataloader.
            Users can specify the shard_dim of each mesh or specify a single shard_dim for all meshes.
            Default: None, which means the data loader will not be split, i.e. mp.
        is_dataset_splitted (bool): Whether the dataset has been split.
        dense_tensor_idx (list): A paired 2D list specifies the index of the dense_tensor in the output of dataloader.
            It allows users to identify which elements within each output batch are dense_tensor.
            first dense_tensor: the dense_tensor return by dataloader.
            second dense_tensor: num_or_sections specifies how to split first tensor: evenly (if a number) or unevenly (if a list).
            Default: None, meaning all outputs are dist_tensors.
            Note: For dense_tensor_idx settings, the idx must be paired.
    NF
dataloaderpaddle.io.DataLoadermeshes4ProcessMesh | list[ProcessMesh] | tuple[ProcessMesh]
input_keyslist[str] | tuple[str] | None
shard_dimslist | tuple | str | int | Noneis_dataset_splittedr(  dense_tensor_idxlist[list[int]] | Nonec                 C  s  |du r|d u rt dt|| _| jd u st| jdkr!t dt }| |r5t d| d| j t| jdk| _|| _| 	|| _
| |\}}	|d u rkt| jd d }t| j
d d }	d}
||	}n||	|}
||	}|du s~|d u r|| _|jj| _nct|jtr|jj| _|j| _|| _nPt|jj| | _t|jtrd}d}n|jj}|jj}t|j| j||
||d	| _|jj| j_tjj|j| j|j|j|j|j|j |j!|j"|j#|j$|j%|j&d
| _d| j_'d | _(|| _)d S )NTz7shard_dims must be set when is_dataset_splitted is Truer   zmeshes must be setzprocess_id z* is in more than one mesh, the meshes are r,   F)r9  r8  Znum_replicasrd  shuffle	drop_last)r9  r7  rL  placesreturn_list
collate_fnnum_workersuse_buffer_readerprefetch_factoruse_shared_memorytimeoutworker_init_fnZpersistent_workers)*r   r$   _meshesrz   r   r   _process_id_in_multi_meshes_all_inputs_in_one_mesh_input_keys_process_shard_dims_shard_dims_get_mesh_and_shard_dimrJ  Zget_rank_by_dim_and_process_id_dataloaderr7  r8  r   r)   r  r*   r  r  r9  Z
_acc_stepsr   iorM   rL  r  r  r  r  r  r  r  r  r  Z_persistent_workersZ
pin_memoryr  r  )r   r  r  r  r  r  r  
process_idr   r   Zdp_rankZdp_world_sizer  r  rl   rl   rm   r     s   






zShardDataloader.__init__c                 C  s   t |ttfs|d u r6g }tt| jD ]}t | j| ttfr.||gt| j|   q|| q|S t|t| jkrNt	dt| dt| j |S )Nz6shard_dims must be the same length as meshes, but got rv  )
r   r  r   r  rz   r  r   rP  r   r   )r   r  resr   rl   rl   rm   r  F  s   z#ShardDataloader._process_shard_dimsc                 C  s   t t| jD ]I}t| j| ttfr<t t| j| D ]}|| j| | jv r:| j| | | j| | f    S qq|| j| jv rP| j| | j| f  S qdS )N)NN)r  rz   r  r   r   rP  _process_idsr  )r   r  r   jrl   rl   rm   r  V  s   $z'ShardDataloader._get_mesh_and_shard_dimc                 C  sh   d}g }| j D ]}t|ttfr|| q|| qtt|}|D ]}||jv r/|d7 }q$|dkS Nr   r,   )r  r   r   rP  extendr   rM  r  )r   r  countZflatten_meshesr   Zunique_meshesrl   rl   rm   r  a  s   

z+ShardDataloader._process_id_in_multi_meshesc                 C  s
   t | jS rq   )rz   r  r   rl   rl   rm   __len__q  r  zShardDataloader.__len__c                 C  
   d | _ | S rq   r  r   rl   rl   rm   __iter__t     zShardDataloader.__iter__c                 C  s   | j r| jd n| j| }|d urt stdg}nt g}| j r(| jd n| j| }tdt|j	D ]	}|
t  q5||fS r  )r  r  r/   r   r  r   r  r  rz   r  r   )r   r   r   r   r   r  rl   rl   rm   _get_mesh_and_placementy  s   
z'ShardDataloader._get_mesh_and_placementc           	      C  s  | j r| jd g| }| jd g| }n4| j| }t|ttfr)t||ks(J n|g| }| j| }t|ttfrCt||ksBJ n|g| }g }t|D ]0}|| d ur`t s`t	
dg}nt	 g}tdt|| jD ]	}|t	  qo|| qN||fS r  )r  r  r  r   r   rP  rz   r  r/   r   r  r   r  r   )	r   r   lengthr  r  r   r   r   r  rl   rl   rm   )_get_meshes_and_placements_for_list_input  s(   




z9ShardDataloader._get_meshes_and_placements_for_list_inputc                 C  sh   g }t t|D ])}|d ur||v st|| tjs"|||  q|t|| || ||  q|S rq   )r  rz   r   r   rD   r   r   )r   Zlist_tensorsr  r   r  Z	dist_datar  rl   rl   rm   _dtensors_from_list_input  s   z)ShardDataloader._dtensors_from_list_inputc              	   C  sR  t |ttfr| jdu rt|t| jksJ g }tt|D ]`}|| }t |ttfrN| |t|\}}| jd u r=d n| j| }|	| 
|||| qt |tjrw| jd urf| j| g krf|	| q| |\}}|	t||| qtdt| |S t |tr| jd u r| n| j}	| jdu rt|	t| jksJ i }t|	D ]_\}}
||
 }t |ttfr| |t|\}}| jd u rd n| j| }| 
||||||
< qt |tjr| jd ur| j| g kr|	| q| |\}}t||
 ||||
< q|||
< q|S t |tjr | d\}}t|||S tdt| )NFzUnsupported input_data type r   zUnsupported batch_data type )r   r   rP  r  rz   r  r  r  r  r   r  r   rD   r  r   r   r   rZ  r  rS  r   )r   
batch_dataZdist_batch_datar   Z
input_datar  r   Z_dense_tensor_idxr   r  r   rl   rl   rm   
_get_batch  s   











zShardDataloader._get_batchc                 C  s*   | j d u r| j | _ t| j }| |S rq   )r  r  r  r  r  )r   r  rl   rl   rm   __next__  s   


zShardDataloader.__next__c                 C  r  rq   r  r   rl   rl   rm   r    r  zShardDataloader.__call__NNFN)r  r  r  r  r  r  r  r  r  r(  r  r  rq   )rh   ri   rj   r   r   r  r  r  r  r  r  r  r  r  r  r  rl   rl   rl   rm   r6    s&    U
Vr6  r  rM   r  #ProcessMesh | Sequence[ProcessMesh]r  Sequence[str] | Noner  0Sequence[str] | Sequence[int] | str | int | Noner  r  r  c                 C  s   t | |||||S )al$  
    Convert the dataloader to a ShardDataloader which provided two capabilities:
    1. split dataloader by shard_dim to do data parallel if it it not None.
    2. reshard the output of dataloader to distributed tensor.
    if is_dataset_splitted is True, it means that the dataset has been split by users, and just need to do reshard.
    only if is_dataset_splitted is False and shard_dims is not None, it will do split.

    Args:
        dataloader (paddle.io.DataLoader): The dataloader to be sharded. the output of dataloader
            must be a list or dict of paddle.Tensor with 2 elements, i.e. [input_data, label] or
            {"input_data": input_data, "label": label}, input_data and label can be a list to support multiple inputs.
        meshes (ProcessMesh|list[ProcessMesh]|tuple[ProcessMesh]): The mesh list of the dataloader.
            Identify which mesh the input is on. if len(meshes) == 1 or type(meshes) == ProcessMesh,
            all the inputs are on the same mesh.
        input_keys (list[str]|tuple[str]): if the iteration result of dataloader is a dict of tensors,
            input_keys is the keys of this dict, identify which tensor is located on which mesh,
            one-to-one correspondence with meshes. i.e. dict[input_keys[i]] is on meshes[i].
            Default: None, which means the outputs is a list, and the i'th input is on meshes[i].
        shard_dims (list(str)|tuple(str)|list(int)|tuple(int)|str|int]):
            The mesh dimension to shard the dataloader.
            Users can specify the shard_dim of each mesh or specify a single shard_dim for all meshes.
            Default: None, which means the data loader will not be split, i.e. mp.
        is_dataset_splitted (bool): Whether the dataset has been split, Default: False.
        dense_tensor_idx (list): A paired 2D list specifies the index of the dense_tensor in the output of dataloader.
            It allows users to identify which elements within each output batch are dense_tensor.
            first dense_tensor: the dense_tensor return by dataloader.
            second dense_tensor: num_or_sections specifies how to split first tensor: evenly (if a number) or unevenly (if a list).
            Default: None, meaning all outputs are dist_tensors.
            Note: For dense_tensor_idx settings, the idx must be paired.
    Returns:
        ShardDataloader: The sharded dataloader.

    Examples:
        .. code-block:: python
            :name: example-1

            >>> import os
            >>> import numpy as np
            >>> import paddle
            >>> import paddle.distributed as dist
            >>> from paddle.io import BatchSampler, DataLoader, Dataset

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> mesh0 = dist.ProcessMesh([[0, 1], [2, 3]], dim_names=['x', 'y'])
            >>> mesh1 = dist.ProcessMesh([[4, 5], [6, 7]], dim_names=['x', 'y'])

            >>> paddle.seed(1024)
            >>> np.random.seed(1024)
            >>> class RandomDataset(Dataset): # type: ignore[type-arg]
            >>>     def __init__(self, seq_len, hidden, num_samples=8):
            ...         super().__init__()
            ...         self.seq_len = seq_len
            ...         self.hidden = hidden
            ...         self.num_samples = num_samples
            ...         self.inputs = [np.random.uniform(size=[self.seq_len, self.hidden]).astype("float32") for _ in range(num_samples)]
            ...         self.labels = [np.array(index, dtype="float32") for index in range(num_samples)]

            ...     def __getitem__(self, index):
            ...         return self.inputs[index], self.labels[index]

            ...     def __len__(self):
            ...         return self.num_samples

            >>> class MlpModel(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super(MlpModel, self).__init__()
            ...         self.w0 = dist.shard_tensor(
            ...             self.create_parameter(shape=[8, 8]),
            ...             mesh0, [dist.Replicate(), dist.Shard(1)])
            ...         self.w1 = dist.shard_tensor(
            ...             self.create_parameter(shape=[8, 8]),
            ...             mesh1, [dist.Replicate(), dist.Shard(0)])

            ...     def forward(self, x):
            ...         y = paddle.matmul(x, self.w0)
            ...         y = dist.reshard(y, mesh1, [dist.Shard(0), dist.Shard(2)])
            ...         z = paddle.matmul(y, self.w1)
            ...         return z

            >>> model = MlpModel()
            >>> dataset = RandomDataset(4, 8)
            >>> sampler = BatchSampler(
            ...     dataset,
            ...     batch_size=2,
            ... )
            >>> dataloader = DataLoader(
            ...     dataset,
            ...     batch_sampler=sampler,
            ... )
            >>> dist_dataloader = dist.shard_dataloader(
            ...     dataloader=dataloader,
            ...     meshes=[mesh0, mesh1],
            ...     shard_dims="x"
            ... )
            >>> opt = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters())
            >>> dist_opt = dist.shard_optimizer(opt)
            >>> def loss_fn(logits, label):
            ...     # logits: [bs, seq_len, hidden], label: [bs]
            ...     loss = paddle.nn.MSELoss(reduction="sum")
            ...     logits = paddle.sum(logits, axis=[1, 2])
            ...     return loss(logits, label)

            >>> RUN_STATIC = eval(os.environ['RUN_STATIC'])
            >>> def run_dynamic():
            ...     for step, (input, label) in enumerate(dist_dataloader()):
            ...         logits = model(input)
            ...         loss = loss_fn(logits, label)
            ...         print("step:{}, loss:{}".format(step, loss))
            ...         loss.backward()
            ...         dist_opt.step()
            ...         dist_opt.clear_grad()

            >>> def run_static():
            ...     dist_model = dist.to_static(
            ...         model, dist_dataloader, loss_fn, opt
            ...     )
            ...     dist_model.train()
            ...     for step, (input, label) in enumerate(dist_dataloader()):
            ...         print("label:", label)
            ...         loss = dist_model(input, label)
            ...         print("step:{}, loss:{}".format(step, loss))

            >>> if RUN_STATIC == 0:
            ...     run_dynamic()
            ... else:
            ...     run_static()

            >>> # This case need to be executed in multi-card environment
            >>> # export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
            >>> # RUN_STATIC=1 python -u -m paddle.distributed.launch --gpus "0,1,2,3,4,5,6,7" {test_case}.py
            >>> # RUN_STATIC=0 python -u -m paddle.distributed.launch --gpus "0,1,2,3,4,5,6,7" {test_case}.py

        .. code-block:: python
            :name: example-2

            >>> import paddle
            >>> import paddle.distributed as dist
            >>> from paddle.io import BatchSampler, DataLoader, Dataset
            >>> import numpy as np
            >>> mesh0 = dist.ProcessMesh([[0, 1], [2, 3]], dim_names=['dp', 'mp'])
            >>> mesh1 = dist.ProcessMesh([[4, 5], [6, 7]], dim_names=['dp', 'mp'])
            >>> class RandomDataset(Dataset): # type: ignore[type-arg]
            ...     def __init__(self, seq_len, hidden, num_samples=8):
            ...         super().__init__()
            ...         self.seq_len = seq_len
            ...         self.hidden = hidden
            ...         self.num_samples = num_samples
            ...         self.inputs1 = [
            ...             np.random.uniform(size=[self.seq_len, self.hidden]).astype(
            ...                 "float32"
            ...             )
            ...             for _ in range(num_samples)
            ...         ]
            ...         self.inputs2 = [
            ...             np.random.uniform(size=[self.seq_len, self.hidden]).astype(
            ...                 "float32"
            ...             )
            ...             for _ in range(num_samples)
            ...         ]
            ...         self.labels = [
            ...             np.array(index, dtype="float32") for index in range(num_samples)
            ...         ]
            ...     def __getitem__(self, index):
            ...         return {
            ...             "inputs": [self.inputs1[index], self.inputs2[index]],
            ...             "label": self.labels[index],
            ...         }
            ...     def __len__(self):
            ...         return self.num_samples

            >>> dataset = RandomDataset(4, 8)
            >>> sampler = BatchSampler(
            ...     dataset,
            ...     batch_size=2,
            ... )
            >>> dataloader = DataLoader(
            ...     dataset,
            ...     batch_sampler=sampler,
            ... )
            >>> dist_dataloader = dist.shard_dataloader(
            ...     dataloader=dataloader,
            ...     meshes=[mesh0, mesh1],  # or [[mesh0, mesh0], mesh1]
            ...     shard_dims="dp",
            ...     input_keys=["inputs", "label"],
            ... )
    )r6  )r  r  r  r  r  r  rl   rl   rm   shard_dataloader  s    Dr  c                   C  s   t jjdd S )NZ%FLAGS_enable_auto_parallel_align_mode)r   r   r   r:  rl   rl   rl   rm   r    s
   r  c                   C  s
   t   dS )a  
    Enables an automated Data Parallel (DP) setup for auto-parallel training.

    This function simplifies the process of implementing vanilla (standard) Data
    Parallelism within the auto-parallel framework. By calling ``enable_auto_dp()``,
    users can achieve data parallel training without needing to manually configure
    ``paddle.distributed.shard_dataloader`` (or a similar distributed dataloader
    interface) for DP-specific data sharding or distribution. This mode automates
    the setup required for DP communication and data handling.

    The function works by setting the related environment variable
    to ``1``. This signals to the auto-parallel system that it should
    automatically manage the data parallelism aspects of the training process
    according to a predefined strategy.

    A significant advantage of this automated DP mode is its inherent robustness
    and ability to handle scenarios that can be challenging for manual or other
    standard DP configurations. For instance, it is particularly effective for:

    - Training models where input data may have non-uniform shapes across
      different data parallel ranks (e.g., certain video generation models
      like Wanx). In such cases, where traditional DP might lead to program
      hangs due to shape mismatches during communication, this automated mode
      employs strategies (like adjusting data representation and gradient
      synchronization) to ensure smooth training.

    In essence, ``enable_auto_dp()`` provides two key benefits:

    1. **Simplified DP Setup:** Automates the configuration for basic data
       parallelism, reducing manual setup effort (e.g., no need for manual
       ``shard_dataloader`` DP configuration).
    2. **Robustness for Complex Cases:** Effectively handles advanced scenarios
       like non-uniform input shapes.

    Note:
        This function should typically be called at the very beginning of your
        training script, prior to initializing Paddle's distributed environment
        or any auto-parallel components. The underlying auto-parallel framework,
        including its data loading and optimizer components, must be designed to
        recognize and act upon the environment variable.

    Examples:
        .. code-block:: python

            >>> import numpy as np
            >>> import paddle
            >>> from paddle import nn
            >>> import paddle.distributed as dist
            >>> from paddle.io import Dataset, DataLoader

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> dist.enable_auto_dp()

            >>> BATCH_SIZE = 32
            >>> CLASS_NUM = 10
            >>> INPUT_DIM = 256
            >>> STEPS = 100

            >>> class RandomDataset(Dataset):  # type: ignore[type-arg]
            ...     def __init__(self, num_samples):
            ...         rank = dist.get_rank() if dist.get_world_size() > 1 else 0
            ...         np.random.seed(42 + rank)
            ...         self.num_samples = num_samples
            ...     def __getitem__(self, idx):
            ...         x = np.random.rand(INPUT_DIM).astype('float32')
            ...         y = np.random.randint(0, CLASS_NUM, (1,)).astype('int64')
            ...         return x, y
            ...     def __len__(self):
            ...         return self.num_samples

            >>> class SimpleNet(nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.net = nn.Sequential(
            ...             nn.Linear(INPUT_DIM, 102400),
            ...             nn.Linear(102400, INPUT_DIM),
            ...             nn.Linear(INPUT_DIM, CLASS_NUM),
            ...         )
            ...     def forward(self, x):
            ...         return self.net(x)

            >>> model = SimpleNet()
            >>> optimizer = paddle.optimizer.AdamW(learning_rate=1e-3, parameters=model.parameters())
            >>> dataset = RandomDataset(num_samples=STEPS * BATCH_SIZE)
            >>> loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=True)

            >>> model.train()
            >>> for step, (x, y) in enumerate(loader):
            ...     y.stop_gradient = True
            ...     loss = paddle.mean(model(x))
            ...     loss.backward()
            ...     optimizer.step()
            ...     model.clear_gradients()
            ...     if step % 5 == 0:
            ...         print(f"[step {step}] loss: {loss.item():.4f}")

            >>> # This case need to be executed in multi-card environment
            >>> # export CUDA_VISIBLE_DEVICES=0,1
            >>> # python -m paddle.distributed.launch {test_case}.py

    N)r-   rl   rl   rl   rm   enable_auto_dp  s   
fr  )ro   rp   )NNN)r   r   r   r   r   r   r   r   r   r   r   r   r   rD   )r   )r   r   r   r   r   r   r   r   r   r   r   rD   )r   rD   r   r   r   r   r   rD   )r  rO   r   r   r  r  r  r  r  r  r   rO   r  r  )r/  r+   r  r  r5  r  r   r,  )r  rI   r   rI   r   )r  rO   r  r  r  r   r/  r!  r   r"  r%  r&  r   r  )r   rD   r   rD   r  )r  rM   r  r  r  r  r  r  r  r(  r  r  r   r6  )
__future__r   r   r-  r  rw  collectionsr   typesr   typingr   r   r   r   numpyr   r   Zpaddle.distributedr  r   r	   r
   r   Zpaddle.amp.grad_scalerr   Zpaddle.autogradr   Zpaddle.baser   Zpaddle.base.dygraph.baser   Zpaddle.base.frameworkr   r   r   r   r   r   r   Z paddle.distributed.auto_parallelr   r   r  Z*paddle.distributed.auto_parallel.interfacer   r   Z-paddle.distributed.auto_parallel.process_meshr   Z2paddle.distributed.auto_parallel.static.completionr   Z4paddle.distributed.auto_parallel.static.dist_contextr   Z/paddle.distributed.auto_parallel.static.dist_opr   Z-paddle.distributed.auto_parallel.static.utilsr   r    r!   r"   r#   r$   Z3paddle.distributed.fleet.utils.tensor_fusion_helperr%   r&   r'   Zpaddle.frameworkr(   Z"paddle.io.dataloader.batch_samplerr)   r*   Zpaddle.optimizerr+   Zauto_dp_utilsr-   r.   r/   r  r0   r1   r2   r3   r4   r5   r6   Zplacement_typer7   r8   r9   r:   r;   randomr<   r=   r_   r>   r?   r@   collections.abcrA   rB   Ztyping_extensionsrC   rD   Zpaddle._typingrE   rF   rG   rH   Z
paddle.amprI   rJ   rK   Z7paddle.distributed.auto_parallel.static.dist_input_specrL   Z	paddle.iorM   Zpaddle.metricrN   Z	paddle.nnrO   r  rP   rQ   rR   rS   rT   rU   rV   rW   rX   r]   rk   r^   rx   r}   r   r~   r   r   r   r   r   r   r   r   r  r   r'  r+  r,  r  r6  r7  r8  r9  r  r  r  Z
BaseConfigrD  r  r  r  r6  r  r  r  rl   rl   rl   rm   <module>   s     $	,
A d
J|
&

(} 
      `X@,^
3 E  $     ^ 
/>  J N