Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): Create delta compatible schema during writing #10165

Merged
merged 27 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3f9f29f
Added method to reconstruct schema with delta compatible timestamps
ion-elgreco Jul 29, 2023
ae279b8
Added uint to int logic
ion-elgreco Jul 29, 2023
85801e6
Disable TZ for now, will have to check with delta-rs team why TZ gets…
ion-elgreco Jul 29, 2023
e6dc26e
Fix most formatting
ion-elgreco Jul 29, 2023
4451057
Use inbuilt pyarrow method to check dtype and use mapping of ints
ion-elgreco Aug 3, 2023
c8c220f
Fix bug some dtypes didn't return the reconstructed field, but only t…
ion-elgreco Aug 4, 2023
a8a9315
Formatting
ion-elgreco Aug 4, 2023
29ea7df
Added one test case with multiple uints and datetimes in unnested and…
ion-elgreco Aug 4, 2023
c6cd7e3
Fix to pass all tests
ion-elgreco Aug 4, 2023
ed47cf2
formatting
ion-elgreco Aug 4, 2023
406db01
Use lazy import
ion-elgreco Aug 4, 2023
67b0afd
Formatting fix
ion-elgreco Aug 4, 2023
50133f3
add doc string, with explanation of the casting, and link to the delt…
ion-elgreco Aug 5, 2023
5a4e2d8
Include largetypes to types schema fixing and cast data to delta comp…
ion-elgreco Aug 5, 2023
5de13e2
Formatting
ion-elgreco Aug 5, 2023
8cd5f64
Added parametrized tests
ion-elgreco Aug 5, 2023
aa1498a
Cast to UTC to prevent delta-rs write issue
ion-elgreco Aug 5, 2023
9a74b55
Remove TZ info from timestamp types
ion-elgreco Aug 5, 2023
56b6c12
Reduce specific ifelse for timestamp
ion-elgreco Aug 5, 2023
e79ffb1
Improve order of ifelse statements for readability
ion-elgreco Aug 5, 2023
e1f51f7
Move list into function directly
ion-elgreco Aug 5, 2023
e2f0173
Add helper function to created nested data type
ion-elgreco Aug 5, 2023
cb2957d
Added if statement to check if custom schema has been passed
ion-elgreco Aug 6, 2023
913a549
Formatting
ion-elgreco Aug 6, 2023
5ddbc98
Refactor
stinodego Aug 10, 2023
0ad4bcb
Refactor 2
stinodego Aug 10, 2023
a0f24a4
Formatting
stinodego Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 57 additions & 51 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,9 @@
N_INFER_DEFAULT,
NUMERIC_DTYPES,
Boolean,
Categorical,
DataTypeClass,
Float64,
List,
Null,
Object,
Struct,
Time,
Utf8,
py_type_to_dtype,
)
Expand Down Expand Up @@ -3314,9 +3309,6 @@ def write_delta(
"""
Write DataFrame as delta table.

Note: Some polars data types like `Null`, `Categorical` and `Time` are
not supported by the delta protocol specification.

Parameters
----------
target
Expand All @@ -3341,9 +3333,35 @@ def write_delta(
Additional keyword arguments while writing a Delta lake Table.
See a list of supported write options `here <https://github.com/delta-io/delta-rs/blob/395d48b47ea638b70415899dc035cc895b220e55/python/deltalake/writer.py#L65>`__.

Raises
------
TypeError
If the DataFrame contains unsupported data types.
ArrowInvalidError
If the DataFrame contains data types that could not be cast to their
primitive type.

Notes
-----
The Polars data types :class:`Null`, :class:`Categorical` and :class:`Time`
are not supported by the delta protocol specification and will raise a
TypeError.

Some other data types are not supported but have an associated `primitive type
<https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types>`__
to which they can be cast. This affects the following data types:

- Unsigned integers
- :class:`Datetime` types with millisecond or nanosecond precision
- :class:`Utf8`, :class:`Binary`, and :class:`List` ('large' types)

Polars columns are always nullable. To write data to a delta table with
non-nullable columns, a custom pyarrow schema has to be passed to the
`delta_write_options`. See the last example below.

Examples
--------
Instantiate a basic dataframe:
Write a dataframe to the local filesystem as a Delta Lake table.

>>> df = pl.DataFrame(
... {
Expand All @@ -3352,28 +3370,25 @@ def write_delta(
... "ham": ["a", "b", "c", "d", "e"],
... }
... )

Write DataFrame as a Delta Lake table on local filesystem.

>>> table_path = "/path/to/delta-table/"
>>> df.write_delta(table_path) # doctest: +SKIP

Append data to an existing Delta Lake table on local filesystem.
Note: This will fail if schema of the new data does not match the
schema of existing table.
Append data to an existing Delta Lake table on the local filesystem.
Note that this will fail if the schema of the new data does not match the
schema of the existing table.

>>> df.write_delta(table_path, mode="append") # doctest: +SKIP

Overwrite a Delta Lake table as a new version.
Note: If the schema of the new and old data is same,
then setting `overwrite_schema` is not required.
If the schemas of the new and old data are the same, setting
`overwrite_schema` is not required.

>>> existing_table_path = "/path/to/delta-table/"
>>> df.write_delta(
... existing_table_path, mode="overwrite", overwrite_schema=True
... ) # doctest: +SKIP

Write DataFrame as a Delta Lake table on cloud object store like S3.
Write a dataframe as a Delta Lake table to a cloud object store like S3.

>>> table_path = "s3://bucket/prefix/to/delta-table/"
>>> df.write_delta(
Expand All @@ -3385,59 +3400,50 @@ def write_delta(
... },
... ) # doctest: +SKIP

Write DataFrame as a Delta Lake table with non-nullable columns.

>>> import pyarrow as pa
>>> existing_table_path = "/path/to/delta-table/"
>>> df.write_delta(
... existing_table_path,
... delta_write_options={
... "schema": pa.schema([pa.field("foo", pa.int64(), nullable=False)])
... },
... ) # doctest: +SKIP

"""
from polars.io.delta import _check_if_delta_available, _resolve_delta_lake_uri
from polars.io.delta import (
_check_for_unsupported_types,
_check_if_delta_available,
_convert_pa_schema_to_delta,
_resolve_delta_lake_uri,
)

_check_if_delta_available()

from deltalake.writer import (
try_get_deltatable,
write_deltalake,
)
from deltalake.writer import write_deltalake

if delta_write_options is None:
delta_write_options = {}

if isinstance(target, (str, Path)):
target = _resolve_delta_lake_uri(str(target), strict=False)

unsupported_cols = {}
unsupported_types = [Time, Categorical, Null]

def check_unsupported_types(n: str, t: PolarsDataType | None) -> None:
if t is None or t in unsupported_types:
unsupported_cols[n] = t
elif isinstance(t, Struct):
for i in t.fields:
check_unsupported_types(f"{n}.{i.name}", i.dtype)
elif isinstance(t, List):
check_unsupported_types(n, t.inner)

for name, data_type in self.schema.items():
check_unsupported_types(name, data_type)

if len(unsupported_cols) != 0:
raise TypeError(
f"Column(s) in {unsupported_cols} have unsupported data types."
)
_check_for_unsupported_types(self.dtypes)

data = self.to_arrow()
data_schema = data.schema

# Workaround to prevent manual casting of large types
table = try_get_deltatable(target, storage_options) # type: ignore[arg-type]

if table is not None:
table_schema = table.schema()
schema = delta_write_options.get("schema")
if schema is None:
schema = _convert_pa_schema_to_delta(data.schema)

if data_schema == table_schema.to_pyarrow(as_large_types=True):
data_schema = table_schema.to_pyarrow()
data = data.cast(schema)

write_deltalake(
table_or_uri=target,
data=data,
schema=schema,
mode=mode,
schema=data_schema,
overwrite_schema=overwrite_schema,
storage_options=storage_options,
**delta_write_options,
Expand Down
52 changes: 52 additions & 0 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
from urllib.parse import urlparse

from polars.convert import from_arrow
from polars.datatypes import Categorical, Null, Time
from polars.datatypes.convert import unpack_dtypes
from polars.dependencies import _DELTALAKE_AVAILABLE, deltalake
from polars.dependencies import pyarrow as pa
from polars.io.pyarrow_dataset import scan_pyarrow_dataset

if TYPE_CHECKING:
from polars import DataFrame, LazyFrame
from polars.type_aliases import PolarsDataType


def read_delta(
Expand Down Expand Up @@ -314,3 +318,51 @@ def _check_if_delta_available() -> None:
raise ImportError(
"deltalake is not installed. Please run `pip install deltalake>=0.9.0`."
)


def _check_for_unsupported_types(dtypes: list[PolarsDataType]) -> None:
schema_dtypes = unpack_dtypes(*dtypes)
unsupported_types = {Time, Categorical, Null}
overlap = schema_dtypes & unsupported_types

if overlap:
raise TypeError(f"dataframe contains unsupported data types: {overlap}")


def _convert_pa_schema_to_delta(schema: pa.schema) -> pa.schema:
"""Convert a PyArrow schema to a schema compatible with Delta Lake."""
# TODO: Add time zone support
dtype_map = {
pa.uint8(): pa.int8(),
pa.uint16(): pa.int16(),
pa.uint32(): pa.int32(),
pa.uint64(): pa.int64(),
pa.timestamp("ns"): pa.timestamp("us"),
pa.timestamp("ms"): pa.timestamp("us"),
pa.large_string(): pa.string(),
pa.large_binary(): pa.binary(),
}

def dtype_to_delta_dtype(dtype: pa.DataType) -> pa.DataType:
# Handle nested types
if isinstance(dtype, pa.LargeListType):
return list_to_delta_dtype(dtype)
elif isinstance(dtype, pa.StructType):
return struct_to_delta_dtype(dtype)

try:
return dtype_map[dtype]
except KeyError:
return dtype

def list_to_delta_dtype(dtype: pa.LargeListType) -> pa.ListType:
nested_dtype = dtype.value_type
nested_dtype_cast = dtype_to_delta_dtype(nested_dtype)
return pa.list_(nested_dtype_cast)

def struct_to_delta_dtype(dtype: pa.StructType) -> pa.StructType:
fields = [dtype.field(i) for i in range(dtype.num_fields)]
fields_cast = [pa.field(f.name, dtype_to_delta_dtype(f.type)) for f in fields]
return pa.struct(fields_cast)

return pa.schema([pa.field(f.name, dtype_to_delta_dtype(f.type)) for f in schema])
Loading