o
    pi#                    @   sf  d Z ddlZddlZddlZddlZddlZddlmZ ddlZ	ddl
mZ ddlmZ ddlmZmZmZ ddlmZmZ ddlmZ dd	lmZmZmZ dd
lmZ ddgZddgZdZdZ ej!" Z#ej!$  Z%Z&ej!j'j(Z)ej!j'j*Z+ej!j'j,Z-ej!j'j.Z/da0G dd dZ1dd Z2G dd dZ3dd Z4dd Z5G dd dZ6G dd dZ7G d d! d!Z8dS )"a  
Steps to transpile trainer:
1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
2. rename split grad variables to add trainer_id suffix ".trainer_%d".
3. modify trainer program add split_op to each grad variable.
4. append send_op to send split variables to server and
5. add recv_op to fetch params(split blocks or origin param) from server.
6. append concat_op to merge split blocks to update local weights.

Steps to transpile pserver:
1. create new program for parameter server.
2. create params and grad variables that assigned to current server instance.
3. create a sub-block in the server side program
4. append ops that should run on current server instance.
5. add listen_and_serv op
    N)reduce)	framework)grad_var_name)BlockProgramcore)PSDispatcher
RoundRobin)Constant)	Parameterdefault_main_programdefault_startup_program)unique_namelookup_tablelookup_table_v2Zlookup_table_gradZlookup_table_v2_gradZop_namescopez@CLIPFc                   @   s   e Zd ZdZdZdZdZdS )DistributedModer            N)__name__
__module____qualname__SYNCASYNC
HALF_ASYNCZGEO r   r   z/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/paddle/distributed/transpiler/distribute_transpiler.pyr   G   s
    r   c                  G   s   t rt|  d S d S N)	PRINT_LOGprint)argsr   r   r   logN   s   r!   c                   @   s   e Zd Zdd Zdd ZdS )VarBlockc                 C   s   || _ || _|| _d S r   varnameoffsetsize)selfr$   r%   r&   r   r   r   __init__T   s   
zVarBlock.__init__c                 C   s   | j  d| j d| j S )N:r#   r'   r   r   r   __str__Z   s   zVarBlock.__str__N)r   r   r   r(   r+   r   r   r   r   r"   S   s    r"   c                 C   s   | |kp
|  |d S )N.block)
startswith)p_namevar_namer   r   r   same_or_split_var^   s   r0   c                 C   s  g }| D ]z}|}t dd |jd}tt|t| }|dkr"d}||k r(|}tt|t| }t|jdkrUt dd |jdd d}	||	 }
|
dkrU||	|
 7 }tt|t| }t|D ]}t	||||  }t
|j||}|t| qdq|S )a
  
    We may need to split dense tensor to one or more blocks and put
    them equally onto parameter server. One block is a sub-tensor
    aligned by dim[0] of the tensor.

    We need to have a minimal block size so that the calculations in
    the parameter server side can gain better performance. By default
    minimum block size 8K elements (maybe 16bit or 32bit or 64bit).

    Args:
        var_list (list): List of variables.
        slice_count (int): Numel of count that variables will be sliced, which
            could be the pserver services' count.
        min_block_size (int): Minimum split block size.
    Returns:
        blocks (list[(varname, block_id, current_block_size)]): A list
            of VarBlocks. Each VarBlock specifies a shard of the var.
    c                 S      | | S r   r   xyr   r   r   <lambda>x       z slice_variable.<locals>.<lambda>r   r   r   c                 S   r1   r   r   r2   r   r   r   r5      r6   N)r   shapeintmathfloorfloatceillenrangeminr"   nameappendstr)Zvar_listZslice_countmin_block_sizeblocksvarZsplit_countZ	var_numelZmax_pserver_count
