From 22cf0f5dfcfbddd5506fdaf260e485bff1b88ef1 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sun, 26 Apr 2020 22:41:15 +0100 Subject: [PATCH] IO: Fix parquet read from s3 directory (#33632) --- doc/source/whatsnew/v1.1.0.rst | 2 ++ pandas/io/common.py | 27 ++++++++++++++++++++++ pandas/io/gcs.py | 6 ++++- pandas/io/parquet.py | 33 +++++++++++++++------------ pandas/io/s3.py | 8 +++++-- pandas/tests/io/test_parquet.py | 40 +++++++++++++++++++++++++++------ 6 files changed, 92 insertions(+), 24 deletions(-) diff --git a/doc/source/whatsnew/v1.1.0.rst b/doc/source/whatsnew/v1.1.0.rst index 3a3525a7ec238..719178a67459d 100644 --- a/doc/source/whatsnew/v1.1.0.rst +++ b/doc/source/whatsnew/v1.1.0.rst @@ -626,6 +626,8 @@ I/O - Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`) - Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`) - Bug in :meth:`~DataFrame.read_feather` was raising an `ArrowIOError` when reading an s3 or http file path (:issue:`29055`) +- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`) +- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`) Plotting ^^^^^^^^ diff --git a/pandas/io/common.py b/pandas/io/common.py index dd3d205ca90eb..8349acafca1e3 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -150,6 +150,33 @@ def urlopen(*args, **kwargs): return urllib.request.urlopen(*args, **kwargs) +def get_fs_for_path(filepath: str): + """ + Get appropriate filesystem given a filepath. + Supports s3fs, gcs and local file system. + + Parameters + ---------- + filepath : str + File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj + + Returns + ------- + s3fs.S3FileSystem, gcsfs.GCSFileSystem, None + Appropriate FileSystem to use. None for local filesystem. + """ + if is_s3_url(filepath): + from pandas.io import s3 + + return s3.get_fs() + elif is_gcs_url(filepath): + from pandas.io import gcs + + return gcs.get_fs() + else: + return None + + def get_filepath_or_buffer( filepath_or_buffer: FilePathOrBuffer, encoding: Optional[str] = None, diff --git a/pandas/io/gcs.py b/pandas/io/gcs.py index 1f5e0faedc6d2..d2d8fc2d2139f 100644 --- a/pandas/io/gcs.py +++ b/pandas/io/gcs.py @@ -6,6 +6,10 @@ ) +def get_fs(): + return gcsfs.GCSFileSystem() + + def get_filepath_or_buffer( filepath_or_buffer, encoding=None, compression=None, mode=None ): @@ -13,6 +17,6 @@ def get_filepath_or_buffer( if mode is None: mode = "rb" - fs = gcsfs.GCSFileSystem() + fs = get_fs() filepath_or_buffer = fs.open(filepath_or_buffer, mode) return filepath_or_buffer, None, compression, True diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 068210eddcc1b..0a9daea105b64 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -8,7 +8,12 @@ from pandas import DataFrame, get_option -from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url +from pandas.io.common import ( + get_filepath_or_buffer, + get_fs_for_path, + is_gcs_url, + is_s3_url, +) def get_engine(engine: str) -> "BaseImpl": @@ -92,13 +97,15 @@ def write( **kwargs, ): self.validate_dataframe(df) - path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") + file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} if index is not None: from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) + # write_to_dataset does not support a file-like object when + # a dircetory path is used, so just pass the path string. if partition_cols is not None: self.api.parquet.write_to_dataset( table, @@ -108,20 +115,18 @@ def write( **kwargs, ) else: - self.api.parquet.write_table(table, path, compression=compression, **kwargs) + self.api.parquet.write_table( + table, file_obj_or_path, compression=compression, **kwargs + ) if should_close: - path.close() + file_obj_or_path.close() def read(self, path, columns=None, **kwargs): - path, _, _, should_close = get_filepath_or_buffer(path) - - kwargs["use_pandas_metadata"] = True - result = self.api.parquet.read_table( - path, columns=columns, **kwargs - ).to_pandas() - if should_close: - path.close() - + parquet_ds = self.api.parquet.ParquetDataset( + path, filesystem=get_fs_for_path(path), **kwargs + ) + kwargs["columns"] = columns + result = parquet_ds.read_pandas(**kwargs).to_pandas() return result @@ -273,7 +278,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs): A file URL can also be a path to a directory that contains multiple partitioned parquet files. Both pyarrow and fastparquet support paths to directories as well as file URLs. A directory path could be: - ``file://localhost/path/to/tables`` + ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir`` If you want to pass in a path object, pandas accepts any ``os.PathLike``. diff --git a/pandas/io/s3.py b/pandas/io/s3.py index 976c319f89d47..329c861d2386a 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -16,6 +16,10 @@ def _strip_schema(url): return result.netloc + result.path +def get_fs(): + return s3fs.S3FileSystem(anon=False) + + def get_file_and_filesystem( filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None ) -> Tuple[IO, Any]: @@ -24,7 +28,7 @@ def get_file_and_filesystem( if mode is None: mode = "rb" - fs = s3fs.S3FileSystem(anon=False) + fs = get_fs() try: file = fs.open(_strip_schema(filepath_or_buffer), mode) except (FileNotFoundError, NoCredentialsError): @@ -34,7 +38,7 @@ def get_file_and_filesystem( # aren't valid for that bucket. # A NoCredentialsError is raised if you don't have creds # for that bucket. - fs = s3fs.S3FileSystem(anon=True) + fs = get_fs() file = fs.open(_strip_schema(filepath_or_buffer), mode) return file, fs diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 280424c68297f..8a43d4079159b 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1,7 +1,6 @@ """ test parquet compat """ import datetime from distutils.version import LooseVersion -import locale import os from warnings import catch_warnings @@ -131,6 +130,7 @@ def check_round_trip( read_kwargs=None, expected=None, check_names=True, + check_like=False, repeat=2, ): """Verify parquet serializer and deserializer produce the same results. @@ -150,6 +150,8 @@ def check_round_trip( Expected deserialization result, otherwise will be equal to `df` check_names: list of str, optional Closed set of column names to be compared + check_like: bool, optional + If True, ignore the order of index & columns. repeat: int, optional How many times to repeat the test """ @@ -169,7 +171,9 @@ def compare(repeat): with catch_warnings(record=True): actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names) + tm.assert_frame_equal( + expected, actual, check_names=check_names, check_like=check_like + ) if path is None: with tm.ensure_clean() as path: @@ -532,15 +536,37 @@ def test_categorical(self, pa): expected = df.astype(object) check_round_trip(df, pa, expected=expected) - # GH#33077 2020-03-27 - @pytest.mark.xfail( - locale.getlocale()[0] == "zh_CN", - reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'", - ) def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") + @td.skip_if_no("s3fs") + @pytest.mark.parametrize("partition_col", [["A"], []]) + def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): + from pandas.io.s3 import get_fs as get_s3_fs + + # GH #26388 + # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 + # As per pyarrow partitioned columns become 'categorical' dtypes + # and are added to back of dataframe on read + + expected_df = df_compat.copy() + if partition_col: + expected_df[partition_col] = expected_df[partition_col].astype("category") + check_round_trip( + df_compat, + pa, + expected=expected_df, + path="s3://pandas-test/parquet_dir", + write_kwargs={ + "partition_cols": partition_col, + "compression": None, + "filesystem": get_s3_fs(), + }, + check_like=True, + repeat=1, + ) + def test_partition_cols_supported(self, pa, df_full): # GH #23283 partition_cols = ["bool", "int"]