Skip to content

Commit

Permalink
python model beta version (#182)
Browse files Browse the repository at this point in the history
support table and incremental materialization.
Co-authored-by: Jeremy Cohen <jeremy@dbtlabs.com>
Co-authored-by: Drew Banin <drew@dbtlabs.com>
  • Loading branch information
ChenyuLInx authored Jul 28, 2022
1 parent 2ceab48 commit c0b9274
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 61 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## dbt-snowflake 1.3.0b1 (Release TBD)

### Features
- support python model through create stored procedure with python in it, currently supported materializations are table and incremental. ([#182](https://github.com/dbt-labs/dbt-snowflake/pull/182))

### Under the Hood
- Reformat overridden macro location of grants work to a apply_grants.sql file in snowflake ([#193](https://github.com/dbt-labs/dbt-snowflake/issues/193), [#192](https://github.com/dbt-labs/dbt-snowflake/pull/192))
- Support dbt Core incremental materialization refactor ([#195](https://github.com/dbt-labs/dbt-snowflake/issues/195), [#196](https://github.com/dbt-labs/dbt-snowflake/pull/196))
Expand Down
37 changes: 33 additions & 4 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import agate

from dbt.adapters.base.impl import AdapterConfig
from dbt.adapters.base.meta import available
from dbt.adapters.sql import SQLAdapter # type: ignore
from dbt.adapters.sql.impl import (
LIST_SCHEMAS_MACRO_NAME,
LIST_RELATIONS_MACRO_NAME,
)
from dbt.adapters.base.meta import available
from dbt.adapters.snowflake import SnowflakeConnectionManager
from dbt.adapters.snowflake import SnowflakeRelation
from dbt.adapters.snowflake import SnowflakeColumn
Expand Down Expand Up @@ -107,9 +107,7 @@ def get_columns_in_relation(self, relation):
else:
raise

def list_relations_without_caching(
self, schema_relation: SnowflakeRelation
) -> List[SnowflakeRelation]:
def list_relations_without_caching(self, schema_relation: SnowflakeRelation) -> List[SnowflakeRelation]: # type: ignore
kwargs = {"schema_relation": schema_relation}
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
Expand Down Expand Up @@ -176,5 +174,36 @@ def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"DATEADD({interval}, {number}, {add_to})"

@available.parse_none
def submit_python_job(self, parsed_model: dict, compiled_code: str):
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
database = getattr(parsed_model, "database", self.config.credentials.database)
identifier = parsed_model["alias"]
proc_name = f"{database}.{schema}.{identifier}__dbt_sp"
packages = ["snowflake-snowpark-python"] + parsed_model["config"].get("packages", [])
packages = "', '".join(packages)
python_stored_procedure = f"""
CREATE OR REPLACE PROCEDURE {proc_name} ()
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8' -- TODO should this be configurable?
PACKAGES = ('{packages}')
HANDLER = 'main'
EXECUTE AS CALLER
AS
$$
{compiled_code}
$$;
"""
self.execute(python_stored_procedure, auto_begin=False, fetch=False)
response, _ = self.execute(f"CALL {proc_name}()", auto_begin=False, fetch=False)
self.execute(
f"drop procedure if exists {proc_name}(string)",
auto_begin=False,
fetch=False,
)
return response

def valid_incremental_strategies(self):
return ["append", "merge", "delete+insert"]
80 changes: 44 additions & 36 deletions dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,41 +1,49 @@
{% macro snowflake__create_table_as(temporary, relation, sql) -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}

{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- endif -%}
{%- if cluster_by_keys is not none -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as
(
{%- if cluster_by_string is not none -%}
select * from(
{{ sql }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ sql }}
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{% macro snowflake__create_table_as(temporary, relation, compiled_code, language='sql') -%}
{%- if language == 'sql' -%}
{%- set transient = config.get('transient', default=true) -%}
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}

{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{%- if cluster_by_keys is not none -%}
{%- set cluster_by_string = cluster_by_keys|join(", ")-%}
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create or replace {% if temporary -%}
temporary
{%- elif transient -%}
transient
{%- endif %} table {{ relation }} {% if copy_grants and not temporary -%} copy grants {%- endif %} as
(
{%- if cluster_by_string is not none -%}
select * from(
{{ compiled_code }}
) order by ({{ cluster_by_string }})
{%- else -%}
{{ compiled_code }}
{%- endif %}
);
{% if cluster_by_string is not none and not temporary -%}
alter table {{relation}} cluster by ({{cluster_by_string}});
{%- endif -%}
{% if enable_automatic_clustering and cluster_by_string is not none and not temporary -%}
alter table {{relation}} resume recluster;
{%- endif -%}

{%- elif language == 'python' -%}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }}
{%- else -%}
{% do exceptions.raise_compiler_error("snowflake__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}

{% endmacro %}

{% macro get_column_comment_sql(column_name, column_dict) -%}
Expand Down
29 changes: 18 additions & 11 deletions dbt/include/snowflake/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

{% set original_query_tag = set_query_tag() %}

{#-- Set vars --#}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set language = model['language'] -%}
{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}
Expand All @@ -16,19 +17,27 @@
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif existing_relation.is_view %}
{#-- Can't overwrite a view with a table - we must drop --#}
{{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}

{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{% elif full_refresh_mode %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{%- call statement('create_tmp_relation', language=language) -%}
{{ create_table_as(True, tmp_relation, compiled_code, language) }}
{%- endcall -%}

{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
Expand All @@ -43,14 +52,12 @@
{% set incremental_predicates = config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': tmp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

{%- call statement('main') -%}
{{ strategy_sql_macro_func(strategy_arg_dict) }}
{%- endcall -%}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks) }}

{% set target_relation = target_relation.incorporate(type='table') %}
Expand Down
37 changes: 34 additions & 3 deletions dbt/include/snowflake/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{% set original_query_tag = set_query_tag() %}

{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}

{% set grant_config = config.get('grants') %}

Expand All @@ -20,9 +21,8 @@
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

--build model
{% call statement('main') -%}
{{ create_table_as(false, target_relation, sql) }}
{% call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall %}

{{ run_hooks(post_hooks) }}
Expand All @@ -37,3 +37,34 @@
{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}

{% macro py_write_table(compiled_code, target_relation, temporary=False) %}
{{ compiled_code }}
def materialize(session, df, target_relation):
# we have to make sure pandas is imported
import pandas
if isinstance(df, pandas.core.frame.DataFrame):
# session.write_pandas does not have overwrite function
df = session.createDataFrame(df)
df.write.mode("overwrite").save_as_table("{{ target_relation }}", create_temp_table={{temporary}})

def main(session):
dbt = dbtObj(session.table)
df = model(dbt, session)
materialize(session, df, dbt.this)
return "OK"
{% endmacro %}

{%macro py_script_comment()%}
# To run this in snowsight, you need to select entry point to be main
# And you may have to modify the return type to text to get the result back
# def main(session):
# dbt = dbtObj(session.table)
# df = model(dbt, session)
# return df.collect()

# to run this in local notebook, you need to create a session following examples https://github.com/Snowflake-Labs/sfguide-getting-started-snowpark-python
# then you can do the following to run model
# dbt = dbtObj(session.table)
# df = model(dbt, session)
{%endmacro%}
1 change: 0 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# install latest changes in dbt-core
# TODO: how to automate switching from develop to version branches?

git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-core&subdirectory=core
git+https://github.com/dbt-labs/dbt-core.git#egg=dbt-tests-adapter&subdirectory=tests/adapter

Expand Down
1 change: 1 addition & 0 deletions tests/functional/adapter/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from tests.functional.adapter.expected_stats import snowflake_stats



class TestSimpleMaterializationsSnowflake(BaseSimpleMaterializations):
pass

Expand Down
43 changes: 43 additions & 0 deletions tests/functional/adapter/test_python_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest
from dbt.tests.util import run_dbt, write_file
from dbt.tests.adapter.python_model.test_python_model import BasePythonModelTests, BasePythonIncrementalTests

class TestPythonModelSnowflake(BasePythonModelTests):
pass

class TestIncrementalSnowflake(BasePythonIncrementalTests):
pass

models__simple_python_model = """
import pandas
def model(dbt, session):
dbt.config(
materialized='table',
)
data = [[1,2]] * 10
return pandas.DataFrame(data, columns=['test', 'test2'])
"""
models__simple_python_model_v2 = """
import pandas
def model(dbt, session):
dbt.config(
materialized='table',
)
data = [[1,2]] * 10
return pandas.DataFrame(data, columns=['test1', 'test3'])
"""



class TestChangingSchemaSnowflake:
@pytest.fixture(scope="class")
def models(self):
return {
"simple_python_model.py": models__simple_python_model
}
def test_changing_schema(self,project):
run_dbt(["run"])
write_file(models__simple_python_model_v2, project.project_root + '/models', "simple_python_model.py")
run_dbt(["run"])
12 changes: 6 additions & 6 deletions tests/integration/defer_state_test/test_defer_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def run_and_defer(self):

# with state it should work though
results = self.run_dbt(['run', '-m', 'view_model', '--state', 'state', '--defer', '--target', 'otherschema'])
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

with open('target/manifest.json') as fp:
data = json.load(fp)
Expand Down Expand Up @@ -122,8 +122,8 @@ def run_defer_iff_not_exists(self):
assert len(results) == 2

# because the seed now exists in our schema, we shouldn't defer it
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

def run_defer_deleted_upstream(self):
results = self.run_dbt(['seed'])
Expand All @@ -144,8 +144,8 @@ def run_defer_deleted_upstream(self):

# despite deferral, test should use models just created in our schema
results = self.run_dbt(['test', '--state', 'state', '--defer'])
assert self.other_schema not in results[0].node.compiled_sql
assert self.unique_schema() in results[0].node.compiled_sql
assert self.other_schema not in results[0].node.compiled_code
assert self.unique_schema() in results[0].node.compiled_code

@use_profile('snowflake')
def test_snowflake_state_changetarget(self):
Expand Down

0 comments on commit c0b9274

Please sign in to comment.