block_sizeZdim1Zremainsblock_idZcurr_block_sizeblockr   r   r   slice_variableb   s0   rI   c                   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
dZdZdZdZdZdZdZdZd	ZdZd
d Zedd Zejdd Zedd Zejdd ZdS )DistributeTranspilerConfiga_  
        :api_attr: Static Graph

    A configuration class that provide support for transpiler distributed jobs.
    Some important parameters are explained as follows:


    .. py:attribute:: slice_var_up (bool)

          Whether to do Tensor slice for parameter servers, default is True.

    .. py:attribute:: split_method (PSDispatcher)

          Methods of dispatching parameters for server,
          `RoundRobin` or
          `HashName` (both from `paddle.incubate.distributed.fleet.parameter_server.ir.ps_dispatcher`) can be used and default is RoundRobin.
          Try to choose the best method to balance loads for parameter servers.

    .. py:attribute:: min_block_size (int)

          Minimum number of split elements in block, default is 8192.

          According to : https://github.com/PaddlePaddle/Paddle/issues/8638#issuecomment-369912156
          We can use bandwidth efficiently when data size is larger than 2MB.If you
          want to change it, please be sure you have read the slice_variable function. You can find
          the definition of slice_variable in
          https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/transpiler/distribute_transpiler.py
          .

    Examples:
        .. code-block:: python

            >>> from paddle.distributed.transpiler.distribute_transpiler import RoundRobin
            >>> import paddle.distributed.transpiler as transpiler

            >>> config = transpiler.DistributeTranspilerConfig()
            >>> config.slice_var_up = True
            >>> config.split_method = RoundRobin
            >>> config.min_block_size = 81920

    TN    FZpserverd   r   r   c                 C   s   d S r   r   r*   r   r   r   r(      s   z#DistributeTranspilerConfig.__init__c                 C      | j S r   )4_DistributeTranspilerConfig__runtime_split_send_recvr*   r   r   r   runtime_split_send_recv      z2DistributeTranspilerConfig.runtime_split_send_recvc                 C   ,   |d u rt d|r| jrt d|| _d S )Nz%runtime_split_send_recv can't be Nonezeif you want to set runtime_split_send_recv to be true, make ensure config.sync_mode is false at first)
ValueError&_DistributeTranspilerConfig__sync_moderN   r'   valuer   r   r   rO         

c                 C   rM   r   )rS   r*   r   r   r   	sync_mode   rP   z$DistributeTranspilerConfig.sync_modec                 C   rQ   )Nzsync_mode can't be Nonezeif you want to set sync_mode to be true, make ensure config.runtime_split_send_recv is false at first)rR   rN   rS   rT   r   r   r   rW      rV   )r   r   r   __doc__slice_var_upsplit_methodrC   enable_dc_asgdmode	print_log	wait_portrN   rS   
half_asynccompletely_not_asyncZgeo_sgd_modeZgeo_sgd_need_push_numsnccl_comm_numuse_hierarchical_allreduce#hierarchical_allreduce_inter_nrankscollective_moder(   propertyrO   setterrW   r   r   r   r   rJ      s8    *

	
rJ   c                   @   s   e Zd Zdd ZdS )ServerRuntimeConfigc                 C   s:   t tdd| _t tdd| _t tdd| _d S )NZFLAGS_rpc_send_thread_numZ12ZFLAGS_rpc_get_thread_numZFLAGS_rpc_prefetch_thread_num)r8   osgetenv_rpc_send_thread_num_rpc_get_thread_num_rpc_prefetch_thread_numr*   r   r   r   r(      s   



zServerRuntimeConfig.__init__N)r   r   r   r(   r   r   r   r   rg      s    rg   c                   @   s  e Zd ZdZdiddZdiddZ		djdd	Z			dkd
dZdd Zdd Z	dd Z
						dlddZdd Zdd Zdd ZdmddZdd Zd d! Zd"d# Z	dnd$d%Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ Zd0d1 Zd2d3 Zd4d5 Zd6d7 Z	8dod9d:Zdmd;d<Zed=d> Z d?d@ Z!dAdB Z"dCdD Z#dEdF Z$dGdH Z%dIdJ Z&dKdL Z'dMdN Z(dOdP Z)dQdR Z*dSdT Z+dUdV Z,dWdX Z-dYdZ Z.d[d\ Z/d]d^ Z0d_d` Z1dadb Z2dcdd Z3dedf Z4dgdh Z5dS )pDistributeTranspilera  
        :api_attr: Static Graph

    **DistributeTranspiler**

    Convert the base program to distributed data-parallelism programs.
    Supports two modes: parameter server(pserver) mode and nccl2 mode.

    In pserver mode, the main_program will be transformed to use a remote
    parameter server to do parameter optimization. And the optimization
    graph will be put into a parameter server program.

    In nccl2 mode, the transpiler will append a NCCL_ID broadcasting
    op in startup_program to share the NCCL_ID across the job nodes.
    After transpile_nccl2 called, you ***must*** pass trainer_id and
    num_trainers argument to ParallelExecutor to enable NCCL2 distributed
    mode.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle
            >>> import paddle.base as base
            >>> import paddle.distributed.transpiler as transpiler

            >>> paddle.enable_static()

            >>> x = paddle.static.data(name='x', shape=[1,13], dtype='float32')
            >>> y = paddle.static.data(name='y', shape=[1], dtype='float32')
            >>> y_predict = paddle.static.nn.fc(x, size=1, activation=None)

            >>> cost = paddle.nn.functional.square_error_cost(input=y_predict, label=y)
            >>> avg_loss = paddle.mean(cost)

            >>> sgd_optimizer = paddle.optimizer.SGD(learning_rate=0.001)
            >>> sgd_optimizer.minimize(avg_loss)

            >>> # for pserver mode
            >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
            >>> trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
            >>> current_endpoint = "192.168.0.1:6174"
            >>> trainer_id = 0
            >>> trainers = 4
            >>> role = "PSERVER"

            >>> t = transpiler.DistributeTranspiler()
            >>> t.transpile(
            ...         trainer_id, pservers=pserver_endpoints, trainers=trainers)

            >>> if role == "PSERVER":
            ...         pserver_program = t.get_pserver_program(current_endpoint)
            ...         pserver_startup_program = t.get_startup_program(current_endpoint,
            ...                                                     pserver_program)
            ... elif role == "TRAINER":
            ...         trainer_program = t.get_trainer_program()

            >>> # for nccl2 mode
            >>> trainer_num = 2
            >>> trainer_id = 0
            >>> config = transpiler.DistributeTranspilerConfig()
            >>> config.mode = "nccl2"
            >>> trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
            >>> t = transpiler.DistributeTranspiler(config=config)
            >>> t.transpile(trainer_id=trainer_id, trainers=trainer_endpoints, current_endpoint="192.168.0.1:6174")
            >>> exe = paddle.static.ParallelExecutor(
            ...     use_cuda=True,
            ...     loss_name=avg_loss.name,
            ...     num_trainers=trainer_num,
            ...     trainer_id=trainer_id
            ... )

    Nc                 C   s   |d ur|| _ nt | _ |   | j jd u rt| j _| j js"| j jr'tj| _	n| j j
r0tj| _	ntj| _	| j jr:da| j jdksBJ | j jjd tksMJ d | _d S )NTrK   r   )configrJ   _set_server_configrZ   r	   rW   r`   r   r   distributed_moderO   r   r   r]   r   rC   	__bases__r   counter_var)r'   rn   r   r   r   r(   S  s    


zDistributeTranspiler.__init__c                 C   s0   |d u r
t  | _d S t|t r|| _d S td)NzQIn DistributeTranspiler, server_config must be an instance of ServerRuntimeConfig)rg   server_config
isinstance	TypeError)r'   rs   r   r   r   ro   k  s   

z'DistributeTranspiler._set_server_configTc           
   
   C   s4  ddl m} |st }|dkr|d}|| |dkr#|r#|| | jddtjj	j
d}td| jjD ]}	| jd|	 dtjj	j
d q7| jjrxtd| jjD ]"}	| jd	|	 dtjj	j
d | jd
|	 dtjj	j
d qU| jdi d|i|d|| jj| jj| jjdd |S td)Nr   wait_server_ready,ZNCCLIDTr@   persistabletyper   ZNCCLID_ZHierarchical_inter_NCCLID_ZHierarchical_exter_NCCLID_Zgen_nccl_id)trainers
trainer_idra   rb   rc   r{   inputsoutputsattrszmust set trainer_id > 0)5paddle.distributed.fleet.base.private_helper_functionrw   r   splitremoveglobal_block
create_varr   VarDescVarTypeRAWr>   rn   ra   rb   	append_oprc   rR   )
r'   r}   r|   current_endpointstartup_programr^   rw   Zworker_endpointsZnccl_id_varir   r   r   _transpile_nccl2u  sT   

z%DistributeTranspiler._transpile_nccl2c                 C   s   ddl m} t|tr|d}	nt|tr|}	n|dkr%tdt| t|	dkr3|dkr3td|d u r:t }|d u rAt	 }d }
|dkrO|
| jj}
n|d	kr[|| jj}
n|dkrd| }
ntd
| |
j||||	||d d S )Nr   )
collectiverx   Zsingle_process_multi_threadzinvalid trainers config: r   z(invalid trainer number in distributed: 1Zgrad_allreduceZ	local_sgdzinvalid collective_mode: )r   main_programZrank	endpointsr   r^   )Zpaddle.distributed.transpilerr   rt   rB   r   listrR   r=   r   r   ZGradAllReducern   ra   ZLocalSGDZSingleProcessMultiThread	transpile)r'   rd   r}   r|   r   r   r   r^   r   r   Z
transpilerr   r   r   _transpile_collective  s<   




z*DistributeTranspiler._transpile_collectivec                 C   sB   g }g d}|  jD ]}|j|v r|ddu r|| q|S )N)r   ncer   Zremote_prefetchT)r   opsr{   attrrA   )r'   r   sparse_update_opssparse_update_op_typesopr   r   r    _get_all_remote_sparse_update_op  s   

z5DistributeTranspiler._get_all_remote_sparse_update_opc                    s  |  D ];\}}| j| }|d }|d }g }d}	g }
t| jD ]/\}}||jv r<|	dkr<|j}	|| |
| q!||jv rP|	|jkrP|| |
| q!|	tv r@ j	  fdd|D }fdd|D } j
|d dd  }|d d}fd	d|D }|d d d
 D ]	} | qd
gt| }d
gt| }t j	D ]N\}}tdt|jD ]}||j| }t|D ]\}}|j|v r|||< qqtdt|jD ]}||j| }t|D ]\}}|j|v r|||< qqqt|t| dkr+t|d } j|d||dd|i||||| j|	dd ntd|
d d d
 D ]	}| j| q6qd S )Nr   r    c                    s   g | ]}  |qS r   )index.0r   )all_opsr   r   
<listcomp>	  s    zHDistributeTranspiler._update_remote_sparse_update_op.<locals>.<listcomp>c                    $   g | ]}   j|d d  qS )Idsr   )r   varsinputr   programr   r   r   
      Wpadding_idxc                    r   )Outr   )r   r   outputr   r   r   r   r     r   distributed_lookup_tabler   r   ZOutputs)table_namesheight_sectionsr   r   r}   Zlookup_table_versionr   r{   r   r   r   zIsomething wrong with distribute_transpiler, submit a issue is recommended)itemssparse_param_to_height_sections	enumerater   input_arg_namesr{   rA   LOOKUP_TABLE_TYPEr   r   r   r   r   
_remove_opr=   r>   output_namesr   r@   input_namesr?   max
_insert_opr}   rR   pop)r'   r   need_sparse_update_paramsparam_varnamer   r   r   r   r   op_typeZused_opsidxr   Zop_idxsr   wr   r   Zinputs_idxsZoutputs_idxsr   Zoutsin_idin_varZinsZout_idout_varZdistributed_idxr   )r   r   r   _update_remote_sparse_update_op  s   








z4DistributeTranspiler._update_remote_sparse_update_opc                 C   s    | j D ]
}||jv r dS qdS NTF)r   r   )r'   
param_namer   r   r   r   $_is_input_of_remote_sparse_update_op@  s
   

z9DistributeTranspiler._is_input_of_remote_sparse_update_op127.0.0.1:6174r   c           6         s	  ddl m} ddlm}	m}
 d}t|tjd |du rt }|du r%t	 }| _
| _ j  _ jjdkrt|ts>J |d j
_ jj j
_ jj j
_ jjrt j
j} jjd	krgt  j_| jjksyJ d
| d jj | jj dksJ d
| d jj dt jj j
_ j|||| jjd dS  jjdkr j  jj!||||| jjd dS | _"| _#| _$|d}| _%|	  _& ' \ _( _) j* j%}| j
 _+ j+du _,i  _-i  _. j)D ]\}}|j/ j-|j/< |j/ j.|j/< q 0 j
 _1i  _2g  _3d j
_4 j% j
_5| j
_6 j$dk j
_7 j+r0 j+nd j
_8 9  |:  g }t; j<= } jj>sXt?j@A j
jB t?j@C| i  _D|D ]\}}|E|} jj>stt|d	kstJ |}t|d	kr|d j/}|
|F |dd}n.t|d	kr|F jG| }|
|F |dd} jjHs I|||| |d	7 }ntJd| |d jKtjLjMjNkrڈ j.| } O|rdd |D  j2|< |F jPtQR d}| jD|<  jjHr|F jG| g} S|} jjTr j"d	kr fdd|D }ndd |D }n|}g }g }|F jU|d	 dd|id|id|d|d|tVtWtX j.| |gid tY|D ]
\}}|Z| qCq]|F jPtQR d}  j,rl|F jPtQR d jD j+< t; jD[ }! j#sȈ \ }"t|"dkrȈ j]r|F jPtQR d}# jjHr j]j/g}ng }g }|F j^dd j]id|#id|d|d|d dd!d"tVtWtX j]j/ j]j/gid# |!Z|#  j#rg }$|F j^d$dt;|!id| id%|d& j$d'd"tVtWid# |$Z|  n$ jjHr jj_r|F j^d$dt;|!id| id%|d& j$d'dtVtWid# g }%tY|D ]\}}|%Z j`|  q|:  |E|%}tY|D ])\}&}' ja|' d( Z|%|&   ja|' d) Z||&   j&b|%|& j/}(|'|(_cq3i })g }* jd= D ]\}+},g }-g }.|,D ]}d*d |%D e|j/}|-Z||  |.Z|j/ qp j#r| }/n jD j-|+  }/ j-|+ }0|0}1 j<|0 }2t|2d	kr|2d j/}1|+ j2v r|.D ]}3 j&b|3}(d+|(_fq|-|.f|)|+< qfg }4 jjHr|F jG|+ }5d,d |,D }4|5g},|*g|, |F j^d-d|/gid|,id|-d.|4d& j$tVtWtX|+|1gid# qf h||)  j#r,|F j^d/d|$id|*id%|d& j$tVtWid#  jd= D ]4\}+},t|,d	kr>q1|F jG|+ }5|+ j2vrd jjHsd|F j^d0d|,id|5gid1dtVtiid# q1 jj|%|d2  j,r} k||  l||  m   j& j
_ndS )3aW  
        Transpile the input program to distributed programs with config and arguments.

        Args:
            trainer_id (int): id for current trainer worker, if you have
                n workers, the id may range from 0 ~ n-1
            program (Program|None): program to transpile,
                default is paddle.static.default_main_program().
            startup_program (Program|None): startup_program to transpile,
                default is paddle.static.default_startup_program().
            pservers (str): comma separated ip:port string for the pserver
                list.
            trainers (int|str): in pserver mode this is the number of
                trainers, in nccl2 mode this is a string of trainer
                endpoints.
            sync_mode (bool): Do sync training or not, default is True.
            startup_program (Program|None): startup_program to transpile,
                default is paddle.static.default_main_program().
            current_endpoint (str): need pass current endpoint when
                transpile as nccl2 distributed mode. In pserver mode
                this argument is not used.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> t = paddle.distributed.transpiler.DistributeTranspiler()
                >>> t.transpile(
                ...     trainer_id=0,
                ...     pservers="127.0.0.1:7000,127.0.0.1:7001",
                ...     trainers=2,
                ...     sync_mode=False,
                ...     current_endpoint="127.0.0.1:7000")

        r   )find_distributed_lookup_table)VarsDistributedfind_op_by_output_argz

