Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support read_parquet for backend with no native support #9744

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ab2ad16
support read_parquet for backend with no native support
jitingxu1 Aug 1, 2024
661f50d
fix unit tests
jitingxu1 Aug 2, 2024
e16f1bb
resolve Xpass and clickhouse tests
jitingxu1 Aug 2, 2024
eaec7a2
handle different inputs
jitingxu1 Aug 5, 2024
9106ad8
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 6, 2024
27d7a08
pandas not suporting glob pattern
jitingxu1 Aug 6, 2024
ac6117f
Merge branch 'main' into extend-read-parquet
gforsyth Aug 6, 2024
3ce9674
tests for url and fssepc url
jitingxu1 Aug 18, 2024
24530ca
resolve pandas use pyarrow as default
jitingxu1 Aug 19, 2024
bb238af
add test for is_url and is_fsspec_url
jitingxu1 Aug 21, 2024
12cfc7d
change to fssepc and add examples
jitingxu1 Aug 23, 2024
2cf597a
add reason for mark.never
jitingxu1 Aug 23, 2024
b4cf0ea
re run workflow
jitingxu1 Aug 23, 2024
2ba5002
Merge branch 'main' into extend-read-parquet
jitingxu1 Aug 23, 2024
6f2c754
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 11, 2024
24bfe38
lint
jitingxu1 Sep 15, 2024
6a50c46
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 15, 2024
4579bff
remove pandas
jitingxu1 Sep 15, 2024
d1ed444
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 16, 2024
b01bc6a
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 18, 2024
e70de2f
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 18, 2024
413ada7
Merge branch 'ibis-project:main' into extend-read-parquet
jitingxu1 Sep 19, 2024
c3fba44
reconcile coe
jitingxu1 Sep 19, 2024
8b6b3c6
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 20, 2024
0d55190
skip trino and impala
jitingxu1 Sep 20, 2024
fda5493
Trigger CI
jitingxu1 Sep 20, 2024
71ebb8e
chore: trigger CI
jitingxu1 Sep 21, 2024
2473c02
chore(test): skip test for backends with own parquet readers
gforsyth Sep 23, 2024
3ab60a8
chore: simplify the logic
jitingxu1 Sep 25, 2024
59c03e0
Merge branch 'main' into extend-read-parquet
jitingxu1 Sep 25, 2024
c0c1fd1
chore: lint
jitingxu1 Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import abc
import collections.abc
import functools
import glob
import importlib.metadata
import keyword
import re
import urllib.parse
import urllib.request
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar

Expand Down Expand Up @@ -1199,6 +1202,61 @@ def has_operation(cls, operation: type[ops.Value]) -> bool:
f"{cls.name} backend has not implemented `has_operation` API"
)

