Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAP-548] Make cluster key sorting optional on table creation #917

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240227-151915.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add new order_by_cluster_key config option for table creation
time: 2024-02-27T15:19:15.75488-08:00
custom:
Author: Slanman3755
Issue: "606"
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
class SnowflakeConfig(AdapterConfig):
transient: Optional[bool] = None
cluster_by: Optional[Union[str, List[str]]] = None
order_by_cluster_key: Optional[bool] = None
automatic_clustering: Optional[bool] = None
secure: Optional[bool] = None
copy_grants: Optional[bool] = None
Expand Down
3 changes: 2 additions & 1 deletion dbt/include/snowflake/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

{%- if language == 'sql' -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set order_by_cluster_key = config.get('order_by_cluster_key', default=true) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}

Expand All @@ -35,7 +36,7 @@
{% endif %}
{% if copy_grants and not temporary -%} copy grants {%- endif %} as
(
{%- if cluster_by_string is not none -%}
{%- if order_by_cluster_key and cluster_by_string is not none -%}
select * from (
{{ compiled_code }}
) order by ({{ cluster_by_string }})
Expand Down
127 changes: 127 additions & 0 deletions tests/functional/adapter/incremental/test_incremental_cluster_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import pytest
from dbt.tests.util import run_dbt, check_relations_equal
from collections import namedtuple


models__cluster_by_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='delete+insert',
cluster_by=['id']
) }}

{% if not is_incremental() %}

-- data for first invocation of model

select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color

{% else %}

-- data for subsequent incremental update

select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color

{% endif %}
"""

seeds__expected_cluster_by_csv = """id,msg,color
1,hey,blue
2,yo,green
3,anyway,purple
"""

ResultHolder = namedtuple(
"ResultHolder",
[
"seed_count",
"model_count",
"seed_rows",
"inc_test_model_count",
"relation",
],
)


class TestIncrementalClusterBySnowflake:
@pytest.fixture(scope="class")
def models(self):
return {"cluster_by.sql": models__cluster_by_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"expected_cluster_by.csv": seeds__expected_cluster_by_csv}

def update_incremental_model(self, incremental_model):
"""update incremental model after the seed table has been updated"""
model_result_set = run_dbt(["run", "--select", incremental_model])
return len(model_result_set)

def get_test_fields(self, project, seed, incremental_model, update_sql_file):
seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))

model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"]))

relation = incremental_model
# update seed in anticipation of incremental model update
row_count_query = "select * from {}.{}".format(project.test_schema, seed)

seed_rows = len(project.run_sql(row_count_query, fetch="all"))

# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model)

return ResultHolder(seed_count, model_count, seed_rows, inc_test_model_count, relation)

def check_scenario_correctness(self, expected_fields, test_case_fields, project):
"""Invoke assertions to verify correct build functionality"""
# 1. test seed(s) should build afresh
assert expected_fields.seed_count == test_case_fields.seed_count
# 2. test model(s) should build afresh
assert expected_fields.model_count == test_case_fields.model_count
# 3. seeds should have intended row counts post update
assert expected_fields.seed_rows == test_case_fields.seed_rows
# 4. incremental test model(s) should be updated
assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count
# 5. result table should match intended result set (itself a relation)
check_relations_equal(
project.adapter, [expected_fields.relation, test_case_fields.relation]
)

def test__cluster_by(self, project):
"""seed should match model after two incremental runs"""

expected_fields = ResultHolder(
seed_count=1,
model_count=1,
inc_test_model_count=1,
seed_rows=3,
relation="expected_cluster_by",
)

test_case_fields = self.get_test_fields(
project,
seed="expected_cluster_by",
incremental_model="cluster_by",
update_sql_file=None,
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)


class TestIncrementalClusterByOrderByClusterKeyDeleteInsertSnowflake(
TestIncrementalClusterBySnowflake
):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+order_by_cluster_key": False,
}
}