Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support read_parquet for backend with no native support #9744

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ab2ad16
support read_parquet for backend with no native support
jitingxu1 Aug 1, 2024
661f50d
fix unit tests
jitingxu1 Aug 2, 2024
e16f1bb
resolve Xpass and clickhouse tests
jitingxu1 Aug 2, 2024
eaec7a2
handle different inputs
jitingxu1 Aug 5, 2024
9106ad8
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 6, 2024
27d7a08
pandas not suporting glob pattern
jitingxu1 Aug 6, 2024
ac6117f
Merge branch 'main' into extend-read-parquet
gforsyth Aug 6, 2024
3ce9674
tests for url and fssepc url
jitingxu1 Aug 18, 2024
24530ca
resolve pandas use pyarrow as default
jitingxu1 Aug 19, 2024
bb238af
add test for is_url and is_fsspec_url
jitingxu1 Aug 21, 2024
12cfc7d
change to fssepc and add examples
jitingxu1 Aug 23, 2024
2cf597a
add reason for mark.never
jitingxu1 Aug 23, 2024
b4cf0ea
re run workflow
jitingxu1 Aug 23, 2024
2ba5002
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 23, 2024
6f2c754
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 11, 2024
24bfe38
lint
jitingxu1 Sep 15, 2024
6a50c46
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 15, 2024
4579bff
remove pandas
jitingxu1 Sep 15, 2024
d1ed444
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 16, 2024
b01bc6a
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 18, 2024
e70de2f
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 18, 2024
413ada7
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 19, 2024
c3fba44
reconcile coe
jitingxu1 Sep 19, 2024
8b6b3c6
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 20, 2024
0d55190
skip trino and impala
jitingxu1 Sep 20, 2024
fda5493
Trigger CI
jitingxu1 Sep 20, 2024
71ebb8e
chore: trigger CI
jitingxu1 Sep 21, 2024
2473c02
chore(test): skip test for backends with own parquet readers
gforsyth Sep 23, 2024
3ab60a8
chore: simplify the logic
jitingxu1 Sep 25, 2024
59c03e0
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 25, 2024
c0c1fd1
chore: lint
jitingxu1 Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import collections.abc
import contextlib
import functools
import glob
import importlib.metadata
import keyword
import re
import sys
import urllib.parse
import urllib.request
import weakref
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple

Expand Down Expand Up @@ -1307,6 +1310,161 @@
f"{cls.name} backend has not implemented `has_operation` API"
)

