Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix run levels for tests, models that depend on ephemeral models #343

Merged
merged 4 commits into from
Mar 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Support composite unique key in archivals ([#324](https://github.com/fishtown-analytics/dbt/pull/324))
- Fix target paths ([#331](https://github.com/fishtown-analytics/dbt/pull/331), [#329](https://github.com/fishtown-analytics/dbt/issues/329))
- Ignore commented-out schema tests ([#330](https://github.com/fishtown-analytics/dbt/pull/330), [#328](https://github.com/fishtown-analytics/dbt/issues/328))
- Fix run levels ([#343](https://github.com/fishtown-analytics/dbt/pull/343), [#340](https://github.com/fishtown-analytics/dbt/issues/340), [#338](https://github.com/fishtown-analytics/dbt/issues/338))

### Changes

Expand Down
9 changes: 5 additions & 4 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import dbt.utils

from dbt.model import Model
from dbt.utils import This, Var, is_enabled, get_materialization, NodeType
from dbt.utils import This, Var, is_enabled, get_materialization, NodeType, \
is_type

from dbt.linker import Linker
from dbt.runtime import RuntimeContext
Expand Down Expand Up @@ -256,7 +257,7 @@ def wrapped_do_ref(*args):
logger.info("Compiler error in {}".format(model.get('path')))
logger.info("Enabled models:")
for n, m in all_models.items():
if m.get('resource_type') == NodeType.Model:
if is_type(m, NodeType.Model):
logger.info(" - {}".format(m.get('unique_id')))
raise e

Expand Down Expand Up @@ -382,7 +383,7 @@ def compile_graph(self, linker, flat_graph):
# data tests get wrapped in count(*)
# TODO : move this somewhere more reasonable
if 'data' in injected_node['tags'] and \
injected_node.get('resource_type') == NodeType.Test:
is_type(injected_node, NodeType.Test):
injected_node['wrapped_sql'] = (
"select count(*) from (\n{test_sql}\n) sbq").format(
test_sql=injected_node['injected_sql'])
Expand All @@ -393,7 +394,7 @@ def compile_graph(self, linker, flat_graph):

wrapped_graph['nodes'][name] = injected_node

elif injected_node.get('resource_type') == NodeType.Archive:
elif is_type(injected_node, NodeType.Archive):
# unfortunately we do everything automagically for
# archives. in the future it'd be nice to generate
# the SQL at the parser level.
Expand Down
15 changes: 2 additions & 13 deletions dbt/linker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import networkx as nx
from collections import defaultdict

import dbt.compilation
from dbt.utils import NodeType
import dbt.utils


def from_file(graph_file):
Expand Down Expand Up @@ -58,16 +57,6 @@ def as_topological_ordering(self, limit_to=None):
"{}".format(cycle)
)

def is_blocking_dependency(self, node_data):
# sorting by # ancestors works, but only if we strictly consider
# non-ephemeral models

if 'dbt_run_type' not in node_data or 'materialized' not in node_data:
return False

return node_data['dbt_run_type'] == NodeType.Model \
and node_data['materialized'] != 'ephemeral'

def as_dependency_list(self, limit_to=None):
"""returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each
element contains nodes whose dependenices are subsumed by the union of
Expand All @@ -92,7 +81,7 @@ def as_dependency_list(self, limit_to=None):
num_ancestors = len([
ancestor for ancestor in
nx.ancestors(self.graph, node)
# if self.is_blocking_dependency(self.graph[ancestor])
if dbt.utils.is_blocking_dependency(self.get_node(ancestor))
])
depth_nodes[num_ancestors].append(node)

Expand Down
51 changes: 30 additions & 21 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dbt.adapters.factory import get_adapter
from dbt.logger import GLOBAL_LOGGER as logger

from dbt.utils import get_materialization, NodeType
from dbt.utils import get_materialization, NodeType, is_type

import dbt.clients.jinja
import dbt.compilation
Expand Down Expand Up @@ -85,11 +85,11 @@ def print_counts(flat_nodes):


def print_start_line(node, schema_name, index, total):
if node.get('resource_type') == NodeType.Model:
if is_type(node, NodeType.Model):
print_model_start_line(node, schema_name, index, total)
if node.get('resource_type') == NodeType.Test:
if is_type(node, NodeType.Test):
print_test_start_line(node, schema_name, index, total)
if node.get('resource_type') == NodeType.Archive:
if is_type(node, NodeType.Archive):
print_archive_start_line(node, index, total)


Expand Down Expand Up @@ -120,11 +120,11 @@ def print_archive_start_line(model, index, total):
def print_result_line(result, schema_name, index, total):
node = result.node

if node.get('resource_type') == NodeType.Model:
if is_type(node, NodeType.Model):
print_model_result_line(result, schema_name, index, total)
elif node.get('resource_type') == NodeType.Test:
elif is_type(node, NodeType.Test):
print_test_result_line(result, schema_name, index, total)
elif node.get('resource_type') == NodeType.Archive:
elif is_type(node, NodeType.Archive):
print_archive_result_line(result, index, total)


Expand Down Expand Up @@ -416,7 +416,6 @@ def __init__(self, project, target_path, args):
self.threads = self.args.threads

adapter = get_adapter(profile)
schema_name = adapter.get_default_schema(profile)

def call_get_columns_in_table(schema_name, table_name):
return adapter.get_columns_in_table(
Expand Down Expand Up @@ -469,11 +468,11 @@ def execute_node(self, node, existing):

node = self.inject_runtime_config(node)

if node.get('resource_type') == NodeType.Model:
if is_type(node, NodeType.Model):
result = execute_model(profile, node, existing)
elif node.get('resource_type') == NodeType.Test:
elif is_type(node, NodeType.Test):
result = execute_test(profile, node)
elif node.get('resource_type') == NodeType.Archive:
elif is_type(node, NodeType.Archive):
result = execute_archive(profile, node, self.context)

return result
Expand Down Expand Up @@ -516,6 +515,9 @@ def safe_execute_node(self, data):
status=status,
execution_time=execution_time)

def as_flat_dep_list(self, linker, nodes_to_run):
return [[linker.get_node(node) for node in nodes_to_run]]

def as_concurrent_dep_list(self, linker, nodes_to_run):
dependency_list = linker.as_dependency_list(nodes_to_run)

Expand Down Expand Up @@ -640,11 +642,10 @@ def on_complete(run_model_results):

map_result = pool.map_async(
self.safe_execute_node,
[(node, existing,) for node in local_nodes],
[(local_node, existing,) for local_node in local_nodes],
callback=on_complete
)
map_result.wait()
run_model_results = map_result.get()

node_index += threads

Expand Down Expand Up @@ -711,7 +712,8 @@ def try_create_schema(self):
raise

def run_types_from_graph(self, include_spec, exclude_spec,
resource_types, tags, should_run_hooks=False):
resource_types, tags, should_run_hooks=False,
flatten_graph=False):
linker = self.deserialize_graph()

selected_nodes = self.get_nodes_to_run(
Expand All @@ -721,9 +723,14 @@ def run_types_from_graph(self, include_spec, exclude_spec,
resource_types,
tags)

dependency_list = self.as_concurrent_dep_list(
linker,
selected_nodes)
dependency_list = []

if flatten_graph is False:
dependency_list = self.as_concurrent_dep_list(linker,
selected_nodes)
else:
dependency_list = self.as_flat_dep_list(linker,
selected_nodes)

self.try_create_schema()

Expand All @@ -746,11 +753,13 @@ def run_models(self, include_spec, exclude_spec):
def run_tests(self, include_spec, exclude_spec, tags):
return self.run_types_from_graph(include_spec,
exclude_spec,
[NodeType.Test],
tags)
resource_types=[NodeType.Test],
tags=tags,
flatten_graph=True)

def run_archives(self, include_spec, exclude_spec):
return self.run_types_from_graph(include_spec,
exclude_spec,
[NodeType.Archive],
set())
resource_types=[NodeType.Archive],
tags=set(),
flatten_graph=True)
9 changes: 9 additions & 0 deletions dbt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ def to_string(s):
return s


def is_blocking_dependency(node):
return (is_type(node, NodeType.Model) and
get_materialization(node) != 'ephemeral')


def get_materialization(node):
return node.get('config', {}).get('materialized')

Expand All @@ -215,6 +220,10 @@ def is_enabled(node):
return node.get('config', {}).get('enabled') is True


def is_type(node, _type):
return node.get('resource_type') == _type


def get_pseudo_test_path(node_name, source_path, test_type):
"schema tests all come from schema.yml files. fake a source sql file"
source_path_parts = split_path(source_path)
Expand Down
13 changes: 12 additions & 1 deletion test/unit/test_linker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import mock
import unittest

import dbt.utils

from dbt.compilation import Linker


class LinkerTest(unittest.TestCase):

def setUp(self):
self.real_is_blocking_dependency = dbt.utils.is_blocking_dependency
self.linker = Linker()

dbt.utils.is_blocking_dependency = mock.MagicMock(return_value=True)

def tearDown(self):
dbt.utils.is_blocking_dependency = self.real_is_blocking_dependency

def test_linker_add_node(self):
expected_nodes = ['A', 'B', 'C']
for node in expected_nodes:
Expand Down Expand Up @@ -69,7 +79,8 @@ def test_linker_bad_limit_throws_runtime_error(self):
for (l, r) in actual_deps:
self.linker.dependency(l, r)

self.assertRaises(RuntimeError, self.linker.as_dependency_list, ['ZZZ'])
self.assertRaises(RuntimeError,
self.linker.as_dependency_list, ['ZZZ'])

def test__find_cycles__cycles(self):
actual_deps = [('A', 'B'), ('B', 'C'), ('C', 'A')]
Expand Down