o
    rqi`                     @   s  d dl Z d dlmZ d dlZd dlmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d d	lmZ d d
lmZm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z)m*Z*m+Z+ d dl,m-Z-m.Z.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9 ddl:m;Z; ddl<m=Z=m>Z> e5 rd dl?Z?e4 r	 ed Z@eeAeBeddf ZCeeAedf ZDe7 ZEG dd deZFG dd deFZGd d! ZHdS )"    N)ABCabstractmethod)partial)Pool)Lock)AnyDict	GeneratorListMappingOptionalUnion)version)Model)	MsDataset)TASK_OUTPUTSModelOutputBase)TASK_INPUTScheck_input_type)Preprocessor)Config)
FrameworksInvoke	ModelFile)create_devicedevice_placementverify_device)read_configsnapshot_download)is_tf_availableis_torch_available)
get_logger)compile_model   )check_model_from_owner_group   )is_modelis_official_hub_path)ztorch.Tensorz	tf.TensorzImage.Imageznumpy.ndarrayztorch.nn.Modulec                
   @   s  e Zd ZdZdd Zdee fddZ							d3d
ede	eee f de	e
ee
 f defddZ		d4dee dee fddZdd ZdefddZde	eee f de	eeef ef fddZdd ZdefddZd d! Zdedeeef fd"d#Zd$d% Zdee deeef fd&d'Zd(d) Zd*d+ Zd,edeeef fd-d.Zd,eeef deeef fd/d0Zd,eeef deeef fd1d2ZdS )5PipelinezPipeline base.
    c                 K   s~   | j rd|d< t|trtd|  t|tr=t|r=td| d t|r;tj|f| j	dt
j| jd|S |S |S )NTtrust_remote_codezinitiate model from zinitiate model from location .)deviceZmodel_prefetchedZ
invoked_by
device_map)r)   
isinstancestrloggerinfor'   r&   r   from_pretraineddevice_namer   ZPIPELINEr,   )selfmodelkwargs r6   `/home/app/PaddleOCR-VL/.venv_paddleocr/lib/python3.10/site-packages/modelscope/pipelines/base.pyinitiate_single_model0   s(   
zPipeline.initiate_single_modelinput_modelsc                 C   s"   g }|D ]
}| | | q|S N)appendr8   )r3   r9   modelsr4   r6   r6   r7   initiate_multiple_modelsB   s   z!Pipeline.initiate_multiple_modelsNgpuTconfig_filer4   preprocessorr+   c           	      K   sj  |dur|dksJ d|| _ t| || _|dd| _t|ts2| j|fi || _| jg| _	n	d| _| 
|| _	t| j	dk| _|durTt|| _tj|}n| jsjt| jtra| j}n| jj}t|| _|du rx| jsxt|| _n|| _| js| jr| j	d r|  | _nd| _| jtjkrt| j| _d| _t  | _!|| _"|dd| _#|d	i | _$dS )
aH   Base class for pipeline.

        If config_file is provided, model and preprocessor will be
        instantiated from corresponding config. Otherwise, model
        and preprocessor will be constructed separately.

        Args:
            config_file(str, optional): Filepath to configuration file.
            model: (list of) Model name or model object
            preprocessor: (list of) Preprocessor object
            device (str): device str, should be either cpu, cuda, gpu, gpu:X or cuda:X
            auto_collate (bool): automatically to convert data to tensor or not.
            compile (bool, optional): Compile the model with torch 2.0, default False
            compile_options (dict, optional): The compile options if compile=True,
                default None to use the default params of 'TorchModel.compile'.
        Nr>   z;`device` and `device_map` cannot be input at the same time!r)   Fr%   r   compileZcompile_options)%r,   r   r2   getr)   r-   r
   r8   r4   r<   r=   lenhas_multiple_modelsr   	from_filecfgospathdirnamer.   	model_dirr   r   r1   r@   _get_framework	frameworkr   torchr   r+   _model_preparer   _model_prepare_lock_auto_collate_compile_compile_options)	r3   r?   r4   r@   r+   auto_collater,   r5   rJ   r6   r6   r7   __init__H   sB   

zPipeline.__init__info_strrJ   c                 C   s(   |pd}t |ds| jsJ |dS dS )a%  Check trust_remote_code if the pipeline needs to import extra libs

        Args:
            info_str(str): The info showed to user if trust_remote_code is `False`.
            model_dir(`Optional[str]`): The local model directory. If is a trusted model, check remote code will pass.
        zThis pipeline requires `trust_remote_code` to be `True` because it needs to import extra libs or execute the code in the model repo, setting this to true means you trust the files in it.)rJ   N)r$   r)   )r3   rU   rJ   r6   r6   r7   check_trust_remote_code   s   	
