Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 Materialization flag introduced #844

Open
wants to merge 4 commits into
base: 1.10.latest
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## dbt-databricks 1.10.0 (TBD)

### Features

- Introduced use_materialization_v2 flag for gating materialization revamps. ([844](https://github.com/databricks/dbt-databricks/pull/844))

## dbt-databricks 1.9.0 (TBD)

### Features
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version: str = "1.9.0b1"
version: str = "1.10.0a1"
11 changes: 10 additions & 1 deletion dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@
),
) # type: ignore[typeddict-item]

USE_MATERIALIZATION_V2 = BehaviorFlag(
name="use_materialization_v2",
default=False,
description=(
"Use revamped materializations based on separating create and insert."
" This allows more performant column comments, as well as new column features."
),
) # type: ignore[typeddict-item]


@dataclass
class DatabricksConfig(AdapterConfig):
Expand Down Expand Up @@ -204,7 +213,7 @@ def __init__(self, config: Any, mp_context: SpawnContext) -> None:

@property
def _behavior_flags(self) -> list[BehaviorFlag]:
return [USE_INFO_SCHEMA_FOR_COLUMNS, USE_USER_FOLDER_FOR_PYTHON]
return [USE_INFO_SCHEMA_FOR_COLUMNS, USE_USER_FOLDER_FOR_PYTHON, USE_MATERIALIZATION_V2]

@available.parse(lambda *a, **k: 0)
def update_tblproperties_for_iceberg(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro execute_multiple_statements() %}
code
{% endmacro %}
9 changes: 9 additions & 0 deletions dbt/include/databricks/macros/materializations/hooks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro run_pre_hooks() %}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{% endmacro %}

{% macro run_post_hooks() %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,12 @@
{% macro databricks__create_csv_table(model, agate_table) %}
{{ return(create_or_replace_csv_table(model, agate_table)) }}
{% endmacro %}

{% macro log_seed_operation(agate_table, full_refresh_mode, create_table_sql, sql) %}
{% set code = 'CREATE' if full_refresh_mode else 'INSERT' %}
{% set rows_affected = (agate_table.rows | length) %}

{% call noop_statement('main', code ~ ' ' ~ rows_affected, code, rows_affected) %}
{{ get_csv_sql(create_table_sql, sql) }};
{% endcall %}
{% endmacro %}
59 changes: 54 additions & 5 deletions dbt/include/databricks/macros/materializations/seeds/seeds.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,58 @@
{% materialization seed, adapter='databricks' %}
{% set target_relation = this.incorporate(type='table') %}

{% if adapter.behavior.use_materialization_v2 %}
{{ create_seed_v2(target_relation) }}
{% else %}
{{ create_seed_v1(target_relation) }}
{% endif %}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}

{% macro create_seed_v2(target_relation) %}
{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier, needs_information=True) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and (old_relation.is_view or old_relation.is_materialized_view)) -%}
{%- set exists_as_streaming_table = (old_relation is not none and old_relation.is_streaming_table) -%}

{%- set grant_config = config.get('grants') -%}
{%- set agate_table = load_agate_table() -%}
-- grab current tables grants config for comparision later on

{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}

{{ run_pre_hooks() }}