API is deprecated since 2.0.0 Please use FleetAPI instead.
WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler

        )fileNnccl2rx   r   ztrainers_num:z' < hierarchical_allreduce_inter_nranks:z) mod hierarchical_allreduce_inter_nranks:z != 0)r   r^   r   )rd   r}   r|   r   r   r   r^   T)reversez6Can not insert the send op by original variable name :c                 S   s   g | ]}|j d  qS )r   )r7   )r   splited_varr   r   r   r     s    
z2DistributeTranspiler.transpile.<locals>.<listcomp>r@   c                    s   g | ]}|j  d  j qS )	.trainer_)r@   r}   r   rE   r*   r   r   r   +  s    c                 S      g | ]}|j qS r   r   r   r   r   r   r   0      sendXr   epmapsectionssend_varnamesr   Z	merge_addZuse_send_handlerFr~   Zsend_barrierr   r}   r_   paramsgradsc                 S   r   r   r   r   vr   r   r   r     r   ZRemotePrefetchc                 S   r   r   r   r   r   r   r   r     r   recvrecv_varnamesfetch_barrierconcataxis)	recv_varseplist)oZ*paddle.distributed.distribute_lookup_tabler   %paddle.distributed.transpiler.detailsr   r   r   sysstderrr   r   origin_programr   cloneZorigin_startup_programrn   r\   rt   rB   r   Z_trainers_endpointsra   Z_nccl_comm_numrb   Z_use_hierarchical_allreducer=   rc   r   Zget_cuda_device_countr8   Z$_hierarchical_allreduce_inter_nranksr   r^   r   rd   trainer_numrW   r}   pserver_endpointsvars_overview_get_optimize_passoptimize_opsparams_gradsrZ   
table_namehas_distributed_lookup_tableparam_name_to_grad_namegrad_name_to_param_namer@   r   r   r   need_delete_optimize_varsZ_is_distributedZ
_endpointsZ_ps_endpointZ	_is_chief_distributed_lookup_table_init_splited_varsresetr   grad_var_mappingr   rY   nprandomseedrandom_seedshufflegrad_name_to_send_dummy_outdispatchr   r   rO   _insert_split_opAssertionErrorr{   r   r   SELECTED_ROWSr   r   r   generate_control_dev_var_name_get_splited_var_sectionsr`   r   RPC_OP_ROLE_ATTR_NAMERPC_OP_ROLE_ATTR_VALUEOP_ROLE_VAR_ATTR_NAMEr   rA   values_get_lr_opsrr   r   r_   grad_param_mappingparam_grad_ep_mappingZget_distributed_var_by_sliceendpointparam_var_mappingr   vtypeextendr   DIST_OP_ROLE_ATTR_VALUE_get_trainer_startup_program&_replace_lookup_table_op_with_prefetch#_split_table_grad_and_add_send_vars_get_distributed_optimizer_varsZ_parameters_on_pservers)6r'   r}   r   Zpserversr|   rW   r   r   r   r   r   err_msgZtrainers_numr   Zps_dispatcher	param_vargrad_varZ	send_varsZgrad_var_mapping_itemsZgrad_varnamesplited_varsr   Zsplited_grad_varnamer   orig_varsparse_param_nameZdummy_outputZsend_input_varsr   r   _rE   Zsend_barrier_outZ
input_depslr_opsZdecay_dummy_outputZfetch_barrier_inputr   r   epZdistributed_varr   Zall_recv_outputsr   r   epsr   Zrecv_dep_inZorig_grad_nameZrecv_op_role_var_nameZsplited_trainer_gradr   r   
orig_paramr   r*   r   r   F  sh  -	





























zDistributeTranspiler.transpilec                 C   s   ddg}g }| j  jD ]'}|j|v r$|ddu r$||dd  |jdkr3||dd  q| jr=|| j t	t
|S )Nr   r   	is_sparseTr   r   r   )r   r   r   r{   r   rA   r   r   r   r   set)r'   r   sparse_table_namesr   r   r   r   _get_sparse_table_names  s   

z,DistributeTranspiler._get_sparse_table_namesc           	   	   C   s   ddl m} |D ]N}| j j| }g }| j jD ]}||jv r&|| qt|}|dkr7t	dt
| |d }| j jdi d|id|did || j | qd S )	Nr   
delete_opsr   z&table init op num should be 1, now is Z	fake_initr   r7   r~   )r   r'  r   r   r   r   output_arg_namesrA   r=   rR   rB   r   r   )	r'   r$  r'  r   	table_varZtable_param_init_opr   Zinit_op_numZtable_init_opr   r   r   _fake_init_sparsetable  s,   



z+DistributeTranspiler._fake_init_sparsetablec                 C   sD  ddl m} g }g }g }| jD ]}||j ||d qtt|}tt|}|D ]}||vr9|| q.tt|}|rg }	|D ]}g }
| j	
 jD ]}||jv r\|
| qP|	|
 qF|| j	
 |	 |D ]}| j	
 |r| j	
 | qmd S || j
 | j |D ]}| j
 |r| j
 | qd S )Nr   r&  Zop_role_var)r   r'  r   r  r   r   r   r#  rA   r   r   r   r(  has_varZ_remove_varr   )r'   
is_startupr'  Zoptimize_varsZoptimize_op_role_varsZoptimize_need_delete_varsr   rE   r   Zinit_opsZparam_init_opr   r   r   _delete_trainer_optimizer7  sF   



z.DistributeTranspiler._delete_trainer_optimizerc                 C   s   ddl m} ddlm} | jdd |  }| | |  }|| j	 | | jdd | j
  | j
  |r>|| j | jS )a  
        Get transpiled trainer side program. The program on trainer side compared with origin program
        has following difference:

            - Delete optimizer related op, because parameter updated on Pserver
            - After the op which computed gradient of each parameter, add ``Send_op`` and ``Recv_op``

        Args:
            wait_port(bool): Whether to wait for the parameter server to be ready before returning to program,
            default is True

        Returns:
            Program: trainer side program.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> import paddle.distributed.transpiler as transpiler
                >>> # this is an example, find available endpoints in your case
                >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
                >>> trainer_id = 0
                >>> trainers = 4

                >>> t = transpiler.DistributeTranspiler()
                >>> t.transpile(trainer_id, trainers=trainers, pservers=pserver_endpoints)
                >>> trainer_program = t.get_trainer_program()

        r   rv   r&  T)r,  F)r   rw   r   r'  r-  r%  r*  r  r   r   r+   r   r   )r'   r^   rw   r'  r$  r  r   r   r   get_trainer_program]  s    



z(DistributeTranspiler.get_trainer_programc                 C   s  | j }|  }| j D ]X\}}||v rqg }|D ]}dd |D |j}	|||	  q|D ]}| |jr;q0| j	|jd|j
|j|j|jd q0| jddg id|id|d	| jttid
}
q| j	t d}| jdi d|id| jd	| jttid
 | j D ]I\}}||v rqt|dkrq|| jv r| j| }n| j j| }| j	||j|j
|j|jd}| jdd|id|giddid
 q|S )a  
        Get transpiled trainer side startup program.

        Args:
            recv_vars (list): Variable list to recv for current trainer_id
            eplist (list): A list of strings indicating

        Returns:
            Program: trainer side startup program.
        c                 S   r   r   r   r   r   r   r   r     r   zEDistributeTranspiler._get_trainer_startup_program.<locals>.<listcomp>F)r@   rz   r{   dtyper7   	lod_levelr   r   r   r   r}   r~   r   r   r   r   r@   rz   r{   r/  r7   r   r   r   )r   r%  r  r   r   r@   rA   r   r+  r   r{   r/  r7   r0  r   r}   r  r  r   r  r   r=   r   r   rz   )r'   r   r   r   r$  r$   r   r   rE   r   r   Zfetch_barrier_outr!  origin_param_varr   r   r   r    s   	
z1DistributeTranspiler._get_trainer_startup_programc           *         s  t jd t }jj|_|j g }j d D ]
}|	 | qj d D ]Y}|j
d}|dkrC|j
d| }n|j
}|	 j|d|j|j|jd}jsajjrjd	krtjD ]}|	 j| d| d
|j|j|jd}	||	 qfq/|| q/j}
g }tjD ]\}}|r|r|| qjjdu rjd
u sJ g _j d D ])}tjD ]!}|j
 d| d}|	 j||j|j|jd}j||f qqg }g fdd} fdd  }g }d}t|dkr4||j d	 }|| t|D ]\}}!||} ||| q|j"}g }|j d	 }t|D ]q\}}||}|| |#t$d }d}tjD ]%\}}|#t$d	 } |#t$d |kr%|| |j}|r nq[|rtjD ]%\}}|#t$d |kr||vrt&d|j|j'| |||||| qq?t(t)|}|r||j d	 }!||! |D ]}"||"|!|d| qg }#j*rj+,}$-|$|||}%||% .|$||%}&/||%j"}'j0|_1|#2|& t|dkr&t34dt5 d  |j d	 }||}(||( |j+,jj6||j7j8j7j9j7j:d})j*rH|'|)d< jjrQd|)d< t|#dkr\|#|)d< |	 j;dd|ii |)d |<  |_=|S )a  
        Get parameter server side program.The program on pserver side compared with origin program
        has following difference:

            - Only the following op is included: optimize-related op and communication-related op
            - NO.0 block only has variable definitions and ``listen_and_serv_op``
            - Every variable which need to be updated has a unique block

        Args:
            endpoint (str): current parameter server endpoint.

        Returns:
            Program: the program for current parameter server to run.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> import paddle.distributed.transpiler as transpiler
                >>> # this is an example, find available endpoints in your case
                >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
                >>> current_endpoint = "192.168.0.1:6174"
                >>> trainer_id = 0
                >>> trainers = 4

                >>> t = transpiler.DistributeTranspiler()
                >>> t.transpile(
                ...     trainer_id, pservers=pserver_endpoints, trainers=trainers)

                >>> pserver_program = t.get_pserver_program(current_endpoint)

        zsget_pserver_program() is deprecated, call get_pserver_programs() to get pserver main and startup in a single call.
r   r   r   r   NTr1  r   FZ_bakr@   r{   r7   r/  c              	      sB    | r||  |j| d S | |vr||  d S d S r   )_is_optimizer_op_append_pserver_opsr   _append_pserver_non_opt_ops)r   rH   grad_to_block_id
merged_varr  )r  r'   sparse_grad_to_paramr   r   __append_optimize_op__r  s   
	zHDistributeTranspiler.get_pserver_program.<locals>.__append_optimize_op__c           	         s   |  dsd S | d}j|j}t|tsJ ||j}|j	D ]}|
| q#|jD ]}|||} ||| q.| d| d S )NZ	sub_block)Zhas_attrr   r   rH   idrt   r   _create_blockr   r   _clone_variabler   _clone_lr_op	_set_attr)	r   r   Zlr_blockZorigin_block_descZorigin_blockZnew_sub_blockrE   Z	origin_op	cloned_op)__clone_lr_op_sub_block__r'   r   r   rA    s   



zKDistributeTranspiler.get_pserver_program.<locals>.__clone_lr_op_sub_block__r   zappend opt op: z	pserver [z] has no optimize block!!)optimize_blocksr  Z
pserver_idZFaninrp   r7  r9  lr_decay_block_idZrpc_get_thread_numZrpc_send_thread_numZrpc_prefetch_thread_numZcheckpint_block_idZdc_asgdprefetch_var_name_to_block_idZlisten_and_servr   r~   )>r   r   writer   r   r   Z_copy_dist_param_info_fromr  
_clone_varr   r@   findr   r{   r/  r7   rW   rn   r`   r   r>   rA   _create_ufindr   r   r4  _is_opt_op_on_pserverr[   param_bak_listr  r=   r<  Z
num_blocksr6  r   r   r	  _append_pserver_grad_merge_opsr!   r   r   r#  r   r   r   _create_table_optimize_block_create_prefetch_block_create_checkpoint_save_blockr   r   r  loggingwarningrB   rp   rs   rk   rj   rl   r   _sync_with_cpppserver_program)*r'   r  rR  Zrecv_inputsr   Zsuff_idxorig_var_nameZsingle_trainer_varr}   rE   ufindopt_op_on_pserverr  r   pr   Zparam_bak_nametmpvarZ
global_opsr:  r  rB  rC  Zlr_decay_blockr@  r7  pre_block_idxr   opt_opZper_opt_blockZoptimize_target_param_namer8  grad_varname_for_blockZopt_state_blockZglb_oprD  pserver_indextable_opt_blockZ!lookup_table_var_name_to_block_idZcheckpoint_block_idZempty_blockr   r   )rA  r  r'   r9  r   get_pserver_program  sj  %








	







z(DistributeTranspiler.get_pserver_programc                 C   s    |  |}| j||d}||fS )aq  
        Get pserver side main program and startup program for distributed training.
        The ``main_program`` returned by this function is consistent with the
        return value of the function ``get_pserver_program`` .

        Args:
            endpoint (str): current pserver endpoint.

        Returns:
            tuple: (main_program, startup_program), of type "Program"

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> import paddle.distributed.transpiler as transpiler
                >>> # this is an example, find available endpoints in your case
                >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
                >>> current_endpoint = "192.168.0.1:6174"
                >>> trainer_id = 0
                >>> trainers = 4

                >>> t = transpiler.DistributeTranspiler()
                >>> t.transpile(
                ...     trainer_id, pservers=pserver_endpoints, trainers=trainers)
                >>> pserver_program, pserver_startup_program = t.get_pserver_programs(current_endpoint)

        )rR  )r]  get_startup_program)r'   r  Zpserver_progZpserver_startupr   r   r   get_pserver_programs.  s
   