z Pipeline.check_trust_remote_codec                    s    j jdd  fdd} jsH jtjkrE jr2 jD ]}|| q jr1 fdd jD  _n| j	  jrEt
 j	fi  j _	d _ j   dS )	zQ Place model on certain device for pytorch models before first inference
        iX  )timeoutc                    s`   t | tjjst| dr| j} t | tjjsd S |   ddlm} || r.| 	 j
 d S d S )Nr4   r   )is_on_same_device)r-   rM   nnModulehasattrr4   evalmodelscope.utils.torch_utilsrX   tor+   )r4   rX   r3   r6   r7   _prepare_single   s   z/Pipeline.prepare_model.<locals>._prepare_singlec                    s   g | ]}t |fi  jqS r6   )r"   rR   ).0mr_   r6   r7   
<listcomp>   s    z*Pipeline.prepare_model.<locals>.<listcomp>TN)rO   acquirerN   rL   r   rM   rD   r<   rQ   r4   r"   rR   release)r3   r`   rb   r6   r_   r7   prepare_model   s&   




zPipeline.prepare_modelreturnc                    s|   g  | j D ]}t|tr|}n|j}t|tj}t	|} 
|j qt fdd D s:td   d S  d S )Nc                 3   s    | ]	}| d  kV  qdS )r   Nr6   )ra   xZ
frameworksr6   r7   	<genexpr>   s    z*Pipeline._get_framework.<locals>.<genexpr>z:got multiple models, but they are in different frameworks r   )r<   r-   r.   rJ   ospjoinr   ZCONFIGURATIONr   rE   r;   rL   allr/   warning)r3   rb   rJ   Zcfg_filerF   r6   ri   r7   rK      s   


zPipeline._get_frameworkinputc           
      O   s  | j s| jr| jd r| js|   |dd }| jd
i |\}}}||d< ||d< ||d< dt| jv rCt	|t
rCd|i}d|d	< t	|t
rp|d u rdg }|D ]}	|| j|	g|R i | qP|S | j||fi |}|S t	|tr| j|g|R i |S | j|g|R i |}|S )Nr   
batch_sizepreprocess_paramsforward_paramspostprocess_paramsZLLMPipelinemessagesTZ
is_messager6   )r4   rD   r<   rN   rf   pop_sanitize_parameterstype__name__r-   listr;   _process_single_process_batchr   _process_iterator)
r3   ro   argsr5   rp   rq   rr   rs   outputeler6   r6   r7   __call__   s2   
 	
zPipeline.__call__c                 K   s
   i i |fS )a  
        this method should sanitize the keyword args to preprocessor params,
        forward params and postprocess params on '__call__' or '_process_single' method
        considered to be a normal classmethod with default implementation / output

        Default Returns:
            Dict[str, str]:  preprocess_params = {}
            Dict[str, str]:  forward_params = {}
            Dict[str, str]:  postprocess_params = pipeline_parameters
        r6   )r3   Zpipeline_parametersr6   r6   r7   rv      s   
zPipeline._sanitize_parametersc                 o   s*    |D ]}| j |g|R i |V  qd S r:   )rz   )r3   ro   r}   r5   r   r6   r6   r7   r|     s   zPipeline._process_iteratorc                 C   s   t || jS r:   )
collate_fnr+   )r3   datar6   r6   r7   _collate_fn  s   zPipeline._collate_fnc              	   O   s   | di }| di }| di }| | | j|fi |}t| j| j= | jtjkrTt  | j	r;| 
|}| j|fi |}W d    n1 sNw   Y  n	| j|fi |}W d    n1 sgw   Y  | j|fi |}| | |S )Nrq   rr   rs   )rB   _check_input
preprocessr   rL   r2   r   rM   no_gradrP   r   forwardpostprocess_check_output)r3   ro   r}   r5   rq   rr   rs   outr6   r6   r7   rz     s&   


	
zPipeline._process_singlec                 C   sv   i }|D ]}|  D ]\}}||g }|| |||< q
q| D ]}t|| d tjr8t|| ||< q#|S )Nr   )itemsrB   r;   keysr-   rM   Tensorcat)r3   Z	data_listZ
batch_dataZsample_preprocessedkvZ
value_listr6   r6   r7   _batch"  s   

zPipeline._batchc              
      s  | d| d}| d}g }tdt||D ]}t|| t|}|| }	fdd||| D }
