-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(python): Add support for Iceberg #10375
Merged
Merged
Changes from 36 commits
Commits
Show all changes
40 commits
Select commit
Hold shift + click to select a range
5107b6e
feat(python): Add support for Iceberg
Fokko aa96547
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko a4cb7a7
Fix ruff
Fokko 66681e8
Make rust fmt happy
Fokko 40480bd
Make sphinx happy
Fokko 2b04ae2
Fix the import
Fokko 5160138
Make mypy happy
Fokko fb39e48
Add missing datafiles
Fokko b56cbc0
Move to Path.parent
Fokko 1f7629f
Make ruff happy
Fokko 2ce9ef9
Move to Path
Fokko 14c0a7c
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko a614905
Update pyproject.toml
Fokko 24fc944
Convert to TypeError
Fokko eeab40c
WIP
Fokko 0790b98
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko 8fec7f4
Merge branch 'main' into fd-add-pyiceberg-support
Fokko 8d96135
Feedback
Fokko 5ae2339
Merge branch 'fd-add-pyiceberg-support' of github.com:Fokko/polars in…
Fokko 0a8ab94
Cleanup
Fokko dc79abf
Ruff
Fokko 29ed90e
Make black happy
Fokko 753ae41
Fix lazy loading
Fokko 39d6a1e
Add PyIceberg
Fokko 5d77c67
Give it a try
Fokko 48db4d5
Fix markdown
Fokko d826256
Revert some changes
Fokko ee603c2
Fix test path
stinodego 5e220d7
Some cleanup
stinodego 90af065
Fix lazy loading
stinodego 0384c84
Merge branch 'main' into fd-add-pyiceberg-support
Fokko 462fbd4
Rc3
Fokko 0dad4ed
Fix the tests
Fokko 3a3d347
Filter warnings for Windows
Fokko f559447
Make black happy
Fokko 5945b3b
minor typo
alexander-beedie beb49e5
Merge branch 'main' into fd-add-pyiceberg-support
Fokko b17119b
Add docstring
Fokko f6c1969
Merge branch 'main' of github.com:pola-rs/polars into fd-add-pyiceber…
Fokko a302872
Bump to PyIceberg 0.5.0
Fokko File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,6 +85,7 @@ jobs: | |
"matplotlib" | ||
"backports.zoneinfo" | ||
"connectorx" | ||
"pyiceberg" | ||
"deltalake" | ||
"xlsx2csv" | ||
) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
from __future__ import annotations | ||
|
||
import ast | ||
from _ast import GtE, Lt, LtE | ||
from ast import ( | ||
Attribute, | ||
BinOp, | ||
BitAnd, | ||
BitOr, | ||
Call, | ||
Compare, | ||
Constant, | ||
Eq, | ||
Gt, | ||
Invert, | ||
List, | ||
UnaryOp, | ||
) | ||
from functools import partial, singledispatch | ||
from typing import TYPE_CHECKING, Any | ||
|
||
import polars._reexport as pl | ||
from polars.dependencies import pyiceberg | ||
|
||
if TYPE_CHECKING: | ||
from pyiceberg.table import Table | ||
|
||
from polars import DataFrame, LazyFrame, Series | ||
|
||
|
||
def scan_iceberg( | ||
source: str | Table, | ||
*, | ||
storage_options: dict[str, Any] | None = None, | ||
) -> LazyFrame: | ||
""" | ||
Lazily read from an Apache Iceberg table. | ||
|
||
Parameters | ||
---------- | ||
source | ||
A PyIceberg table, or a direct path to the metadata. | ||
|
||
Note: For Local filesystem, absolute and relative paths are supported but | ||
for the supported object storages - GCS, Azure and S3 full URI must be provided. | ||
storage_options | ||
Extra options for the storage backends supported by `pyiceberg`. | ||
For cloud storages, this may include configurations for authentication etc. | ||
|
||
More info is available `here <https://py.iceberg.apache.org/configuration/>`__. | ||
|
||
Returns | ||
------- | ||
LazyFrame | ||
|
||
Examples | ||
-------- | ||
Creates a scan for an Iceberg table from local filesystem, or object store. | ||
|
||
>>> table_path = "file:/path/to/iceberg-table/metadata.json" | ||
>>> pl.scan_iceberg(table_path).collect() # doctest: +SKIP | ||
|
||
Creates a scan for an Iceberg table from S3. | ||
See a list of supported storage options for S3 `here | ||
<https://py.iceberg.apache.org/configuration/#fileio>`__. | ||
|
||
>>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json" | ||
>>> storage_options = { | ||
... "s3.region": "eu-central-1", | ||
... "s3.access-key-id": "THE_AWS_ACCESS_KEY_ID", | ||
... "s3.secret-access-key": "THE_AWS_SECRET_ACCESS_KEY", | ||
... } | ||
>>> pl.scan_iceberg( | ||
... table_path, storage_options=storage_options | ||
... ).collect() # doctest: +SKIP | ||
|
||
Creates a scan for an Iceberg table from Azure. | ||
Supported options for Azure are available `here | ||
<https://py.iceberg.apache.org/configuration/#azure-data-lake>`__. | ||
|
||
Following type of table paths are supported: | ||
* az://<container>/<path>/metadata.json | ||
* adl://<container>/<path>/metadata.json | ||
* abfs[s]://<container>/<path>/metadata.json | ||
|
||
>>> table_path = "az://container/path/to/iceberg-table/metadata.json" | ||
>>> storage_options = { | ||
... "adlfs.account-name": "AZURE_STORAGE_ACCOUNT_NAME", | ||
... "adlfs.account-key": "AZURE_STORAGE_ACCOUNT_KEY", | ||
... } | ||
>>> pl.scan_iceberg( | ||
... table_path, storage_options=storage_options | ||
... ).collect() # doctest: +SKIP | ||
|
||
Creates a scan for an Iceberg table from Google Cloud Storage. | ||
Supported options for GCS are available `here | ||
<https://py.iceberg.apache.org/configuration/#google-cloud-storage>`__. | ||
|
||
>>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json" | ||
>>> storage_options = { | ||
... "gcs.project-id": "my-gcp-project", | ||
... "gcs.oauth.token": "ya29.dr.AfM...", | ||
... } | ||
>>> pl.scan_iceberg( | ||
... table_path, storage_options=storage_options | ||
... ).collect() # doctest: +SKIP | ||
|
||
Creates a scan for an Iceberg table with additional options. | ||
In the below example, `without_files` option is used which loads the table without | ||
file tracking information. | ||
|
||
>>> table_path = "/path/to/iceberg-table/metadata.json" | ||
>>> storage_options = {"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} | ||
>>> pl.scan_iceberg( | ||
... table_path, storage_options=storage_options | ||
... ).collect() # doctest: +SKIP | ||
|
||
""" | ||
from pyiceberg.io.pyarrow import schema_to_pyarrow | ||
from pyiceberg.table import StaticTable | ||
|
||
if isinstance(source, str): | ||
source = StaticTable.from_metadata( | ||
metadata_location=source, properties=storage_options or {} | ||
) | ||
|
||
func = partial(_scan_pyarrow_dataset_impl, source) | ||
arrow_schema = schema_to_pyarrow(source.schema()) | ||
return pl.LazyFrame._scan_python_function(arrow_schema, func, pyarrow=True) | ||
|
||
|
||
def _scan_pyarrow_dataset_impl( | ||
tbl: Table, | ||
with_columns: list[str] | None = None, | ||
predicate: str = "", | ||
n_rows: int | None = None, | ||
**kwargs: Any, | ||
) -> DataFrame | Series: | ||
""" | ||
Take the projected columns and materialize an arrow table. | ||
|
||
Parameters | ||
---------- | ||
tbl | ||
pyarrow dataset | ||
with_columns | ||
Columns that are projected | ||
predicate | ||
pyarrow expression that can be evaluated with eval | ||
n_rows: | ||
Materialize only n rows from the arrow dataset. | ||
batch_size | ||
The maximum row count for scanned pyarrow record batches. | ||
kwargs: | ||
For backward compatibility | ||
|
||
Returns | ||
------- | ||
DataFrame | ||
|
||
""" | ||
from polars import from_arrow | ||
|
||
scan = tbl.scan(limit=n_rows) | ||
|
||
if with_columns is not None: | ||
scan = scan.select(*with_columns) | ||
|
||
if predicate is not None: | ||
try: | ||
expr_ast = _to_ast(predicate) | ||
pyiceberg_expr = _convert_predicate(expr_ast) | ||
except ValueError as e: | ||
raise ValueError( | ||
f"Could not convert predicate to PyIceberg: {predicate}" | ||
) from e | ||
|
||
scan = scan.filter(pyiceberg_expr) | ||
|
||
return from_arrow(scan.to_arrow()) | ||
|
||
|
||
def _to_ast(expr: str) -> ast.expr: | ||
""" | ||
Converts a Python string to an AST. | ||
|
||
This will take the Python Arrow expression (as a string), and it will | ||
be converted into a Python AST that can be traversed to convert it to a PyIceberg | ||
expression. | ||
|
||
Parameters | ||
---------- | ||
expr | ||
The string expression | ||
|
||
Returns | ||
------- | ||
The AST representing the Arrow expression | ||
""" | ||
return ast.parse(expr, mode="eval").body | ||
|
||
|
||
@singledispatch | ||
def _convert_predicate(a: Any) -> Any: | ||
"""Walks the AST to convert the PyArrow expression to a PyIceberg expression.""" | ||
raise ValueError(f"Unexpected symbol: {a}") | ||
|
||
|
||
@_convert_predicate.register(Constant) | ||
def _(a: Constant) -> Any: | ||
return a.value | ||
|
||
|
||
@_convert_predicate.register(UnaryOp) | ||
def _(a: UnaryOp) -> Any: | ||
if isinstance(a.op, Invert): | ||
return pyiceberg.expressions.Not(_convert_predicate(a.operand)) | ||
else: | ||
raise TypeError(f"Unexpected UnaryOp: {a}") | ||
|
||
|
||
@_convert_predicate.register(Call) | ||
def _(a: Call) -> Any: | ||
args = [_convert_predicate(arg) for arg in a.args] | ||
f = _convert_predicate(a.func) | ||
if f == "field": | ||
return args | ||
else: | ||
ref = _convert_predicate(a.func.value)[0] # type: ignore[attr-defined] | ||
if f == "isin": | ||
return pyiceberg.expressions.In(ref, args[0]) | ||
elif f == "is_null": | ||
return pyiceberg.expressions.IsNull(ref) | ||
elif f == "is_nan": | ||
return pyiceberg.expressions.IsNaN(ref) | ||
|
||
raise ValueError(f"Unknown call: {f}") | ||
|
||
|
||
@_convert_predicate.register(Attribute) | ||
def _(a: Attribute) -> Any: | ||
return a.attr | ||
|
||
|
||
@_convert_predicate.register(BinOp) | ||
def _(a: BinOp) -> Any: | ||
lhs = _convert_predicate(a.left) | ||
rhs = _convert_predicate(a.right) | ||
|
||
op = a.op | ||
if isinstance(op, BitAnd): | ||
return pyiceberg.expressions.And(lhs, rhs) | ||
if isinstance(op, BitOr): | ||
return pyiceberg.expressions.Or(lhs, rhs) | ||
else: | ||
raise TypeError(f"Unknown: {lhs} {op} {rhs}") | ||
|
||
|
||
@_convert_predicate.register(Compare) | ||
def _(a: Compare) -> Any: | ||
op = a.ops[0] | ||
lhs = _convert_predicate(a.left)[0] | ||
rhs = _convert_predicate(a.comparators[0]) | ||
|
||
if isinstance(op, Gt): | ||
return pyiceberg.expressions.GreaterThan(lhs, rhs) | ||
if isinstance(op, GtE): | ||
return pyiceberg.expressions.GreaterThanOrEqual(lhs, rhs) | ||
if isinstance(op, Eq): | ||
return pyiceberg.expressions.EqualTo(lhs, rhs) | ||
if isinstance(op, Lt): | ||
return pyiceberg.expressions.LessThan(lhs, rhs) | ||
if isinstance(op, LtE): | ||
return pyiceberg.expressions.LessThanOrEqual(lhs, rhs) | ||
else: | ||
raise TypeError(f"Unknown comparison: {op}") | ||
|
||
|
||
@_convert_predicate.register(List) | ||
def _(a: List) -> Any: | ||
return [_convert_predicate(e) for e in a.elts] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a second
_scan_pyarrow_dataset_impl
? Can we not reuse the existing implementation?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko Could you come back to me on this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course. Here we do something different than in the original one. In the original, the Python
eval
function is used to convert the string containing Python to an actual Python class, and that's being passed into the delta library:What we do here is that we take the string, we convert it into an abstract syntax tree, and that's being traversed to convert it into a PyIceberg expression. The reason why I did this is that the PyArrow expression doesn't have any Python methods to traverse the expression (the same goes for the Polars expression, otherwise I could just traverse that one as well). I've added this to the docstring as well 👍🏻