z)DistributeTranspiler.get_pserver_programsc                    s  t  }| j}|j|_| j| d   fdd}| j}t }| D ]\}	}
| 	|
}|||
j
< q$| jD ]e}t }d}|jdvry|jD ].}|||d \}}	|rbd}|| ||< qJ||d |v rxd}|||d  ||< qJ|r| ||}|jdv r|d	t|d
 j | j|j||| d q:| jjr| jD ]"\}}| j|j
 }| j|j
 }| jdd|id
|id q|S )aF  
        **Deprecated**

        Get startup program for current parameter server.
        Modify operator input variables if there are variables that
        were split to several blocks.

        Args:
            endpoint (str): current pserver endpoint.
            pserver_program (Program): deprecated, call get_pserver_program first.
            startup_program (Program): deprecated, should pass startup_program
                when initializing

        Returns:
            Program: parameter server side startup program.

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env:DISTRIBUTED)
                >>> pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
                >>> trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
                >>> current_endpoint = "192.168.0.1:6174"
                >>> trainer_id = 0
                >>> trainers = 4

                >>> t = paddle.distributed.transpiler.DistributeTranspiler()
                >>> t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
                >>> pserver_program = t.get_pserver_program(current_endpoint)
                >>> pserver_startup_program = t.get_startup_program(current_endpoint,
                ...                                                 pserver_program)

        r   c                    s@   t  D ]\}}|j}t|| r| |kr||jf  S qdg fS )Nr   )r   r@   r0   r7   )r$   r   Zsplited_paramZpnamer   r   r   _get_splited_name_and_shapez  s   zMDistributeTranspiler.get_startup_program.<locals>._get_splited_name_and_shapeF)r   r   r   r   T)Zgaussian_randomfill_constantZuniform_randomZtruncated_gaussian_randomr7   r   r~   Zassignr   r{   r   r   )r   r   r   r  r   r   collectionsOrderedDictr   r=  r@   r   r{   r   r   _get_input_map_from_opr?  r   r7   r   	all_attrsrn   r[   rJ  )r'   r  rR  r   Zs_progZorig_s_progra  Zpserver_varsZcreated_var_mapr  rE   rW  r   Znew_outputsZop_on_pserverkeyZnewname
new_inputsrV  p_bakZstartup_param_varZstartup_tmpvarr   r`  r   r^  Q  sX   $



z(DistributeTranspiler.get_startup_programc                 C   s   d}d}d}d}|  |j\}}}|s|||fS t||d }d}	| j| }
d}t|
d jdkrAtdd |
d jdd  }|
d | D ]	}|	|jd 7 }	qG|	| }d}|||fS )	NrH   r   Fr   r   c                 S   r1   r   r   r2   r   r   r   r5     r6   z:DistributeTranspiler._get_slice_var_info.<locals>.<lambda>T)_get_varname_partsr@   r8   r   r  r=   r7   r   )r'   	slice_varZblock_suffixZ	block_idxr%   is_slicerS  
block_namer  Z	skip_dim0Z
slice_varsorig_dim1_flattenr   r   r   _get_slice_var_info  s(   


z(DistributeTranspiler._get_slice_var_infoc                    s$    fdd} j D ]}|| q	d S )Nc                    s8  ddl m} g }t jD ]\}} |r! | |r!|| q|D ]u}d }|jD ]}|dkrA||d } j	
|| } nq+|jD ]S}|dv rLqE j j||d  }	 |j||	j|jj}
|
|jjkr||	j|
|	j|	j|	j|	jd} j	j|	||j|j|jd| d qE j	j|	|	dddd| d qEq$d S )	Nr   )	VarStructParamrr  GradLearningRateZBeta1TensorZBeta2Tensorr@   r7   r/  r{   r0  rz   Z	Optimizer)
origin_varrl  rm  rG   r%   r  r  F)r   rq  r   r   r4  rI  rA   r   r   r   Z$get_distributed_var_by_origin_and_epr   r   r   _get_optimizer_input_shaper{   r7   slicer@   r/  r0  rz   add_distributed_varrm  rG   r%   )r  rq  rU  r  r   rY  Zdist_varrh  r   rw  	new_shaper   r*   r   r   _get_distributed_optimizer_var  sp   



	
z\DistributeTranspiler._get_distributed_optimizer_vars.<locals>._get_distributed_optimizer_var)r   )r'   r|  r  r   r*   r   r    s   
B
z4DistributeTranspiler._get_distributed_optimizer_varsc                    s   j  jrQfdd|D }fdd|D }tfdd|D _jd jrA fddttjD _||fS  fddttjD _||fS )	Nc                    s   g | ]
}|j  jkr|qS r   r@   r   )r   paramr*   r   r   r   !  s    zGDistributeTranspiler._update_dist_lookup_table_vars.<locals>.<listcomp>c                    s    g | ]}|j t jkr|qS r   )r@   r   r   )r   Zgradr*   r   r   r   $  s
    c                 3   s$    | ]}|d  j  jkr|V  qdS )r   Nr}  )r   Z
param_gradr*   r   r   	<genexpr>)  s    zFDistributeTranspiler._update_dist_lookup_table_vars.<locals>.<genexpr>r   c                    s<   g | ]}   jj d j d| jjjdqS r   	.pserver_r3  )r   r   r@   r}   r{   r7   r/  r   r   r   r'   table_grad_varr   r   r   0  s    c                    s4   g | ]}   jj d | jjjdqS )r  r3  r   r   r@   r{   r7   r/  r  )r   r  r   r   r   :  s    )	r   r   nexttable_param_gradrW   r>   r=   r   trainer_side_table_grad_list)r'   
param_list	grad_listr   r   r  r   _update_dist_lookup_table_vars  s*   



	z3DistributeTranspiler._update_dist_lookup_table_varsc              
      s  g }g }t  } jD ]0\}}t|tkr|jdu rq
|j|vr*|| ||j |j|vr:|| ||j q
 || j\}} j	j
r`t|t j j	j}t|t j j	j}nt|d j	j}t|d j	j}t|t|kszJ   j| _ j D ]%\}}	 j |}
|	D ]} |\}}} jj|
||||dd qq j j| jdkd _t  _t||D ]'\}}|d\}}}|d\}}} j| t|  j j| t| < qt  _  fdd jD  d S )	NFr   rr  )rw  rl  rG   r%   rm  r  )add_trainer_suffixr)   c                    s"   g | ]} j |g g d iqS ))r   r   )r  update)r   r  r*   r   r   r     s    z;DistributeTranspiler._init_splited_vars.<locals>.<listcomp>)!r#  r   r{   r   Z	trainabler@   rA   addr  rn   rY   rI   r=   r   rC   _create_vars_from_blocklistr   r  r   r   rE   rp  r   rz  r   r   rd  re  r  zipr   r8   r  )r'   r  r  Zparam_grad_setrV  gZgrad_blocksZparam_blocksZ	orig_namer  r  r   rm  rG   r%   Zg_nameZg_bidr  r.   Zp_bidr   r*   r   r   E  s   