def read_parquet(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of BytesIO, I could pass the fsspec object, It could be HTTPFile if we pass an HTTP url. Not sure what is the best way to handle the type of path

@gforsyth any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fsspec is a good option.

"""Register a parquet file as a table in the current backend.

Parameters
----------
path
The data source. May be a path to a file, an iterable of files,
or directory of parquet files.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to the pyarrow loading function.
See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
for more information.

Returns
-------
ir.Table
The just-registered table

"""

table = self._get_pyarrow_table_from_path(path, **kwargs)
table_name = table_name or util.gen_name("read_parquet")
self.create_table(table_name, table)
return self.table(table_name)

def _get_pyarrow_table_from_path(self, path: str | Path, **kwargs) -> pa.Table:
Copy link
Member

@cpcloud cpcloud Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the implementation of this just be:

return pq.read_table(path, **kwargs)

Did you try that already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that in my first commit, it cannot handle all the cases:

such as glob pattern and Parquet files hosted on some uri: i.e HTTPS SFTP

Pyarrow implements natively the following filesystem subclasses:

Local FS (LocalFileSystem)

S3 (S3FileSystem)

Google Cloud Storage File System (GcsFileSystem)

Hadoop Distributed File System (HDFS) (HadoopFileSystem)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cpcloud does this make sense to you?

pq = util.import_object("pyarrow.parquet")

path = str(path)
# handle url
if util.is_url(path):
headers = kwargs.pop("headers", {})
req_info = urllib.request.Request(path, headers=headers) # noqa: S310
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
jitingxu1 marked this conversation as resolved.
Show resolved Hide resolved
with urllib.request.urlopen(req_info) as req: # noqa: S310
with BytesIO(req.read()) as reader:
return pq.read_table(reader)

# handle fsspec compatible url
if util.is_fsspec_url(path):
return pq.read_table(path, **kwargs)

# Handle local file paths or patterns
paths = glob.glob(path)
if not paths:
raise ValueError(f"No files found matching pattern: {path!r}")
elif len(paths) == 1:
paths = paths[0]

return pq.read_table(paths, **kwargs)

def _cached(self, expr: ir.Table):
jitingxu1 marked this conversation as resolved.
Show resolved Hide resolved
"""Cache the provided expression.

Expand Down
96 changes: 75 additions & 21 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import csv
import gzip
import os
import urllib
from pathlib import Path
from typing import TYPE_CHECKING
from unittest import mock

import pytest
from pytest import param
Expand All @@ -18,8 +20,6 @@

import pyarrow as pa

pytestmark = pytest.mark.notimpl(["druid", "exasol", "oracle"])


@contextlib.contextmanager
def pushd(new_dir):
Expand Down Expand Up @@ -98,6 +98,7 @@ def gzip_csv(data_dir, tmp_path):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
def test_register_csv(con, data_dir, fname, in_table_name, out_table_name):
with pushd(data_dir / "csv"):
with pytest.warns(FutureWarning, match="v9.1"):
Expand All @@ -109,7 +110,7 @@ def test_register_csv(con, data_dir, fname, in_table_name, out_table_name):


# TODO: rewrite or delete test when register api is removed
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
ncclementi marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -153,6 +154,7 @@ def test_register_csv_gz(con, data_dir, gzip_csv):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_with_dotted_name(con, data_dir, tmp_path):
basename = "foo.bar.baz/diamonds.csv"
f = tmp_path.joinpath(basename)
Expand Down Expand Up @@ -212,6 +214,7 @@ def read_table(path: Path) -> Iterator[tuple[str, pa.Table]]:
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_parquet(
con, tmp_path, data_dir, fname, in_table_name, out_table_name
):
Expand Down Expand Up @@ -252,6 +255,7 @@ def test_register_parquet(
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_register_iterator_parquet(
con,
tmp_path,
Expand Down Expand Up @@ -280,7 +284,7 @@ def test_register_iterator_parquet(
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion"])
@pytest.mark.notimpl(["datafusion", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -316,7 +320,7 @@ def test_register_pandas(con):
# TODO: remove entirely when `register` is removed
# This same functionality is implemented across all backends
# via `create_table` and tested in `test_client.py`
@pytest.mark.notimpl(["datafusion", "polars"])
@pytest.mark.notimpl(["datafusion", "polars", "druid", "exasol", "oracle"])
@pytest.mark.notyet(
[
"bigquery",
Expand Down Expand Up @@ -361,6 +365,7 @@ def test_register_pyarrow_tables(con):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_csv_reregister_schema(con, tmp_path):
foo = tmp_path.joinpath("foo.csv")
with foo.open("w", newline="") as csvfile:
Expand Down Expand Up @@ -390,10 +395,13 @@ def test_csv_reregister_schema(con, tmp_path):
"clickhouse",
"dask",
"datafusion",
"druid",
"exasol",
"flink",
"impala",
"mysql",
"mssql",
"oracle",
"pandas",
"polars",
"postgres",
Expand Down Expand Up @@ -425,9 +433,8 @@ def test_register_garbage(con, monkeypatch):
("functional_alltypes.parquet", "funk_all"),
],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notyet(["flink"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
pq = pytest.importorskip("pyarrow.parquet")

Expand All @@ -448,6 +455,61 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
assert table.count().execute()


# test reading a Parquet file from a URL request for backends using pyarrow
# that do not have their own implementation
@pytest.mark.parametrize(
("url", "in_table_name"),
[
("http://example.com/functional_alltypes.parquet", "http_table"),
("sftp://example.com/path/to/functional_alltypes.parquet", "sftp_table"),
],
)
@pytest.mark.notimpl(
jitingxu1 marked this conversation as resolved.
Show resolved Hide resolved
[
"druid",
"flink",
"duckdb",
"pandas",
"polars",
"bigquery",
"dask",
"clickhouse",
"datafusion",
"snowflake",
"pyspark",
]
)
def test_read_parquet_url_request(con, url, data_dir, in_table_name, monkeypatch):
pytest.importorskip("pyarrow.parquet")

headers = {"User-Agent": "test-agent"}
fname = Path("functional_alltypes.parquet")
fname = Path(data_dir) / "parquet" / fname.name
mock_calls = []

mock_request = mock.create_autospec(urllib.request.Request)

def mock_urlopen(request, *args, **kwargs):
mock_calls.append((request, args, kwargs))
return open(fname, "rb") # noqa: SIM115

monkeypatch.setattr("urllib.request.Request", mock_request)
monkeypatch.setattr("urllib.request.urlopen", mock_urlopen)

table = con.read_parquet(url, in_table_name, headers=headers)

mock_request.assert_called_once_with(url, headers=headers)
called_url = mock_request.call_args[0][0]
called_headers = mock_request.call_args[1]
assert url == called_url
assert called_headers["headers"] == headers
assert len(mock_calls) == 1
assert table.count().execute()

if in_table_name is not None:
assert table.op().name == in_table_name


@pytest.fixture(scope="module")
def ft_data(data_dir):
pq = pytest.importorskip("pyarrow.parquet")
Expand All @@ -456,19 +518,8 @@ def ft_data(data_dir):
return table.slice(0, nrows)


@pytest.mark.notyet(
[
"flink",
"impala",
"mssql",
"mysql",
"pandas",
"postgres",
"risingwave",
"sqlite",
"trino",
]
)
@pytest.mark.notyet(["flink", "pandas"])
@pytest.mark.notimpl(["druid"])
def test_read_parquet_glob(con, tmp_path, ft_data):
pq = pytest.importorskip("pyarrow.parquet")

Expand Down Expand Up @@ -498,6 +549,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data):
"trino",
]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv_glob(con, tmp_path, ft_data):
pc = pytest.importorskip("pyarrow.csv")

Expand Down Expand Up @@ -534,6 +586,7 @@ def test_read_csv_glob(con, tmp_path, ft_data):
raises=ValueError,
reason="read_json() missing required argument: 'schema'",
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_json_glob(con, tmp_path, ft_data):
nrows = len(ft_data)
ntables = 2
Expand Down Expand Up @@ -580,6 +633,7 @@ def num_diamonds(data_dir):
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "risingwave", "sqlite", "trino"]
)
@pytest.mark.notimpl(["druid", "exasol", "oracle"])
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
fname = "diamonds.csv"
with pushd(data_dir / "csv"):
Expand Down
67 changes: 66 additions & 1 deletion ibis/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

import pytest

from ibis.util import PseudoHashable, flatten_iterable, import_object
from ibis.util import (
PseudoHashable,
flatten_iterable,
import_object,
is_fsspec_url,
is_url,
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -51,6 +57,65 @@ def test_import_object():
import_object("collections.this_attribute_doesnt_exist")


@pytest.mark.parametrize(
("url", "expected"),
[
("http://example.com", True), # Valid http URL
("https://example.com", True), # Valid https URL
("ftp://example.com", True), # Valid ftp URL
("sftp://example.com", True), # Valid sftp URL
("ws://example.com", True), # Valid WebSocket URL
("wss://example.com", True), # Valid WebSocket Secure URL
("file:///home/user/file.txt", True), # Valid file URL
("mailto:example@example.com", False), # Invalid URL with non-supported scheme
("http://localhost:8000", True), # Valid URL with port
("ftp://192.168.1.1", True), # Valid URL with IP address
("https://example.com/path/to/resource", True), # Valid URL with path
("http://user:pass@example.com", True), # Valid URL with credentials
("ftp://example.com/resource", True), # Valid FTP URL with resource
("telnet://example.com", True), # Valid Telnet URL
("git://example.com/repo.git", True), # Valid Git URL
("sip://example.com", True), # Valid SIP URL
("sips://example.com", True), # Valid SIPS URL
("invalid://example.com", False), # Invalid URL with unknown scheme
],
)
def test_is_url(url, expected):
assert is_url(url) == expected


@pytest.mark.parametrize(
("url", "expected"),
[
("s3://bucket/path/to/file", True), # Valid fsspec URL
("ftp://example.com/file.txt", True), # Valid fsspec URL
("gs://bucket/path/to/file", True), # Valid fsspec URL
("http://example.com/file.txt", False), # Invalid URL (HTTP)
("https://example.com/file.txt", False), # Invalid URL (HTTPS)
("file://localhost/path/to/file", True), # Valid fsspec URL
("mailto:user@example.com", False), # Invalid URL
(
"ftp://user:pass@example.com/path/to/file",
True,
), # Valid fsspec URL with credentials
("ftp://example.com", True), # Valid fsspec URL without file
("", False), # Empty string (invalid URL)
("invalid://path/to/file", True), # Invalid scheme but valid format
("http://localhost:8000", False), # Invalid URL (HTTP with port)
(
"https://192.168.1.1/path/to/file",
False,
), # Invalid URL (HTTPS with IP address)
(
"file:/path/to/file",
False,
), # Invalid URL (missing double slashes after file:)
],
)
def test_is_fsspec_url(url, expected):
assert is_fsspec_url(url) == expected


# TODO(kszucs): add tests for promote_list and promote_tuple


Expand Down
Loading