From a215ac0125aad430152c305625e0bb7ebfd36c2e Mon Sep 17 00:00:00 2001 From: George Sittas Date: Tue, 5 Nov 2024 21:44:11 +0200 Subject: [PATCH] Fix!: bump sqlglot to v25.29.0, fix info schema view handling in bigquery --- setup.py | 2 +- sqlmesh/core/engine_adapter/bigquery.py | 2 +- .../engine_adapter/integration/__init__.py | 8 +- .../integration/test_integration_bigquery.py | 79 ++++++++++++++++--- tests/core/engine_adapter/test_bigquery.py | 8 +- tests/dbt/test_transformation.py | 2 +- 6 files changed, 80 insertions(+), 21 deletions(-) diff --git a/setup.py b/setup.py index dca36fdb1..c4de09592 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ "requests", "rich[jupyter]", "ruamel.yaml", - "sqlglot[rs]~=25.28.0", + "sqlglot[rs]~=25.29.0", "tenacity", ], extras_require={ diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 4c4e438b8..00b7ef107 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -234,7 +234,7 @@ def create_mapping_schema( } table = exp.to_table(table_name) - if len(table.parts) > 3: + if len(table.parts) == 3 and "." in table.name: # The client's `get_table` method can't handle paths with >3 identifiers self.execute(exp.select("*").from_(table).limit(1)) query_results = self._query_job._query_results diff --git a/tests/core/engine_adapter/integration/__init__.py b/tests/core/engine_adapter/integration/__init__.py index 1c47e6288..d41c4cb39 100644 --- a/tests/core/engine_adapter/integration/__init__.py +++ b/tests/core/engine_adapter/integration/__init__.py @@ -217,7 +217,7 @@ def table(self, table_name: TableName, schema: str = TEST_SCHEMA) -> exp.Table: schema = self.add_test_suffix(schema) self._schemas.append(schema) - table = exp.to_table(table_name) + table = exp.to_table(table_name, dialect=self.dialect) table.set("db", exp.parse_identifier(schema, dialect=self.dialect)) return exp.to_table( @@ -455,7 +455,9 @@ def get_column_comments( return comments def create_context( - self, config_mutator: t.Optional[t.Callable[[str, Config], None]] = None + self, + config_mutator: t.Optional[t.Callable[[str, Config], None]] = None, + path: t.Optional[pathlib.Path] = None, ) -> Context: private_sqlmesh_dir = pathlib.Path(pathlib.Path().home(), ".sqlmesh") config = load_config_from_paths( @@ -486,7 +488,7 @@ def create_context( # Ensure that s3_warehouse_location is propagated conn.s3_warehouse_location = self.engine_adapter.s3_warehouse_location - self._context = Context(paths=".", config=config, gateway=self.gateway) + self._context = Context(paths=path or ".", config=config, gateway=self.gateway) return self._context def create_catalog(self, catalog_name: str): diff --git a/tests/core/engine_adapter/integration/test_integration_bigquery.py b/tests/core/engine_adapter/integration/test_integration_bigquery.py index adebb6af3..8eed2c0b3 100644 --- a/tests/core/engine_adapter/integration/test_integration_bigquery.py +++ b/tests/core/engine_adapter/integration/test_integration_bigquery.py @@ -1,8 +1,11 @@ import typing as t import pytest +from pathlib import Path from sqlglot import exp from sqlglot.optimizer.qualify_columns import quote_identifiers from sqlglot.helper import seq_get +from sqlmesh.cli.example_project import ProjectTemplate, init_example_project +from sqlmesh.core.config import Config from sqlmesh.core.engine_adapter import BigQueryEngineAdapter from sqlmesh.core.engine_adapter.bigquery import _CLUSTERING_META_KEY from sqlmesh.core.engine_adapter.shared import DataObject @@ -180,18 +183,45 @@ def _get_data_object(table: exp.Table) -> DataObject: assert not metadata.is_clustered -def test_fetch_schema_of_information_schema_tables( - ctx: TestContext, engine_adapter: BigQueryEngineAdapter -): - # We produce Table(this=Dot(this=INFORMATION_SCHEMA, expression=TABLES)) here, - # otherwise `db` or `catalog` would be set, which is not the right representation - information_schema_tables = exp.to_table("_._.INFORMATION_SCHEMA.TABLES") - information_schema_tables.set("db", None) - information_schema_tables.set("catalog", None) +def test_information_schema_view_external_model(ctx: TestContext, tmp_path: Path): + # Information schema views are represented as: + # + # Table( + # this=Identifier(INFORMATION_SCHEMA.SOME_VIEW, quoted=True), + # db=Identifier(some_schema), + # catalog=Identifier(some_catalog)) + # + # This representation is produced by BigQuery's parser, so that the mapping schema + # nesting depth is consistent with other table references in a project, which will + # usually look like `project.dataset.table`. + information_schema_tables_view = ctx.table("INFORMATION_SCHEMA.TABLES") + assert len(information_schema_tables_view.parts) == 3 + + model_name = ctx.table("test") + dependency = f"`{'.'.join(part.name for part in information_schema_tables_view.parts)}`" + + init_example_project(tmp_path, dialect="bigquery", template=ProjectTemplate.EMPTY) + with open(tmp_path / "models" / "test.sql", "w", encoding="utf-8") as f: + f.write( + f""" + MODEL ( + name {model_name.sql("bigquery")}, + kind FULL, + dialect 'bigquery' + ); + + SELECT * FROM {dependency} AS tables + """ + ) - source = ctx.table(information_schema_tables) + def _mutate_config(_: str, config: Config) -> None: + config.model_defaults.dialect = "bigquery" - expected_columns_to_types = { + sqlmesh = ctx.create_context(_mutate_config, path=tmp_path) + sqlmesh.create_external_models() + sqlmesh.load() + + assert sqlmesh.get_model(information_schema_tables_view.sql()).columns_to_types == { "table_catalog": exp.DataType.build("TEXT"), "table_schema": exp.DataType.build("TEXT"), "table_name": exp.DataType.build("TEXT"), @@ -217,4 +247,31 @@ def test_fetch_schema_of_information_schema_tables( ), } - assert expected_columns_to_types == engine_adapter.columns(source.sql()) + rendered_query = sqlmesh.get_model(model_name.sql()).render_query() + assert isinstance(rendered_query, exp.Query) + + assert rendered_query.sql("bigquery", pretty=True) == ( + "SELECT\n" + " `tables`.`table_catalog` AS `table_catalog`,\n" + " `tables`.`table_schema` AS `table_schema`,\n" + " `tables`.`table_name` AS `table_name`,\n" + " `tables`.`table_type` AS `table_type`,\n" + " `tables`.`is_insertable_into` AS `is_insertable_into`,\n" + " `tables`.`is_typed` AS `is_typed`,\n" + " `tables`.`creation_time` AS `creation_time`,\n" + " `tables`.`base_table_catalog` AS `base_table_catalog`,\n" + " `tables`.`base_table_schema` AS `base_table_schema`,\n" + " `tables`.`base_table_name` AS `base_table_name`,\n" + " `tables`.`snapshot_time_ms` AS `snapshot_time_ms`,\n" + " `tables`.`ddl` AS `ddl`,\n" + " `tables`.`default_collation_name` AS `default_collation_name`,\n" + " `tables`.`upsert_stream_apply_watermark` AS `upsert_stream_apply_watermark`,\n" + " `tables`.`replica_source_catalog` AS `replica_source_catalog`,\n" + " `tables`.`replica_source_schema` AS `replica_source_schema`,\n" + " `tables`.`replica_source_name` AS `replica_source_name`,\n" + " `tables`.`replication_status` AS `replication_status`,\n" + " `tables`.`replication_error` AS `replication_error`,\n" + " `tables`.`is_change_history_enabled` AS `is_change_history_enabled`,\n" + " `tables`.`sync_status` AS `sync_status`\n" + f"FROM {dependency} AS `tables`" + ) diff --git a/tests/core/engine_adapter/test_bigquery.py b/tests/core/engine_adapter/test_bigquery.py index 20d7c711e..e218329e1 100644 --- a/tests/core/engine_adapter/test_bigquery.py +++ b/tests/core/engine_adapter/test_bigquery.py @@ -73,7 +73,7 @@ def test_insert_overwrite_by_partition_query( assert sql_calls == [ "CREATE SCHEMA IF NOT EXISTS `test_schema`", f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_id}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`", - f"DECLARE _sqlmesh_target_partitions_ ARRAY DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');", + f"DECLARE _sqlmesh_target_partitions_ ARRAY DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');", f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_id}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)", f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_id}`", ] @@ -120,7 +120,7 @@ def test_insert_overwrite_by_partition_query_unknown_column_types( assert sql_calls == [ "CREATE SCHEMA IF NOT EXISTS `test_schema`", f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_id}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`", - f"DECLARE _sqlmesh_target_partitions_ ARRAY DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');", + f"DECLARE _sqlmesh_target_partitions_ ARRAY DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');", f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_id}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)", f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_id}`", ] @@ -677,7 +677,7 @@ def test_select_partitions_expr(): granularity="day", catalog="{{ target.database }}", ) - == "SELECT MAX(PARSE_DATE('%Y%m%d', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'" + == "SELECT MAX(PARSE_DATE('%Y%m%d', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'" ) assert ( @@ -686,7 +686,7 @@ def test_select_partitions_expr(): "test_table", "int64", ) - == "SELECT MAX(CAST(partition_id AS INT64)) FROM `test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = 'test_table' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'" + == "SELECT MAX(CAST(partition_id AS INT64)) FROM `test_schema`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = 'test_table' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'" ) diff --git a/tests/dbt/test_transformation.py b/tests/dbt/test_transformation.py index 6050cbf3e..f3a09b069 100644 --- a/tests/dbt/test_transformation.py +++ b/tests/dbt/test_transformation.py @@ -1092,7 +1092,7 @@ def test_dbt_max_partition(sushi_test_project: Project, assert_exp_eq, mocker: M JINJA_STATEMENT_BEGIN; {% if is_incremental() %} DECLARE _dbt_max_partition DATETIME DEFAULT ( - COALESCE((SELECT MAX(PARSE_DATETIME('%Y%m', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'), CAST('1970-01-01' AS DATETIME)) + COALESCE((SELECT MAX(PARSE_DATETIME('%Y%m', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'), CAST('1970-01-01' AS DATETIME)) ); {% endif %} JINJA_END;""".strip()