z'DistributeTranspiler._init_splited_varsc                 C   s  ddl m} g | _g | _g | _g | _d}d}|rd}| j}|D ][}|jt	kr|| j
|dd kr||ds;tdd}|dkrC|nt||}|d	}|d
}	| j|d  }
| j|
 | j|	d  }| j| || |g  nq!|stt| jD ]H}| jtdt| | jd j| jd j| jd jd}| j| | jtdt| | jd j| jd j| jd jd}| j| q| j|dd	| jid
| jid | j|d dd| jid
| jid|id | j|d d| j| j| jdd
| jid d S )Nr   r&  r   TFr   is_distributedz[lookup_table_op that lookup an distributed embedding tableshould set is_distributed to truer   r   Zprefetch_compress_in_tmp_r3  Zprefetch_compress_out_tmp_	split_ids)r   r{   r   r   r   Zprefetchr   r   r   r   Z	merge_ids)r   ZRowsr   )r   r'  Zall_in_ids_varsall_prefetch_input_varsall_prefetch_output_varsZall_out_emb_varsr   r   r{   r   r   r   r   RuntimeErrorr   r   r   r   rA   r>   r=   r   r   rB   r7   r/  r   )r'   r   r   r'  Zlookup_table_op_indexZcontinue_search_lookup_table_opr   r   Zids_nameZout_nameZids_varr   r   r   r   r   r   r    s   




