o
    pi-                     @  s   d dl mZ d dlZd dlZd dlmZmZ d dlZd dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ er^d dlmZ d dlmZ d dlmZ d dlmZ eejZ									d0d1d(d)Z 	d2d3d.d/Z!dS )4    )annotationsN)TYPE_CHECKINGLiteral)GroupShardedOptimizerStage2)GroupShardedStage2)GroupShardedStage3)GroupShardedScaler)MixPrecisionOptimizer)
get_logger)	Optimizer)Sequence)
GradScaler)Group)LayerF      modelr   	optimizerr   levelLiteral['os', 'os_g', 'p_g_os']scalerGradScaler | NonegroupGroup | Noneoffloadboolsync_buffersbuffer_max_sizeintsegment_size	sync_commdp_groupexclude_layerSequence[str | int] | Nonereturn#tuple[Layer, Optimizer, GradScaler]c                 C  sd  t  dd }|dv s|t j v sJ dt| t jjs#J dt|tt	fs.J d|dv s6J dd	d
 }t
t||  }|du rRt|dkrRtd |dv r|td td td t|j||||
|d}t| |||||
|d} n|dkrt| ||||||	|
||d
} ntdt|t jjrt|}td td td | ||fS )a  
    Use group_sharded_parallel can perform group shared configuration on the model, optimizer and GradScaler. Level has three string options, 'os', 'os_g' and 'p_g_os' corresponds to three different usage scenarios: optimizer state segmentation, optimizer state + gradient segmentation, and parameter + gradient + optimizer state segmentation.
    Usually, optimizer state + gradient segmentation is actually a re optimization of optimizer state segmentation, so optimizer state + gradient segmentation can be used to realize optimizer state segmentation.

    Args:
        model (Layer): The layer to be wrapped with group_sharded_parallel.
        optimizer (Optimizer): The optimizer to be wrapped with group_sharded_parallel.
        level (str): The different level of the group sharded. Such as `os`, `os_g`, `p_g_os`.
        scaler (GradScaler|None, optional): If AMP is used, you need to pass GradScaler. Defaults to None, indicating that GradScaler is not used.
        group (Group|None, optional): The group instance. Defaults to None, indicating that the default environment group is used.
        offload (bool, optional): Whether to use the offload function. Defaults to False, which means that the offload function is not used.
        sync_buffers (bool, optional): Whether to broadcast model buffers. It is generally used when there are registered model buffers. Defaults to False, indicating that model buffers are not used.
        buffer_max_size (int, optional): The max size of the buffer used to integrate gradient in `os_g`. The larger the size, the more GPU memory will be used. Defaults to 2**23, which means that the dimension of the buffer is 2**23.
        segment_size (int, optional): The smallest size of parameter to be sharded in `p_g_os`. Defaults to 2**20, indicating that the dimension of the minimum segmented parameter is 2**20.
        sync_comm (bool, optional): Whether to use synchronous communication, only in `p_g_os` used. Defaults to False, indicating that asynchronous communication is used.
        dp_group(Group|None, optional): dp communication group, support to combine stage2 or stage3 with dp hybrid communication.
        exclude_layer(list|None, optional): exclude some layers for slicing for sharding stage3, for example, exclude_layer=["GroupNorm", id(model.gpt.linear)], exclude_layer must contain the layers' name or one layer's id.

    Returns:
        model: A wrapper for group sharded given model.
        optimizer: A wrapper for group sharded given optimizer.
        scaler: A wrapper for group sharded given scaler.

    Examples:
        .. code-block:: python

            >>> # type: ignore
            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle
            >>> from paddle.nn import Linear
            >>> from paddle.distributed import fleet
            >>> from paddle.distributed.sharding import group_sharded_parallel

            >>> fleet.init(is_collective=True)
            >>> group = paddle.distributed.new_group([0, 1])
            >>> model = Linear(1000, 1000)

            >>> clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
            >>> optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip)

            >>> # wrap sharding model, optimizer and scaler
            >>> model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler)

            >>> img, label = data
            >>> label.stop_gradient = True
            >>> img.stop_gradient = True

            >>> out = model(img)
            >>> loss = paddle.nn.functional.cross_entropy(input=out, label=label)

            >>> loss.backward()
            >>> optimizer.step()
            >>> optimizer.clear_grad()

    :r   )ZgpuZxpuzBgroup_sharded_parallel only support gpu, xpu and custom_device nowz2The model must be the instance of paddle.nn.Layer.zhThe optimizer must be the instance of paddle.optimizer.Optimizer or MixPrecisionOptimizer for main grad.)osos_gp_g_osz%The level must be os, os_g or p_g_os.c                 S  s   | j tjkS N)ZdtypepaddleZfloat16)param r-   p/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/sharding/group_sharded.pycheck_dtype   s   z+group_sharded_parallel.<locals>.check_dtypeNzjthe input of scaler is None, please ensure the logic of your scaler outside is same as GroupShardedScaler.)r'   r(   z******************************z6Sharded level os uses sharded level os_g achieved now.)paramsZoptimr   r   r!   device)r   r   r   r!   r1   r)   )	r   r   r   r   r   r    r!   r1   r"   zPlease enter the correct level.zIf there is a communication hang using group sharded, please check whether the communication operations of each process are unified.)r+   Z
