Skip to content

Commit

Permalink
Add support for a materialization return value
Browse files Browse the repository at this point in the history
Materializations now return a list of relations they added
 - those get added to the cache
Updated existing materializations
Added backwards compatibility support + a deprecation warning
  • Loading branch information
Jacob Beck committed Sep 20, 2019
1 parent 566b6b4 commit 1946b2f
Show file tree
Hide file tree
Showing 19 changed files with 112 additions and 28 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

### Breaking changes
- Cache management changes:
- Materializations that perform creates via direct "create X" statements must call `adapter.cache_added`
- `create_table_as`/`create_view_as` already do this
- Materialization macros should now return a dictionary {"relations": [...]}, with the list containing all relations that have been added, in order to add them to the cache. The default behavior is to still add the materialization's model to the cache.
- Materializations that perform drops via direct "drop" statements must call `adapter.cache_dropped`
- `adapter.drop_relation` already does this
- Materializations that perform renames via direct "alter table" statements must call `adapter.cache_renamed`
Expand Down
6 changes: 1 addition & 5 deletions core/dbt/context/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,6 @@ def _return(value):
raise dbt.exceptions.MacroReturn(value)


def get_this_relation(db_wrapper, config, model):
return db_wrapper.Relation.create_from_node(config, model)


def get_pytz_module_context():
context_exports = pytz.__all__

Expand Down Expand Up @@ -472,7 +468,7 @@ def generate_model(model, config, manifest, source_config, provider):
source_config, provider)
# operations (hooks) don't get a 'this'
if model.resource_type != NodeType.Operation:
this = get_this_relation(context['adapter'], config, model)
this = context['adapter'].Relation.create_from(config, model)
context['this'] = this
# overwrite schema/database if we have them, and hooks + sql
# the hooks should come in as dicts, at least for the `run_hooks` macro
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/context/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ def __call__(self, *args):
else:
dbt.exceptions.ref_invalid_args(self.model, args)

return self.Relation.create_from_node(self.config, self.model)
return self.Relation.create_from(self.config, self.model)


class SourceResolver(dbt.context.common.BaseResolver):
def __call__(self, source_name, table_name):
# When you call source(), this is what happens at parse time
self.model.sources.append([source_name, table_name])
return self.Relation.create_from_node(self.config, self.model)
return self.Relation.create_from(self.config, self.model)


class Provider:
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/context/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def create_relation(self, target_model, name):
if get_materialization(target_model) == 'ephemeral':
return self.create_ephemeral_relation(target_model, name)
else:
return self.Relation.create_from_node(self.config, target_model)
return self.Relation.create_from(self.config, target_model)


