Skip to content

Commit

Permalink
TEST-modin-project#6830: Use local s3 server instead of public s3 buc…
Browse files Browse the repository at this point in the history
…kets (modin-project#6863)

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev authored Jan 18, 2024
1 parent 602f866 commit 4f91d24
Show file tree
Hide file tree
Showing 30 changed files with 122 additions and 48 deletions.
18 changes: 17 additions & 1 deletion modin/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,10 +671,26 @@ def s3_resource(s3_base):
raise RuntimeError("Could not create bucket")

s3fs.S3FileSystem.clear_instance_cache()
yield conn

s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})

test_s3_files = [
("modin-bugs/multiple_csv/", "modin/pandas/test/data/multiple_csv/"),
(
"modin-bugs/test_data_dir.parquet/",
"modin/pandas/test/data/test_data_dir.parquet/",
),
("modin-bugs/test_data.parquet", "modin/pandas/test/data/test_data.parquet"),
("modin-bugs/test_data.json", "modin/pandas/test/data/test_data.json"),
("modin-bugs/test_data.fwf", "modin/pandas/test/data/test_data.fwf"),
("modin-bugs/test_data.feather", "modin/pandas/test/data/test_data.feather"),
("modin-bugs/issue5159.parquet/", "modin/pandas/test/data/issue5159.parquet/"),
]
for s3_key, file_name in test_s3_files:
s3.put(file_name, f"{bucket}/{s3_key}", recursive=s3_key.endswith("/"))

yield conn

s3.rm(bucket, recursive=True)
for _ in range(20):
# We want to wait until the deletion finishes.
Expand Down
6 changes: 5 additions & 1 deletion modin/core/io/column_stores/feather_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,9 @@ def _read(cls, path, columns=None, **kwargs):
# Filtering out the columns that describe the frame's index
columns = [col for col in reader.schema.names if col not in index_cols]
return cls.build_query_compiler(
path, columns, use_threads=False, dtype_backend=kwargs["dtype_backend"]
path,
columns,
use_threads=False,
storage_options=kwargs["storage_options"],
dtype_backend=kwargs["dtype_backend"],
)
17 changes: 14 additions & 3 deletions modin/core/io/text/json_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def _read(cls, path_or_buf, **kwargs):
path_or_buf = stringify_path(path_or_buf)
path_or_buf = cls.get_path_or_buffer(path_or_buf)
if isinstance(path_or_buf, str):
if not cls.file_exists(path_or_buf):
if not cls.file_exists(
path_or_buf, storage_options=kwargs.get("storage_options")
):
return cls.single_worker_read(
path_or_buf, reason=cls._file_not_found_msg(path_or_buf), **kwargs
)
Expand All @@ -60,12 +62,21 @@ def _read(cls, path_or_buf, **kwargs):
return cls.single_worker_read(
path_or_buf, reason="`lines` argument not supported", **kwargs
)
with OpenFile(path_or_buf, "rb") as f:
with OpenFile(
path_or_buf,
"rb",
**(kwargs.get("storage_options", None) or {}),
) as f:
columns = pandas.read_json(BytesIO(b"" + f.readline()), lines=True).columns
kwargs["columns"] = columns
empty_pd_df = pandas.DataFrame(columns=columns)

