o
    1 iBB                     @   s*  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZe	dZ
dd Zdd Zdd Zdd	d
ddZdd Zdd Z	d9ddZdd Zdd Zdd Zdd Zdd Zd Zd Zd!d" Zd#d$ Zd%d& Zd'Zd(Zd)Zd*Zd+Z d,Z!d-d. Z"d/d0 Z#d1d2 Z$d3d4 Z%d5d6 Z&d7d8 Z'dS ):    Nzray.util.spark.utilsc                   C   s
   dt jv S )NZDATABRICKS_RUNTIME_VERSION)osenviron r   r   `/home/app/PaddleOCR-VL-test/.venv_paddleocr/lib/python3.10/site-packages/ray/util/spark/utils.pyis_in_databricks_runtime   s   
r   c                 C   s,   d | }d |}d| d| d| dS )N  zCommand z failed with return code z", tail output are included below.

)join)cmdreturn_codetail_output_dequeZcmd_strZtail_outputr   r   r   gen_cmd_exec_failure_msg   s   

r   c                 C   sD   | j dd }t|d d }|d }ddddd}|||  S )	Nzspark.executor.memoryZ1g   i   i   @l        )kmgt)confgetlowerint)sparkZ	value_strZ	value_numZ
value_unitZunit_mapr   r   r   *get_configured_spark_executor_memory_bytes   s   r   T)	extra_envsynchronousc                   s   t | h d}|rtdt| |dd}|dur(|dur(td|du r.|ni tj|}tj	| f|dtj
tjd| tjdd	 fd
d}tj|dd  |sa fS   }|dkrqtt| |dS )a  
    A convenience wrapper of `subprocess.Popen` for running a command from a Python
    script.
    If `synchronous` is True, wait until the process terminated and if subprocess
    return code is not 0, raise error containing last 100 lines output.
    If `synchronous` is False, return an `Popen` instance and a deque instance holding
    tail outputs.
    The subprocess stdout / stderr output will be streamly redirected to current
    process stdout.
    >   textstdoutstderrz`kwargs` cannot contain envNz5`extra_env` and `env` cannot be used at the same timeT)r    r   r   r   d   )maxlenc                     s&    j D ]} |  tj |  qd S N)r   appendsyswrite)lineprocessr   r   r   redirect_log_thread_fnN   s   

z(exec_cmd.<locals>.redirect_log_thread_fnr   )targetargsr   )setkeysintersection
ValueErrorlistpopr   r   
subprocessPopenPIPESTDOUTcollectionsdeque	threadingThreadstartwaitRuntimeErrorr   )r   r   r   kwargsZillegal_kwargsr    r*   r   r   r(   r   exec_cmd(   s8   	
r?   c                 C   s^   dd l }ddlm} || |j|j}|| |fdkW  d    S 1 s(w   Y  d S )Nr   )closing)socket
contextlibr@   AF_INETSOCK_STREAM
connect_ex)hostportrA   r@   sockr   r   r   is_port_in_useb   s
   $rI   c                 C   sD   t   }t   | |k r t| |rdS t d t   | |k sdS )NT   F)timerI   sleep)rF   rG   timeoutZbeg_timer   r   r   _wait_service_upj   s   

rN   r     r!   c                 C   s\   t  }|pg }t|D ]}|||}||v rqt| |s"|  S qtd| d| d)z!
    Get random unused port.
    z!Get available port between range z and z failed.)randomSystemRandomrangerandintrI   r=   )rF   Zmin_portZmax_portmax_retriesZexclude_listrng_rG   r   r   r   get_random_unused_portu   s   
rW   c                  C   s(   ddl m}  |  }|d u rtd|S )Nr   )SparkSessionzSpark session haven't been initiated yet. Please use `SparkSession.builder` to create a spark session and connect to a spark cluster.)Zpyspark.sqlrX   ZgetActiveSessionr=   )rX   Zspark_sessionr   r   r   get_spark_session   s   rY   c                 C   s   | j dS )Nzspark.driver.host)r   r   )r   r   r   r   !get_spark_application_driver_host   s   rZ   c                 C   sV   | j  }|dur"dd }| dgd||  ||jS || 	 S )z0Gets the current max number of concurrent tasks.Nc                 S   s   d S r#   r   )rV   r   r   r   dummpy_mapper   s   z3get_max_num_concurrent_tasks.<locals>.dummpy_mapperrJ   )
Z_jscscparallelizeZwithResourcesmapcollectZmaxNumConcurrentTasksZ_java_resource_profileZresourceProfileManagerZdefaultResourceProfile)Zspark_contextZresource_profileZsscr[   r   r   r   get_max_num_concurrent_tasks   s   

r`   c                  C   s*   dd l } ttjv rttjt S |  jS Nr   )psutil)RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTESr   r   r   virtual_memorytotal)rb   r   r   r   '_get_spark_worker_total_physical_memory   s   

