Skip to content

Commit

Permalink
IO: Fix parquet read from s3 directory (#33632)
Browse files Browse the repository at this point in the history
  • Loading branch information
alimcmaster1 authored Apr 26, 2020
1 parent 64d544c commit 22cf0f5
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 24 deletions.
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v1.1.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,8 @@ I/O
- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`)
- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`)
- Bug in :meth:`~DataFrame.read_feather` was raising an `ArrowIOError` when reading an s3 or http file path (:issue:`29055`)
- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`)
- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`)

Plotting
^^^^^^^^
Expand Down
27 changes: 27 additions & 0 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,33 @@ def urlopen(*args, **kwargs):
return urllib.request.urlopen(*args, **kwargs)


def get_fs_for_path(filepath: str):
"""
Get appropriate filesystem given a filepath.
Supports s3fs, gcs and local file system.
Parameters
----------
filepath : str
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj
Returns
-------
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
Appropriate FileSystem to use. None for local filesystem.
"""
if is_s3_url(filepath):
from pandas.io import s3

return s3.get_fs()
elif is_gcs_url(filepath):
from pandas.io import gcs

return gcs.get_fs()
else:
return None


def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
Expand Down
6 changes: 5 additions & 1 deletion pandas/io/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
)


def get_fs():
return gcsfs.GCSFileSystem()


def get_filepath_or_buffer(
filepath_or_buffer, encoding=None, compression=None, mode=None
):

if mode is None:
mode = "rb"

fs = gcsfs.GCSFileSystem()
fs = get_fs()
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
return filepath_or_buffer, None, compression, True
33 changes: 19 additions & 14 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

from pandas import DataFrame, get_option

from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
from pandas.io.common import (
get_filepath_or_buffer,
get_fs_for_path,
is_gcs_url,
is_s3_url,
)


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -92,13 +97,15 @@ def write(
**kwargs,
):
self.validate_dataframe(df)
path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")

from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
if index is not None:
from_pandas_kwargs["preserve_index"] = index

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
# write_to_dataset does not support a file-like object when
# a dircetory path is used, so just pass the path string.
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table,
Expand All @@ -108,20 +115,18 @@ def write(
**kwargs,
)
else:
self.api.parquet.write_table(table, path, compression=compression, **kwargs)
self.api.parquet.write_table(
table, file_obj_or_path, compression=compression, **kwargs
)
if should_close:
path.close()
file_obj_or_path.close()

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)

kwargs["use_pandas_metadata"] = True
result = self.api.parquet.read_table(
path, columns=columns, **kwargs
).to_pandas()
if should_close:
path.close()

parquet_ds = self.api.parquet.ParquetDataset(
path, filesystem=get_fs_for_path(path), **kwargs
)
kwargs["columns"] = columns
result = parquet_ds.read_pandas(**kwargs).to_pandas()
return result


Expand Down Expand Up @@ -273,7 +278,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables``
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
If you want to pass in a path object, pandas accepts any
``os.PathLike``.
Expand Down
8 changes: 6 additions & 2 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def _strip_schema(url):
return result.netloc + result.path


def get_fs():
return s3fs.S3FileSystem(anon=False)


def get_file_and_filesystem(
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
) -> Tuple[IO, Any]:
Expand All @@ -24,7 +28,7 @@ def get_file_and_filesystem(
if mode is None:
mode = "rb"

fs = s3fs.S3FileSystem(anon=False)
fs = get_fs()
try:
file = fs.open(_strip_schema(filepath_or_buffer), mode)
except (FileNotFoundError, NoCredentialsError):
Expand All @@ -34,7 +38,7 @@ def get_file_and_filesystem(
# aren't valid for that bucket.
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
fs = get_fs()
file = fs.open(_strip_schema(filepath_or_buffer), mode)
return file, fs

Expand Down
40 changes: 33 additions & 7 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
""" test parquet compat """
import datetime
from distutils.version import LooseVersion
import locale
import os
from warnings import catch_warnings

Expand Down Expand Up @@ -131,6 +130,7 @@ def check_round_trip(
read_kwargs=None,
expected=None,
check_names=True,
check_like=False,
repeat=2,
):
"""Verify parquet serializer and deserializer produce the same results.
Expand All @@ -150,6 +150,8 @@ def check_round_trip(
Expected deserialization result, otherwise will be equal to `df`
check_names: list of str, optional
Closed set of column names to be compared
check_like: bool, optional
If True, ignore the order of index & columns.
repeat: int, optional
How many times to repeat the test
"""
Expand All @@ -169,7 +171,9 @@ def compare(repeat):
with catch_warnings(record=True):
actual = read_parquet(path, **read_kwargs)

tm.assert_frame_equal(expected, actual, check_names=check_names)
tm.assert_frame_equal(
expected, actual, check_names=check_names, check_like=check_like
)

if path is None:
with tm.ensure_clean() as path:
Expand Down Expand Up @@ -532,15 +536,37 @@ def test_categorical(self, pa):
expected = df.astype(object)
check_round_trip(df, pa, expected=expected)

# GH#33077 2020-03-27
@pytest.mark.xfail(
locale.getlocale()[0] == "zh_CN",
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
)
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
# GH #19134
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")

@td.skip_if_no("s3fs")
@pytest.mark.parametrize("partition_col", [["A"], []])
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
from pandas.io.s3 import get_fs as get_s3_fs

# GH #26388
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
# As per pyarrow partitioned columns become 'categorical' dtypes
# and are added to back of dataframe on read

expected_df = df_compat.copy()
if partition_col:
expected_df[partition_col] = expected_df[partition_col].astype("category")
check_round_trip(
df_compat,
pa,
expected=expected_df,
path="s3://pandas-test/parquet_dir",
write_kwargs={
"partition_cols": partition_col,
"compression": None,
"filesystem": get_s3_fs(),
},
check_like=True,
repeat=1,
)

def test_partition_cols_supported(self, pa, df_full):
# GH #23283
partition_cols = ["bool", "int"]
Expand Down

0 comments on commit 22cf0f5

Please sign in to comment.