"






z;DistributeTranspiler._replace_lookup_table_op_with_prefetchc                 C   s   |  j}t| j}|D ]W}||jv rct||}|  j|d dd|  j| gid| j	it
tid |  j|d dd| j	id| jrL| j| j gng id	|d
| jt
tt| j| |gid  d S qd S )Nr   r  r   r   r   r   r   r   r   r}   )r   r   r   r   r(  r   r   r   r   r  r  r  rW   r   r}   r  r	  r   )r'   r   r   r   Ztable_grad_namer   Zop_indexr   r   r   r    sD   


	z8DistributeTranspiler._split_table_grad_and_add_send_varsc                 C   s   |  j| j }g }||j}| j| }|  j|j|j|j	|j
d}| j| }	|  j|	j|	j|	j	|	j
d}
|jd||dd|
iddddd ||jd	 t|j  |S )
Nr3  Zlookup_sparse_tabler   r   Tr   )r"  r  r   r~   r)   )r   r   r   r<  r   r  r   r@   r{   r7   r/  r  r   rA   rB   )r'   r[  rR  optimize_blockr)  rD  Zprefetch_blockZtrainer_idsZpserver_idsZtrainer_outZpserver_outr   r   r   rM  /  s<   


z+DistributeTranspiler._create_prefetch_blockc                    s   |}tfddjD }j jj }tt	|j
d ttj }t|j
}	||	d<  j|j|	|jtjjjdd}
|
jtjjj  j jtj } j j|dd  }jrjd  fdd	tjD }|jd
d|id|gii d n!|j}j   j}|!|st"d| d |j  #||}|
g|g|gd}d|
gi}t$%d|j&  |jd||d |'|jd t(|j)  |S )Nc                 3   s2    | ]}d |j v r|d d  jkr|V  qdS )rr  r   N)r   r   r   r   r*   r   r   r  Z  s    
zDDistributeTranspiler._create_table_optimize_block.<locals>.<genexpr>r   T)r@   r7   r/  r{   rz   ru  r   c                    s:   g | ]}  jj d | d  jjjdqS r  r  r  )r[  rR  r  r   r   r     s    zEDistributeTranspiler._create_table_optimize_block.<locals>.<listcomp>sumr   r   r~   zorigin_grad_var: z
 grad_var:)rr  rt  ru  ParamOutz\distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of sgdrc  r)   )*r<  r  r   r   r   r   r   r8   r9   r<   r7   r;   r=   r   r   r   r@   r/  r   r   r   r  descset_typer=  r   r   rW   r  r>   r   r   r  r-   rR   _rename_varrO  rP  r{   rA   rB   r   )r'   r[  rR  rX  r7  r\  Ztable_opt_opr2  Zzero_dimZtable_shaper  r  Zlr_varZpserver_side_table_grad_listorigin_grad_nameZsplited_grad_namer   r   r   )r[  rR  r'   r  r   rL  S  s   


	




z1DistributeTranspiler._create_table_optimize_blockc                 C   sH   |  jddtjjjd ||}|jdd| jgii ddid |j	S )	z?
        create a new block to handle save checkpoint.
        ZkLookupTablePathTry   saver   	file_pathnoner~   )
r   r   r   r   r   r   r<  r   r   r   )r'   rR  rX  Zcheckpoint_save_blockr   r   r   rN    s   

z2DistributeTranspiler._create_checkpoint_save_blockFc              	   C   s  t  }t  }|D ]}|d\}}}	||vrg ||< || t|t|	f q
| D ]\}}
| |}t|
dkrl| j	r`|r`|j
 d| j }| || | |g||< n| |j
g||< q.g ||< |j}d}t|dkrtdd |dd d}t|
D ]M\}}|d }	|	| }|g}t|dkr||dd  d}| j	r|r| d	| d| j }n| d	| }| j|d
|j|j|d}|| | q|   q.|S )af  
        Create vars for each split.
        NOTE: only grads need to be named for different trainers, use
              add_trainer_suffix to rename the grad vars.
        Args:
            program (ProgramDesc): ProgramDesc which gradients belong.
            block_list (list[(varname, block_id, block_size)]): List of gradient blocks.
            add_trainer_suffix (Bool): Add trainer suffix to new variable's name if set True.
        Returns:
            var_mapping (collections.OrderedDict(varname->[new_varname_variable])):A dict mapping
                from original var name to each var split.
        r)   r   r   r   c                 S   r1   r   r   r2   r   r   r   r5     r6   zBDistributeTranspiler._create_vars_from_blocklist.<locals>.<lambda>Nr   r,   F)r@   rz   r/  r{   r7   )rd  re  r   rA   r8   r   r   rE   r=   rW   r@   r}   r  r7   r   r   r  r   r/  r{   rQ  )r'   r   Z