get_devicesplitr1   Zget_all_custom_device_type
isinstancennr   r	   r   listfilter
parameterslenlogger_warninginfor   Z_parameter_listr   r   
ValueErrorampr   r   )r   r   r   r   r   r   r   r   r   r    r!   r"   r1   r/   Zparams_fp16r-   r-   r.   group_sharded_parallel2   s   F



	


r>   outputstrOptimizer | NoneNonec                 C  s   t d tj|rJ d| dtj|dd tj|d}t| tr0t	
| j | n t| trL| jr:dnd}| j|d t	
| j | ntd	|d
urmt|ds]J dtj|d}t	
|j | t d d
S )aV  
    Group sharded encapsulated model and optimizer state saving module.

    Note:
        If using save_group_sharded_model saves the model. When loading again, you need to set the model or optimizer state before using group_sharded_parallel.

    Args:
        model (Layer): A wrapper for group sharded given model.
        output (str): Save directory.
        optimizer (Optimizer, optional): Group sharded encapsulated optimizer. Defaults to None, indicating that the optimizer state is not saved.

    Examples:
        .. code-block:: python

            >>> # type: ignore
            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle
            >>> from paddle.nn import Linear
            >>> from paddle.distributed import fleet
            >>> from paddle.distributed.sharding import group_sharded_parallel, save_group_sharded_model

            >>> fleet.init(is_collective=True)
            >>> group = paddle.distributed.new_group([0, 1])
            >>> model = Linear(1000, 1000)

            >>> clip = paddle.nn.ClipGradByGlobalNorm(clip_norm=1.0)
            >>> optimizer = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters(), weight_decay=0.00001, grad_clip=clip)

            >>> # wrap sharding model, optimizer and scaler
            >>> model, optimizer, scaler = group_sharded_parallel(model, optimizer, "p_g", scaler=scaler)

            >>> img, label = data
            >>> label.stop_gradient = True
            >>> img.stop_gradient = True

            >>> out = model(img)
            >>> loss = paddle.nn.functional.cross_entropy(input=out, label=label)

            >>> loss.backward()
            >>> optimizer.step()
            >>> optimizer.clear_grad()

            >>> # save model and optimizer state_dict
            >>> save_group_sharded_model(model, optimizer, output=output_dir)

    zC==========Begin to save group sharded model and optimizer==========zSaving directory (z#) should be a directory, not a fileT)exist_okzmodel.pdmodelF)convert2cpuzBPlease use the layer which is wrapped with group_sharded_parallel.N_optimzFPlease use the optimizer which is wrapped with group_sharded_parallel.zmodel.pdoptzA==========End to save group sharded model and optimizer==========)r9   r;   r'   pathisfilemakedirsjoinr3   r   r+   saveZ_layerZ
state_dictr   Z_offloadZget_all_parametersr<   hasattrrE   )r   r?   r   Zoutput_modelrD   Z
output_optr-   r-   r.   save_group_sharded_model   s4   1


rL   )	NNFFr   r   FNN)r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r   r!   r   r"   r#   r$   r%   r*   )r   r   r?   r@   r   rA   r$   rB   )"
__future__r   loggingr'   typingr   r   r+   ZNpaddle.distributed.fleet.meta_parallel.sharding.group_sharded_optimizer_stage2r   ZDpaddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage2r   ZDpaddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3r   ZCpaddle.distributed.fleet.meta_parallel.sharding.group_sharded_utilsr   Z2paddle.distributed.fleet.utils.mix_precision_utilsr	   Z"paddle.distributed.utils.log_utilsr
   Zpaddle.optimizerr   collections.abcr   Z
paddle.ampr   Z&paddle.distributed.communication.groupr   Z	paddle.nnr   WARNINGr9   r>   rL   r-   r-   r-   r.   <module>   s>   
 