rf   c                  C   s,   dd l } ttjv rttjt S | djS Nr   z/dev/shm)shutil'RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTESr   r   r   
disk_usagere   )rh   r   r   r   %_get_spark_worker_total_shared_memory   s   
rk   g?c           	      C   s   dd l }dd l}ttjv rttjt }n| j}|t }t	tjv r+ttjt	 }n|
dj}|t }t||| |\}}}|d urHt| ||fS rg   )rh   rb   )RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTESr   r   r   rd   re   '_RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET'RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTESrj   _calc_mem_per_ray_node_loggerwarning)	configured_heap_memory_bytesconfigured_object_store_bytesrh   rb   Zavailable_physical_memZavailable_shared_memheap_mem_bytesobject_store_byteswarning_msgr   r   r   calc_mem_ray_head_node   s.   




rw   c                 C   s.   t ||  t }t ||  t }t||||S r#   )r   rm   ro   )Znum_task_slotsphysical_mem_bytesshared_mem_bytesrr   rs   available_physical_mem_per_nodeavailable_shared_mem_per_noder   r   r   _calc_mem_per_ray_worker_node   s   

r|   c           
      C   s   ddl m}m} d }|p| | }tjds||kr|}| t }||kr(|}d}||k r?||kr7d| d}nd| d}|}t|}|d u rNt| | }	nt|}	|	||fS )	Nr   )&DEFAULT_OBJECT_STORE_MEMORY_PROPORTION!OBJECT_STORE_MINIMUM_MEMORY_BYTESZ#RAY_OBJECT_STORE_ALLOW_SLOW_STORAGEzzYour configured `object_store_memory_per_node` value is too high and it is capped by 80% of per-Ray node allocated memory.zYour operating system is configured with too small /dev/shm size, so `object_store_memory_worker_node` value is configured to minimal size (z- bytes),Please increase system /dev/shm size.zYou configured too small Ray node object store memory size, so `object_store_memory_worker_node` value is configured to minimal size (zI bytes),Please increase 'object_store_memory_worker_node' argument value.)Zray._private.ray_constantsr}   r~   r   r   r   0_RAY_ON_SPARK_MAX_OBJECT_STORE_MEMORY_PROPORTIONr   )
rz   r{   rr   rs   r}   r~   rv   ru   Zobject_store_bytes_upper_boundrt   r   r   r   ro     s@   
ro   RAY_ON_SPARK_WORKER_CPU_CORESRAY_ON_SPARK_WORKER_GPU_NUMrc   ri   rl   rn   c                  C   s(   dd l } ttjv rttjt S |  S ra   )multiprocessingr   r   r   r   	cpu_count)r   r   r   r   _get_cpu_coresS  s   
r   c               
   C   s   t tjv rttjt  S tdd u rdS ztjdddddd} t| j	
 dW S  tyG } ztdt|  W Y d }~dS d }~ww )Nz