tjjG jtjkrot	  
|
}jrV|}j|fi |}W d    n1 siw   Y  n
|
}j|fi |}W d    n1 sw   Y  t|	D ]T i }| D ]8\}}|d urt|ttfrt|d tjrt| fdd|D ||< q|  ||< q|  d	  ||< qj|fi |}| || qq|S )
Nrq   rr   rs   r   c                    s   g | ]}j |fi  qS r6   )r   )ra   i)rq   r3   r6   r7   rc   9  s    z+Pipeline._process_batch.<locals>.<listcomp>c                 3   s     | ]}|  d   V  qdS )r%   Nr6   )ra   e)	batch_idxr6   r7   rj   O  s
    
z*Pipeline._process_batch.<locals>.<genexpr>r%   )rB   rangerC   minr   rL   r2   r   rM   r   r   rP   r   r   r   r-   tuplery   r   rw   r   r   r;   )r3   ro   rp   r5   rr   rs   Zoutput_listr   endZreal_batch_sizeZpreprocessed_listZbatched_outr   r   elementr6   )r   rq   r3   r7   r{   .  sT   









zPipeline._process_batchc           	      C   sh  | j }|tv rt| }t|trLd }|D ]}t|ttfr*t|t|kr)|} nqt|tr3|} nq|d u rJd}|D ]	}|| d7 }q<t||}t|trXt	|| d S t|trwt|tsfJ dt
||D ]	\}}t	|| qkd S t|tr| D ]}t|tr||v rt	|| ||  qd S td| t| ddstd| d d	| _d S d S )
NzDinput data format for current pipeline should be one of following: 

zinput should be a tuplezinvalid input_type definition _input_has_warnedFtask z input definition is missingT)	group_keyr   r-   ry   dictr   rw   r.   
ValueErrorr   zipr   getattrr/   rn   r   )	r3   ro   	task_nameZ
input_typeZmatched_typeterr_msgZ	input_eler   r6   r6   r7   r   ]  sN   





zPipeline._check_inputc                 C   s   | j }|tvrt| ddstd| d d| _d S t| }g }t|ttfr,|	 n|}|D ]}t|ttfrB||vrB|
| q0t|dkrTtd| d| d	d S )
N_output_has_warnedFr   z output keys are missingTr   zexpected output keys are z, those z are missing)r   r   r   r/   rn   r   r-   r   r   r   r;   rC   r   )r3   ro   r   Zoutput_keysZmissing_keysr   r6   r6   r7   r     s,   


zPipeline._check_outputinputsc                 K   s8   | j dus	J dt| j trJ d| j |fi |S )z\ Provide default implementation based on preprocess_cfg and user can reimplement it
        Nz'preprocess method should be implementedzEdefault implementation does not support using multiple preprocessors.)r@   r-   r
   )r3   r   rq   r6   r6   r7   r     s
   zPipeline.preprocessc                 K   s2   | j dus	J d| jrJ d| j |fi |S )zU Provide default implementation using self.model and user can reimplement it
        Nz$forward method should be implementedzFdefault implementation does not support multiple models in a pipeline.)r4   rD   )r3   r   rr   r6   r6   r7   r     s   zPipeline.forwardc                 K   s   t d)ac   If current pipeline support model reuse, common postprocess
            code should be write here.

        Args:
            inputs:  input data
            post_params:   post process parameters

        Return:
            dict of results:  a dict containing outputs of model, each
                output should have the standard output name.
        r   )NotImplementedError)r3   r   Zpost_paramsr6   r6   r7   r     s   zPipeline.postprocess)NNNr>   TN)NN) rx   
__module____qualname____doc__r8   r
   
