o
    iW                     @  s`  U d dl mZ d dlZd dlZd dlZd dlZd dlmZmZm	Z	 d dlm
Z
mZmZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZmZ d dlm Z! d dl"m#Z#m$Z$ d d	l%m&Z& d d
l'm(Z( d dl)m*Z* erd dl+m,Z,m-Z- d dl.m/Z/m.Z. d dl0Z0d dl1Z0d dl2m3Z3 d dl4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z: nd dl;m0Z0 e#e$dZ<de=d< dZ>de=d< dgddZ?				dhdid)d*Z@djd,d-ZAdkd0d1ZBedld3d4ZCeCDedmd6d7ZEeCDednd9d7ZEeCDedod;d7ZEeCDedpd=d7ZEeCDe
dqd?d7ZEeCDedrdAd7ZEeCDedsdCd7ZEeCDedtdEd7ZEG dFdG dGZFG dHdI dIZGeG dJdK dKZHedudMdNZIG dOdP dPejJZKG dQdR dReKZLG dSdT dTeKZMG dUdV dVeKZNG dWdX dXeKZOG dYdZ dZeKZPG d[d\ d\eKZQG d]d^ d^eKZRG d_d` d`eLZSG dadb dbeKZTG dcdd ddeKZUG dedf dfeKZVdS )v    )annotationsN)GtELtLtE)	AttributeBinOpBitAndBitOrCallCompareConstantEqGtInvertListNameUnaryOp)	dataclass)cachesingledispatch)TYPE_CHECKINGAny)
to_py_dateto_py_datetime)eprint)wrap_s)ComputeError)CallableSequence)datedatetime)DataFile)Table)IcebergType)	DataFrameSeries)	pyicebergz)dict[str, Callable[..., datetime | date]]_temporal_conversionsi  intICEBERG_TIME_TO_NSpathstrreturnc                 C  s2   t jdkr| dr| dsd| d S | S )Nwin32zfile://zfile:///)sysplatform
startswithremoveprefix)r*    r2   O/home/app/Keep/.python/lib/python3.10/site-packages/polars/io/iceberg/_utils.py#_normalize_windows_iceberg_file_uri8   s   
r4   tblr"   with_columnslist[str] | Noneiceberg_table_filter
Any | Nonen_rows
int | Nonesnapshot_idkwargsr   DataFrame | Seriesc                 K  sJ   ddl m} | j||d}|dur|j| }|dur||}|| S )a  
    Take the projected columns and materialize an arrow table.

    Parameters
    ----------
    tbl
        pyarrow dataset
    with_columns
        Columns that are projected
    iceberg_table_filter
        PyIceberg filter expression
    n_rows:
        Materialize only n rows from the arrow dataset.
    snapshot_id:
        The snapshot ID to scan from.
    batch_size
        The maximum row count for scanned pyarrow record batches.
    kwargs:
        For backward compatibility

    Returns
    -------
    DataFrame
    r   )
from_arrow)limitr<   N)polarsr?   scanselectfilterZto_arrow)r5   r6   r8   r:   r<   r=   r?   rB   r2   r2   r3   _scan_pyarrow_dataset_implC   s    

rE   pyarrow_predicatec                 C  s>   t t t| }t|W  d    S 1 sw   Y  d S N)
contextlibsuppress	Exception_to_ast_convert_predicate)rF   Zexpr_astr2   r2   r3   try_convert_pyarrow_predicatep   s
    rM   exprast.exprc                 C  s   t j| ddjS )a<  
    Converts a Python string to an AST.

    This will take the Python Arrow expression (as a string), and it will
    be converted into a Python AST that can be traversed to convert it to a PyIceberg
    expression.

    The reason to convert it to an AST is because the PyArrow expression
    itself doesn't have any methods/properties to traverse the expression.
    We need this to convert it into a PyIceberg expression.

    Parameters
    ----------
    expr
        The string expression

    Returns
    -------
    The AST representing the Arrow expression
    eval)mode)astparsebody)rN   r2   r2   r3   rK   x   s   rK   ac                 C  s   d|  }t |)zJWalks the AST to convert the PyArrow expression to a PyIceberg expression.zUnexpected symbol: )
ValueErrorrU   msgr2   r2   r3   rL      s   
rL   r   c                 C     | j S rG   )valuerU   r2   r2   r3   _      r\   r   c                 C  rY   rG   )idr[   r2   r2   r3   r\      r]   r   c                 C  s0   t | jtrtjt| jS d|  }t|)NzUnexpected UnaryOp: )	
isinstanceopr   r&   expressionsNotrL   operand	TypeErrorrW   r2   r2   r3   r\      s   
r
   c                 C  s   dd | j D }t| j}|dkr|S |dkr|d S |tv r't| |  S t| jjd }|dkr<tj||d S |dkrFtj	|S |dkrPtj
|S d	|}t|)
Nc                 S     g | ]}t |qS r2   rL   ).0argr2   r2   r3   
<listcomp>       _.<locals>.<listcomp>fieldZscalarr   isinZis_nullis_nanzUnknown call: )argsrL   funcr'   	isoformatrZ   r&   ra   InZIsNullZIsNaNrV   )rU   ro   frefrX   r2   r2   r3   r\      s"   

r   c                 C  rY   rG   )attrr[   r2   r2   r3   r\      r]   r   c                 C  sh   t | j}t | j}| j}t|trtj||S t|t	r%tj
||S d| d| d| }t|)Nz	Unknown:  )rL   leftrightr`   r_   r   r&   ra   Andr	   Orrd   )rU   lhsrhsr`   rX   r2   r2   r3   r\      s   



r   c                 C  s   | j d }t| jd }t| jd }t|trtj||S t|t	r+tj
||S t|tr7tj||S t|trCtj||S t|trOtj||S d| }t|)Nr   zUnknown comparison: )opsrL   rw   comparatorsr_   r   r&   ra   ZGreaterThanr   ZGreaterThanOrEqualr   ZEqualTor   ZLessThanr   ZLessThanOrEqualrd   )rU   r`   r{   r|   rX   r2   r2   r3   r\      s   






r   c                 C  s   dd | j D S )Nc                 S  re   r2   rf   )rg   er2   r2   r3   ri      rj   rk   )eltsr[   r2   r2   r3   r\      s   c                   @  s*   e Zd ZdddZdddZdddZdS ))IdentityTransformedPartitionValuesBuildertabler"   projected_schemapyiceberg.schema.Schemar,   Nonec              
   C  s  dd l }ddlm} ddlm} ddlm}m}m}m	}	 |j
}
i | _i | _i | _| }| D ].\}}g }t|jD ]\}}|j|
v rXt|j|rX|||jf g | j|j< q;|| j|< q0| jD ]r}||}|j}t||j| \}}|| j|< |jr| rd|d|| j|< |  D ];}z||j}W n	 t y   Y qw ||kst||	rt||st|||frt|||fsd| d| | j|< qqbd S )	Nr   schema_to_pyarrow)IdentityTransform)
DoubleType	FloatTypeIntegerTypeLongTypez%non-primitive type: projected_type = z output_dtype = zunsupported type change: from: z, to: )!pyiceberg.schemapyiceberg.io.pyarrowr   Zpyiceberg.transformsr   pyiceberg.typesr   r   r   r   Z	field_idspartition_valuespartition_values_dtypes(partition_spec_id_to_identity_transformsspecsitems	enumeratefieldsZ	source_idr_   Z	transformappend
find_field
field_typeplSchemaschemapopitemZis_primitive	is_nestedschemasvaluesrV   )selfr   r   r&   r   r   r   r   r   r   Zprojected_idsZpartition_specsZspec_idspecoutZfield_indexrl   field_idZprojected_fieldZprojected_typer\   output_dtyper   Ztype_this_schemar2   r2   r3   __init__   sl   


