-
Notifications
You must be signed in to change notification settings - Fork 415
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: Automatically convert Pandas types to valid Delta Lake types in write_deltalake() #686
Comments
# Description As described in #686 some pandas datatypes are not converted to a format that is compatible with delta lake. This handles the instance of timestamps, which are stored with `ns` resolution in Pandas. Here, if is a schema is not provided, we specify converting the timestamps to `us` resolution. We also update `python/tests/test_writer.py::test_write_pandas` to reflect this change. # Related Issue(s) #685 Co-authored-by: Will Jones <willjones127@gmail.com>
Would appreciate if this can be prioritized. Right now this is forcing us to use spark over delta-rs. |
This also happens when you write delta from Polars with columns with nano precision datetime. However it's slightly more easy to circumvent you just have to do the casting first to micro precision. |
@blaze225 You can also switch to polars, which casts the dtypes correctly to a delta compatible schema: https://github.com/pola-rs/polars/pull/10165/files#diff-843e4fa7334b1cfcdf4ebe039377c0d724d0abb51bcde68c9aaae1b93868e20b |
I made this as a stopgap solution. Its a dumb solution but it helped me actually get it to write and test out the library. import deltalake as dl
from deltalake import DeltaTable
from typing import Union
def strip_categorical(df:pd.DataFrame):
"""convert categorical columns back into integer types,
and return a dataframe of the categories
Example:
```python
(original_df, categories) = strip_categorical(df)
```"""
categories = {}
df=df.copy()
for col in df.columns:
if pd.api.types.is_categorical_dtype(df[col]):
print(f"Converting categorical column to integer: '{col}' - {dict(enumerate(df[col].cat.categories))}")
categories[col] = df[col].cat.categories
df[col] = df[col].cat.codes
return df, pd.DataFrame(categories)
def strip_duration_to_int(df:pd.DataFrame, to_int_unit:Union[str,dict[str,str]]="ms"):
"""convert Timedelta columns to integer types with the given unit
to_int_unit should be a string or a dictionary of column names to units
Example:
```python
df, time_delta_cols = strip_duration_to_int(df, to_int_unit="ms")
```"""
df=df.copy()
time_delta_cols = {}
for col in df.columns:
if pd.api.types.is_timedelta64_dtype(df[col].dtype):
col_to_int_unit = to_int_unit
if isinstance(to_int_unit, dict):
col_to_int_unit = to_int_unit[col]
print(f"Converting Timedelta column to integer using units '{col_to_int_unit}': '{col}'")
time_delta_cols[col] = col_to_int_unit
df[col] = df[col] // pd.Timedelta(1, unit=col_to_int_unit)
return df, time_delta_cols
def write_delta(path, data, timedelta_to_int_unit:Union[str,dict[str,str]]="ms", **kwargs):
data, categories = strip_categorical(data)
data, time_delta_cols = strip_duration_to_int(data, timedelta_to_int_unit)
dl.write_deltalake(path, data, **kwargs)
if len(categories) > 0:
dl.write_deltalake(path+"_categories", categories,**kwargs)
if len(time_delta_cols) > 0:
dl.write_deltalake(path+"_time_delta_cols", time_delta_cols,**kwargs) |
Are there any plans to implement this? |
You can use polars.io.delta import _convert_pa_schema_to_delta |
What about the |
I don't see any categorical primitive types in here: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types |
And that's the issue. Delta returns deltalake.PyDeltaTableError: Schema error: Invalid data type for Delta Lake: Dictionary(Int8, Utf8) for Categorical fields. Here you have a method that raises an exception on Categorical fields in polars: https://github.com/pola-rs/polars/blob/main/py-polars/polars/io/delta.py#L323-L329 |
I see, we could possibly port these things from Polars into delta-rs, I'll check with the polars contributors. Not super familiar with licenses and all |
…iter/merge (#1820) # Description This ports some functionality that @stinodego and I had worked on in Polars. Where we converted a pyarrow schema to a compatible delta schema. It converts the following: - uint -> int - timestamp(any timeunit) -> timestamp(us) I adjusted the functionality to do schema conversion from large to normal when necessary, which is still needed in MERGE as workaround #1753. Additional things I've added: - Schema conversion for every input in write_deltalake/merge - Add Pandas dataframe conversion - Add Pandas dataframe as input in merge # Related Issue(s) - closes #686 - closes #1467 --------- Co-authored-by: Will Jones <willjones127@gmail.com>
…iter/merge (delta-io#1820) This ports some functionality that @stinodego and I had worked on in Polars. Where we converted a pyarrow schema to a compatible delta schema. It converts the following: - uint -> int - timestamp(any timeunit) -> timestamp(us) I adjusted the functionality to do schema conversion from large to normal when necessary, which is still needed in MERGE as workaround delta-io#1753. Additional things I've added: - Schema conversion for every input in write_deltalake/merge - Add Pandas dataframe conversion - Add Pandas dataframe as input in merge - closes delta-io#686 - closes delta-io#1467 --------- Co-authored-by: Will Jones <willjones127@gmail.com>
Description
Many Pandas types aren't automatically converted into valid Delta Lake types when converted into Arrow tables. For example, Pandas Timestamps are converted into timestamps with nanosecond precision by default, but Delta Lake only supports microsecond precision. This makes
write_deltalake()
difficult to use for Pandas users.We should write a test that validates all Pandas types can be written with
write_deltalake()
without manual conversion.I'm not sure yet how to configure the conversion here:
delta-rs/python/deltalake/writer.py
Lines 128 to 129 in 431d0ea
It's possible that we can pass in an adjusted schema to the schema parameter of
pyarrow.Table.from_pandas()
and that will make the correct conversion.Use Case
Related Issue(s)
Based on #685
The text was updated successfully, but these errors were encountered: