From 76aef7038bb0f098e876cbf961f7eb45e6ab51ea Mon Sep 17 00:00:00 2001 From: Elia Zaides Date: Thu, 5 Sep 2024 13:16:17 -0700 Subject: [PATCH 1/6] Enable use of dataframe type, in athena2pyarrow type --- awswrangler/_arrow.py | 2 +- awswrangler/_data_types.py | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/awswrangler/_arrow.py b/awswrangler/_arrow.py index 796b30ddf..12c853fc0 100644 --- a/awswrangler/_arrow.py +++ b/awswrangler/_arrow.py @@ -119,7 +119,7 @@ def _df_to_table( for col_name, col_type in dtype.items(): if col_name in table.column_names: col_index = table.column_names.index(col_name) - pyarrow_dtype = athena2pyarrow(col_type) + pyarrow_dtype = athena2pyarrow(col_type, df.dtypes.get(col_name)) field = pa.field(name=col_name, type=pyarrow_dtype) table = table.set_column(col_index, field, table.column(col_name).cast(pyarrow_dtype)) _logger.debug("Casting column %s (%s) to %s (%s)", col_name, col_index, col_type, pyarrow_dtype) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 98ce476c9..6b895126a 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -306,7 +306,7 @@ def _split_map(s: str) -> list[str]: return parts -def athena2pyarrow(dtype: str) -> pa.DataType: # noqa: PLR0911,PLR0912 +def athena2pyarrow(dtype: str, df_type: str = None) -> pa.DataType: # noqa: PLR0911,PLR0912 """Athena to PyArrow data types conversion.""" dtype = dtype.strip() if dtype.startswith(("array", "struct", "map")): @@ -329,7 +329,18 @@ def athena2pyarrow(dtype: str) -> pa.DataType: # noqa: PLR0911,PLR0912 if (dtype in ("string", "uuid")) or dtype.startswith("char") or dtype.startswith("varchar"): return pa.string() if dtype == "timestamp": - return pa.timestamp(unit="ns") + if df_type: + match df_type: + case "datetime64[s]": + return pa.timestamp(unit="s") + case "datetime64[ms]": + return pa.timestamp(unit="ms") + case "datetime64[us]": + return pa.timestamp(unit="us") + case "datetime64[ns]": + return pa.timestamp(unit="ns") + case _: + return pa.timestamp(unit="ns") if dtype == "date": return pa.date32() if dtype in ("binary" or "varbinary"): @@ -701,7 +712,7 @@ def pyarrow_schema_from_pandas( ) for k, v in casts.items(): if (k not in ignore) and (k in df.columns or _is_index_name(k, df.index)): - columns_types[k] = athena2pyarrow(dtype=v) + columns_types[k] = athena2pyarrow(dtype=v, df_type=df.dtypes.get(k)) columns_types = {k: v for k, v in columns_types.items() if v is not None} _logger.debug("columns_types: %s", columns_types) return pa.schema(fields=columns_types) From b159e3f22574a5ac193ce9fecd7aaff00a8980a1 Mon Sep 17 00:00:00 2001 From: Elia Zaides Date: Thu, 5 Sep 2024 13:43:59 -0700 Subject: [PATCH 2/6] Ruff format check --- awswrangler/_data_types.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 6b895126a..78dc05148 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -331,15 +331,15 @@ def athena2pyarrow(dtype: str, df_type: str = None) -> pa.DataType: # noqa: PLR if dtype == "timestamp": if df_type: match df_type: - case "datetime64[s]": + case "datetime64[s]": return pa.timestamp(unit="s") - case "datetime64[ms]": + case "datetime64[ms]": return pa.timestamp(unit="ms") - case "datetime64[us]": + case "datetime64[us]": return pa.timestamp(unit="us") - case "datetime64[ns]": + case "datetime64[ns]": return pa.timestamp(unit="ns") - case _: + case _: return pa.timestamp(unit="ns") if dtype == "date": return pa.date32() From ea43b45014369772dd608db7e5185841ac474f13 Mon Sep 17 00:00:00 2001 From: Elia Zaides Date: Thu, 5 Sep 2024 13:58:50 -0700 Subject: [PATCH 3/6] Fix mypy error --- awswrangler/_data_types.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 78dc05148..19f0c57ec 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -12,7 +12,6 @@ import numpy as np import pandas as pd import pyarrow as pa -import pyarrow.parquet from awswrangler import _arrow, exceptions from awswrangler._distributed import engine @@ -306,7 +305,7 @@ def _split_map(s: str) -> list[str]: return parts -def athena2pyarrow(dtype: str, df_type: str = None) -> pa.DataType: # noqa: PLR0911,PLR0912 +def athena2pyarrow(dtype: str, df_type: str | None = None) -> pa.DataType: # noqa: PLR0911,PLR0912 """Athena to PyArrow data types conversion.""" dtype = dtype.strip() if dtype.startswith(("array", "struct", "map")): @@ -329,18 +328,16 @@ def athena2pyarrow(dtype: str, df_type: str = None) -> pa.DataType: # noqa: PLR if (dtype in ("string", "uuid")) or dtype.startswith("char") or dtype.startswith("varchar"): return pa.string() if dtype == "timestamp": - if df_type: - match df_type: - case "datetime64[s]": - return pa.timestamp(unit="s") - case "datetime64[ms]": - return pa.timestamp(unit="ms") - case "datetime64[us]": - return pa.timestamp(unit="us") - case "datetime64[ns]": - return pa.timestamp(unit="ns") - case _: - return pa.timestamp(unit="ns") + if df_type == "datetime64[ns]": + return pa.timestamp(unit="ns") + elif df_type == "datetime64[us]": + return pa.timestamp(unit="us") + elif df_type == "datetime64[ms]": + return pa.timestamp(unit="ms") + elif df_type == "datetime64[s]": + return pa.timestamp(unit="s") + else: + return pa.timestamp(unit="ns") if dtype == "date": return pa.date32() if dtype in ("binary" or "varbinary"): From 2b3a8957f0ab0df2d062d90453c14c6243c86ee9 Mon Sep 17 00:00:00 2001 From: Elia Zaides Date: Thu, 5 Sep 2024 15:30:32 -0700 Subject: [PATCH 4/6] Add test to verify write was successful --- tests/unit/test_s3_parquet.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/unit/test_s3_parquet.py b/tests/unit/test_s3_parquet.py index 97d568fb4..19e848eeb 100644 --- a/tests/unit/test_s3_parquet.py +++ b/tests/unit/test_s3_parquet.py @@ -1032,3 +1032,37 @@ def test_read_from_access_point(access_point_path_path: str) -> None: wr.s3.to_parquet(df, path) df_out = wr.s3.read_parquet(path) assert df_out.shape == (3, 3) + + +@pytest.mark.parametrize("use_threads", [True, False, 2]) +def test_save_dataframe_with_ms_units(path, glue_database, glue_table): + df = pd.DataFrame( + { + "c0": [ + "2023-01-01 00:00:00.000", + "2023-01-02 00:00:00.000", + "0800-01-01 00:00:00.000", # Out-of-bounds timestamp + "2977-09-21 00:12:43.000", + ] + } + ) + + wr.s3.to_parquet( + df, + path, + dataset=True, + database=glue_database, + table=glue_table, + ) + + # Saving exactly the same data twice. This ensures that even if the athena table exists, the flow of using its metadata + # to identify the schema of the data is working correctly. + wr.s3.to_parquet( + df, + path, + dataset=True, + database=glue_database, + table=glue_table, + ) + df_out = wr.s3.read_parquet_table(table=glue_table, database=glue_database) + assert df_out.shape == (8, 1) From ea75b4125d2c253875644cee920a963d446f5ef3 Mon Sep 17 00:00:00 2001 From: Elia Zaides Date: Thu, 5 Sep 2024 15:57:48 -0700 Subject: [PATCH 5/6] Fix test_save_dataframe_with_ms_units parameters --- tests/unit/test_s3_parquet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/test_s3_parquet.py b/tests/unit/test_s3_parquet.py index 19e848eeb..12c4123a3 100644 --- a/tests/unit/test_s3_parquet.py +++ b/tests/unit/test_s3_parquet.py @@ -1034,7 +1034,6 @@ def test_read_from_access_point(access_point_path_path: str) -> None: assert df_out.shape == (3, 3) -@pytest.mark.parametrize("use_threads", [True, False, 2]) def test_save_dataframe_with_ms_units(path, glue_database, glue_table): df = pd.DataFrame( { From b071f0b9c93c55762f178a9602394ad5297cb95b Mon Sep 17 00:00:00 2001 From: Elia Zaides Date: Thu, 5 Sep 2024 16:32:30 -0700 Subject: [PATCH 6/6] Add use_threads parameter --- tests/unit/test_s3_parquet.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_s3_parquet.py b/tests/unit/test_s3_parquet.py index 12c4123a3..b650100bc 100644 --- a/tests/unit/test_s3_parquet.py +++ b/tests/unit/test_s3_parquet.py @@ -1034,7 +1034,8 @@ def test_read_from_access_point(access_point_path_path: str) -> None: assert df_out.shape == (3, 3) -def test_save_dataframe_with_ms_units(path, glue_database, glue_table): +@pytest.mark.parametrize("use_threads", [True, False, 2]) +def test_save_dataframe_with_ms_units(path, glue_database, glue_table, use_threads): df = pd.DataFrame( { "c0": [ @@ -1052,6 +1053,7 @@ def test_save_dataframe_with_ms_units(path, glue_database, glue_table): dataset=True, database=glue_database, table=glue_table, + use_threads=use_threads, ) # Saving exactly the same data twice. This ensures that even if the athena table exists, the flow of using its metadata @@ -1062,6 +1064,7 @@ def test_save_dataframe_with_ms_units(path, glue_database, glue_table): dataset=True, database=glue_database, table=glue_table, + use_threads=use_threads, ) df_out = wr.s3.read_parquet_table(table=glue_table, database=glue_database) assert df_out.shape == (8, 1)