class RefResolver(BaseRefResolver):
Expand Down
14 changes: 14 additions & 0 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ class GenerateSchemaNameSingleArgDeprecated(DBTDeprecation):
''' # noqa


class MaterializationReturnDeprecation(DBTDeprecation):
_name = 'materialization-return'

_description = '''
The materialization ("{materialization}") did not explicitly return a list
of relations to add to the cache. By default the target relation will be
added, but this behavior will be removed in a future version of dbt.
For more information, see:
--- TODO: docs link here ---
'''.lstrip()


_adapter_renamed_description = """\
The adapter function `adapter.{old_name}` is deprecated and will be removed in
a future release of dbt. Please use `adapter.{new_name}` instead.
Expand Down Expand Up @@ -99,6 +112,7 @@ def warn(name, *args, **kwargs):
deprecations_list: List[DBTDeprecation] = [
DBTRepositoriesDeprecation(),
GenerateSchemaNameSingleArgDeprecated(),
MaterializationReturnDeprecation(),
]

deprecations: Dict[str, DBTDeprecation] = {
Expand Down
4 changes: 0 additions & 4 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@
{% endmacro %}

{% macro create_table_as(temporary, relation, sql) -%}
{%- if not temporary -%}
{%- do adapter.cache_added(relation.incorporate(dbt_created=True)) -%}
{%- endif -%}
{{ adapter_macro('create_table_as', temporary, relation, sql) }}
{%- endmacro %}

Expand All @@ -80,7 +77,6 @@
{% endmacro %}

{% macro create_view_as(relation, sql) -%}
{%- do adapter.cache_added(relation.incorporate(dbt_created=True)) -%}
{{ adapter_macro('create_view_as', relation, sql) }}
{%- endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,6 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@

{% macro create_csv_table(model, agate_table) -%}
{%- do adapter.cache_added(api.Relation.create(
database=model.database,
schema=model.schema,
identifier=model.alias,
dbt_created=True)) -%}
{{ adapter_macro('create_csv_table', model, agate_table) }}
{%- endmacro %}

Expand Down Expand Up @@ -136,4 +131,8 @@
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% set target_relation = this.incorporate(type='table') %}
{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,6 @@
{% do post_snapshot(staging_table) %}
{% endif %}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@
{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@
-- No transactions on BigQuery
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endif %}

{{ return({'relations': [target_relation]}) }}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}
70 changes: 65 additions & 5 deletions core/dbt/node_runners.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import threading
import time
import traceback
from typing import List, Dict, Any

from dbt import deprecations
from dbt.adapters.base import BaseRelation
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.exceptions import (
NotImplementedException, CompilationException, RuntimeException,
Expand All @@ -16,10 +23,6 @@
import dbt.ui.printer
import dbt.flags

import threading
import time
import traceback


INTERNAL_ERROR_STRING = """This is an error in dbt. Please try again. If \
the error persists, open an issue at https://github.com/fishtown-analytics/dbt
Expand Down Expand Up @@ -307,6 +310,41 @@ def compile(self, manifest):
return compile_node(self.adapter, self.config, self.node, manifest, {})


# make sure that we got an ok result back from a materialization
def _validate_materialization_relations_dict(
inp: Dict[Any, Any], model
) -> List[BaseRelation]:
try:
relations_value = inp['relations']
except KeyError:
msg = (
'Invalid return value from materialization, "relations" '
'not found, got keys: {}'.format(list(inp))
)
raise CompilationException(msg, node=model) from None

if not isinstance(relations_value, list):
msg = (
'Invalid return value from materialization, "relations" '
'not a list, got: {}'.format(relations_value)
)
raise CompilationException(msg, node=model) from None

relations: List[BaseRelation] = []
for relation in relations_value:
if not isinstance(relation, BaseRelation):
msg = (
'Invalid return value from materialization, '
'"relations" contains non-Relation: {}'
.format(relation)
)
raise CompilationException(msg, node=model)

assert isinstance(relation, BaseRelation)
relations.append(relation)
return relations


class ModelRunner(CompileRunner):
def get_node_representation(self):
if self.config.credentials.database == self.node.database:
Expand Down Expand Up @@ -343,6 +381,25 @@ def _build_run_model_result(self, model, context):
result = context['load_result']('main')
return RunModelResult(model, status=result.status)

def _materialization_relations(
self, result: Any, model
) -> List[BaseRelation]:
if isinstance(result, str):
deprecations.warn('materialization-return',
materialization=model.get_materialization())
return [
self.adapter.Relation.create_from(self.config, model)
]

if isinstance(result, dict):
return _validate_materialization_relations_dict(result, model)

msg = (
'Invalid return value from materialization, expected a dict '
'with key "relations", got: {}'.format(str(result))
)
raise CompilationException(msg, node=model)

def execute(self, model, manifest):
context = dbt.context.runtime.generate(
model, self.config, manifest)
Expand All @@ -354,7 +411,10 @@ def execute(self, model, manifest):
if materialization_macro is None:
missing_materialization(model, self.adapter.type())

materialization_macro.generator(context)()
result = materialization_macro.generator(context)()

for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))

return self._build_run_model_result(model, context)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@


{% materialization view, adapter='bigquery' -%}
{{ create_or_replace_view(run_outside_transaction_hooks=False) }}
{{ return(create_or_replace_view(run_outside_transaction_hooks=False)) }}
{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
Expand All @@ -29,4 +29,7 @@
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{% materialization view, adapter='snowflake' -%}
{{ create_or_replace_view() }}
{{ return(create_or_replace_view()) }}
{%- endmaterialization %}

0 comments on commit 1946b2f

Please sign in to comment.