From ec073fabefbb55bfac565fca2f63a0ddea1288ba Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Mon, 1 Apr 2024 10:43:07 -0500 Subject: [PATCH 1/3] fix: Iceberg schema evolution fails for map or array types --- awswrangler/_data_types.py | 2 +- tests/unit/test_athena_iceberg.py | 53 +++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 362601906..399aeb4fa 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -375,7 +375,7 @@ def athena2pandas(dtype: str, dtype_backend: str | None = None) -> str: # noqa: return "decimal" if dtype_backend != "pyarrow" else "double[pyarrow]" if dtype in ("binary", "varbinary"): return "bytes" if dtype_backend != "pyarrow" else "binary[pyarrow]" - if dtype in ("array", "row", "map"): + if any(dtype.startswith(t) for t in ["array", "row", "map"]): return "object" if dtype == "geometry": return "string" diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 65c604f6b..8a9055c4a 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -924,3 +924,56 @@ def test_to_iceberg_uppercase_columns( ) assert_pandas_equals(df, df_output) + + +def test_to_iceberg_fill_missing_columns( + path: str, + path2: str, + glue_database: str, + glue_table: str, +) -> None: + df_with_col = pd.DataFrame( + { + "partition": [1, 1, 2, 2], + "column2": ["A", "B", "C", "D"], + "map_col": [{"s": "d"}, {"s": "h"}, {"i": "l"}, {}], + } + ) + df_missing_col = pd.DataFrame( + { + "partition": [2, 2], + "column2": ["Z", "X"], + } + ) + + glue_dtypes = { + "partition": "int", + "column2": "string", + "map_col": "map", + } + + wr.athena.to_iceberg( + df=df_with_col, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + dtype=glue_dtypes, + mode="overwrite_partitions", + partition_cols=["partition"], + ) + + wr.athena.to_iceberg( + df=df_missing_col, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + dtype=glue_dtypes, + mode="overwrite_partitions", + partition_cols=["partition"], + schema_evolution=True, + fill_missing_columns_in_df=True, + ) From e7693913a3463baef745db09054e242a43565b45 Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Mon, 1 Apr 2024 10:55:03 -0500 Subject: [PATCH 2/3] rename test --- tests/unit/test_athena_iceberg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 8a9055c4a..6e0b02dbe 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -926,7 +926,7 @@ def test_to_iceberg_uppercase_columns( assert_pandas_equals(df, df_output) -def test_to_iceberg_fill_missing_columns( +def test_to_iceberg_fill_missing_columns_map( path: str, path2: str, glue_database: str, From 83c3a129f954ce631e1a3a4d8f9747d41053225a Mon Sep 17 00:00:00 2001 From: Leon Luttenberger Date: Tue, 2 Apr 2024 08:51:00 -0500 Subject: [PATCH 3/3] fix struct types --- awswrangler/_data_types.py | 3 ++- tests/unit/test_athena_iceberg.py | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 399aeb4fa..72c4149ac 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -308,6 +308,7 @@ def _split_map(s: str) -> list[str]: def athena2pyarrow(dtype: str) -> pa.DataType: # noqa: PLR0911,PLR0912 """Athena to PyArrow data types conversion.""" + dtype = dtype.strip() if dtype.startswith(("array", "struct", "map")): orig_dtype: str = dtype dtype = dtype.lower().replace(" ", "") @@ -375,7 +376,7 @@ def athena2pandas(dtype: str, dtype_backend: str | None = None) -> str: # noqa: return "decimal" if dtype_backend != "pyarrow" else "double[pyarrow]" if dtype in ("binary", "varbinary"): return "bytes" if dtype_backend != "pyarrow" else "binary[pyarrow]" - if any(dtype.startswith(t) for t in ["array", "row", "map"]): + if any(dtype.startswith(t) for t in ["array", "row", "map", "struct"]): return "object" if dtype == "geometry": return "string" diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 6e0b02dbe..032e53755 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -926,7 +926,7 @@ def test_to_iceberg_uppercase_columns( assert_pandas_equals(df, df_output) -def test_to_iceberg_fill_missing_columns_map( +def test_to_iceberg_fill_missing_columns_with_complex_types( path: str, path2: str, glue_database: str, @@ -937,6 +937,12 @@ def test_to_iceberg_fill_missing_columns_map( "partition": [1, 1, 2, 2], "column2": ["A", "B", "C", "D"], "map_col": [{"s": "d"}, {"s": "h"}, {"i": "l"}, {}], + "struct_col": [ + {"a": "val1", "b": {"c": "val21"}}, + {"a": "val1", "b": {"c": None}}, + {"a": "val1", "b": None}, + {}, + ], } ) df_missing_col = pd.DataFrame( @@ -950,6 +956,7 @@ def test_to_iceberg_fill_missing_columns_map( "partition": "int", "column2": "string", "map_col": "map", + "struct_col": "struct>", } wr.athena.to_iceberg(