Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Add support for Iceberg #10375

Merged
merged 40 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5107b6e
feat(python): Add support for Iceberg
Fokko Aug 8, 2023
aa96547
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko Sep 6, 2023
a4cb7a7
Fix ruff
Fokko Sep 6, 2023
66681e8
Make rust fmt happy
Fokko Sep 6, 2023
40480bd
Make sphinx happy
Fokko Sep 6, 2023
2b04ae2
Fix the import
Fokko Sep 6, 2023
5160138
Make mypy happy
Fokko Sep 6, 2023
fb39e48
Add missing datafiles
Fokko Sep 7, 2023
b56cbc0
Move to Path.parent
Fokko Sep 7, 2023
1f7629f
Make ruff happy
Fokko Sep 7, 2023
2ce9ef9
Move to Path
Fokko Sep 7, 2023
14c0a7c
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko Sep 7, 2023
a614905
Update pyproject.toml
Fokko Sep 11, 2023
24fc944
Convert to TypeError
Fokko Sep 11, 2023
eeab40c
WIP
Fokko Sep 11, 2023
0790b98
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko Sep 12, 2023
8fec7f4
Merge branch 'main' into fd-add-pyiceberg-support
Fokko Sep 12, 2023
8d96135
Feedback
Fokko Sep 12, 2023
5ae2339
Merge branch 'fd-add-pyiceberg-support' of github.com:Fokko/polars in…
Fokko Sep 12, 2023
0a8ab94
Cleanup
Fokko Sep 12, 2023
dc79abf
Ruff
Fokko Sep 12, 2023
29ed90e
Make black happy
Fokko Sep 12, 2023
753ae41
Fix lazy loading
Fokko Sep 12, 2023
39d6a1e
Add PyIceberg
Fokko Sep 12, 2023
5d77c67
Give it a try
Fokko Sep 12, 2023
48db4d5
Fix markdown
Fokko Sep 12, 2023
d826256
Revert some changes
Fokko Sep 12, 2023
ee603c2
Fix test path
stinodego Sep 14, 2023
5e220d7
Some cleanup
stinodego Sep 14, 2023
90af065
Fix lazy loading
stinodego Sep 14, 2023
0384c84
Merge branch 'main' into fd-add-pyiceberg-support
Fokko Sep 14, 2023
462fbd4
Rc3
Fokko Sep 14, 2023
0dad4ed
Fix the tests
Fokko Sep 14, 2023
3a3d347
Filter warnings for Windows
Fokko Sep 14, 2023
f559447
Make black happy
Fokko Sep 14, 2023
5945b3b
minor typo
alexander-beedie Sep 14, 2023
beb49e5
Merge branch 'main' into fd-add-pyiceberg-support
Fokko Sep 15, 2023
b17119b
Add docstring
Fokko Sep 15, 2023
f6c1969
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko Sep 18, 2023
a302872
Bump to PyIceberg 0.5.0
Fokko Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
"matplotlib"
"backports.zoneinfo"
"connectorx"
"pyiceberg"
"deltalake"
"xlsx2csv"
)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ You can also install the dependencies directly.
| xlsx2csv | Support for reading from Excel files |
| openpyxl | Support for reading from Excel files with native types |
| deltalake | Support for reading from Delta Lake Tables |
| pyiceberg | Support for reading from Apache Iceberg tables |
| timezone | Timezone support, only needed if are on Python<3.9 or you are on Windows |

Releases happen quite often (weekly / every few days) at the moment, so updating polars regularly to get the latest bugfixes / features might not be a bad idea.
Expand Down
7 changes: 7 additions & 0 deletions py-polars/docs/source/reference/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ Spreadsheet
read_ods
DataFrame.write_excel

Apache Iceberg
~~~~~~~~~~~~~~
.. autosummary::
:toctree: api/

scan_iceberg