nvidia-smir   z1nvidia-smi --query-gpu=name --format=csv,noheaderT)shellcheckr   capture_outputr	   zU'nvidia-smi --query-gpu=name --format=csv,noheader' command execution failed, error: )r   r   r   r   rh   whichr3   runlenr   stripsplit	Exceptionrp   inforepr)Zcompleted_procer   r   r   _get_num_physical_gpusa  s,   
r   c                 C   sd   || krt d| d|  d| | }|dkr0||kr&t d| d| d||| kr0|| }|S )Nzscpu number per Ray worker node should be <= spark worker node CPU cores, you set cpu number per Ray worker node to z* but spark worker node CPU core number is .r   z|gpu number per Ray worker node should be <= spark worker node GPU number, you set GPU devices number per Ray worker node to z- but spark worker node GPU devices number is )r0   )num_cpusnum_gpusnum_cpus_per_nodenum_gpus_per_nodenum_ray_node_slotsr   r   r   _get_local_ray_node_slots~  s*   r   c                 C   sV   t  }|dkrt }nd}t||| |}t }t }t|||||\}	}
}|	|
d|fS )z
    Returns tuple of (
        ray_worker_node_heap_mem_bytes,
        ray_worker_node_object_store_bytes,
        error_message, # always None
        warning_message,
    )
    r   N)r   r   r   rf   rk   r|   )r   r   heap_memory_per_nodeobject_store_memory_per_noder   r   r   rx   ry   Zray_worker_node_heap_mem_bytesZ"ray_worker_node_object_store_bytesrv   r   r   r   "_get_avail_mem_per_ray_worker_node  s2   	r   c           
         sj    fdd}| j dgd| d \}}}}	|dur(td| d|	dur1t|	 ||fS )a<  
    Return the available heap memory and object store memory for each ray worker,
    and error / warning message if it has.
    Return value is a tuple of
    (ray_worker_node_heap_mem_bytes, ray_worker_node_object_store_bytes,
     error_message, warning_message)
    NB: We have one ray node per spark task.
    c              
      sd   zt  W S  ty1 } zdd l}d||j}ddt|| d fW  Y d }~S d }~ww )Nr   r	   r   )r   r   	tracebackr
   	format_tb__traceback__r   )rV   r   r   Z	trace_msgr   r   r   r   r   r   mapper  s    z1get_avail_mem_per_ray_worker_node.<locals>.mapperrJ   r   Nz:Inferring ray worker node available memory failed, error: a  . You can bypass this error by setting following spark configs: spark.executorEnv.RAY_ON_SPARK_WORKER_CPU_CORES, spark.executorEnv.RAY_ON_SPARK_WORKER_GPU_NUM, spark.executorEnv.RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES, spark.executorEnv.RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES.)ZsparkContextr]   r^   r_   r=   rp   rq   )
r   r   r   r   r   r   Z'inferred_ray_worker_node_heap_mem_bytesZ+inferred_ray_worker_node_object_store_byteserrrv   r   r   r   !get_avail_mem_per_ray_worker_node  s    

r   c                    s:   dt jv rdd t jd dD   fdd| D S | S )NZCUDA_VISIBLE_DEVICESc                 S   s   g | ]}t | qS r   )r   r   ).0devr   r   r   
<listcomp>  s    z9get_spark_task_assigned_physical_gpus.<locals>.<listcomp>,c                    s   g | ]} | qS r   r   )r   addrZvisible_cuda_dev_listr   r   r     s    )r   r   r   )Zgpu_addr_listr   r   r   %get_spark_task_assigned_physical_gpus  s   
r   )r   rO   r!   N)(r7   loggingr   rP   rh   r3   r%   r9   rK   	getLoggerrp   r   r   r   r?   rI   rN   rW   rY   rZ   r`   rf   rk   r   rm   rw   r|   ro   r   r   rc   ri   rl   rn   r   r   r   r   r   r   r   r   r   r   <module>   sT    
	:

&E.;