block_listr  Z	block_mapZvar_mappingZ	block_strr$   r%   r&   r   r  Znew_var_name
orig_shapero  r   rH   rowsZsplited_shaperE   r   r   r   r    s\   


z0DistributeTranspiler._create_vars_from_blocklistc                 C   s    |j |j|j|j|j|j|dS )Nrv  )r   r@   r7   r/  r{   r0  )r'   rH   rE   rz   r   r   r   rF  	  s   zDistributeTranspiler._clone_varc                 C   s"   g }| D ]
}| |jd  q|S )Nr   )rA   r7   )r  r   r   r   r   r   r  "	  s   z.DistributeTranspiler._get_splited_var_sectionsc              	   C   s   |  |}|jtjjjkr5| j|j }| |r|| j	|< |
 j|d dd|id|id|ttid d S |jtjjjkrU|
 j|d dd|id|id|ttid d S td	 d S )
Nr   Zsplit_selected_rowsr   r   r   r   Zsplit_byrefr   z<Variable type should be in set [DENSE_TENSOR, SELECTED_ROWS])r  r{   r   r   r   r  r   r@   r   r   r   r   r  r  DENSE_TENSORr  )r'   r   r  r   r  r   r  r   r   r   r  )	  s8   




z%DistributeTranspiler._insert_split_opc                 C   s   |dkr|dv r
|S |S |dkr|dkr|S |S |dkr$|dv r"|S |S |dv r0|dkr.|S |S |d	kr<|d
v r:|S |S |dkrH|dkrF|S |S |dkrT|dv rR|S |S |dkr[	 |S t d| )z
        Returns the shape for optimizer inputs that need to be reshaped when
        Param and Grad is split to multiple servers.
        Zadam)ZMoment1ZMoment2ZadagradMomentZadamax)r  ZInfNorm)ZmomentumZlars_momentumVelocityZrmsprop)r  Z
MeanSquareZdecayed_adagradZftrl)ZSquaredAccumulatorZLinearAccumulatorr  z2Not supported optimizer for distributed training: )rR   )r'   r   Zvarkeyr  Zparam_shaper   r   r   rx  L	  sD   	
z/DistributeTranspiler._get_optimizer_input_shapec                 C   s   d}d}d}| d}|dkr||d d  }nt|}| d}|dkr.||d | }nt|}|dt|| }|||fS )Nr   r   r   r   r,   )rG  r=   r?   )r'   r$   rS  Ztrainer_partZ
block_partZtrainer_idxZblock_indexr   r   r   rk  r	  s   


z'DistributeTranspiler._get_varname_partsc                 C   s   |  |\}}}|S r   )rk  )r'   r$   origr  r   r   r   _orig_varname	  s   z"DistributeTranspiler._orig_varnamec                 C   s*  |j }| }d }| j| d D ]}	| |	j| |kr!|	} nq|s&d S | |j\}
}}|r9d|
|g}n|
}|j| }||jd t	|j
  | jsY| jjr| jdkrg }t| jD ]}| d| }||j|  q`|jdd|id|ii d	 |jd
d|id|id
dt| j id	 |S )Nr   .r)   r   r   r  r   r   r~   scaleg      ?)r   r   r  r  r@   rk  joinr   rA   rB   r   rW   rn   r`   r   r>   r   r;   )r'   r  rZ  r  r7  r   r   pserver_block
grad_blockr  Zorig_varnamern  Ztrainer_nameZmerged_var_namer8  Z
vars2merger   Zper_trainer_namer   r   r   rK  	  sT   


z3DistributeTranspiler._append_pserver_grad_merge_opsc                    s   j j djjjdd} j dtjjjtjjj	dgdd}g }| j
D ]\}}|jjkr6|| q' jd||dd	|id
  fdd}	|	 }
 jd|dd	|
id
 |	 } jd|
|dd	|id
 |	 } jd||dd	|id
 |	 } jd||dd	|id
 |S )Nz
.local_bakFr@   r7   r{   r/  rz   z@TRAINER_ID@r   )r@   r{   r/  r7   rz   Zref_by_trainer_id)r   Z	TrainerIdr   rc  c                      s     j tdjjjddS )NZtmp_dc_outputFr  )r   r   generater7   r{   r/  r   rH   r  r   r   __create_temp_var__	  s   zEDistributeTranspiler._append_dc_asgd_ops.<locals>.__create_temp_var__Zelementwise_sub)r   YZelementwise_mulZelementwise_add)r   r@   r7   r{   r/  r   r   r   r  ZINT64rJ  rA   r   )r'   rH   r  r  Zlocal_param_bakZtrainer_id_varZ
ref_inputsrV  rj  r  Zo1o2Zo3Zo4r   r  r   _append_dc_asgd_ops	  sd   
	
	z(DistributeTranspiler._append_dc_asgd_opsc                    s>  |j }| }	t }
 fdd}jjr!||}|||}|jD ]}|dkrTjjr3||
|< q$||d }t	
 |v rO|	|rO|	|}||
|< q$||
|< q$|dkrr||}|sa d S |	j|jd|j|jd}||
|< q$|dkr||d }||	jv r|	j||d  |
|< q$| j| }|	j|j|j|j|jd}||
|< q$|jD ]5}d }|d	v rqj j||d  }|
d }|j||j|j}|	j|j|j|j|d}||
|< qj j|}|
d |d
< |j|j|
|| d |
d jt	jjjkr|t|
d jd t|
d j  d S d S )Nc                    s<   d }j   d D ]}t|j| dd r|} |S q	|S )Nr   rr  r   )r  r0   r@   r   )rY  param_blockrV  r  r'   r   r   _get_param_block
  s   zBDistributeTranspiler._append_pserver_ops.<locals>._get_param_blockrt  r   rr  T)r@   rz   r/  r7   ru  rs  r  r~   r)   )r   r   rd  re  rn   r[   r  r   r   r   ZkNewGradSuffixr+  rE   r   r@   r/  r7   r   rz   r   rx  r{   _get_output_map_from_opr   rg  r   r   r  rA   rB   )r'   r  rY  r  r7  r   r8  r9  r   r  ri  r  r  dcrh  r  Znew_gradr  rW  Z
lr_varnamerw  r{  rE   r   r   r  r   r5  	  s   
	








z(DistributeTranspiler._append_pserver_opsc                 C   sp   d}|  D ]/\}}| |j| |jkr5|jddkr5| |j| jv s0| |j| jv r5|} |S q|S )aP  
        Return pserver side grad/param variable, return None
        if the variable is not grad/param, e.g.

            a@GRAD -> a@GRAD.block0
            a@GRAD -> a@GRAD (a is not split)
            fc_0.w_0 -> fc_0.w_0.block_0
            fc_0.w_0 -> fc_0.w_0 (weight is not split)
            _generated_var_123 -> None
        Nr   r   )r   r  r@   rG  r   r   )r'   rE   Zvar_dictr  r  r  r   r   r   _get_pserver_grad_param_varq
  s   

 z0DistributeTranspiler._get_pserver_grad_param_varc           	      C   s   |  | j j|}| D ]\}}t|ts|g}|D ]}|| jvr*|| qq| | j j|}| D ]\}}t|tsF|g}|D ]}|| jvrV|| qHq:|j	|j
||| dS Nr~   )rf  r   r   r   r   rt   r   r=  r  r   r{   rg  )	r'   r   rH   r   r   rh  varlistrE   r   r   r   r   r>  
  s2   