with OpenFile(path_or_buf, "rb", kwargs.get("compression", "infer")) as f:
with OpenFile(
path_or_buf,
"rb",
kwargs.get("compression", "infer"),
**(kwargs.get("storage_options", None) or {}),
) as f:
column_widths, num_splits = cls._define_metadata(empty_pd_df, columns)
args = {"fname": path_or_buf, "num_splits": num_splits, **kwargs}
splits, _ = cls.partitioned_file(
Expand Down
18 changes: 14 additions & 4 deletions modin/experimental/core/io/text/csv_glob_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ def _read(cls, filepath_or_buffer, **kwargs):
reason=cls._file_not_found_msg(filepath_or_buffer),
**kwargs,
)
filepath_or_buffer = cls.get_path(filepath_or_buffer)
filepath_or_buffer = cls.get_path(
filepath_or_buffer, kwargs.get("storage_options")
)
elif not cls.pathlib_or_pypath(filepath_or_buffer):
return cls.single_worker_read(
filepath_or_buffer,
Expand Down Expand Up @@ -314,14 +316,16 @@ def file_exists(cls, file_path: str, storage_options=None) -> bool:
return exists or len(fs.glob(file_path)) > 0

@classmethod
def get_path(cls, file_path: str) -> list:
def get_path(cls, file_path: str, storage_options=None) -> list:
"""
Return the path of the file(s).
Parameters
----------
file_path : str
String representing a path.
storage_options : dict, optional
Keyword from `read_*` functions.
Returns
-------
Expand Down Expand Up @@ -363,11 +367,17 @@ def get_file_path(fs_handle) -> List[str]:
fs_addresses = [fs_handle.unstrip_protocol(path) for path in file_paths]
return fs_addresses

fs, _ = fsspec.core.url_to_fs(file_path)
if storage_options is not None:
new_storage_options = dict(storage_options)
new_storage_options.pop("anon", None)
else:
new_storage_options = {}

fs, _ = fsspec.core.url_to_fs(file_path, **new_storage_options)
try:
return get_file_path(fs)
except credential_error_type:
fs, _ = fsspec.core.url_to_fs(file_path, anon=True)
fs, _ = fsspec.core.url_to_fs(file_path, anon=True, **new_storage_options)
return get_file_path(fs)

@classmethod
Expand Down
28 changes: 17 additions & 11 deletions modin/experimental/pandas/test/test_io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,15 @@ def test_read_single_csv_with_parse_dates(self, parse_dates):
@pytest.mark.parametrize(
"path",
[
"s3://modin-datasets/testing/multiple_csv/test_data*.csv",
"s3://modin-test/modin-bugs/multiple_csv/test_data*.csv",
"gs://modin-testing/testing/multiple_csv/test_data*.csv",
],
)
def test_read_multiple_csv_cloud_store(path):
def test_read_multiple_csv_cloud_store(path, s3_resource, s3_storage_options):
storage_options_new = {"anon": True}
if path.startswith("s3"):
storage_options_new = s3_storage_options

def _pandas_read_csv_glob(path, storage_options):
pandas_dfs = [
pandas.read_csv(
Expand All @@ -198,7 +202,7 @@ def _pandas_read_csv_glob(path, storage_options):
lambda module, **kwargs: pd.read_csv_glob(path, **kwargs).reset_index(drop=True)
if hasattr(module, "read_csv_glob")
else _pandas_read_csv_glob(path, **kwargs),
storage_options={"anon": True},
storage_options=storage_options_new,
)


Expand All @@ -207,17 +211,19 @@ def _pandas_read_csv_glob(path, storage_options):
reason=f"{Engine.get()} does not have experimental API",
)
@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_multiple_csv_s3_storage_opts(storage_options):
path = "s3://modin-datasets/testing/multiple_csv/"
def test_read_multiple_csv_s3_storage_opts(
s3_resource, s3_storage_options, storage_options_extra
):
s3_path = "s3://modin-test/modin-bugs/multiple_csv/"

def _pandas_read_csv_glob(path, storage_options):
pandas_df = pandas.concat(
[
pandas.read_csv(
f"{path}test_data{i}.csv",
f"{s3_path}test_data{i}.csv",
storage_options=storage_options,
)
for i in range(2)
Expand All @@ -228,10 +234,10 @@ def _pandas_read_csv_glob(path, storage_options):
eval_general(
pd,
pandas,
lambda module, **kwargs: pd.read_csv_glob(path, **kwargs)
lambda module, **kwargs: pd.read_csv_glob(s3_path, **kwargs)
if hasattr(module, "read_csv_glob")
else _pandas_read_csv_glob(path, **kwargs),
storage_options=storage_options,
else _pandas_read_csv_glob(s3_path, **kwargs),
storage_options=s3_storage_options | storage_options_extra,
)


Expand Down
Binary file not shown.
Binary file not shown.
5 changes: 5 additions & 0 deletions modin/pandas/test/data/multiple_csv/test_data0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
a,b,c
0,True,x
1,False,y
2,True,z
3,False,w
5 changes: 5 additions & 0 deletions modin/pandas/test/data/multiple_csv/test_data1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
a,b,c
4,True,m
5,False,n
6,True,t
7,True,l
Binary file added modin/pandas/test/data/test_data.feather
Binary file not shown.
6 changes: 6 additions & 0 deletions modin/pandas/test/data/test_data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"Duration":60,"Pulse":110,"Maxpulse":130,"Calories":409}
{"Duration":60,"Pulse":117,"Maxpulse":145,"Calories":479}
{"Duration":60,"Pulse":103,"Maxpulse":135,"Calories":340}
{"Duration":45,"Pulse":109,"Maxpulse":175,"Calories":282}
{"Duration":45,"Pulse":117,"Maxpulse":148,"Calories":406}
{"Duration":60,"Pulse":102,"Maxpulse":127,"Calories":300}
Binary file added modin/pandas/test/data/test_data.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
65 changes: 37 additions & 28 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1892,26 +1892,29 @@ def test_read_parquet_hdfs(self, engine):
"path_type",
["object", "directory", "url"],
)
def test_read_parquet_s3(self, path_type, engine):
dataset_url = "s3://modin-datasets/testing/test_data.parquet"
def test_read_parquet_s3(self, s3_resource, path_type, engine, s3_storage_options):
s3_path = "s3://modin-test/modin-bugs/test_data.parquet"
if path_type == "object":
import s3fs

fs = s3fs.S3FileSystem(anon=True)
with fs.open(dataset_url, "rb") as file_obj:
fs = s3fs.S3FileSystem(
endpoint_url=s3_storage_options["client_kwargs"]["endpoint_url"]
)
with fs.open(s3_path, "rb") as file_obj:
eval_io("read_parquet", path=file_obj, engine=engine)
elif path_type == "directory":
s3_path = "s3://modin-test/modin-bugs/test_data_dir.parquet"
eval_io(
"read_parquet",
path="s3://modin-datasets/test_data_dir.parquet",
storage_options={"anon": True},
path=s3_path,
storage_options=s3_storage_options,
engine=engine,
)
else:
eval_io(
"read_parquet",
path=dataset_url,
storage_options={"anon": True},
path=s3_path,
storage_options=s3_storage_options,
engine=engine,
)

Expand Down Expand Up @@ -2031,15 +2034,16 @@ def test_read_parquet_5767(self, tmp_path, engine):
# both Modin and pandas read column "b" as a category
df_equals(test_df, read_df.astype("int64"))

def test_read_parquet_s3_with_column_partitioning(self, engine):
# This test case comes from
def test_read_parquet_s3_with_column_partitioning(
self, s3_resource, engine, s3_storage_options
):
# https://github.com/modin-project/modin/issues/4636
dataset_url = "s3://modin-datasets/modin-bugs/modin_bug_5159_parquet/df.parquet"
s3_path = "s3://modin-test/modin-bugs/issue5159.parquet"
eval_io(
fn_name="read_parquet",
path=dataset_url,
path=s3_path,
engine=engine,
storage_options={"anon": True},
storage_options=s3_storage_options,
)


Expand Down Expand Up @@ -2087,16 +2091,17 @@ def comparator(df1, df2):
)

@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_json_s3(self, storage_options):
def test_read_json_s3(self, s3_resource, s3_storage_options, storage_options_extra):
s3_path = "s3://modin-test/modin-bugs/test_data.json"
eval_io(
fn_name="read_json",
path_or_buf="s3://modin-datasets/testing/test_data.json",
path_or_buf=s3_path,
lines=True,
orient="records",
storage_options=storage_options,
storage_options=s3_storage_options | storage_options_extra,
)

def test_read_json_categories(self):
Expand Down Expand Up @@ -2882,14 +2887,15 @@ def test_read_fwf_empty_frame(self, make_fwf_file):
df_equals(modin_df, pandas_df)

@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_fwf_s3(self, storage_options):
def test_read_fwf_s3(self, s3_resource, s3_storage_options, storage_options_extra):
s3_path = "s3://modin-test/modin-bugs/test_data.fwf"
eval_io(
fn_name="read_fwf",
filepath_or_buffer="s3://modin-datasets/testing/test_data.fwf",
storage_options=storage_options,
filepath_or_buffer=s3_path,
storage_options=s3_storage_options | storage_options_extra,
)


Expand Down Expand Up @@ -2980,14 +2986,17 @@ def comparator(df1, df2):
)

@pytest.mark.parametrize(
"storage_options",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}, None],
"storage_options_extra",
[{"anon": False}, {"anon": True}, {"key": "123", "secret": "123"}],
)
def test_read_feather_s3(self, storage_options):
def test_read_feather_s3(
self, s3_resource, s3_storage_options, storage_options_extra
):
s3_path = "s3://modin-test/modin-bugs/test_data.feather"
eval_io(
fn_name="read_feather",
path="s3://modin-datasets/testing/test_data.feather",
storage_options=storage_options,
path=s3_path,
storage_options=s3_storage_options | storage_options_extra,
)

def test_read_feather_path_object(self, make_feather_file):
Expand Down
2 changes: 2 additions & 0 deletions modin/test/test_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def test_line_endings():
if any(i in subdir for i in [".git", ".idea", "__pycache__"]):
continue
for file in files:
if file.endswith(".parquet"):
continue
filepath = os.path.join(subdir, file)
with open(filepath, "rb+") as f:
file_contents = f.read()
Expand Down

0 comments on commit 4f91d24

Please sign in to comment.