Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix out of order execution on model select (#1354) #1355

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions core/dbt/linker.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,28 @@ def join(self):
self.inner.join()


def _subset_graph(graph, include_nodes):
"""Create and return a new graph that is a shallow copy of graph but with
only the nodes in include_nodes. Transitive edges across removed nodes are
preserved as explicit new edges.
"""
new_graph = nx.algorithms.transitive_closure(graph)

include_nodes = set(include_nodes)

for node in graph.nodes():
if node not in include_nodes:
new_graph.remove_node(node)

for node in include_nodes:
if node not in new_graph:
raise RuntimeError(
"Couldn't find model '{}' -- does it exist or is "
"it disabled?".format(node)
)
return new_graph


class Linker(object):
def __init__(self, data=None):
if data is None:
Expand Down Expand Up @@ -209,23 +231,7 @@ def as_graph_queue(self, manifest, limit_to=None):
else:
graph_nodes = limit_to

new_graph = nx.DiGraph(self.graph)

to_remove = []
graph_nodes_lookup = set(graph_nodes)
for node in new_graph.nodes():
if node not in graph_nodes_lookup:
to_remove.append(node)

for node in to_remove:
new_graph.remove_node(node)

for node in graph_nodes:
if node not in new_graph:
raise RuntimeError(
"Couldn't find model '{}' -- does it exist or is "
"it disabled?".format(node)
)
new_graph = _subset_graph(self.graph, graph_nodes)
return GraphQueue(new_graph, manifest)

def get_dependent_nodes(self, node):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{{
config(materialized='table')
}}

select * from {{ ref('users_rollup') }}
33 changes: 27 additions & 6 deletions test/integration/007_graph_selection_tests/test_graph_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test__postgres__tags_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', 'tag:base+'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

created_models = self.get_models_in_schema()
self.assertFalse('base_users' in created_models)
Expand Down Expand Up @@ -89,7 +89,7 @@ def test__postgres__specific_model_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', 'users+'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

self.assertTablesEqual("seed", "users")
self.assertTablesEqual("summary_expected", "users_rollup")
Expand All @@ -104,7 +104,7 @@ def test__snowflake__specific_model_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', 'users+'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

self.assertManyTablesEqual(
["SEED", "USERS"],
Expand Down Expand Up @@ -194,7 +194,7 @@ def test__postgres__locally_qualified_name(self):
def test__postgres__childrens_parents(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
results = self.run_dbt(['run', '--models', '@base_users'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

created_models = self.get_models_in_schema()
self.assertIn('users_rollup', created_models)
Expand All @@ -207,12 +207,33 @@ def test__postgres__childrens_parents(self):
def test__postgres__more_childrens_parents(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
results = self.run_dbt(['run', '--models', '@users'])
# base_users, emails, users_rollup, but not users (ephemeral)
self.assertEqual(len(results), 3)
# base_users, emails, users_rollup, users_rollup_dependency, but not users (ephemeral)
self.assertEqual(len(results), 4)

created_models = self.get_models_in_schema()
self.assertIn('users_rollup', created_models)
self.assertIn('users', created_models)
self.assertIn('emails_alt', created_models)
self.assertNotIn('subdir', created_models)
self.assertNotIn('nested_users', created_models)

@attr(type='snowflake')
def test__snowflake__skip_intermediate(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
results = self.run_dbt(['run', '--models', '@users'])
# base_users, emails, users_rollup, users_rollup_dependency
self.assertEqual(len(results), 4)

# now re-run, skipping users_rollup
results = self.run_dbt(['run', '--models', '@users', '--exclude', 'users_rollup'])
self.assertEqual(len(results), 3)

# make sure that users_rollup_dependency and users don't interleave
users = [r for r in results if r.node.name == 'users'][0]
dep = [r for r in results if r.node.name == 'users_rollup_dependency'][0]
user_last_end = users.timing[1]['completed_at']
dep_first_start = dep.timing[0]['started_at']
self.assertTrue(
user_last_end < dep_first_start,
'dependency started before its transitive parent ({} > {})'.format(user_last_end, dep_first_start)
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def run_schema_and_assert(self, include, exclude, expected_tests):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")
self.run_dbt(["deps"])
results = self.run_dbt(['run', '--exclude', 'never_selected'])
self.assertEqual(len(results), 8)
self.assertEqual(len(results), 9)

args = FakeArgs()
args.models = include
Expand Down
10 changes: 6 additions & 4 deletions test/integration/007_graph_selection_tests/test_tag_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test__postgres__select_tag_and_children(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', '+tag:specified_in_project+'])
self.assertEqual(len(results), 2)
self.assertEqual(len(results), 3)

models_run = [r.node['name'] for r in results]
self.assertTrue('users' in models_run)
Expand All @@ -69,8 +69,10 @@ def test__postgres__select_tag_in_model_with_project_Config(self):
self.run_sql_file("test/integration/007_graph_selection_tests/seed.sql")

results = self.run_dbt(['run', '--models', '@tag:users'])
self.assertEqual(len(results), 3)
self.assertEqual(len(results), 4)

models_run = set(r.node['name'] for r in results)
self.assertEqual({'users', 'users_rollup', 'emails_alt'}, models_run)

self.assertEqual(
{'users', 'users_rollup', 'emails_alt', 'users_rollup_dependency'},
models_run
)
2 changes: 1 addition & 1 deletion test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def run_dbt(self, args=None, expect_pass=True, strict=True):
args = ["run"]

if strict:
args = ["--single-threaded", "--strict"] + args
args = ["--strict"] + args
args.append('--log-cache-events')
logger.info("Invoking dbt with {}".format(args))

Expand Down