From 1002708693643646d1b9cd7cb0d3f72eb11e5793 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Mon, 23 Sep 2024 20:27:05 -0700 Subject: [PATCH] Fix: Fix columns-to-types when inserting by time partition if the type of the time column is unknown (#3168) --- sqlmesh/core/engine_adapter/base.py | 3 ++- tests/core/engine_adapter/test_base.py | 28 ++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index c978fa8d1..9387cb522 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -1198,7 +1198,8 @@ def insert_overwrite_by_time_partition( source_queries, columns_to_types = self._get_source_queries_and_columns_to_types( query_or_df, columns_to_types, target_table=table_name ) - columns_to_types = columns_to_types or self.columns(table_name) + if not columns_to_types or not columns_to_types_all_known(columns_to_types): + columns_to_types = self.columns(table_name) low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)] if isinstance(time_column, TimeColumn): time_column = time_column.column diff --git a/tests/core/engine_adapter/test_base.py b/tests/core/engine_adapter/test_base.py index 4785ecfc3..e1055ebcc 100644 --- a/tests/core/engine_adapter/test_base.py +++ b/tests/core/engine_adapter/test_base.py @@ -184,6 +184,34 @@ def test_insert_overwrite_by_time_partition(make_mocked_engine_adapter: t.Callab ] +def test_insert_overwrite_by_time_partition_missing_time_column_type( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + adapter = make_mocked_engine_adapter(EngineAdapter) + + columns_mock = mocker.patch.object(adapter, "columns") + columns_mock.return_value = {"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")} + + adapter.insert_overwrite_by_time_partition( + "test_table", + parse_one("SELECT a, b FROM tbl"), + start="2022-01-01", + end="2022-01-02", + time_column="b", + time_formatter=lambda x, _: exp.Literal.string(to_ds(x)), + columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("UNKNOWN")}, + ) + + columns_mock.assert_called_once_with("test_table") + adapter.cursor.begin.assert_called_once() + adapter.cursor.commit.assert_called_once() + + assert to_sql_calls(adapter) == [ + """DELETE FROM "test_table" WHERE "b" BETWEEN '2022-01-01' AND '2022-01-02'""", + """INSERT INTO "test_table" ("a", "b") SELECT "a", "b" FROM (SELECT "a", "b" FROM "tbl") AS "_subquery" WHERE "b" BETWEEN '2022-01-01' AND '2022-01-02'""", + ] + + def test_insert_overwrite_by_time_partition_supports_insert_overwrite( make_mocked_engine_adapter: t.Callable, ):