Skip to content

Commit

Permalink
Fix!: bump sqlglot to v25.29.0, fix info schema view handling in bigq…
Browse files Browse the repository at this point in the history
…uery
  • Loading branch information
georgesittas committed Nov 6, 2024
1 parent dcfbc7d commit a215ac0
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 21 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"requests",
"rich[jupyter]",
"ruamel.yaml",
"sqlglot[rs]~=25.28.0",
"sqlglot[rs]~=25.29.0",
"tenacity",
],
extras_require={
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/engine_adapter/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def create_mapping_schema(
}

table = exp.to_table(table_name)
if len(table.parts) > 3:
if len(table.parts) == 3 and "." in table.name:
# The client's `get_table` method can't handle paths with >3 identifiers
self.execute(exp.select("*").from_(table).limit(1))
query_results = self._query_job._query_results
Expand Down
8 changes: 5 additions & 3 deletions tests/core/engine_adapter/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def table(self, table_name: TableName, schema: str = TEST_SCHEMA) -> exp.Table:
schema = self.add_test_suffix(schema)
self._schemas.append(schema)

table = exp.to_table(table_name)
table = exp.to_table(table_name, dialect=self.dialect)
table.set("db", exp.parse_identifier(schema, dialect=self.dialect))

return exp.to_table(
Expand Down Expand Up @@ -455,7 +455,9 @@ def get_column_comments(
return comments

def create_context(
self, config_mutator: t.Optional[t.Callable[[str, Config], None]] = None
self,
config_mutator: t.Optional[t.Callable[[str, Config], None]] = None,
path: t.Optional[pathlib.Path] = None,
) -> Context:
private_sqlmesh_dir = pathlib.Path(pathlib.Path().home(), ".sqlmesh")
config = load_config_from_paths(
Expand Down Expand Up @@ -486,7 +488,7 @@ def create_context(
# Ensure that s3_warehouse_location is propagated
conn.s3_warehouse_location = self.engine_adapter.s3_warehouse_location

self._context = Context(paths=".", config=config, gateway=self.gateway)
self._context = Context(paths=path or ".", config=config, gateway=self.gateway)
return self._context

def create_catalog(self, catalog_name: str):
Expand Down
79 changes: 68 additions & 11 deletions tests/core/engine_adapter/integration/test_integration_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import typing as t
import pytest
from pathlib import Path
from sqlglot import exp
from sqlglot.optimizer.qualify_columns import quote_identifiers
from sqlglot.helper import seq_get
from sqlmesh.cli.example_project import ProjectTemplate, init_example_project
from sqlmesh.core.config import Config
from sqlmesh.core.engine_adapter import BigQueryEngineAdapter
from sqlmesh.core.engine_adapter.bigquery import _CLUSTERING_META_KEY
from sqlmesh.core.engine_adapter.shared import DataObject
Expand Down Expand Up @@ -180,18 +183,45 @@ def _get_data_object(table: exp.Table) -> DataObject:
assert not metadata.is_clustered


def test_fetch_schema_of_information_schema_tables(
ctx: TestContext, engine_adapter: BigQueryEngineAdapter
):
# We produce Table(this=Dot(this=INFORMATION_SCHEMA, expression=TABLES)) here,
# otherwise `db` or `catalog` would be set, which is not the right representation
information_schema_tables = exp.to_table("_._.INFORMATION_SCHEMA.TABLES")
information_schema_tables.set("db", None)
information_schema_tables.set("catalog", None)
def test_information_schema_view_external_model(ctx: TestContext, tmp_path: Path):
# Information schema views are represented as:
#
# Table(
# this=Identifier(INFORMATION_SCHEMA.SOME_VIEW, quoted=True),
# db=Identifier(some_schema),
# catalog=Identifier(some_catalog))
#
# This representation is produced by BigQuery's parser, so that the mapping schema
# nesting depth is consistent with other table references in a project, which will
# usually look like `project.dataset.table`.
information_schema_tables_view = ctx.table("INFORMATION_SCHEMA.TABLES")
assert len(information_schema_tables_view.parts) == 3

model_name = ctx.table("test")
dependency = f"`{'.'.join(part.name for part in information_schema_tables_view.parts)}`"

init_example_project(tmp_path, dialect="bigquery", template=ProjectTemplate.EMPTY)
with open(tmp_path / "models" / "test.sql", "w", encoding="utf-8") as f:
f.write(
f"""
MODEL (
name {model_name.sql("bigquery")},
kind FULL,
dialect 'bigquery'
);
SELECT * FROM {dependency} AS tables
"""
)

source = ctx.table(information_schema_tables)
def _mutate_config(_: str, config: Config) -> None:
config.model_defaults.dialect = "bigquery"

expected_columns_to_types = {
sqlmesh = ctx.create_context(_mutate_config, path=tmp_path)
sqlmesh.create_external_models()
sqlmesh.load()

assert sqlmesh.get_model(information_schema_tables_view.sql()).columns_to_types == {
"table_catalog": exp.DataType.build("TEXT"),
"table_schema": exp.DataType.build("TEXT"),
"table_name": exp.DataType.build("TEXT"),
Expand All @@ -217,4 +247,31 @@ def test_fetch_schema_of_information_schema_tables(
),
}

assert expected_columns_to_types == engine_adapter.columns(source.sql())
rendered_query = sqlmesh.get_model(model_name.sql()).render_query()
assert isinstance(rendered_query, exp.Query)

assert rendered_query.sql("bigquery", pretty=True) == (
"SELECT\n"
" `tables`.`table_catalog` AS `table_catalog`,\n"
" `tables`.`table_schema` AS `table_schema`,\n"
" `tables`.`table_name` AS `table_name`,\n"
" `tables`.`table_type` AS `table_type`,\n"
" `tables`.`is_insertable_into` AS `is_insertable_into`,\n"
" `tables`.`is_typed` AS `is_typed`,\n"
" `tables`.`creation_time` AS `creation_time`,\n"
" `tables`.`base_table_catalog` AS `base_table_catalog`,\n"
" `tables`.`base_table_schema` AS `base_table_schema`,\n"
" `tables`.`base_table_name` AS `base_table_name`,\n"
" `tables`.`snapshot_time_ms` AS `snapshot_time_ms`,\n"
" `tables`.`ddl` AS `ddl`,\n"
" `tables`.`default_collation_name` AS `default_collation_name`,\n"
" `tables`.`upsert_stream_apply_watermark` AS `upsert_stream_apply_watermark`,\n"
" `tables`.`replica_source_catalog` AS `replica_source_catalog`,\n"
" `tables`.`replica_source_schema` AS `replica_source_schema`,\n"
" `tables`.`replica_source_name` AS `replica_source_name`,\n"
" `tables`.`replication_status` AS `replication_status`,\n"
" `tables`.`replication_error` AS `replication_error`,\n"
" `tables`.`is_change_history_enabled` AS `is_change_history_enabled`,\n"
" `tables`.`sync_status` AS `sync_status`\n"
f"FROM {dependency} AS `tables`"
)
8 changes: 4 additions & 4 deletions tests/core/engine_adapter/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_insert_overwrite_by_partition_query(
assert sql_calls == [
"CREATE SCHEMA IF NOT EXISTS `test_schema`",
f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_id}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`",
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_id}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)",
f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_id}`",
]
Expand Down Expand Up @@ -120,7 +120,7 @@ def test_insert_overwrite_by_partition_query_unknown_column_types(
assert sql_calls == [
"CREATE SCHEMA IF NOT EXISTS `test_schema`",
f"CREATE TABLE IF NOT EXISTS `test_schema`.`__temp_test_table_{temp_table_id}` PARTITION BY DATETIME_TRUNC(`ds`, MONTH) AS SELECT `a`, `ds` FROM `tbl`",
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
f"DECLARE _sqlmesh_target_partitions_ ARRAY<DATETIME> DEFAULT (SELECT ARRAY_AGG(PARSE_DATETIME('%Y%m', partition_id)) FROM `test_project`.`test_schema`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '__temp_test_table_{temp_table_id}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__');",
f"MERGE INTO `test_schema`.`test_table` AS `__MERGE_TARGET__` USING (SELECT `a`, `ds` FROM (SELECT * FROM `test_schema`.`__temp_test_table_{temp_table_id}`) AS `_subquery` WHERE DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`)) AS `__MERGE_SOURCE__` ON FALSE WHEN NOT MATCHED BY SOURCE AND DATETIME_TRUNC(`ds`, MONTH) IN UNNEST(`_sqlmesh_target_partitions_`) THEN DELETE WHEN NOT MATCHED THEN INSERT (`a`, `ds`) VALUES (`a`, `ds`)",
f"DROP TABLE IF EXISTS `test_schema`.`__temp_test_table_{temp_table_id}`",
]
Expand Down Expand Up @@ -677,7 +677,7 @@ def test_select_partitions_expr():
granularity="day",
catalog="{{ target.database }}",
)
== "SELECT MAX(PARSE_DATE('%Y%m%d', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'"
== "SELECT MAX(PARSE_DATE('%Y%m%d', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'"
)

assert (
Expand All @@ -686,7 +686,7 @@ def test_select_partitions_expr():
"test_table",
"int64",
)
== "SELECT MAX(CAST(partition_id AS INT64)) FROM `test_schema`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = 'test_table' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'"
== "SELECT MAX(CAST(partition_id AS INT64)) FROM `test_schema`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = 'test_table' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'"
)


Expand Down
2 changes: 1 addition & 1 deletion tests/dbt/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ def test_dbt_max_partition(sushi_test_project: Project, assert_exp_eq, mocker: M
JINJA_STATEMENT_BEGIN;
{% if is_incremental() %}
DECLARE _dbt_max_partition DATETIME DEFAULT (
COALESCE((SELECT MAX(PARSE_DATETIME('%Y%m', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'), CAST('1970-01-01' AS DATETIME))
COALESCE((SELECT MAX(PARSE_DATETIME('%Y%m', partition_id)) FROM `{{ target.database }}`.`{{ adapter.resolve_schema(this) }}`.`INFORMATION_SCHEMA.PARTITIONS` AS PARTITIONS WHERE table_name = '{{ adapter.resolve_identifier(this) }}' AND NOT partition_id IS NULL AND partition_id <> '__NULL__'), CAST('1970-01-01' AS DATETIME))
);
{% endif %}
JINJA_END;""".strip()
Expand Down

0 comments on commit a215ac0

Please sign in to comment.