Skip to content

Commit

Permalink
Fix: Fix columns-to-types when inserting by time partition if the typ…
Browse files Browse the repository at this point in the history
…e of the time column is unknown (#3168)
  • Loading branch information
izeigerman authored Sep 24, 2024
1 parent a84afc0 commit 3bdfeec
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
3 changes: 2 additions & 1 deletion sqlmesh/core/engine_adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,8 @@ def insert_overwrite_by_time_partition(
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
query_or_df, columns_to_types, target_table=table_name
)
columns_to_types = columns_to_types or self.columns(table_name)
if not columns_to_types or not columns_to_types_all_known(columns_to_types):
columns_to_types = self.columns(table_name)
low, high = [time_formatter(dt, columns_to_types) for dt in make_inclusive(start, end)]
if isinstance(time_column, TimeColumn):
time_column = time_column.column
Expand Down
28 changes: 28 additions & 0 deletions tests/core/engine_adapter/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,34 @@ def test_insert_overwrite_by_time_partition(make_mocked_engine_adapter: t.Callab
]


def test_insert_overwrite_by_time_partition_missing_time_column_type(
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
):
adapter = make_mocked_engine_adapter(EngineAdapter)

columns_mock = mocker.patch.object(adapter, "columns")
columns_mock.return_value = {"a": exp.DataType.build("INT"), "b": exp.DataType.build("STRING")}

adapter.insert_overwrite_by_time_partition(
"test_table",
parse_one("SELECT a, b FROM tbl"),
start="2022-01-01",
end="2022-01-02",
time_column="b",
time_formatter=lambda x, _: exp.Literal.string(to_ds(x)),
columns_to_types={"a": exp.DataType.build("INT"), "b": exp.DataType.build("UNKNOWN")},
)

columns_mock.assert_called_once_with("test_table")
adapter.cursor.begin.assert_called_once()
adapter.cursor.commit.assert_called_once()

assert to_sql_calls(adapter) == [
"""DELETE FROM "test_table" WHERE "b" BETWEEN '2022-01-01' AND '2022-01-02'""",
"""INSERT INTO "test_table" ("a", "b") SELECT "a", "b" FROM (SELECT "a", "b" FROM "tbl") AS "_subquery" WHERE "b" BETWEEN '2022-01-01' AND '2022-01-02'""",
]


def test_insert_overwrite_by_time_partition_supports_insert_overwrite(
make_mocked_engine_adapter: t.Callable,
):
Expand Down

0 comments on commit 3bdfeec

Please sign in to comment.