z2IdentityTransformedPartitionValuesBuilder.__init__current_indexr(   partition_spec_idr   pyiceberg.typedef.Recordc          	      C  s   z| j | }W n ty   t| jd| | _Y d S w |D ]'\}}|| }t| j|  }trF|dd t|t	| D  |
| qd S )Nzpartition spec ID not found: c                 s  s    | ]}d V  qd S rG   r2   )rg   r\   r2   r2   r3   	<genexpr>X  s    zRIdentityTransformedPartitionValuesBuilder.push_partition_values.<locals>.<genexpr>)r   KeyErrordictfromkeysr   r_   listextendrangelenr   )	r   r   r   r   Zidentity_transformsiZsource_field_idZpartition_valuer   r2   r2   r3   push_partition_values@  s$    
z?IdentityTransformedPartitionValuesBuilder.push_partition_valuesdict[int, pl.Series | str]c                 C  s   ddl m}m}m}m}m}m} i }| j D ]a\}}	t	|	t
r%|	||< qz:| j| }
t	|
|||fr5|nt	|
|r<|n|
}tj|	|d}|j rLJ t	|
|rU|t }||
}|||< W q tyx } zd| ||< W Y d }~qd }~ww |S )Nr   )DateDatetimeDurationInt32Int64Timedtypez!failed to load partition values: )Zpolars.datatypesr   r   r   r   r   r   r   r   r_   r+   r   r   r%   r   r   r)   castrJ   )r   r   r   r   r   r   r   r   r   vr   Zconstructor_dtypesr   r2   r2   r3   finish[  s0    




z0IdentityTransformedPartitionValuesBuilder.finishN)r   r"   r   r   r,   r   )r   r(   r   r(   r   r   r,   r   )r,   r   )__name__
__module____qualname__r   r   r   r2   r2   r2   r3   r      s    

