diff --git a/.changes/unreleased/Features-20240227-151915.yaml b/.changes/unreleased/Features-20240227-151915.yaml new file mode 100644 index 000000000..d4715081d --- /dev/null +++ b/.changes/unreleased/Features-20240227-151915.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add new order_by_cluster_key config option for table creation +time: 2024-02-27T15:19:15.75488-08:00 +custom: + Author: Slanman3755 + Issue: "606" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 6854b199d..65cd19a93 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -35,6 +35,7 @@ class SnowflakeConfig(AdapterConfig): transient: Optional[bool] = None cluster_by: Optional[Union[str, List[str]]] = None + order_by_cluster_key: Optional[bool] = None automatic_clustering: Optional[bool] = None secure: Optional[bool] = None copy_grants: Optional[bool] = None diff --git a/dbt/include/snowflake/macros/relations/table/create.sql b/dbt/include/snowflake/macros/relations/table/create.sql index c6bc8f775..e4c19f48e 100644 --- a/dbt/include/snowflake/macros/relations/table/create.sql +++ b/dbt/include/snowflake/macros/relations/table/create.sql @@ -11,6 +11,7 @@ {%- if language == 'sql' -%} {%- set cluster_by_keys = config.get('cluster_by', default=none) -%} + {%- set order_by_cluster_key = config.get('order_by_cluster_key', default=true) -%} {%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%} {%- set copy_grants = config.get('copy_grants', default=false) -%} @@ -35,7 +36,7 @@ {% endif %} {% if copy_grants and not temporary -%} copy grants {%- endif %} as ( - {%- if cluster_by_string is not none -%} + {%- if order_by_cluster_key and cluster_by_string is not none -%} select * from ( {{ compiled_code }} ) order by ({{ cluster_by_string }}) diff --git a/tests/functional/adapter/incremental/test_incremental_cluster_by.py b/tests/functional/adapter/incremental/test_incremental_cluster_by.py new file mode 100644 index 000000000..afe4d9de5 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_cluster_by.py @@ -0,0 +1,127 @@ +import pytest +from dbt.tests.util import run_dbt, check_relations_equal +from collections import namedtuple + + +models__cluster_by_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy='delete+insert', + cluster_by=['id'] +) }} + +{% if not is_incremental() %} + +-- data for first invocation of model + +select 1 as id, 'hello' as msg, 'blue' as color +union all +select 2 as id, 'goodbye' as msg, 'red' as color + +{% else %} + +-- data for subsequent incremental update + +select 1 as id, 'hey' as msg, 'blue' as color +union all +select 2 as id, 'yo' as msg, 'green' as color +union all +select 3 as id, 'anyway' as msg, 'purple' as color + +{% endif %} +""" + +seeds__expected_cluster_by_csv = """id,msg,color +1,hey,blue +2,yo,green +3,anyway,purple +""" + +ResultHolder = namedtuple( + "ResultHolder", + [ + "seed_count", + "model_count", + "seed_rows", + "inc_test_model_count", + "relation", + ], +) + + +class TestIncrementalClusterBySnowflake: + @pytest.fixture(scope="class") + def models(self): + return {"cluster_by.sql": models__cluster_by_sql} + + @pytest.fixture(scope="class") + def seeds(self): + return {"expected_cluster_by.csv": seeds__expected_cluster_by_csv} + + def update_incremental_model(self, incremental_model): + """update incremental model after the seed table has been updated""" + model_result_set = run_dbt(["run", "--select", incremental_model]) + return len(model_result_set) + + def get_test_fields(self, project, seed, incremental_model, update_sql_file): + seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"])) + + model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"])) + + relation = incremental_model + # update seed in anticipation of incremental model update + row_count_query = "select * from {}.{}".format(project.test_schema, seed) + + seed_rows = len(project.run_sql(row_count_query, fetch="all")) + + # propagate seed state to incremental model according to unique keys + inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model) + + return ResultHolder(seed_count, model_count, seed_rows, inc_test_model_count, relation) + + def check_scenario_correctness(self, expected_fields, test_case_fields, project): + """Invoke assertions to verify correct build functionality""" + # 1. test seed(s) should build afresh + assert expected_fields.seed_count == test_case_fields.seed_count + # 2. test model(s) should build afresh + assert expected_fields.model_count == test_case_fields.model_count + # 3. seeds should have intended row counts post update + assert expected_fields.seed_rows == test_case_fields.seed_rows + # 4. incremental test model(s) should be updated + assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count + # 5. result table should match intended result set (itself a relation) + check_relations_equal( + project.adapter, [expected_fields.relation, test_case_fields.relation] + ) + + def test__cluster_by(self, project): + """seed should match model after two incremental runs""" + + expected_fields = ResultHolder( + seed_count=1, + model_count=1, + inc_test_model_count=1, + seed_rows=3, + relation="expected_cluster_by", + ) + + test_case_fields = self.get_test_fields( + project, + seed="expected_cluster_by", + incremental_model="cluster_by", + update_sql_file=None, + ) + self.check_scenario_correctness(expected_fields, test_case_fields, project) + + +class TestIncrementalClusterByOrderByClusterKeyDeleteInsertSnowflake( + TestIncrementalClusterBySnowflake +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": { + "+order_by_cluster_key": False, + } + }