def read_parquet(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of BytesIO, I could pass the fsspec object, It could be HTTPFile if we pass an HTTP url. Not sure what is the best way to handle the type of path

@gforsyth any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fsspec is a good option.

"""Register a parquet file as a table in the current backend.

Parameters
----------
path
The data source. May be a path to a file, an iterable of files,
or directory of parquet files.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to the pyarrow loading function.
See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
for more information.

When reading data from cloud storage (such as Amazon S3 or Google Cloud Storage),
credentials can be provided via the `filesystem` argument by creating an appropriate
filesystem object (e.g., `pyarrow.fs.S3FileSystem`).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, and additionally, pq.read_table also supports the standard aws auth patterns (environment variables, or aws SSO credentials, or instance credentials)


For URLs with credentials, `fsspec` is used to handle authentication and file access.
Pass the credentials using the `credentials` keyword argument. `fsspec` will use these
credentials to manage access to the remote files.

Returns
-------
ir.Table
The just-registered table

Examples
--------
Connect to a SQLite database:

>>> con = ibis.sqlite.connect()

Read a single parquet file:

>>> table = con.read_parquet("path/to/file.parquet")

Read all parquet files in a directory:

>>> table = con.read_parquet("path/to/parquet_directory/")

Read parquet files with a glob pattern

>>> table = con.read_parquet("path/to/parquet_directory/data_*.parquet")

Read from Amazon S3

>>> table = con.read_parquet("s3://bucket-name/path/to/file.parquet")

Read from Google Cloud Storage

>>> table = con.read_parquet("gs://bucket-name/path/to/file.parquet")

Read from HTTPS URL

>>> table = con.read_parquet("https://example.com/data/file.parquet")

Read with a custom table name

>>> table = con.read_parquet("s3://bucket/data.parquet", table_name="my_table")

Read with additional pyarrow options

>>> table = con.read_parquet("gs://bucket/data.parquet", columns=["col1", "col2"])

Read from Amazon S3 with secret info

>>> from pyarrow import fs
>>> s3_fs = fs.S3FileSystem(
... access_key="YOUR_ACCESS_KEY", secret_key="YOUR_SECRET_KEY", region="YOUR_AWS_REGION"
... )
>>> table = con.read_parquet("s3://bucket/data.parquet", filesystem=s3_fs)

Read from HTTPS URL with authentication tokens

>>> table = con.read_parquet(
... "https://example.com/data/file.parquet",
... credentials={"headers": {"Authorization": "Bearer YOUR_TOKEN"}},
... )

"""

table = self._get_pyarrow_table_from_path(path, **kwargs)
table_name = table_name or util.gen_name("read_parquet")
self.create_table(table_name, table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the read_csv PR, this should probably be a memtable so we don't create a persistent table by default

return self.table(table_name)

def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table:
Copy link
Member

@cpcloud cpcloud Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the implementation of this just be:

return pq.read_table(path, **kwargs)

Did you try that already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that in my first commit, it cannot handle all the cases:

such as glob pattern and Parquet files hosted on some uri: i.e HTTPS SFTP

Pyarrow implements natively the following filesystem subclasses:

Local FS (LocalFileSystem)

S3 (S3FileSystem)

Google Cloud Storage File System (GcsFileSystem)

Hadoop Distributed File System (HDFS) (HadoopFileSystem)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cpcloud does this make sense to you?

import pyarrow.parquet as pq

path = str(path)
# handle url
if util.is_url(path):
import fsspec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsspec is not a dependency of Ibis (it's in our test suite) so this would need extra import handling if we leave it in (but see my other comments)


credentials = kwargs.pop("credentials", {})
with fsspec.open(path, **credentials) as f:
with BytesIO(f.read()) as reader:
return pq.read_table(reader)

# handle fsspec compatible url
if util.is_fsspec_url(path):
return pq.read_table(path, **kwargs)

Check warning on line 1419 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1419

Added line #L1419 was not covered by tests

# Handle local file paths or patterns
paths = glob.glob(path)
if not paths:
raise ValueError(f"No files found matching pattern: {path!r}")

Check warning on line 1424 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1424

Added line #L1424 was not covered by tests
elif len(paths) == 1:
paths = paths[0]

return pq.read_table(paths, **kwargs)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should reconsider handling all of these cases -- this sort of branching logic means that when a user reports an error, we'll have any number of possible culprits to consider, and it makes it harder to debug for everyone.

I think (and I could be wrong) that nearly all of these cases are covered by pq.read_table by itself, and that's much easier to document and debug.

read_table also has support for being passed an fsspec object, so if someone needs to read from a hypertext url, they can use fsspec as a shim for that. (This is something we can add a note about in the docstring).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pq.read_table could handle most of the cases, I will simplify the logic, to see how much cases could be covered. Thanks for your suggestion.

def _cached(self, expr: ir.Table):
jitingxu1 marked this conversation as resolved.
Show resolved Hide resolved
"""Cache the provided expression.

All subsequent operations on the returned expression will be performed on the cached data.

Parameters
----------
expr
Table expression to cache

Returns
-------
Expr
Cached table

"""
op = expr.op()

Check warning on line 1446 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1446

Added line #L1446 was not covered by tests
if (result := self._query_cache.get(op)) is None:
result = self._query_cache.store(expr)
return ir.CachedTable(result)

Check warning on line 1449 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1448-L1449

Added lines #L1448 - L1449 were not covered by tests

def _release_cached(self, expr: ir.CachedTable) -> None:
"""Releases the provided cached expression.

Parameters
----------
expr
Cached expression to release

"""
self._query_cache.release(expr.op().name)

Check warning on line 1460 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L1460

Added line #L1460 was not covered by tests

def _load_into_cache(self, name, expr):
raise NotImplementedError(self.name)

def _clean_up_cached_table(self, name):
raise NotImplementedError(self.name)

def _transpile_sql(self, query: str, *, dialect: str | None = None) -> str:
# only transpile if dialect was passed
if dialect is None:
Expand Down
90 changes: 70 additions & 20 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import pyarrow as pa

pytestmark = pytest.mark.notimpl(["druid", "exasol", "oracle"])


@contextlib.contextmanager
def pushd(new_dir):
Expand Down Expand Up @@ -96,6 +94,7 @@ def gzip_csv(data_dir, tmp_path):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
def test_register_csv(con, data_dir, fname, in_table_name, out_table_name):
with pushd(data_dir / "csv"):
with pytest.warns(FutureWarning, match="v9.1"):
Expand All @@ -107,7 +106,7 @@ def test_register_csv(con, data_dir, fname, in_table_name, out_table_name):


# TODO: rewrite or delete test when register api is removed
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -147,6 +146,7 @@ def test_register_csv_gz(con, data_dir, gzip_csv):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_with_dotted_name(con, data_dir, tmp_path):
basename = "foo.bar.baz/diamonds.csv"
f = tmp_path.joinpath(basename)
Expand Down Expand Up @@ -204,6 +204,7 @@ def read_table(path: Path) -> Iterator[tuple[str, pa.Table]]:
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_parquet(
con, tmp_path, data_dir, fname, in_table_name, out_table_name
):
Expand Down Expand Up @@ -242,6 +243,7 @@ def test_register_parquet(
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_iterator_parquet(
con,
tmp_path,
Expand Down Expand Up @@ -270,7 +272,7 @@ def test_register_iterator_parquet(
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -304,7 +306,7 @@ def test_register_pandas(con):
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion", "polars"])
@pytest.mark.notimpl(["datafusion", "polars", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -345,6 +347,7 @@ def test_register_pyarrow_tables(con):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_csv_reregister_schema(con, tmp_path):
foo = tmp_path.joinpath("foo.csv")
with foo.open("w", newline="") as csvfile:
Expand Down Expand Up @@ -373,10 +376,13 @@ def test_csv_reregister_schema(con, tmp_path):
"bigquery",
"clickhouse",
"datafusion",
"druid",
"exasol",
"flink",
"impala",
"mysql",
"mssql",
"oracle",
"polars",
"postgres",
"risingwave",
Expand Down Expand Up @@ -407,9 +413,8 @@ def test_register_garbage(con, monkeypatch):
("functional_alltypes.parquet", "funk_all"),
],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
pq = pytest.importorskip("pyarrow.parquet")

Expand All @@ -430,6 +435,58 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
assert table.count().execute()


# test reading a Parquet file from a URL request for backends using pyarrow
# that do not have their own implementation
@pytest.mark.parametrize(
("url", "in_table_name"),
[
("http://example.com/functional_alltypes.parquet", "http_table"),
("sftp://example.com/path/to/functional_alltypes.parquet", "sftp_table"),
],
)
@pytest.mark.never(
[
"duckdb",
"polars",
"bigquery",
"clickhouse",
"datafusion",
"snowflake",
],
reason="backend implements its own read_parquet",
)
@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(
[
"druid",
"pyspark",
]
)
def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch):
pytest.importorskip("pyarrow.parquet")
fsspec = pytest.importorskip("fsspec")

fname = Path("functional_alltypes.parquet")
fname = Path(data_dir) / "parquet" / fname.name
mock_calls = []

original_fsspec_open = fsspec.open

def mock_fsspec_open(path, *args, **kwargs):
mock_calls.append((path, args, kwargs))
return original_fsspec_open(fname, "rb")

monkeypatch.setattr("fsspec.open", mock_fsspec_open)

table = con.read_parquet(url, in_table_name)

assert len(mock_calls) == 1
assert table.count().execute()

if in_table_name is not None:
assert table.op().name == in_table_name


@pytest.fixture(scope="module")
def ft_data(data_dir):
pq = pytest.importorskip("pyarrow.parquet")
Expand All @@ -438,18 +495,8 @@ def ft_data(data_dir):
return table.slice(0, nrows)


@pytest.mark.notyet(
[
"flink",
"impala",
"mssql",
"mysql",
"postgres",
"risingwave",
"sqlite",
"trino",
]
)
@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet_glob(con, tmp_path, ft_data):
pq = pytest.importorskip("pyarrow.parquet")

Expand Down Expand Up @@ -478,6 +525,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv_glob(con, tmp_path, ft_data):
pc = pytest.importorskip("pyarrow.csv")

Expand Down Expand Up @@ -512,6 +560,7 @@ def test_read_csv_glob(con, tmp_path, ft_data):
raises=ValueError,
reason="read_json() missing required argument: 'schema'",
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_json_glob(con, tmp_path, ft_data):
nrows = len(ft_data)
ntables = 2
Expand Down Expand Up @@ -558,6 +607,7 @@ def num_diamonds(data_dir):
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
fname = "diamonds.csv"
with pushd(data_dir / "csv"):
Expand Down
Loading
Loading