Or   c                   @  s*   e Zd ZdddZdddZdddZdS )IcebergStatisticsLoaderr   r"   projected_filter_schemar   r,   r   c                 C  sJ  dd l }ddlm} dd l}dd l}|jj }i | _g | _	g | _
|| _|jD ]{}t }	|  D ]!}
tt |	|
|jj W d    n1 sNw   Y  q2|||j| \}}t|j|	|}|r|d urvt|jnd}td|jd|jd|jd|	d|d	| t |j|j||g g g d
| j|j< q'd S )Nr   r   r   z&IcebergStatisticsLoader: field.name = z, field.field_id = z, field.field_type = z, field_all_types = z, field_polars_dtype = z, _load_from_bytes_impl = )r   column_namecolumn_dtypeload_from_bytes_impl
min_values
max_values
null_count)!r   r   r   rA   polars._utils.logging_utilsloggingverbosefile_column_statisticsZload_as_empty_statisticsfile_lengthsr   r   setr   r   rH   rI   rV   addr   r   r   r   r   r   LoadFromBytesImplinit_for_field_typetyper   r   nameIcebergColumnStatisticsLoader)r   r   r   r&   r   r   rA   r   rl   Zfield_all_typesr   r\   field_polars_dtyper   Z_load_from_bytes_implr2   r2   r3   r     sp   


z IcebergStatisticsLoader.__init__filer!   c                 C  s,   | j |j | j D ]}|| qd S rG   )r   r   Zrecord_countr   r   push_file_statistics)r   r   statsr2   r2   r3   r     s   z,IcebergStatisticsLoader.push_file_statisticsexpected_heightr(   identity_transformed_valuesr   pl.DataFramec           
      C  s   dd l }|jd| j|jd g}| j D ]&\}}|| }d ur1t|t	r1d| }t
||||}	||	 q|j|ddS )Nr   r   r   z+statistics load failure for filter column: 
horizontal)how)rA   r%   r   UInt32to_framer   r   getr_   r+   r   r   r   concat)
r   r   r   r   r   r   Zstat_builderprX   Zcolumn_stats_dfr2   r2   r3   r     s   

zIcebergStatisticsLoader.finishN)r   r"   r   r   r,   r   r   r!   r,   r   )r   r(   r   r   r,   r   )r   r   r   r   r   r   r2   r2   r2   r3   r     s    