-- build model
{% set create_table_sql = "" %}
{% if exists_as_view %}
{{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view or a materialized view".format(old_relation)) }}
{% elif exists_as_streaming_table %}
{{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a streaming table".format(old_relation)) }}
{% elif exists_as_table %}
{% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %}
{% else %}
{% set create_table_sql = create_csv_table(model, agate_table) %}
{% endif %}

{% set sql = load_csv_rows(model, agate_table) %}

{{ log_seed_operation(agate_table, full_refresh_mode, create_table_sql, sql) }}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
-- No need to persist docs, already handled in seed create

{{ run_post_hooks() }}

{% endmacro %}

{% macro create_seed_v1(target_relation) %}
{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

Expand Down Expand Up @@ -40,8 +93,6 @@
{{ get_csv_sql(create_table_sql, sql) }};
{% endcall %}

{% set target_relation = this.incorporate(type='table') %}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's more shared code that I can pull out, but I want to leave the existing materialization largely intact, since the whole reason to use the flag is to lower risk of significant changes. Also, eventually V1 will be deleted. I pulled out target_relation so that I could do the return statement at the top level.


{% set should_revoke = should_revoke(old_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
-- No need to persist docs, already handled in seed create
Expand All @@ -57,6 +108,4 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
{% endmacro %}
81 changes: 81 additions & 0 deletions docs/seed_flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
---
Seed Flow
---

# V1 Seed Flow

```mermaid
flowchart LR
AGATE[Create in memory table from CSV]
STORE[Stores result of loading table]
PRE["Run prehooks (inside_transaction=False)"]
PRE2["Run prehooks (inside_transaction=True)"]
RAISE[Raise compiler error]
COR[create or replace table...]
CREATE[create table...]
DROP[Drop existing table]
INSERT[chunked inserts to table]
GRANTS[Apply grants]
INDEX["Create indexes (What?!)"]
POST["Run posthooks (inside_transaction=True)"]
POST2["Run posthooks (inside_transaction=False)"]
COMMIT["Commit transaction (What?!)"]
D1{Existing?}
D2{Existing type?}
D3{Delta?}
AGATE-->STORE
STORE-->PRE
PRE-->PRE2-->D1
D1--yes-->D2
D1--"no"-->CREATE
D2--"non-table"-->RAISE
D2--table-->D3
D3--yes-->COR
COR-->INSERT
D3--"no"-->DROP
DROP-->CREATE
CREATE-->INSERT
INSERT-->GRANTS
GRANTS-->INDEX
INDEX-->POST
POST-->COMMIT
COMMIT-->POST2
```

# V2 Seed Flow

Other than some cleanup, V2 mostly just removes calls that we don't support

```mermaid
flowchart LR
AGATE[Create in memory table from CSV]
STORE[Stores result of loading table]
PRE["Run prehooks (inside_transaction=False)"]
PRE2["Run prehooks (inside_transaction=True)"]
RAISE[Raise compiler error]
COR[create or replace table...]
CREATE[create table...]
DROP[Drop existing table]
INSERT[chunked inserts to table]
GRANTS[Apply grants]
POST["Run posthooks (inside_transaction=True)"]
POST2["Run posthooks (inside_transaction=False)"]
D1{Existing?}
D2{Existing type?}
D3{Delta?}
AGATE-->STORE
STORE-->PRE
PRE-->PRE2-->D1
D1--yes-->D2
D1--"no"-->CREATE
D2--"non-table"-->RAISE
D2--table-->D3
D3--yes-->COR
COR-->INSERT
D3--"no"-->DROP
DROP-->CREATE
CREATE-->INSERT
INSERT-->GRANTS
GRANTS-->POST
POST-->POST2
```
7 changes: 7 additions & 0 deletions tests/functional/adapter/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import pytest


class MaterializationV2Mixin:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixin that flips the flag on for test classes that include it.

@pytest.fixture(scope="class")
def project_config_update(self):
return {"flags": {"use_materialization_v2": True}}
8 changes: 8 additions & 0 deletions tests/functional/adapter/persist_docs/test_persist_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.tests import util
from dbt.tests.adapter.persist_docs import fixtures
from tests.functional.adapter.fixtures import MaterializationV2Mixin
from tests.functional.adapter.persist_docs import fixtures as override_fixtures


Expand Down Expand Up @@ -290,3 +291,10 @@ def test_reseeding_with_persist_docs(self, table_relation, adapter):
assert table_comment.startswith("A seed description")
assert columns[0].comment.startswith("An id column")
assert columns[1].comment.startswith("A name column")


@pytest.mark.external
# Skipping UC Cluster to ensure these tests don't fail due to overlapping resources
@pytest.mark.skip_profile("databricks_uc_cluster")
class TestPersistDocsWithSeedsV2(TestPersistDocsWithSeeds, MaterializationV2Mixin):
pass
59 changes: 55 additions & 4 deletions tests/functional/adapter/simple_seed/test_seeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dbt.tests.adapter.simple_seed.test_seed import BaseSimpleSeedWithBOM
from dbt.tests.adapter.simple_seed.test_seed import BaseTestEmptySeed
from dbt.tests.adapter.simple_seed.test_seed import SeedTestBase
from tests.functional.adapter.fixtures import MaterializationV2Mixin
from tests.functional.adapter.simple_seed import fixtures


Expand All @@ -35,18 +36,40 @@ def test_simple_seed(self, project):
)


class TestBasicSeedTestsV2(DatabricksSetup, SeedTestBase, MaterializationV2Mixin):
pass


class TestDatabricksSeedWithUniqueDelimiter(DatabricksSetup, BaseSeedWithUniqueDelimiter):
pass


class TestDatabricksSeedWithUniqueDelimiterV2(
DatabricksSetup, BaseSeedWithUniqueDelimiter, MaterializationV2Mixin
):
pass


class TestDatabricksSeedWithWrongDelimiter(DatabricksSetup, BaseSeedWithWrongDelimiter):
pass


class TestDatabricksSeedWithWrongDelimiterV2(
DatabricksSetup, BaseSeedWithWrongDelimiter, MaterializationV2Mixin
):
pass


class TestSeedConfigFullRefreshOff(DatabricksSetup, BaseSeedConfigFullRefreshOff):
pass


class TestSeedConfigFullRefreshOffV2(
DatabricksSetup, BaseSeedConfigFullRefreshOff, MaterializationV2Mixin
):
pass


class TestSeedCustomSchema(DatabricksSetup, BaseSeedCustomSchema):
@pytest.fixture(scope="class", autouse=True)
def setUp(self, project):
Expand All @@ -57,22 +80,44 @@ def setUp(self, project):
project.run_sql(f"drop schema if exists {project.test_schema}_custom_schema cascade")


class TestSeedCustomSchemaV2(TestSeedCustomSchema, MaterializationV2Mixin):
pass


class TestDatabricksSeedWithEmptyDelimiter(DatabricksSetup, BaseSeedWithEmptyDelimiter):
pass


class TestDatabricksSeedWithEmptyDelimiterV2(
DatabricksSetup, BaseSeedWithEmptyDelimiter, MaterializationV2Mixin
):
pass


class TestDatabricksEmptySeed(BaseTestEmptySeed):
pass


class TestDatabricksEmptySeedV2(BaseTestEmptySeed, MaterializationV2Mixin):
pass


class TestSimpleSeedEnabledViaConfig(BaseSimpleSeedEnabledViaConfig):
pass


class TestSimpleSeedEnabledViaConfigV2(BaseSimpleSeedEnabledViaConfig, MaterializationV2Mixin):
pass


class TestSeedParsing(DatabricksSetup, BaseSeedParsing):
pass


class TestSeedParsingV2(DatabricksSetup, BaseSeedParsing, MaterializationV2Mixin):
pass


class TestSimpleSeedWithBOM(BaseSimpleSeedWithBOM):
@pytest.fixture(scope="class", autouse=True)
def setUp(self, project):
Expand All @@ -87,18 +132,24 @@ def setUp(self, project):
)


class TestSimpleSeedWithBOMV2(TestSimpleSeedWithBOM, MaterializationV2Mixin):
pass


class TestSeedSpecificFormats(DatabricksSetup, BaseSeedSpecificFormats):
@pytest.fixture(scope="class")
def seeds(self, test_data_dir):
big_seed_path = self._make_big_seed(test_data_dir)
big_seed = util.read_file(big_seed_path)
def seeds(self):
big_seed = "seed_id\n" + "\n".join(str(i) for i in range(1, 20001))

yield {
"big_seed.csv": big_seed,
"seed_unicode.csv": seeds.seed__unicode_csv,
}
util.rm_dir(test_data_dir)

def test_simple_seed(self, project):
results = util.run_dbt(["seed"])
assert len(results) == 2


class TestSeedSpecificFormatsV2(TestSeedSpecificFormats, MaterializationV2Mixin):
pass
Loading