z!DistributeTranspiler._clone_lr_opc                 C   s  |j }| | j j|}| D ]K\}}t|ts|g}tt	|D ]4}|| }| 
|| j}	|	r9|	||< q#|j| jvrM| |}
|
||< q#| j|j ||< q#|||< q| | j j|}| D ]K\}}t|tsw|g}tt	|D ]4}|| }| 
|| j}	|	r|	||< q}|j| jvr| |}
|
||< q}| j|j ||< q}|||< qk|j|j||| dS r  )r   rf  r   r   r   r   rt   r   r>   r=   r  r@   r=  r  r   r{   rg  )r'   r  rY  r   r   rh  r  r   rE   r  rW  r   r   r   r   r6  
  sV   









z0DistributeTranspiler._append_pserver_non_opt_opsc                 C   s@   t |j t |j @ st |j t |j @ rdS dS r   )r#  r  r(  r   )r'   op1op2r   r   r   _is_op_connected
  s   z%DistributeTranspiler._is_op_connectedc                 C   sf   ddl m} ||}tt|D ] }t|t|D ]}|| }|| }| ||r/||| qq|S )Nr   	UnionFind)r   r  r>   r=   r  union)r'   r   r  rT  r   jr  r  r   r   r   rH  
  s   z"DistributeTranspiler._create_ufindc                 C   s   d|j v rd|j v rdS dS )Nrr  ru  TF)r   )r'   r   r   r   r   r4  
  s   z%DistributeTranspiler._is_optimizer_opc                 C   sb   dd | j | d D }|dd |v rdS |D ]}|dd }t||r.||kr. dS qdS )Nc                 S   r   r   r   )r   rV  r   r   r   r   
  s    z>DistributeTranspiler._is_opt_op_on_pserver.<locals>.<listcomp>r   rr  r   TF)r  r   r0   )r'   r  r   Zparam_namesnr~  r   r   r   rI  
  s   z*DistributeTranspiler._is_opt_op_on_pserverc                 C   \   t  }|jD ]$}g }||D ]	}|||  qt|dkr'|d ||< q|||< q|S )z8Returns a dict from op input name to the vars in varmap.r   r   )rd  re  r   r   rA   r=   r'   Zvarmapr   Ziomaprh  r   r$   r   r   r   rf       

z+DistributeTranspiler._get_input_map_from_opc                 C   r  )z9Returns a dict from op output name to the vars in varmap.r   r   )rd  re  r   r   rA   r=   r  r   r   r   r    r  z,DistributeTranspiler._get_output_map_from_opc              
      s  g }j  }t|jD ]\}}t|t}|ttks'|ttttB krj	du r|j
dkr؈j  j|}j  j|}|D ]}||  qG fddtjD }	tj jD ]3\}
}|j
dkr|jD ]&}t||dkr||d  jkrj j|
 dtd	j  qoqc|	D ]&}|j j d
j kr|_j j|j|j
|j|j|jtdd qtj  }|!| |j"|dd|	i||tid}|#| t$d|j
 q|S )NF	incrementc              	      s:   g | ]}j  j j d |  j j j jdqS )r   )r@   r{   r7   r/  rz   )r   r   r   r@   r{   r7   r/  rz   )r   Zid_rr   r'   r   r   r   -  s    
z4DistributeTranspiler._get_lr_ops.<locals>.<listcomp>rb  r   r   rU   g        r   )r@   r{   r/  r7   rz   Zinitializerr  r   r~   zappend lr op: )%r   r   r   r   r8   r   r  LR_SCHED_OP_ROLE_ATTR_VALUEOPT_OP_ROLE_ATTR_VALUErW   r{   rf  r   r  r>   r   r   r   r=   r   r@   r?  r;   r}   rr   r   r/  r7   rz   r
   r   op_proto_and_checker_makerkOpRoleAttrNamer   r   rA   r!   )r'   r  rH   r   r   Zrole_idr   r   rh  Zall_trainer_counter_inputsr   rE   op_role_attr_namer   r  r   r    s|   







	

z DistributeTranspiler._get_lr_opsc           
      C   s   ddl m} g }t }| jD ]}| |r||dd  qg }| j }|j	D ]}t|j
|@ r8|| q*||j	}|j	D ]"}|j	D ]}	||	krb| ||	rb| |sb| |	sb|||	 qFqA|j	D ]}|D ]}	|||	rz||  nqkqg|S )Nr   r  ru  )r   r  r#  r   r4  r  r   r   r   r   r(  rA   r  r  Zis_connected)
r'   r  r  Zlr_varsr   Zfind_opsrH   rT  r  r  r   r   r   _get_lr_ops_deprecatedb  sD   










z+DistributeTranspiler._get_lr_ops_deprecatedc                 C   sB   t j}t jjj}| |jv rt| |  t|krdS dS r   )r   r  OpRoleOptimizer  Z
attr_namesr8   rg  )r'   r   Zop_makerZoptimize_roler   r   r   _is_opt_role_op  s   
z$DistributeTranspiler._is_opt_role_opc           
      C   s  | j  }g }g }t }| j  j}|jD ]`}| |rtt| v rAt|	tv rA| j
jdkrA| j
jdkrA|dttjjj q|| |	trs|	td }|	td }||vrs|| td|| ||| || g q	 q|  }	|	r||	 }||fS )z
        Get optimizer operators, parameters and gradients from origin_program
        Returns:
            opt_ops (list): optimize operators.
            params_grads (dict): parameter->gradient.
        r   r   Zop_roler   r   zadding param_grad pair: )r   r   r#  r   r   r  OP_NAME_SCOPErg  CLIP_OP_NAME_SCOPEr   rn   r\   r?  r8   r   r  r  ZBackwardrA   r	  r  r!   _get_distribute_update_vars)
r'   rH   Zopt_opsr   Zoptimize_paramsorigin_var_dictr   r   Z	grad_nameZspecial_distribute_update_varsr   r   r   r     sD   





z'DistributeTranspiler._get_optimize_passc           
      C   s   | j  }| j  j}g }|jD ]!}d}|| v r1||r1||dD ]	}|||  q'qtt	|}g }|D ]	}	||	|	g q<|S )a  
        This Function is used for a special model, like PyramidDnn which has pyramid hash op.
        Some Parameters don't use optimizing op to update its value, but updated in its BP process.
        In these cases, Transpilse can't find these special vars by optimizing op information.
        So we add this function and add attr "distribute_update_vars" to tell transpiler these Parameter
        need to be updated in distribute training.
        We assume these special var send and receive the same var_name.
        Zdistribute_update_varsrx   )
r   r   r   r   rg  r   r   rA   r   r#  )
r'   rH   r  r   r   Zspecial_attrr   Zunique_paramsr   rE   r   r   r   r    s   



z0DistributeTranspiler._get_distribute_update_varsr   )NT)NNT)Nr   r   TNr   )T)NN)F)6r   r   r   rX   r(   ro   r   r   r   r   r   r   r%  r*  r-  r.  r  r]  r_  r^  rp  r  r  r   r  r  rM  rL  rN  r  rF  staticmethodr  r  rx  rk  r  rK  r  r5  r  r>  r6  r  rH  r4  rI  rf  r  r  r  r  r   r  r   r   r   r   rm     s    
J

C
2Q	
   J
&5`  >$
fF+^b*$g

H

#&6Ar4
F'3rm   )9rX   rd  rO  r9   rh   r   	functoolsr   numpyr   Zpaddler   Zpaddle.base.frameworkr   Zpaddle.frameworkr   r   r   ZCpaddle.incubate.distributed.fleet.parameter_server.ir.ps_dispatcherr   r	   Zpaddle.nn.initializerr
   Zpaddle.staticr   r   r   Zpaddle.utilsr   r   ZLOOKUP_TABLE_GRAD_TYPEr  r  r  ZkOpRoleVarAttrNamer	  r  r  r  r  r  r  ZRPCr  ZDistr  ZLRSchedr  r   r   r!   r"   r0   rI   rJ   rg   rm   r   r   r   r   <module>   sF   




/j