>r   c                   @  sZ   e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< dddZdddZdS )r   r+   r   pl.DataTyper   r(   r   LoadFromBytesImpl | Noner   zlist[int | None]r   list[bytes | None]r   r   r   r!   r,   r   c                 C  sT   | j |j| j | jd ur(| j|j| j | j|j	| j d S d S rG   )
r   r   Znull_value_countsr   r   r   r   Zlower_boundsr   Zupper_bounds)r   r   r2   r2   r3   r     s
   
z2IcebergColumnStatisticsLoader.push_file_statisticsr   r   pl.Series | Noner   c           	      C  s<  dd l }| j}t| j|ksJ |j| d| j|jd }| jd u rE|d ur*|n|jd || j	d}|
|| d|| dS t| j|ksNJ t| j|ksWJ | j	 r^t| j| j}| j| j}|d ur|j| j	ksxJ |d ||  }||}||}|
|| d|| dS )Nr   Z_ncr   _min_max)rA   r   r   r   r%   r   r   r   repeatr   r6   aliasr   r   r   NotImplementedErrorload_from_bytesr   Zextend_constant	fill_null)	r   r   r   r   cr   r   r   r   r2   r2   r3   r     s4   
$


z$IcebergColumnStatisticsLoader.finishNr   )r   r(   r   r   r,   r   )r   r   r   __annotations__r   r   r2   r2   r2   r3   r     s   
 
r   hdict[type[IcebergType], tuple[type[LoadFromBytesImpl], type[IcebergType] | Sequence[type[IcebergType]]]]c                  C  s   ddl m} m}m}m}m}m}m}m}m	}m
}	m}
 |t|f|t|f|
t|
f|t|f|	t|	f|t|f|t||ff|t|f| t| f|t|f|t|fiS )Nr   
BinaryTypeBooleanTypeDateTypeDecimalType	FixedTyper   r   
StringTypeTimestampTypeTimestamptzTypeTimeType)r   r   r   r   r   r   r   r   r   r   r   r   LoadBooleanFromBytesLoadDateFromBytesLoadTimeFromBytesLoadTimestampFromBytesLoadTimestamptzFromBytesLoadInt32FromBytesLoadInt64FromBytesLoadStringFromBytesLoadBinaryFromBytesLoadDecimalFromBytesLoadFixedFromBytesr   r2   r2   r3   _bytes_loader_lookup  s   4r  c                   @  s4   e Zd ZdddZedddZejdddZdS )r   polars_dtyper   r,   r   c                 C  s
   || _ d S rG   )r  )r   r  r2   r2   r3   r   A  s   
