diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index eb17b0b6ed3b..02386075157d 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -85,6 +85,7 @@ jobs: "matplotlib" "backports.zoneinfo" "connectorx" + "pyiceberg" "deltalake" "xlsx2csv" ) diff --git a/README.md b/README.md index 01ba31396c62..f917c1c9bdb8 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,7 @@ You can also install the dependencies directly. | xlsx2csv | Support for reading from Excel files | | openpyxl | Support for reading from Excel files with native types | | deltalake | Support for reading from Delta Lake Tables | +| pyiceberg | Support for reading from Apache Iceberg tables | | timezone | Timezone support, only needed if are on Python<3.9 or you are on Windows | Releases happen quite often (weekly / every few days) at the moment, so updating polars regularly to get the latest bugfixes / features might not be a bad idea. diff --git a/py-polars/docs/source/reference/io.rst b/py-polars/docs/source/reference/io.rst index 2537c6a1e523..9b0b91335c09 100644 --- a/py-polars/docs/source/reference/io.rst +++ b/py-polars/docs/source/reference/io.rst @@ -75,6 +75,13 @@ Spreadsheet read_ods DataFrame.write_excel +Apache Iceberg +~~~~~~~~~~~~~~ +.. autosummary:: + :toctree: api/ + + scan_iceberg + Delta Lake ~~~~~~~~~~ .. autosummary:: diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index 2084e328beeb..8500a34fd0de 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -173,6 +173,7 @@ read_parquet_schema, scan_csv, scan_delta, + scan_iceberg, scan_ipc, scan_ndjson, scan_parquet, @@ -269,6 +270,7 @@ "read_parquet_schema", "scan_csv", "scan_delta", + "scan_iceberg", "scan_ipc", "scan_ndjson", "scan_parquet", diff --git a/py-polars/polars/dependencies.py b/py-polars/polars/dependencies.py index 4ba67974f8f2..bf853d1d2cc4 100644 --- a/py-polars/polars/dependencies.py +++ b/py-polars/polars/dependencies.py @@ -17,6 +17,7 @@ _PANDAS_AVAILABLE = True _PYARROW_AVAILABLE = True _PYDANTIC_AVAILABLE = True +_PYICEBERG_AVAILABLE = True _ZONEINFO_AVAILABLE = True @@ -162,6 +163,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]: import pandas import pyarrow import pydantic + import pyiceberg if sys.version_info >= (3, 9): import zoneinfo @@ -186,6 +188,7 @@ def _lazy_import(module_name: str) -> tuple[ModuleType, bool]: pandas, _PANDAS_AVAILABLE = _lazy_import("pandas") pyarrow, _PYARROW_AVAILABLE = _lazy_import("pyarrow") pydantic, _PYDANTIC_AVAILABLE = _lazy_import("pydantic") + pyiceberg, _PYICEBERG_AVAILABLE = _lazy_import("pyiceberg") zoneinfo, _ZONEINFO_AVAILABLE = ( _lazy_import("zoneinfo") if sys.version_info >= (3, 9) @@ -235,6 +238,7 @@ def _check_for_pydantic(obj: Any) -> bool: "numpy", "pandas", "pydantic", + "pyiceberg", "pyarrow", "zoneinfo", # lazy utilities @@ -245,6 +249,7 @@ def _check_for_pydantic(obj: Any) -> bool: "_LazyModule", # exported flags/guards "_DELTALAKE_AVAILABLE", + "_PYICEBERG_AVAILABLE", "_FSSPEC_AVAILABLE", "_GEVENT_AVAILABLE", "_HYPOTHESIS_AVAILABLE", diff --git a/py-polars/polars/io/__init__.py b/py-polars/polars/io/__init__.py index f5ef42823324..f4a39b5f778a 100644 --- a/py-polars/polars/io/__init__.py +++ b/py-polars/polars/io/__init__.py @@ -4,6 +4,7 @@ from polars.io.csv import read_csv, read_csv_batched, scan_csv from polars.io.database import read_database, read_database_uri from polars.io.delta import read_delta, scan_delta +from polars.io.iceberg import scan_iceberg from polars.io.ipc import read_ipc, read_ipc_schema, read_ipc_stream, scan_ipc from polars.io.json import read_json from polars.io.ndjson import read_ndjson, scan_ndjson @@ -29,6 +30,7 @@ "read_parquet_schema", "scan_csv", "scan_delta", + "scan_iceberg", "scan_ipc", "scan_ndjson", "scan_parquet", diff --git a/py-polars/polars/io/iceberg.py b/py-polars/polars/io/iceberg.py new file mode 100644 index 000000000000..0e6aac9287e4 --- /dev/null +++ b/py-polars/polars/io/iceberg.py @@ -0,0 +1,285 @@ +from __future__ import annotations + +import ast +from _ast import GtE, Lt, LtE +from ast import ( + Attribute, + BinOp, + BitAnd, + BitOr, + Call, + Compare, + Constant, + Eq, + Gt, + Invert, + List, + UnaryOp, +) +from functools import partial, singledispatch +from typing import TYPE_CHECKING, Any + +import polars._reexport as pl +from polars.dependencies import pyiceberg + +if TYPE_CHECKING: + from pyiceberg.table import Table + + from polars import DataFrame, LazyFrame, Series + + +def scan_iceberg( + source: str | Table, + *, + storage_options: dict[str, Any] | None = None, +) -> LazyFrame: + """ + Lazily read from an Apache Iceberg table. + + Parameters + ---------- + source + A PyIceberg table, or a direct path to the metadata. + + Note: For Local filesystem, absolute and relative paths are supported but + for the supported object storages - GCS, Azure and S3 full URI must be provided. + storage_options + Extra options for the storage backends supported by `pyiceberg`. + For cloud storages, this may include configurations for authentication etc. + + More info is available `here `__. + + Returns + ------- + LazyFrame + + Examples + -------- + Creates a scan for an Iceberg table from local filesystem, or object store. + + >>> table_path = "file:/path/to/iceberg-table/metadata.json" + >>> pl.scan_iceberg(table_path).collect() # doctest: +SKIP + + Creates a scan for an Iceberg table from S3. + See a list of supported storage options for S3 `here + `__. + + >>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json" + >>> storage_options = { + ... "s3.region": "eu-central-1", + ... "s3.access-key-id": "THE_AWS_ACCESS_KEY_ID", + ... "s3.secret-access-key": "THE_AWS_SECRET_ACCESS_KEY", + ... } + >>> pl.scan_iceberg( + ... table_path, storage_options=storage_options + ... ).collect() # doctest: +SKIP + + Creates a scan for an Iceberg table from Azure. + Supported options for Azure are available `here + `__. + + Following type of table paths are supported: + * az:////metadata.json + * adl:////metadata.json + * abfs[s]:////metadata.json + + >>> table_path = "az://container/path/to/iceberg-table/metadata.json" + >>> storage_options = { + ... "adlfs.account-name": "AZURE_STORAGE_ACCOUNT_NAME", + ... "adlfs.account-key": "AZURE_STORAGE_ACCOUNT_KEY", + ... } + >>> pl.scan_iceberg( + ... table_path, storage_options=storage_options + ... ).collect() # doctest: +SKIP + + Creates a scan for an Iceberg table from Google Cloud Storage. + Supported options for GCS are available `here + `__. + + >>> table_path = "s3://bucket/path/to/iceberg-table/metadata.json" + >>> storage_options = { + ... "gcs.project-id": "my-gcp-project", + ... "gcs.oauth.token": "ya29.dr.AfM...", + ... } + >>> pl.scan_iceberg( + ... table_path, storage_options=storage_options + ... ).collect() # doctest: +SKIP + + Creates a scan for an Iceberg table with additional options. + In the below example, `without_files` option is used which loads the table without + file tracking information. + + >>> table_path = "/path/to/iceberg-table/metadata.json" + >>> storage_options = {"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + >>> pl.scan_iceberg( + ... table_path, storage_options=storage_options + ... ).collect() # doctest: +SKIP + + """ + from pyiceberg.io.pyarrow import schema_to_pyarrow + from pyiceberg.table import StaticTable + + if isinstance(source, str): + source = StaticTable.from_metadata( + metadata_location=source, properties=storage_options or {} + ) + + func = partial(_scan_pyarrow_dataset_impl, source) + arrow_schema = schema_to_pyarrow(source.schema()) + return pl.LazyFrame._scan_python_function(arrow_schema, func, pyarrow=True) + + +def _scan_pyarrow_dataset_impl( + tbl: Table, + with_columns: list[str] | None = None, + predicate: str = "", + n_rows: int | None = None, + **kwargs: Any, +) -> DataFrame | Series: + """ + Take the projected columns and materialize an arrow table. + + Parameters + ---------- + tbl + pyarrow dataset + with_columns + Columns that are projected + predicate + pyarrow expression that can be evaluated with eval + n_rows: + Materialize only n rows from the arrow dataset. + batch_size + The maximum row count for scanned pyarrow record batches. + kwargs: + For backward compatibility + + Returns + ------- + DataFrame + + """ + from polars import from_arrow + + scan = tbl.scan(limit=n_rows) + + if with_columns is not None: + scan = scan.select(*with_columns) + + if predicate is not None: + try: + expr_ast = _to_ast(predicate) + pyiceberg_expr = _convert_predicate(expr_ast) + except ValueError as e: + raise ValueError( + f"Could not convert predicate to PyIceberg: {predicate}" + ) from e + + scan = scan.filter(pyiceberg_expr) + + return from_arrow(scan.to_arrow()) + + +def _to_ast(expr: str) -> ast.expr: + """ + Converts a Python string to an AST. + + This will take the Python Arrow expression (as a string), and it will + be converted into a Python AST that can be traversed to convert it to a PyIceberg + expression. + + The reason to convert it to an AST is because the PyArrow expression + itself doesn't have any methods/properties to traverse the expression. + We need this to convert it into a PyIceberg expression. + + Parameters + ---------- + expr + The string expression + + Returns + ------- + The AST representing the Arrow expression + """ + return ast.parse(expr, mode="eval").body + + +@singledispatch +def _convert_predicate(a: Any) -> Any: + """Walks the AST to convert the PyArrow expression to a PyIceberg expression.""" + raise ValueError(f"Unexpected symbol: {a}") + + +@_convert_predicate.register(Constant) +def _(a: Constant) -> Any: + return a.value + + +@_convert_predicate.register(UnaryOp) +def _(a: UnaryOp) -> Any: + if isinstance(a.op, Invert): + return pyiceberg.expressions.Not(_convert_predicate(a.operand)) + else: + raise TypeError(f"Unexpected UnaryOp: {a}") + + +@_convert_predicate.register(Call) +def _(a: Call) -> Any: + args = [_convert_predicate(arg) for arg in a.args] + f = _convert_predicate(a.func) + if f == "field": + return args + else: + ref = _convert_predicate(a.func.value)[0] # type: ignore[attr-defined] + if f == "isin": + return pyiceberg.expressions.In(ref, args[0]) + elif f == "is_null": + return pyiceberg.expressions.IsNull(ref) + elif f == "is_nan": + return pyiceberg.expressions.IsNaN(ref) + + raise ValueError(f"Unknown call: {f}") + + +@_convert_predicate.register(Attribute) +def _(a: Attribute) -> Any: + return a.attr + + +@_convert_predicate.register(BinOp) +def _(a: BinOp) -> Any: + lhs = _convert_predicate(a.left) + rhs = _convert_predicate(a.right) + + op = a.op + if isinstance(op, BitAnd): + return pyiceberg.expressions.And(lhs, rhs) + if isinstance(op, BitOr): + return pyiceberg.expressions.Or(lhs, rhs) + else: + raise TypeError(f"Unknown: {lhs} {op} {rhs}") + + +@_convert_predicate.register(Compare) +def _(a: Compare) -> Any: + op = a.ops[0] + lhs = _convert_predicate(a.left)[0] + rhs = _convert_predicate(a.comparators[0]) + + if isinstance(op, Gt): + return pyiceberg.expressions.GreaterThan(lhs, rhs) + if isinstance(op, GtE): + return pyiceberg.expressions.GreaterThanOrEqual(lhs, rhs) + if isinstance(op, Eq): + return pyiceberg.expressions.EqualTo(lhs, rhs) + if isinstance(op, Lt): + return pyiceberg.expressions.LessThan(lhs, rhs) + if isinstance(op, LtE): + return pyiceberg.expressions.LessThanOrEqual(lhs, rhs) + else: + raise TypeError(f"Unknown comparison: {op}") + + +@_convert_predicate.register(List) +def _(a: List) -> Any: + return [_convert_predicate(e) for e in a.elts] diff --git a/py-polars/polars/utils/show_versions.py b/py-polars/polars/utils/show_versions.py index 20d0ff7ed5e1..f20185c9ca5d 100644 --- a/py-polars/polars/utils/show_versions.py +++ b/py-polars/polars/utils/show_versions.py @@ -69,6 +69,7 @@ def _get_dependency_info() -> dict[str, str]: "pandas", "pyarrow", "pydantic", + "pyiceberg", "sqlalchemy", "xlsx2csv", "xlsxwriter", diff --git a/py-polars/pyproject.toml b/py-polars/pyproject.toml index 7e3b14e81813..7b75e280c38c 100644 --- a/py-polars/pyproject.toml +++ b/py-polars/pyproject.toml @@ -49,13 +49,14 @@ deltalake = ["deltalake >= 0.10.0"] timezone = ["backports.zoneinfo; python_version < '3.9'", "tzdata; platform_system == 'Windows'"] matplotlib = ["matplotlib"] pydantic = ["pydantic"] +pyiceberg = ["pyiceberg >= 0.5.0"] sqlalchemy = ["sqlalchemy", "pandas"] xlsxwriter = ["xlsxwriter"] adbc = ["adbc_driver_sqlite"] cloudpickle = ["cloudpickle"] gevent = ["gevent"] all = [ - "polars[pyarrow,pandas,numpy,fsspec,connectorx,xlsx2csv,deltalake,timezone,matplotlib,pydantic,sqlalchemy,xlsxwriter,adbc,cloudpickle,gevent]", + "polars[pyarrow,pandas,numpy,fsspec,connectorx,xlsx2csv,deltalake,timezone,matplotlib,pydantic,pyiceberg,sqlalchemy,xlsxwriter,adbc,cloudpickle,gevent]", ] [tool.mypy] diff --git a/py-polars/requirements-dev.txt b/py-polars/requirements-dev.txt index 991b104ec094..0a45cad3e538 100644 --- a/py-polars/requirements-dev.txt +++ b/py-polars/requirements-dev.txt @@ -36,6 +36,7 @@ XlsxWriter deltalake == 0.10.1 # Dataframe interchange protocol dataframe-api-compat >= 0.1.6 +pyiceberg >= 0.5.0 # Other matplotlib gevent diff --git a/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-01/.00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00001.parquet.crc b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-01/.00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00001.parquet.crc new file mode 100644 index 000000000000..a9285317b405 Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-01/.00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00001.parquet.crc differ diff --git a/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-01/00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00001.parquet b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-01/00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00001.parquet new file mode 100644 index 000000000000..0bbb8ba707a5 Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-01/00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00001.parquet differ diff --git a/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-02/.00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00002.parquet.crc b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-02/.00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00002.parquet.crc new file mode 100644 index 000000000000..258a5cd76cc1 Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-02/.00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00002.parquet.crc differ diff --git a/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-02/00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00002.parquet b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-02/00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00002.parquet new file mode 100644 index 000000000000..8da5caa43195 Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/data/ts_day=2023-03-02/00000-1-6bc54766-6e8a-4fd5-8c00-c6bacbdcaeeb-00002.parquet differ diff --git a/py-polars/tests/unit/io/files/iceberg-table/metadata/aef5d952-7e24-4764-9b30-3483be37240f-m0.avro b/py-polars/tests/unit/io/files/iceberg-table/metadata/aef5d952-7e24-4764-9b30-3483be37240f-m0.avro new file mode 100644 index 000000000000..a5e28a04bcff Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/metadata/aef5d952-7e24-4764-9b30-3483be37240f-m0.avro differ diff --git a/py-polars/tests/unit/io/files/iceberg-table/metadata/snap-7051579356916758811-1-aef5d952-7e24-4764-9b30-3483be37240f.avro b/py-polars/tests/unit/io/files/iceberg-table/metadata/snap-7051579356916758811-1-aef5d952-7e24-4764-9b30-3483be37240f.avro new file mode 100644 index 000000000000..21f503266051 Binary files /dev/null and b/py-polars/tests/unit/io/files/iceberg-table/metadata/snap-7051579356916758811-1-aef5d952-7e24-4764-9b30-3483be37240f.avro differ diff --git a/py-polars/tests/unit/io/files/iceberg-table/metadata/v2.metadata.json b/py-polars/tests/unit/io/files/iceberg-table/metadata/v2.metadata.json new file mode 100644 index 000000000000..04e078f8a63b --- /dev/null +++ b/py-polars/tests/unit/io/files/iceberg-table/metadata/v2.metadata.json @@ -0,0 +1,109 @@ +{ + "format-version" : 1, + "table-uuid" : "c470045a-5d75-48aa-9d4b-86a86a9b1fce", + "location" : "/tmp/iceberg/t1", + "last-updated-ms" : 1694547405299, + "last-column-id" : 3, + "schema" : { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "id", + "required" : false, + "type" : "int" + }, { + "id" : 2, + "name" : "str", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "ts", + "required" : false, + "type" : "timestamp" + } ] + }, + "current-schema-id" : 0, + "schemas" : [ { + "type" : "struct", + "schema-id" : 0, + "fields" : [ { + "id" : 1, + "name" : "id", + "required" : false, + "type" : "int" + }, { + "id" : 2, + "name" : "str", + "required" : false, + "type" : "string" + }, { + "id" : 3, + "name" : "ts", + "required" : false, + "type" : "timestamp" + } ] + } ], + "partition-spec" : [ { + "name" : "ts_day", + "transform" : "day", + "source-id" : 3, + "field-id" : 1000 + } ], + "default-spec-id" : 0, + "partition-specs" : [ { + "spec-id" : 0, + "fields" : [ { + "name" : "ts_day", + "transform" : "day", + "source-id" : 3, + "field-id" : 1000 + } ] + } ], + "last-partition-id" : 1000, + "default-sort-order-id" : 0, + "sort-orders" : [ { + "order-id" : 0, + "fields" : [ ] + } ], + "properties" : { + "owner" : "fokkodriesprong" + }, + "current-snapshot-id" : 7051579356916758811, + "refs" : { + "main" : { + "snapshot-id" : 7051579356916758811, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 7051579356916758811, + "timestamp-ms" : 1694547405299, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1694547283063", + "added-data-files" : "2", + "added-records" : "3", + "added-files-size" : "1788", + "changed-partition-count" : "2", + "total-records" : "3", + "total-files-size" : "1788", + "total-data-files" : "2", + "total-delete-files" : "0", + "total-position-deletes" : "0", + "total-equality-deletes" : "0" + }, + "manifest-list" : "/tmp/iceberg/t1/metadata/snap-7051579356916758811-1-aef5d952-7e24-4764-9b30-3483be37240f.avro", + "schema-id" : 0 + } ], + "statistics" : [ ], + "snapshot-log" : [ { + "timestamp-ms" : 1694547405299, + "snapshot-id" : 7051579356916758811 + } ], + "metadata-log" : [ { + "timestamp-ms" : 1694547211303, + "metadata-file" : "/tmp/iceberg/t1/metadata/v1.metadata.json" + } ] +} \ No newline at end of file diff --git a/py-polars/tests/unit/io/test_iceberg.py b/py-polars/tests/unit/io/test_iceberg.py new file mode 100644 index 000000000000..cdd2ebb04c23 --- /dev/null +++ b/py-polars/tests/unit/io/test_iceberg.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +import contextlib +import os +from pathlib import Path + +import pytest + +import polars as pl +from polars.io.iceberg import _convert_predicate, _to_ast + + +@pytest.fixture() +def iceberg_path(io_files_path: Path) -> str: + # Iceberg requires absolute paths, so we'll symlink + # the test table into /tmp/iceberg/t1/ + Path("/tmp/iceberg").mkdir(parents=True, exist_ok=True) + current_path = Path(__file__).parent.resolve() + + with contextlib.suppress(FileExistsError): + os.symlink(f"{current_path}/files/iceberg-table", "/tmp/iceberg/t1") + + iceberg_path = io_files_path / "iceberg-table" / "metadata" / "v2.metadata.json" + return f"file://{iceberg_path.resolve()}" + + +@pytest.mark.filterwarnings( + "ignore:No preferred file implementation for scheme*:UserWarning" +) +def test_scan_iceberg_plain(iceberg_path: str) -> None: + df = pl.scan_iceberg(iceberg_path) + assert len(df.collect()) == 3 + assert df.schema == { + "id": pl.Int32, + "str": pl.Utf8, + "ts": pl.Datetime(time_unit="us", time_zone=None), + } + + +@pytest.mark.filterwarnings( + "ignore:No preferred file implementation for scheme*:UserWarning" +) +def test_scan_iceberg_filter_on_partition(iceberg_path: str) -> None: + df = pl.scan_iceberg(iceberg_path) + df = df.filter(pl.col("ts") > "2023-03-02T00:00:00") + assert len(df.collect()) == 1 + + +@pytest.mark.filterwarnings( + "ignore:No preferred file implementation for scheme*:UserWarning" +) +def test_scan_iceberg_filter_on_column(iceberg_path: str) -> None: + df = pl.scan_iceberg(iceberg_path) + df = df.filter(pl.col("id") < 2) + assert len(df.collect()) == 1 + + +def test_is_null_expression() -> None: + from pyiceberg.expressions import IsNull + + expr = _to_ast("(pa.compute.field('id')).is_null()") + assert _convert_predicate(expr) == IsNull("id") + + +def test_is_not_null_expression() -> None: + from pyiceberg.expressions import IsNull, Not + + expr = _to_ast("~(pa.compute.field('id')).is_null()") + assert _convert_predicate(expr) == Not(IsNull("id")) + + +def test_isin_expression() -> None: + from pyiceberg.expressions import In, literal # type: ignore[attr-defined] + + expr = _to_ast("(pa.compute.field('id')).isin([1,2,3])") + assert _convert_predicate(expr) == In("id", {literal(1), literal(2), literal(3)}) + + +def test_parse_combined_expression() -> None: + from pyiceberg.expressions import ( # type: ignore[attr-defined] + And, + EqualTo, + GreaterThan, + In, + Or, + Reference, + literal, + ) + + expr = _to_ast( + "(((pa.compute.field('str') == '2') & (pa.compute.field('id') > 10)) | (pa.compute.field('id')).isin([1,2,3]))" + ) + assert _convert_predicate(expr) == Or( + left=And( + left=EqualTo(term=Reference(name="str"), literal=literal("2")), + right=GreaterThan(term="id", literal=literal(10)), + ), + right=In("id", {literal(1), literal(2), literal(3)}), + ) + + +def test_parse_gt() -> None: + from pyiceberg.expressions import GreaterThan + + expr = _to_ast("(pa.compute.field('ts') > '2023-08-08')") + assert _convert_predicate(expr) == GreaterThan("ts", "2023-08-08") + + +def test_parse_gteq() -> None: + from pyiceberg.expressions import GreaterThanOrEqual + + expr = _to_ast("(pa.compute.field('ts') >= '2023-08-08')") + assert _convert_predicate(expr) == GreaterThanOrEqual("ts", "2023-08-08") + + +def test_parse_eq() -> None: + from pyiceberg.expressions import EqualTo + + expr = _to_ast("(pa.compute.field('ts') == '2023-08-08')") + assert _convert_predicate(expr) == EqualTo("ts", "2023-08-08") + + +def test_parse_lt() -> None: + from pyiceberg.expressions import LessThan + + expr = _to_ast("(pa.compute.field('ts') < '2023-08-08')") + assert _convert_predicate(expr) == LessThan("ts", "2023-08-08") + + +def test_parse_lteq() -> None: + from pyiceberg.expressions import LessThanOrEqual + + expr = _to_ast("(pa.compute.field('ts') <= '2023-08-08')") + assert _convert_predicate(expr) == LessThanOrEqual("ts", "2023-08-08")