Delta Lake
~~~~~~~~~~
.. autosummary::
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
read_parquet_schema,
scan_csv,
scan_delta,
scan_iceberg,
scan_ipc,
scan_ndjson,
scan_parquet,
Expand Down Expand Up @@ -269,6 +270,7 @@
"read_parquet_schema",
"scan_csv",
"scan_delta",
"scan_iceberg",
"scan_ipc",
"scan_ndjson",
"scan_parquet",
Expand Down
5 changes: 5 additions & 0 deletions py-polars/polars/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
_PANDAS_AVAILABLE = True
_PYARROW_AVAILABLE = True
_PYDANTIC_AVAILABLE = True
_PYICEBERG_AVAILABLE = True
_ZONEINFO_AVAILABLE = True


Expand Down Expand Up @@ -160,6 +161,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
import pandas
import pyarrow
import pydantic
import pyiceberg

if sys.version_info >= (3, 9):
import zoneinfo
Expand All @@ -184,6 +186,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]:
pandas, _PANDAS_AVAILABLE = _lazy_import("pandas")
pyarrow, _PYARROW_AVAILABLE = _lazy_import("pyarrow")
pydantic, _PYDANTIC_AVAILABLE = _lazy_import("pydantic")
pyiceberg, _PYICEBERG_AVAILABLE = _lazy_import("pyiceberg")
zoneinfo, _ZONEINFO_AVAILABLE = (
_lazy_import("zoneinfo")
if sys.version_info >= (3, 9)
Expand Down Expand Up @@ -231,6 +234,7 @@ def _check_for_pydantic(obj: Any) -> bool:
"numpy",
"pandas",
"pydantic",
"pyiceberg",
"pyarrow",
"zoneinfo",
# lazy utilities
Expand All @@ -241,6 +245,7 @@ def _check_for_pydantic(obj: Any) -> bool:
"_LazyModule",
# exported flags/guards
"_DELTALAKE_AVAILABLE",
"_PYICEBERG_AVAILABLE",
"_FSSPEC_AVAILABLE",
"_HYPOTHESIS_AVAILABLE",
"_NUMPY_AVAILABLE",
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from polars.io.csv import read_csv, read_csv_batched, scan_csv
from polars.io.database import read_database, read_database_uri
from polars.io.delta import read_delta, scan_delta
from polars.io.iceberg import scan_iceberg
from polars.io.ipc import read_ipc, read_ipc_schema, read_ipc_stream, scan_ipc
from polars.io.json import read_json
from polars.io.ndjson import read_ndjson, scan_ndjson
Expand All @@ -29,6 +30,7 @@
"read_parquet_schema",
"scan_csv",
"scan_delta",
"scan_iceberg",
"scan_ipc",
"scan_ndjson",
"scan_parquet",
Expand Down
281 changes: 281 additions & 0 deletions py-polars/polars/io/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
from __future__ import annotations

import ast
from _ast import GtE, Lt, LtE
from ast import (
Attribute,
BinOp,
BitAnd,
BitOr,
Call,
Compare,
Constant,
Eq,
Gt,
Invert,
List,
UnaryOp,
)
from functools import partial, singledispatch
from typing import TYPE_CHECKING, Any

import polars._reexport as pl
from polars.dependencies import pyiceberg

if TYPE_CHECKING:
from pyiceberg.table import Table

from polars import DataFrame, LazyFrame, Series


def scan_iceberg(
source: str | Table,
*,
storage_options: dict[str, Any] | None = None,
) -> LazyFrame:
"""
Lazily read from an Apache Iceberg table.

Parameters
----------
source
A PyIceberg table, or a direct path to the metadata.

Note: For Local filesystem, absolute and relative paths are supported but
for the supported object storages - GCS, Azure and S3 full URI must be provided.
storage_options
Extra options for the storage backends supported by `pyiceberg`.
For cloud storages, this may include configurations for authentication etc.

More info is available `here <https://py.iceberg.apache.org/configuration/>`__.

Returns
-------
LazyFrame

Examples
--------
Creates a scan for a Iceberg table from local filesystem, or object store.

>>> table_path = "file:/path/to/iceberg-table/metadata.json"
>>> pl.scan_iceberg(table_path).collect() # doctest: +SKIP

Creates a scan for an Iceberg table from S3.
See a list of supported storage options for S3 `here
<https://py.iceberg.apache.org/configuration/#fileio>`__.

>>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json"
>>> storage_options = {
... "s3.region": "eu-central-1",
... "s3.access-key-id": "THE_AWS_ACCESS_KEY_ID",
... "s3.secret-access-key": "THE_AWS_SECRET_ACCESS_KEY",
... }
>>> pl.scan_iceberg(
... table_path, storage_options=storage_options
... ).collect() # doctest: +SKIP

Creates a scan for a Iceberg table from Azure.
Supported options for Azure are available `here
<https://py.iceberg.apache.org/configuration/#azure-data-lake>`__.

Following type of table paths are supported:
* az://<container>/<path>/metadata.json
* adl://<container>/<path>/metadata.json
* abfs[s]://<container>/<path>/metadata.json

>>> table_path = "az://container/path/to/iceberg-table/metadata.json"
>>> storage_options = {
... "adlfs.account-name": "AZURE_STORAGE_ACCOUNT_NAME",
... "adlfs.account-key": "AZURE_STORAGE_ACCOUNT_KEY",
... }
>>> pl.scan_iceberg(
... table_path, storage_options=storage_options
... ).collect() # doctest: +SKIP

Creates a scan for a Iceberg table from Google Cloud Storage.
Supported options for GCS are available `here
<https://py.iceberg.apache.org/configuration/#google-cloud-storage>`__.

>>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json"
>>> storage_options = {
... "gcs.project-id": "my-gcp-project",
... "gcs.oauth.token": "ya29.dr.AfM...",
... }
>>> pl.scan_iceberg(
... table_path, storage_options=storage_options
... ).collect() # doctest: +SKIP

Creates a scan for a Iceberg table with additional options.
In the below example, `without_files` option is used which loads the table without
file tracking information.

>>> table_path = "/path/to/iceberg-table/metadata.json"
>>> storage_options = {"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"}
>>> pl.scan_iceberg(
... table_path, storage_options=storage_options
... ).collect() # doctest: +SKIP

"""
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.table import StaticTable

if isinstance(source, str):
source = StaticTable.from_metadata(
metadata_location=source, properties=storage_options or {}
)

func = partial(_scan_pyarrow_dataset_impl, source)
arrow_schema = schema_to_pyarrow(source.schema())
return pl.LazyFrame._scan_python_function(arrow_schema, func, pyarrow=True)


def _scan_pyarrow_dataset_impl(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a second _scan_pyarrow_dataset_impl? Can we not reuse the existing implementation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Could you come back to me on this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. Here we do something different than in the original one. In the original, the Python eval function is used to convert the string containing Python to an actual Python class, and that's being passed into the delta library:

    if predicate:
        # imports are used by inline python evaluated by `eval`
        from polars.datatypes import Date, Datetime, Duration  # noqa: F401
        from polars.utils.convert import (
            _to_python_datetime,  # noqa: F401
            _to_python_time,  # noqa: F401
            _to_python_timedelta,  # noqa: F401
        )
    
        _filter = eval(predicate)

What we do here is that we take the string, we convert it into an abstract syntax tree, and that's being traversed to convert it into a PyIceberg expression. The reason why I did this is that the PyArrow expression doesn't have any Python methods to traverse the expression (the same goes for the Polars expression, otherwise I could just traverse that one as well). I've added this to the docstring as well 👍🏻

tbl: Table,
with_columns: list[str] | None = None,
predicate: str = "",
n_rows: int | None = None,
**kwargs: Any,
) -> DataFrame | Series:
"""
Take the projected columns and materialize an arrow table.

Parameters
----------
tbl
pyarrow dataset
with_columns
Columns that are projected
predicate
pyarrow expression that can be evaluated with eval
n_rows:
Materialize only n rows from the arrow dataset.
batch_size
The maximum row count for scanned pyarrow record batches.
kwargs:
For backward compatibility

Returns
-------
DataFrame

"""
from polars import from_arrow

scan = tbl.scan(limit=n_rows)

if with_columns is not None:
scan = scan.select(*with_columns)

if predicate is not None:
try:
expr_ast = _to_ast(predicate)
pyiceberg_expr = _convert_predicate(expr_ast)
except ValueError as e:
raise ValueError(
f"Could not convert predicate to PyIceberg: {predicate}"
) from e

scan = scan.filter(pyiceberg_expr)

return from_arrow(scan.to_arrow())


def _to_ast(expr: str) -> ast.expr:
"""
Converts a Python string to an AST.

This will take the Python Arrow expression (as a string), and it will
be converted into a Python AST that can be traversed to convert it to a PyIceberg
expression.

Parameters
----------
expr
The string expression

Returns
-------
The AST representing the Arrow expression
"""
return ast.parse(expr, mode="eval").body


@singledispatch
def _convert_predicate(a: Any) -> Any:
"""Walks the AST to convert the PyArrow expression to a PyIceberg expression."""
raise ValueError(f"Unexpected symbol: {a}")


@_convert_predicate.register(Constant)
def _(a: Constant) -> Any:
return a.value


@_convert_predicate.register(UnaryOp)
def _(a: UnaryOp) -> Any:
if isinstance(a.op, Invert):
return pyiceberg.expressions.Not(_convert_predicate(a.operand))
else:
raise TypeError(f"Unexpected UnaryOp: {a}")


@_convert_predicate.register(Call)
def _(a: Call) -> Any:
args = [_convert_predicate(arg) for arg in a.args]
f = _convert_predicate(a.func)
if f == "field":
return args
else:
ref = _convert_predicate(a.func.value)[0] # type: ignore[attr-defined]
if f == "isin":
return pyiceberg.expressions.In(ref, args[0])
elif f == "is_null":
return pyiceberg.expressions.IsNull(ref)
elif f == "is_nan":
return pyiceberg.expressions.IsNaN(ref)

raise ValueError(f"Unknown call: {f}")


@_convert_predicate.register(Attribute)
def _(a: Attribute) -> Any:
return a.attr


@_convert_predicate.register(BinOp)
def _(a: BinOp) -> Any:
lhs = _convert_predicate(a.left)
rhs = _convert_predicate(a.right)

op = a.op
if isinstance(op, BitAnd):
return pyiceberg.expressions.And(lhs, rhs)
if isinstance(op, BitOr):
return pyiceberg.expressions.Or(lhs, rhs)
else:
raise TypeError(f"Unknown: {lhs} {op} {rhs}")


@_convert_predicate.register(Compare)
def _(a: Compare) -> Any:
op = a.ops[0]
lhs = _convert_predicate(a.left)[0]
rhs = _convert_predicate(a.comparators[0])

if isinstance(op, Gt):
return pyiceberg.expressions.GreaterThan(lhs, rhs)
if isinstance(op, GtE):
return pyiceberg.expressions.GreaterThanOrEqual(lhs, rhs)
if isinstance(op, Eq):
return pyiceberg.expressions.EqualTo(lhs, rhs)
if isinstance(op, Lt):
return pyiceberg.expressions.LessThan(lhs, rhs)
if isinstance(op, LtE):
return pyiceberg.expressions.LessThanOrEqual(lhs, rhs)
else:
raise TypeError(f"Unknown comparison: {op}")


@_convert_predicate.register(List)
def _(a: List) -> Any:
return [_convert_predicate(e) for e in a.elts]
1 change: 1 addition & 0 deletions py-polars/polars/utils/show_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def _get_dependency_info() -> dict[str, str]:
"pandas",
"pyarrow",
"pydantic",
"pyiceberg",
"sqlalchemy",
"xlsx2csv",
"xlsxwriter",
Expand Down
Loading