Skip to content

Commit

Permalink
Remove injected_sql. Store non-ephemeral injected_sql in compiled_sql
Browse files Browse the repository at this point in the history
  • Loading branch information
gshank committed Oct 15, 2020
1 parent 7203825 commit 8be9825
Show file tree
Hide file tree
Showing 24 changed files with 216 additions and 169 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Added dbt_invocation_id for each BigQuery job to enable performance analysis ([#2808](https://github.com/fishtown-analytics/dbt/issues/2808), [#2809](https://github.com/fishtown-analytics/dbt/pull/2809))
- Save cli and rpc arguments in run_results.json ([#2510](https://github.com/fishtown-analytics/dbt/issues/2510), [#2813](https://github.com/fishtown-analytics/dbt/pull/2813))
- Added support for BigQuery connections using refresh tokens ([#2344](https://github.com/fishtown-analytics/dbt/issues/2344), [#2805](https://github.com/fishtown-analytics/dbt/pull/2805))
- Remove injected_sql from manifest nodes ([#2762](https://github.com/fishtown-analytics/dbt/issues/2762), [#2834](https://github.com/fishtown-analytics/dbt/pull/2834))

### Under the hood
- Added strategy-specific validation to improve the relevancy of compilation errors for the `timestamp` and `check` snapshot strategies. (([#2787](https://github.com/fishtown-analytics/dbt/issues/2787), [#2791](https://github.com/fishtown-analytics/dbt/pull/2791))
Expand Down
190 changes: 96 additions & 94 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,15 @@ def initialize(self):
make_directory(self.config.target_path)
make_directory(self.config.modules_path)

# creates a ModelContext which is converted to
# a dict for jinja rendering of SQL
def _create_node_context(
self,
node: NonSourceCompiledNode,
manifest: Manifest,
extra_context: Dict[str, Any],
) -> Dict[str, Any]:

context = generate_runtime_model(
node, self.config, manifest
)
Expand All @@ -169,36 +172,6 @@ def add_ephemeral_prefix(self, name: str):
relation_cls = adapter.Relation
return relation_cls.add_ephemeral_prefix(name)

def _get_compiled_model(
self,
manifest: Manifest,
cte_id: str,
extra_context: Dict[str, Any],
) -> NonSourceCompiledNode:

if cte_id not in manifest.nodes:
raise InternalException(
f'During compilation, found a cte reference that could not be '
f'resolved: {cte_id}'
)
cte_model = manifest.nodes[cte_id]
if getattr(cte_model, 'compiled', False):
assert isinstance(cte_model, tuple(COMPILED_TYPES.values()))
return cast(NonSourceCompiledNode, cte_model)
elif cte_model.is_ephemeral_model:
# this must be some kind of parsed node that we can compile.
# we know it's not a parsed source definition
assert isinstance(cte_model, tuple(COMPILED_TYPES))
# update the node so
node = self.compile_node(cte_model, manifest, extra_context)
manifest.sync_update_node(node)
return node
else:
raise InternalException(
f'During compilation, found an uncompiled cte that '
f'was not an ephemeral model: {cte_id}'
)

def _inject_ctes_into_sql(self, sql: str, ctes: List[InjectedCTE]) -> str:
"""
`ctes` is a list of InjectedCTEs like:
Expand Down Expand Up @@ -260,73 +233,107 @@ def _inject_ctes_into_sql(self, sql: str, ctes: List[InjectedCTE]) -> str:

return str(parsed)

def _model_prepend_ctes(
self,
model: NonSourceCompiledNode,
prepended_ctes: List[InjectedCTE]
) -> NonSourceCompiledNode:
if model.compiled_sql is None:
raise RuntimeException(
'Cannot prepend ctes to an unparsed node', model
)
injected_sql = self._inject_ctes_into_sql(
model.compiled_sql,
prepended_ctes,
)

model.extra_ctes_injected = True
model.extra_ctes = prepended_ctes
model.injected_sql = injected_sql
model.validate(model.to_dict())
return model

def _get_dbt_test_name(self) -> str:
return 'dbt__CTE__INTERNAL_test'

# This method is called by the 'compile_node' method. Starting
# from the node that it is passed in, it will recursively call
# itself using the 'extra_ctes'. The 'ephemeral' models do
# not produce SQL that is executed directly, instead they
# are rolled up into the models that refer to them by
# inserting CTEs into the SQL.
def _recursively_prepend_ctes(
self,
model: NonSourceCompiledNode,
manifest: Manifest,
extra_context: Dict[str, Any],
extra_context: Optional[Dict[str, Any]],
) -> Tuple[NonSourceCompiledNode, List[InjectedCTE]]:

if model.compiled_sql is None:
raise RuntimeException(
'Cannot inject ctes into an unparsed node', model
)
if model.extra_ctes_injected:
return (model, model.extra_ctes)

if flags.STRICT_MODE:
if not isinstance(model, tuple(COMPILED_TYPES.values())):
raise InternalException(
f'Bad model type: {type(model)}'
)
# Just to make it plain that nothing is actually injected for this case
if not model.extra_ctes:
model.extra_ctes_injected = True
manifest.update_node(model)
return (model, model.extra_ctes)

# This stores the ctes which will all be recursively
# gathered and then "injected" into the model.
prepended_ctes: List[InjectedCTE] = []

dbt_test_name = self._get_dbt_test_name()

# extra_ctes are added to the model by
# RuntimeRefResolver.create_relation, which adds an
# extra_cte for every model relation which is an
# ephemeral model.
for cte in model.extra_ctes:
if cte.id == dbt_test_name:
sql = cte.sql
else:
cte_model = self._get_compiled_model(
manifest,
cte.id,
extra_context,
)
cte_model, new_prepended_ctes = self._recursively_prepend_ctes(
cte_model, manifest, extra_context
)
if cte.id not in manifest.nodes:
raise InternalException(
f'During compilation, found a cte reference that '
f'could not be resolved: {cte.id}'
)
cte_model = manifest.nodes[cte.id]

if not cte_model.is_ephemeral_model:
raise InternalException(f'{cte.id} is not ephemeral')

# This model has already been compiled, so it's been
# through here before
if getattr(cte_model, 'compiled', False):
assert isinstance(cte_model,
tuple(COMPILED_TYPES.values()))
cte_model = cast(NonSourceCompiledNode, cte_model)
new_prepended_ctes = cte_model.extra_ctes

# if the cte_model isn't compiled, i.e. first time here
else:
# This is an ephemeral parsed model that we can compile.
# Compile and update the node
cte_model = self._compile_node(
cte_model, manifest, extra_context)
# recursively call this method
cte_model, new_prepended_ctes = \
self._recursively_prepend_ctes(
cte_model, manifest, extra_context
)
# Save compiled SQL file and sync manifest
self._write_node(cte_model)
manifest.sync_update_node(cte_model)

_extend_prepended_ctes(prepended_ctes, new_prepended_ctes)

new_cte_name = self.add_ephemeral_prefix(cte_model.name)
sql = f' {new_cte_name} as (\n{cte_model.compiled_sql}\n)'

_add_prepended_cte(prepended_ctes, InjectedCTE(id=cte.id, sql=sql))

model = self._model_prepend_ctes(model, prepended_ctes)
# We don't save injected_sql into compiled sql for ephemeral models
# because it will cause problems with processing of subsequent models.
# Ephemeral models do not produce executable SQL of their own.
if not model.is_ephemeral_model:
injected_sql = self._inject_ctes_into_sql(
model.compiled_sql,
prepended_ctes,
)
model.compiled_sql = injected_sql
model.extra_ctes_injected = True
model.extra_ctes = prepended_ctes
model.validate(model.to_dict())

manifest.update_node(model)

return model, prepended_ctes

def _insert_ctes(
def _add_ctes(
self,
compiled_node: NonSourceCompiledNode,
manifest: Manifest,
Expand All @@ -352,11 +359,12 @@ def _insert_ctes(
compiled_node.extra_ctes.append(cte)
compiled_node.compiled_sql = f'\nselect count(*) from {name}'

injected_node, _ = self._recursively_prepend_ctes(
compiled_node, manifest, extra_context
)
return injected_node
return compiled_node

# creates a compiled_node from the ManifestNode passed in,
# creates a "context" dictionary for jinja rendering,
# and then renders the "compiled_sql" using the node, the
# raw_sql and the context.
def _compile_node(
self,
node: ManifestNode,
Expand All @@ -374,7 +382,6 @@ def _compile_node(
'compiled_sql': None,
'extra_ctes_injected': False,
'extra_ctes': [],
'injected_sql': None,
})
compiled_node = _compiled_type_for(node).from_dict(data)

Expand All @@ -390,11 +397,13 @@ def _compile_node(

compiled_node.compiled = True

injected_node = self._insert_ctes(
# add ctes for specific test nodes, and also for
# possible future use in adapters
compiled_node = self._add_ctes(
compiled_node, manifest, extra_context
)

return injected_node
return compiled_node

def write_graph_file(self, linker: Linker, manifest: Manifest):
filename = graph_file_name
Expand Down Expand Up @@ -449,26 +458,26 @@ def compile(self, manifest: Manifest, write=True) -> Graph:

return Graph(linker.graph)

# writes the "compiled_sql" into the target/compiled directory
def _write_node(self, node: NonSourceCompiledNode) -> ManifestNode:
if not _is_writable(node):
if (not node.extra_ctes_injected or
node.resource_type == NodeType.Snapshot):
return node
logger.debug(f'Writing injected SQL for node "{node.unique_id}"')

if node.injected_sql is None:
# this should not really happen, but it'd be a shame to crash
# over it
logger.error(
f'Compiled node "{node.unique_id}" had no injected_sql, '
'cannot write sql!'
)
else:
if node.compiled_sql:
node.build_path = node.write_node(
self.config.target_path,
'compiled',
node.injected_sql
node.compiled_sql
)
return node

# This is the main entry point into this code. It's called by
# CompileRunner.compile, GenericRPCRunner.compile, and
# RunTask.get_hook_sql. It calls '_compile_node' to convert
# the node into a compiled node, and then calls the
# recursive method to "prepend" the ctes.
def compile_node(
self,
node: ManifestNode,
Expand All @@ -478,16 +487,9 @@ def compile_node(
) -> NonSourceCompiledNode:
node = self._compile_node(node, manifest, extra_context)

if write and _is_writable(node):
node, _ = self._recursively_prepend_ctes(
node, manifest, extra_context
)
if write:
self._write_node(node)
return node


def _is_writable(node):
if not node.injected_sql:
return False

if node.resource_type == NodeType.Snapshot:
return False

return True
4 changes: 3 additions & 1 deletion core/dbt/context/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,9 @@ def post_hooks(self) -> List[Dict[str, Any]]:

@contextproperty
def sql(self) -> Optional[str]:
return getattr(self.model, 'injected_sql', None)
if getattr(self.model, 'extra_ctes_injected', None):
return self.model.compiled_sql
return None

@contextproperty
def database(self) -> str:
Expand Down
1 change: 0 additions & 1 deletion core/dbt/contracts/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class CompiledNode(ParsedNode, CompiledNodeMixin):
compiled_sql: Optional[str] = None
extra_ctes_injected: bool = False
extra_ctes: List[InjectedCTE] = field(default_factory=list)
injected_sql: Optional[str] = None

def set_cte(self, cte_id: str, sql: str):
"""This is the equivalent of what self.extra_ctes[cte_id] = sql would
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def data(self):
result.update({
'raw_sql': self.node.raw_sql,
# the node isn't always compiled, but if it is, include that!
'compiled_sql': getattr(self.node, 'injected_sql', None),
'compiled_sql': getattr(self.node, 'compiled_sql', None),
})
return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['injected_sql']) %}
{% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@


{% macro snapshot_check_all_get_existing_columns(node, target_exists) -%}
{%- set query_columns = get_columns_in_query(node['injected_sql']) -%}
{%- set query_columns = get_columns_in_query(node['compiled_sql']) -%}
{%- if not target_exists -%}
{# no table yet -> return whatever the query does #}
{{ return([false, query_columns]) }}
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/rpc/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class RPCCompileRunner(GenericRPCRunner[RemoteCompileResult]):
def execute(self, compiled_node, manifest) -> RemoteCompileResult:
return RemoteCompileResult(
raw_sql=compiled_node.raw_sql,
compiled_sql=compiled_node.injected_sql,
compiled_sql=compiled_node.compiled_sql,
node=compiled_node,
timing=[], # this will get added later
logs=[],
Expand All @@ -88,7 +88,7 @@ def from_run_result(
class RPCExecuteRunner(GenericRPCRunner[RemoteRunResult]):
def execute(self, compiled_node, manifest) -> RemoteRunResult:
_, execute_result = self.adapter.execute(
compiled_node.injected_sql, fetch=True
compiled_node.compiled_sql, fetch=True
)

table = ResultTable(
Expand All @@ -98,7 +98,7 @@ def execute(self, compiled_node, manifest) -> RemoteRunResult:

return RemoteRunResult(
raw_sql=compiled_node.raw_sql,
compiled_sql=compiled_node.injected_sql,
compiled_sql=compiled_node.compiled_sql,
node=compiled_node,
table=table,
timing=[],
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def raise_on_first_error(self):
def get_hook_sql(self, adapter, hook, idx, num_hooks, extra_context):
compiler = adapter.get_compiler()
compiled = compiler.compile_node(hook, self.manifest, extra_context)
statement = compiled.injected_sql
statement = compiled.compiled_sql
hook_index = hook.index or num_hooks
hook_obj = get_hook(statement, index=hook_index)
return hook_obj.sql or ''
Expand Down
Loading

0 comments on commit 8be9825

Please sign in to comment.