InputModelr=   r.   r   r   rT   r   rV   rf   rK   Inputr   r   r	   r   rv   r|   r   rz   r   r{   r   r   r   r   r   r6   r6   r6   r7   r(   ,   s`    
E
#
)


/*


r(   c                   @   s   e Zd ZdZ			ddedeeee f fddZdd	 Z	d
d Z
edd Zdeeef deeef fddZedd ZdedefddZdS )DistributedPipelinea  This pipeline is used to load multi gpu models.

    What will this class do:
    1. Read the global config from the configuration.json
    2. Set the multiprocessing method to spawn
    3. Open a multiprocessing pool of the world_size to instantiate model pieces.
    4. Set the master port and ip
    5. Call _instantiate_one to instantiate one model piece,
    This method should be implemented by the derived class.
    6. After the forward method is called, do preprocess in main process and
    call _forward_one to collect results, and do post process in main process.

    NOTE: _instantiate_one and _forward_one are class methods, any derived class should implement them and
    store the model handler in the class field.
    NTr4   r@   c           	      K   sP  || _ d| _t | _|| _tj|r|| _nt	|| _t
| j| _| | j| _d | _d| _t| j| _d| _| jj| _tjjddd tt| j}t| j| _d|vrZd|d< d|v rdt|d ntd	d
}ddlm}m} ||sy| }t ||d< |d tj!d< |d tj!d< | j"t#| j$j%fd| ji| jj&|| g | _'d S )NFcpuZspawnT)forceZ	master_ipz	127.0.0.1master_porti<s  iL  r   )_find_free_port_is_free_portZMASTER_ADDRZMASTER_PORTrJ   )(r@   rN   r   rO   rP   rG   rH   existsrJ   r   r   rF   _get_world_size
world_size
model_poolr2   r   r+   rD   rL   rM   multiprocessingZset_start_methodry   r   r   intrandomrandintr]   r   r   r.   environmapr   	__class___instantiate_oner4   r<   )	r3   r4   r@   rS   r5   Zranksr   r   r   r6   r6   r7   rT     sX   


zDistributedPipeline.__init__c                 C   sB   t | dr| jd urz| j  W d S  ty   Y d S w d S d S )Nr   )r[   r   	terminateAttributeErrorr_   r6   r6   r7   __del__  s   zDistributedPipeline.__del__c                 C   s    | j  }|d= |d= |d= |S )Nr   r@   rO   )__dict__copy)r3   Z	self_dictr6   r6   r7   __getstate__  s
   
z DistributedPipeline.__getstate__c                 K      dS )a  Instantiate one model piece.

        Args:
            rank: The model rank.
            model_dir: The model_dir in the node.
            kwargs: Any extra args.

        Returns:
            None. The model handler should be kept in the class field.
        Nr6   )clsZrankrJ   r5   r6   r6   r7   r        z$DistributedPipeline._instantiate_oner   rg   c                 K   s,   ||d}| j | jj|g| j }|d S )N)r   rr   r   )r   r   r   _forward_oner   )r3   r   rr   resr6   r6   r7   r     s   
zDistributedPipeline.forwardc                 C   r   )zForward the inputs to one model piece.

        Use the model handler kept in the class field to forward.

        Args:
            inputs: The inputs after the preprocessing.

        Returns:
            The forward results.
        Nr6   )r   r   r6   r6   r7   r   #  r   z DistributedPipeline._forward_onerF   c                 C   s    | d}|d u r| dS |S )Nzmegatron.world_sizezmodel.world_size)Zsafe_get)r3   rF   Zm_world_sizer6   r6   r7   r   1  s   

z#DistributedPipeline._get_world_size)NNT)rx   r   r   r   r.   r   r   r
   rT   r   r   classmethodr   r   r   r   r   r   r   r   r6   r6   r6   r7   r     s(    
1




r   c              	      s8  ddl m} dd }t| tst| tr#t|  fdd|  D S t| ttfrRdt	| kr5t
g S t| d ttfrE||  S t|  fdd| D S t| tjri| jjtju ra| S tt
|  S t| t
jrt|  S t| ttttttd	fr| S || d
kr| S || dkr| S tdt|  )a3  Prepare the input just before the forward function.
    This method will move the tensors to the right device.
    Usually this method does not need to be overridden.

    Args:
        data: The data out of the dataloader.
        device: The device to move data to.

    Returns: The processed data.

    r   )default_collatec                 S   s   | j jS r:   )r   rx   )objr6   r6   r7   get_class_nameF  s   z"collate_fn.<locals>.get_class_namec                    s(   i | ]\}}||d krt | n|qS )Z	img_metasr   )ra   r   r   r+   r6   r7   
<dictcomp>K  s    zcollate_fn.<locals>.<dictcomp>c                 3   s    | ]}t | V  qd S r:   r   )ra   r   r   r6   r7   rj   U  s    zcollate_fn.<locals>.<genexpr>NZInputFeaturesZDataContainerzUnsupported data type )Ztorch.utils.data.dataloaderr   r-   r   r   rw   r   r   ry   rC   rM   r   r   floatr^   npZndarrayZdtypeZstr_r   Z
from_numpybytesr.   boolr   )r   r+   r   r   r6   r   r7   r   8  s2   

r   )IrG   Zos.pathrH   rk   r   abcr   r   	functoolsr   r   r   	threadingr   typingr   r   r	   r
   r   r   r   numpyr   	packagingr   Zmodelscope.models.baser   Zmodelscope.msdatasetsr   Zmodelscope.outputsr   r   Zmodelscope.pipeline_inputsr   r   Zmodelscope.preprocessorsr   Zmodelscope.utils.configr   Zmodelscope.utils.constantr   r   r   Zmodelscope.utils.devicer   r   r   Zmodelscope.utils.hubr   r   Zmodelscope.utils.import_utilsr   r    Zmodelscope.utils.loggerr!   r]   r"   Zutils.automodel_utilsr$   utilr&   r'   rM   r   r.   r   r   r   r/   r(   r   r   r6   r6   r6   r7   <module>   sL   $   }