zLoadFromBytesImpl.__init__current_field_typer#   all_field_typesset[IcebergType]r   r   c                   sF   t  t|  }d u rd S |\} t fdd|D r!||S d S )Nc                 3  s    | ]}t | V  qd S rG   )r_   )rg   xZallowed_field_typesr2   r3   r   R  s    z8LoadFromBytesImpl.init_for_field_type.<locals>.<genexpr>)r  r   r   all)r  r  r   r   Zloader_implr2   r  r3   r   D  s   z%LoadFromBytesImpl.init_for_field_typebyte_valuesr   	pl.Seriesc                 C  s   dS )z(`bytes_values` should be of binary type.Nr2   )r   r  r2   r2   r3   r   V  s    z!LoadFromBytesImpl.load_from_bytesN)r  r   r,   r   )r  r#   r  r  r   r   r,   r   r  r   r,   r  )	r   r   r   r   staticmethodr   abcabstractmethodr   r2   r2   r2   r3   r   @  s    
r   c                   @     e Zd ZdddZdS )	r  r  r   r,   r  c                 C  s   dd l }|j||jdS Nr   r   )rA   r%   Binaryr   r  r   r2   r2   r3   r   \  s   z#LoadBinaryFromBytes.load_from_bytesNr  r   r   r   r   r2   r2   r2   r3   r  [      r  c                   @  r  )	r  r  r   r,   r  c                 C  .   dd l }|j||jdjj|jdd|jS Nr   r   littler   Z
endianness)rA   r%   r  binreinterpretr   r   r   r  r2   r2   r3   r   c     
z!LoadDateFromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r  b  r  r  c                   @  r  )	r  r  r   r,   r  c                 C  s2   dd l }|j||jdjj|jddt |jS r   )	rA   r%   r  r#  r$  r   r)   r   r   r  r2   r2   r3   r   n  s   z!LoadTimeFromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r  m  r  r  c                   @  r  )	r  r  r   r,   r  c                 C  s2   dd l }|j||jdjj|jdd|dS )Nr   r   r!  r"  usrA   r%   r  r#  r$  r   r   r   r  r2   r2   r3   r   z  s   
z&LoadTimestampFromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r  y  r  r  c                   @  r  )	r  r  r   r,   r  c                 C  s6   dd l }|j||jdjj|jdd|jdddS )Nr   r   r!  r"  r&  UTC)Z	time_zoner'  r  r2   r2   r3   r     s   
z(LoadTimestamptzFromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r    r  r  c                   @  r  )	r   r  r   r,   r  c                 C  r  r   )rA   r%   r  r#  r$  ZUInt8r   Booleanr  r2   r2   r3   r     r%  z$LoadBooleanFromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r     r  r   c                   @  r  )	r	  r  r   r,   r  c                 C  sP   dd l }ddlm} | j}t||jsJ |jd usJ t|j||j|j	dS )Nr   )PySeries)Z
bytes_list	precisionscale)
rA   Zpolars._plrr*  r  r_   Decimalr+  r   Z(_import_decimal_from_iceberg_binary_reprr,  )r   r  r   r*  r   r2   r2   r3   r     s   z$LoadDecimalFromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r	    r  r	  c                   @  s   e Zd ZdS )r
  N)r   r   r   r2   r2   r2   r3   r
    s    r
  c                   @  r  )	r  r  r   r,   r  c                 C  s&   dd l }|j||jdjj|jddS r   )rA   r%   r  r#  r$  r   r  r2   r2   r3   r     s   z"LoadInt32FromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r    r  r  c                   @  r  )	r  r  r   r,   r  c                 C  sF   dd l }|j||jd}|jj|jdd|jj|jdd|jS r   )	rA   r%   r  r#  r$  r   r   r   r   )r   r  r   r   r2   r2   r3   r     s
   z"LoadInt64FromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r    r  r  c                   @  r  )	r  r  r   r,   r  c                 C  s    dd l }|j||jd|jS r  )rA   r%   r  r   Stringr  r2   r2   r3   r     s   z#LoadStringFromBytes.load_from_bytesNr  r  r2   r2   r2   r3   r    r  r  )r*   r+   r,   r+   )NNNN)r5   r"   r6   r7   r8   r9   r:   r;   r<   r;   r=   r   r,   r>   )rF   r+   r,   r9   )rN   r+   r,   rO   )rU   r   r,   r   )rU   r   r,   r   )rU   r   r,   r   )rU   r   r,   r   )rU   r
   r,   r   )rU   r   r,   r   )rU   r   r,   r   )rU   r   r,   r   )rU   r   r,   r   )r,   r   )W
__future__r   r  rR   rH   r.   _astr   r   r   r   r   r   r	   r
   r   r   r   r   r   r   r   r   dataclassesr   	functoolsr   r   typingr   r   Zpolars._reexportZ	_reexportr   Zpolars._utils.convertr   r   r   r   Zpolars._utils.wrapr   Zpolars.exceptionsr   collections.abcr   r   r    r   r&   r   Zpyiceberg.manifestr!   Zpyiceberg.tabler"   r   r#   rA   r$   r%   Zpolars._dependenciesr'   r   r)   r4   rE   rM   rK   rL   registerr\   r   r   r   r  ABCr   r  r  r  r  r  r   r	  r
  r  r  r  r2   r2   r2   r3   <module>   s    <

-
 \>"	