From 73ff1f5c806aa4bff04509dfd9a9391d628cda64 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 26 Jul 2016 16:46:36 -0400 Subject: [PATCH 01/18] wip --- dbt/compilation.py | 12 ++++++-- dbt/runner.py | 72 ++++++++++++++++++++++++++++++++-------------- 2 files changed, 60 insertions(+), 24 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 707d92d04ef..632ff45da2f 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -16,12 +16,20 @@ def __init__(self): def nodes(self): return self.graph.nodes() - def as_dependency_list(self, limit_to=None): + def graph_to_dependency_list(self, the_graph, limit_to=None): try: - return nx.topological_sort(self.graph, nbunch=limit_to) + return nx.topological_sort(the_graph, nbunch=limit_to) except KeyError as e: raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(e)) + def as_dependency_list(self, limit_to=None): + return self.graph_to_dependency_list(self.graph, limit_to) + + def as_disjoint_dependency_lists(self, limit_to=None): + subgraphs = list(nx.weakly_connected_component_subgraphs(self.graph)) + all_dep_lists = [self.graph_to_dependency_list(g, limit_to) for g in subgraphs] + return [dep_list for dep_list in all_dep_lists if len(dep_list) > 0] + def dependency(self, node1, node2): "indicate that node1 depends on node2" self.graph.add_node(node1) diff --git a/dbt/runner.py b/dbt/runner.py index 1fff81db8b7..5aa567065b0 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -10,6 +10,8 @@ from dbt.source import Source from dbt.utils import find_model_by_name +from multiprocessing.dummy import Pool as ThreadPool + SCHEMA_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the schema '{schema}'. Either create the schema manually, or adjust the permissions of the '{user}' user.""" @@ -112,6 +114,26 @@ def get_model_by_fqn(self, models, fqn): return model raise RuntimeError("Couldn't find a compiled model with fqn: '{}'".format(fqn)) + def execute_model(self, cursor, handle, target, existing, model): + if model.name in existing: + self.__drop(cursor, target.schema, model.name, existing[model.name]) + handle.commit() + + #print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) + print("Creating relation {}.{}".format(target.schema, model.name)) + try: + self.__do_execute(cursor, model.contents, model) + except psycopg2.ProgrammingError as e: + if "permission denied for" in e.diag.message_primary: + raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + else: + raise e + handle.commit() + + def execute_disjoint_models(self, cursor, handle, target, existing, dependency_list): + for model in dependency_list: + self.execute_model(cursor, handle, target, existing, model) + def execute_models(self, linker, models, limit_to=None): target = self.get_target() @@ -120,26 +142,32 @@ def execute_models(self, linker, models, limit_to=None): dependency_list = [self.get_model_by_fqn(models, fqn) for fqn in linker.as_dependency_list(limit_to)] existing = self.query_for_existing(cursor, target.schema); - num_models = len(dependency_list) - if num_models == 0: - print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) - return - - for index, model in enumerate(dependency_list): - if model.name in existing: - self.__drop(cursor, target.schema, model.name, existing[model.name]) - handle.commit() - - print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) - try: - self.__do_execute(cursor, model.contents, model) - except psycopg2.ProgrammingError as e: - if "permission denied for" in e.diag.message_primary: - raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) - else: - raise e - handle.commit() - yield model + disjoint_dependency_lists = [[self.get_model_by_fqn(models, fqn) for fqn in fqn_list] for fqn_list in linker.as_disjoint_dependency_lists()] + pool = ThreadPool(5) + results = pool.map(lambda x: self.execute_disjoint_models(cursor, handle, target, existing, x), disjoint_dependency_lists) + pool.close() + pool.join() + + #num_models = len(dependency_list) + #if num_models == 0: + # print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) + # return + + #for index, model in enumerate(dependency_list): + # if model.name in existing: + # self.__drop(cursor, target.schema, model.name, existing[model.name]) + # handle.commit() + + # print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) + # try: + # self.__do_execute(cursor, model.contents, model) + # except psycopg2.ProgrammingError as e: + # if "permission denied for" in e.diag.message_primary: + # raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + # else: + # raise e + # handle.commit() + # yield model def run(self, specified_models=None): linker = self.deserialize_graph() @@ -166,8 +194,8 @@ def run(self, specified_models=None): if schema_name not in schemas: self.create_schema_or_exit(schema_name) - for model in self.execute_models(linker, compiled_models, limit_to): - yield model, True + self.execute_models(linker, compiled_models, limit_to) + return [] except psycopg2.OperationalError as e: print("ERROR: Could not connect to the target database. Try `dbt debug` for more information") print(str(e)) From 099994bfbd30c7fe7c98a48e12d0d71faa72f383 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 27 Jul 2016 14:42:47 -0400 Subject: [PATCH 02/18] run queries in parallel, considering deps --- dbt/compilation.py | 33 +++++++++--- dbt/runner.py | 128 ++++++++++++++++++++++++++------------------- 2 files changed, 99 insertions(+), 62 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 632ff45da2f..596497cb5de 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -16,19 +16,38 @@ def __init__(self): def nodes(self): return self.graph.nodes() - def graph_to_dependency_list(self, the_graph, limit_to=None): + def as_dependency_list(self, limit_to=None): try: return nx.topological_sort(the_graph, nbunch=limit_to) except KeyError as e: raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(e)) - def as_dependency_list(self, limit_to=None): - return self.graph_to_dependency_list(self.graph, limit_to) + def as_sequential_dependency_lists(self, limit_to=None): + """returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each element contains nodes whose + dependenices are subsumed by the union of all lists before it. In this way, all nodes in list `i` + can be run simultaneously assuming that all lists before list `i` have been completed""" + + # only populate `nodes` with nodes at or below nodes in the limit_to list. If limit_to + # is None, then use all of the nodes in the graph + if limit_to is None: + graph_nodes = set(self.graph.nodes()) + else: + graph_nodes = set() + for node in limit_to: + reachable_nodes = nx.depth_first_search.dfs_postorder_nodes(self.graph, node) + graph_nodes.update(set(reachable_nodes)) + + depth_nodes = defaultdict(list) + + for node in graph_nodes: + num_ancestors = len(nx.ancestors(self.graph, node)) + depth_nodes[num_ancestors].append(node) + + sequential_node_lists = [] + for depth in sorted(depth_nodes.keys()): + sequential_node_lists.append(depth_nodes[depth]) - def as_disjoint_dependency_lists(self, limit_to=None): - subgraphs = list(nx.weakly_connected_component_subgraphs(self.graph)) - all_dep_lists = [self.graph_to_dependency_list(g, limit_to) for g in subgraphs] - return [dep_list for dep_list in all_dep_lists if len(dep_list) > 0] + return sequential_node_lists def dependency(self, node1, node2): "indicate that node1 depends on node2" diff --git a/dbt/runner.py b/dbt/runner.py index 5aa567065b0..19730d83812 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -3,6 +3,7 @@ import psycopg2 import os, sys +import functools from dbt.compilation import Linker, Compiler from dbt.templates import BaseCreateTemplate @@ -15,13 +16,16 @@ SCHEMA_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the schema '{schema}'. Either create the schema manually, or adjust the permissions of the '{user}' user.""" -RELATION_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the model '{model}' \ -in the schema '{schema}'.\nPlease adjust the permissions of the '{user}' user on the '{schema}' schema. +RELATION_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the model '{model}' in the schema '{schema}'. +Please adjust the permissions of the '{user}' user on the '{schema}' schema. With a superuser account, execute the following commands, then re-run dbt. grant usage, create on schema "{schema}" to "{user}"; -grant select on all tables in schema "{schema}" to "{user}"; -""" +grant select on all tables in schema "{schema}" to "{user}";""" + +RELATION_NOT_OWNER_MESSAGE = """The user '{user}' does not have sufficient permissions to drop the model '{model}' in the schema '{schema}'. +This is likely because the relation was created by a different user. Either delete the model "{schema}"."{model}" manually, +or adjust the permissions of the '{user}' user in the '{schema}' schema.""" class Runner: def __init__(self, project, target_path, run_mode): @@ -74,14 +78,17 @@ def create_schema_or_exit(self, schema_name): else: raise e - def query_for_existing(self, cursor, schema): + def query_for_existing(self, target, schema): sql = """ select tablename as name, 'table' as type from pg_tables where schemaname = '{schema}' union all select viewname as name, 'view' as type from pg_views where schemaname = '{schema}' """.format(schema=schema) - cursor.execute(sql) - existing = [(name, relation_type) for (name, relation_type) in cursor.fetchall()] + + with target.get_handle() as handle: + with handle.cursor() as cursor: + cursor.execute(sql) + existing = [(name, relation_type) for (name, relation_type) in cursor.fetchall()] return dict(existing) @@ -114,60 +121,71 @@ def get_model_by_fqn(self, models, fqn): return model raise RuntimeError("Couldn't find a compiled model with fqn: '{}'".format(fqn)) - def execute_model(self, cursor, handle, target, existing, model): - if model.name in existing: - self.__drop(cursor, target.schema, model.name, existing[model.name]) - handle.commit() + def execute_model(self, data): + target = data['target'] + model = data['model'] + drop_type = data['drop_type'] - #print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) - print("Creating relation {}.{}".format(target.schema, model.name)) - try: - self.__do_execute(cursor, model.contents, model) - except psycopg2.ProgrammingError as e: - if "permission denied for" in e.diag.message_primary: - raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) - else: - raise e - handle.commit() + with target.get_handle() as handle: + with handle.cursor() as cursor: + if drop_type is not None: + try: + self.__drop(cursor, target.schema, model.name, drop_type) + except psycopg2.ProgrammingError as e: + if "must be owner of relation" in e.diag.message_primary: + raise RuntimeError(RELATION_NOT_OWNER_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + else: + raise e + handle.commit() + + #print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) + print("Creating relation {}.{}".format(target.schema, model.name)) + + try: + self.__do_execute(cursor, model.contents, model) + except psycopg2.ProgrammingError as e: + if "permission denied for" in e.diag.message_primary: + raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + else: + raise e + handle.commit() - def execute_disjoint_models(self, cursor, handle, target, existing, dependency_list): - for model in dependency_list: - self.execute_model(cursor, handle, target, existing, model) + return model def execute_models(self, linker, models, limit_to=None): target = self.get_target() - with target.get_handle() as handle: - with handle.cursor() as cursor: - dependency_list = [self.get_model_by_fqn(models, fqn) for fqn in linker.as_dependency_list(limit_to)] - existing = self.query_for_existing(cursor, target.schema); - - disjoint_dependency_lists = [[self.get_model_by_fqn(models, fqn) for fqn in fqn_list] for fqn_list in linker.as_disjoint_dependency_lists()] - pool = ThreadPool(5) - results = pool.map(lambda x: self.execute_disjoint_models(cursor, handle, target, existing, x), disjoint_dependency_lists) - pool.close() - pool.join() - - #num_models = len(dependency_list) - #if num_models == 0: - # print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) - # return - - #for index, model in enumerate(dependency_list): - # if model.name in existing: - # self.__drop(cursor, target.schema, model.name, existing[model.name]) - # handle.commit() - - # print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) - # try: - # self.__do_execute(cursor, model.contents, model) - # except psycopg2.ProgrammingError as e: - # if "permission denied for" in e.diag.message_primary: - # raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) - # else: - # raise e - # handle.commit() - # yield model + sequential_node_lists = linker.as_sequential_dependency_lists(limit_to) + + if len(sequential_node_lists) == 0: + print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) + return + + existing = self.query_for_existing(target, target.schema); + + # TODO : better names and clean this up! + sequential_model_list = [] + for node_list in sequential_node_lists: + model_list = [] + for node in node_list: + model = self.get_model_by_fqn(models, node) + drop_type = existing.get(model.name, None) # False, 'view', or 'table' + data = { + "model" : model, + "target": target, + "drop_type": drop_type + } + model_list.append(data) + sequential_model_list.append(model_list) + + # TODO : make this an arg + pool = ThreadPool(4) + for model_list in sequential_model_list: + results = pool.map(self.execute_model, model_list) + for model in results: + print("Created model {}".format(model.name)) # TODO : better logging + pool.close() + pool.join() def run(self, specified_models=None): linker = self.deserialize_graph() From 1731ce863270ab7e37eee13f6f39b30c3513b7bd Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 27 Jul 2016 15:05:49 -0400 Subject: [PATCH 03/18] better logging --- dbt/runner.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index 19730d83812..a9d470aaeb9 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -138,9 +138,6 @@ def execute_model(self, data): raise e handle.commit() - #print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) - print("Creating relation {}.{}".format(target.schema, model.name)) - try: self.__do_execute(cursor, model.contents, model) except psycopg2.ProgrammingError as e: @@ -179,11 +176,15 @@ def execute_models(self, linker, models, limit_to=None): sequential_model_list.append(model_list) # TODO : make this an arg + completed = 0 + num_models = sum([len(model_list) for model_list in sequential_model_list]) + pool = ThreadPool(4) for model_list in sequential_model_list: results = pool.map(self.execute_model, model_list) for model in results: - print("Created model {}".format(model.name)) # TODO : better logging + completed += 1 + print("{} of {} -- Created relation {}.{}".format(completed, num_models, target.schema, model.name)) pool.close() pool.join() From b9bcef28407fdc8d82e7a755af815715e241ae5d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 27 Jul 2016 15:18:48 -0400 Subject: [PATCH 04/18] code cleanup --- dbt/compilation.py | 10 ++++----- dbt/runner.py | 53 ++++++++++++++++++++-------------------------- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 596497cb5de..ee7d81a0695 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -16,13 +16,13 @@ def __init__(self): def nodes(self): return self.graph.nodes() - def as_dependency_list(self, limit_to=None): + def as_topological_ordering(self, limit_to=None): try: return nx.topological_sort(the_graph, nbunch=limit_to) except KeyError as e: raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(e)) - def as_sequential_dependency_lists(self, limit_to=None): + def as_dependency_list(self, limit_to=None): """returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each element contains nodes whose dependenices are subsumed by the union of all lists before it. In this way, all nodes in list `i` can be run simultaneously assuming that all lists before list `i` have been completed""" @@ -43,11 +43,11 @@ def as_sequential_dependency_lists(self, limit_to=None): num_ancestors = len(nx.ancestors(self.graph, node)) depth_nodes[num_ancestors].append(node) - sequential_node_lists = [] + dependency_list = [] for depth in sorted(depth_nodes.keys()): - sequential_node_lists.append(depth_nodes[depth]) + dependency_list.append(depth_nodes[depth]) - return sequential_node_lists + return dependency_list def dependency(self, node1, node2): "indicate that node1 depends on node2" diff --git a/dbt/runner.py b/dbt/runner.py index a9d470aaeb9..6a497f03cdf 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -149,45 +149,39 @@ def execute_model(self, data): return model - def execute_models(self, linker, models, limit_to=None): + def execute_models(self, linker, models, limit_to=None, num_threads=4): target = self.get_target() - sequential_node_lists = linker.as_sequential_dependency_lists(limit_to) + dependency_list = linker.as_dependency_list(limit_to) + num_models = sum([len(node_list) for node_list in dependency_list]) - if len(sequential_node_lists) == 0: + if num_models == 0: print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) return existing = self.query_for_existing(target, target.schema); - # TODO : better names and clean this up! - sequential_model_list = [] - for node_list in sequential_node_lists: - model_list = [] - for node in node_list: - model = self.get_model_by_fqn(models, node) - drop_type = existing.get(model.name, None) # False, 'view', or 'table' - data = { - "model" : model, - "target": target, - "drop_type": drop_type - } - model_list.append(data) - sequential_model_list.append(model_list) - - # TODO : make this an arg - completed = 0 - num_models = sum([len(model_list) for model_list in sequential_model_list]) - - pool = ThreadPool(4) - for model_list in sequential_model_list: - results = pool.map(self.execute_model, model_list) - for model in results: - completed += 1 - print("{} of {} -- Created relation {}.{}".format(completed, num_models, target.schema, model.name)) + def wrap_fqn(target, models, existing, fqn): + model = self.get_model_by_fqn(models, fqn) + drop_type = existing.get(model.name, None) # False, 'view', or 'table' + return {"model" : model, "target": target, "drop_type": drop_type} + + # we can only pass one arg to the self.execute_model method below. Pass a dict w/ all the data we need + model_dependency_list = [[wrap_fqn(target, models, existing, fqn) for fqn in node_list] for node_list in dependency_list] + + pool = ThreadPool(num_threads) + + completed_models = [] + for model_list in model_dependency_list: + executed_models = pool.map(self.execute_model, model_list) + for model in executed_models: + completed_models.append(model) + print("{} of {} -- Created relation {}.{}".format(len(completed_models), num_models, target.schema, model.name)) pool.close() pool.join() + return completed_models + def run(self, specified_models=None): linker = self.deserialize_graph() compiled_models = self.get_compiled_models() @@ -213,8 +207,7 @@ def run(self, specified_models=None): if schema_name not in schemas: self.create_schema_or_exit(schema_name) - self.execute_models(linker, compiled_models, limit_to) - return [] + return self.execute_models(linker, compiled_models, limit_to) except psycopg2.OperationalError as e: print("ERROR: Could not connect to the target database. Try `dbt debug` for more information") print(str(e)) From fce38ba0d94fbcfa810c9c6a416ebd6ab9504dd9 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 27 Jul 2016 16:08:15 -0400 Subject: [PATCH 05/18] refactor --- dbt/runner.py | 67 ++++++++++++++++++++++++------------------------ dbt/task/run.py | 3 +-- dbt/task/test.py | 9 ++++--- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index 6a497f03cdf..36127f6c2cc 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -92,28 +92,31 @@ def query_for_existing(self, target, schema): return dict(existing) - def __drop(self, cursor, schema, relation, relation_type): + def __drop(self, target, model, relation_type): + schema = target.schema + relation = model.name + sql = 'drop {relation_type} if exists "{schema}"."{relation}" cascade'.format(schema=schema, relation_type=relation_type, relation=relation) - cursor.execute(sql) + self.__do_execute(target, sql, model) - def __do_execute(self, cursor, sql, model): - try: - cursor.execute(sql) - except Exception as e: - e.model = model - raise e + def __do_execute(self, target, sql, model): + with target.get_handle() as handle: + with handle.cursor() as cursor: + try: + cursor.execute(sql) + handle.commit() + except Exception as e: + e.model = model + raise e def drop_models(self, models): target = self.get_target() - with target.get_handle() as handle: - with handle.cursor() as cursor: - - existing = self.query_for_existing(cursor, target.schema); + existing = self.query_for_existing(target, target.schema); - for model in models: - model_name = model.fqn[-1] - self.__drop(cursor, target.schema, model_name, existing[model_name]) + for model in models: + model_name = model.fqn[-1] + self.__drop(target, model, existing[model_name]) def get_model_by_fqn(self, models, fqn): for model in models: @@ -126,26 +129,22 @@ def execute_model(self, data): model = data['model'] drop_type = data['drop_type'] - with target.get_handle() as handle: - with handle.cursor() as cursor: - if drop_type is not None: - try: - self.__drop(cursor, target.schema, model.name, drop_type) - except psycopg2.ProgrammingError as e: - if "must be owner of relation" in e.diag.message_primary: - raise RuntimeError(RELATION_NOT_OWNER_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) - else: - raise e - handle.commit() + if drop_type is not None: + try: + self.__drop(target, model, drop_type) + except psycopg2.ProgrammingError as e: + if "must be owner of relation" in e.diag.message_primary: + raise RuntimeError(RELATION_NOT_OWNER_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + else: + raise e - try: - self.__do_execute(cursor, model.contents, model) - except psycopg2.ProgrammingError as e: - if "permission denied for" in e.diag.message_primary: - raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) - else: - raise e - handle.commit() + try: + self.__do_execute(target, model.contents, model) + except psycopg2.ProgrammingError as e: + if "permission denied for" in e.diag.message_primary: + raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + else: + raise e return model diff --git a/dbt/task/run.py b/dbt/task/run.py index bc54af15537..5cd31857bdc 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -18,5 +18,4 @@ def compile(self): def run(self): self.compile() runner = Runner(self.project, self.project['target-path'], BaseCreateTemplate.label) - for (model, passed) in runner.run(self.args.models): - pass + runner.run(self.args.models) diff --git a/dbt/task/test.py b/dbt/task/test.py index 4a5f5265082..40d3df2e9a0 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -35,10 +35,13 @@ def run_and_catch_errors(self, func, onFinally=None): errored = False try: - for (model, test_passed) in func(): + #for (model, test_passed) in func(): + # executed_models.append(model) + # if test_passed: + # passed += 1 + for model in func(): executed_models.append(model) - if test_passed: - passed += 1 + passed += 1 except psycopg2.ProgrammingError as e: errored = True print("") From 0364d058f12b7d235ff19409ec52baba9e7af7ea Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 27 Jul 2016 23:21:57 -0400 Subject: [PATCH 06/18] better error handling + error messages --- dbt/runner.py | 41 +++++++++++++++++++++++++++++++---------- dbt/task/run.py | 10 +++++++++- dbt/task/test.py | 6 +++++- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index 36127f6c2cc..c550a7ee3fc 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -27,6 +27,15 @@ This is likely because the relation was created by a different user. Either delete the model "{schema}"."{model}" manually, or adjust the permissions of the '{user}' user in the '{schema}' schema.""" +class RunModelResult(object): + def __init__(self, model, error=None): + self.model = model + self.error = error + + @property + def errored(self): + return self.error is not None + class Runner: def __init__(self, project, target_path, run_mode): self.project = project @@ -113,7 +122,6 @@ def drop_models(self, models): target = self.get_target() existing = self.query_for_existing(target, target.schema); - for model in models: model_name = model.fqn[-1] self.__drop(target, model, existing[model_name]) @@ -124,11 +132,20 @@ def get_model_by_fqn(self, models, fqn): return model raise RuntimeError("Couldn't find a compiled model with fqn: '{}'".format(fqn)) - def execute_model(self, data): + def execute_wrapped_model(self, data): target = data['target'] model = data['model'] drop_type = data['drop_type'] + error = None + try: + self.execute_model(target, model, drop_type) + except (RuntimeError, psycopg2.ProgrammingError) as e: + error = "Error executing {filepath}\n{error}".format(filepath=model.filepath, error=str(e).strip()) + + return RunModelResult(model, error) + + def execute_model(self, target, model, drop_type): if drop_type is not None: try: self.__drop(target, model, drop_type) @@ -146,8 +163,6 @@ def execute_model(self, data): else: raise e - return model - def execute_models(self, linker, models, limit_to=None, num_threads=4): target = self.get_target() @@ -170,16 +185,22 @@ def wrap_fqn(target, models, existing, fqn): pool = ThreadPool(num_threads) - completed_models = [] + model_results = [] for model_list in model_dependency_list: - executed_models = pool.map(self.execute_model, model_list) - for model in executed_models: - completed_models.append(model) - print("{} of {} -- Created relation {}.{}".format(len(completed_models), num_models, target.schema, model.name)) + run_model_results = pool.map(self.execute_wrapped_model, model_list) + for run_model_result in run_model_results: + model_results.append(run_model_result) + + if run_model_result.errored: + print("{} of {} -- ERROR creating relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) + print(run_model_result.error.rjust(10)) + else: + print("{} of {} -- OK Created relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) + pool.close() pool.join() - return completed_models + return model_results def run(self, specified_models=None): linker = self.deserialize_graph() diff --git a/dbt/task/run.py b/dbt/task/run.py index 5cd31857bdc..4805c6127e3 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -1,4 +1,6 @@ +from __future__ import print_function + import os from dbt.templates import BaseCreateTemplate from dbt.runner import Runner @@ -18,4 +20,10 @@ def compile(self): def run(self): self.compile() runner = Runner(self.project, self.project['target-path'], BaseCreateTemplate.label) - runner.run(self.args.models) + results = runner.run(self.args.models) + + successes = [r for r in results if not r.errored] + errors = [r for r in results if r.errored] + + print() + print("Done. Executed {} models executed with {} failures".format(len(successes) + len(errors), len(errors))) diff --git a/dbt/task/test.py b/dbt/task/test.py index 40d3df2e9a0..5b92a9c4cd1 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -63,7 +63,11 @@ def run_and_catch_errors(self, func, onFinally=None): def run_test_creates(self): runner = Runner(self.project, self.project['target-path'], TestCreateTemplate.label) - self.run_and_catch_errors(runner.run, runner.drop_models) + def on_complete(query_results): + models = [query_result.model for query_result in query_results] + runner.drop_models(models) + + self.run_and_catch_errors(runner.run, on_complete) def run_validations(self): print("Validating schemas") From 39f4a8e83d4d87ac5a2eb43d0f0d674c601b2a7e Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Wed, 27 Jul 2016 23:23:12 -0400 Subject: [PATCH 07/18] don't delete test views for failed tests --- dbt/task/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/task/test.py b/dbt/task/test.py index 5b92a9c4cd1..0374825cc26 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -64,7 +64,7 @@ def run_and_catch_errors(self, func, onFinally=None): def run_test_creates(self): runner = Runner(self.project, self.project['target-path'], TestCreateTemplate.label) def on_complete(query_results): - models = [query_result.model for query_result in query_results] + models = [query_result.model for query_result in query_results if not query_result.errored] runner.drop_models(models) self.run_and_catch_errors(runner.run, on_complete) From 80305b8ed44a295718834083e4501361958de90d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 28 Jul 2016 00:09:09 -0400 Subject: [PATCH 08/18] don't run child queries if a parent fails --- dbt/compilation.py | 10 ++++++++++ dbt/runner.py | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index ee7d81a0695..84f5896f453 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -49,6 +49,16 @@ def as_dependency_list(self, limit_to=None): return dependency_list + def is_child_of(self, nodes, target_node): + "returns True if node is a child of a node in nodes. Otherwise, False" + node_span = set() + for node in nodes: + node_span.add(node) + for child in nx.descendants(self.graph, node): + node_span.add(child) + + return target_node in node_span + def dependency(self, node1, node2): "indicate that node1 depends on node2" self.graph.add_node(node1) diff --git a/dbt/runner.py b/dbt/runner.py index c550a7ee3fc..ec26073a72c 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -185,17 +185,24 @@ def wrap_fqn(target, models, existing, fqn): pool = ThreadPool(num_threads) + failed_models = set() + model_results = [] for model_list in model_dependency_list: - run_model_results = pool.map(self.execute_wrapped_model, model_list) + failed_nodes = [tuple(model.fqn) for model in failed_models] + models_to_execute = [data for data in model_list if not linker.is_child_of(failed_nodes, tuple(data['model'].fqn))] + run_model_results = pool.map(self.execute_wrapped_model, models_to_execute) for run_model_result in run_model_results: model_results.append(run_model_result) if run_model_result.errored: + failed_models.add(run_model_result.model) print("{} of {} -- ERROR creating relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) print(run_model_result.error.rjust(10)) else: print("{} of {} -- OK Created relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) + for i, model in enumerate(failed_models): + print("{} of {} -- SKIP relation {}.{} because parent failed".format(len(model_results) + i + 1, num_models, target.schema, run_model_result.model.name)) pool.close() pool.join() From 3afc31bcb350a1b140ceef41f763a0dc653fcf69 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 28 Jul 2016 00:32:41 -0400 Subject: [PATCH 09/18] make schema tests work --- dbt/schema_tester.py | 15 +++++++++------ dbt/task/test.py | 11 ++++------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dbt/schema_tester.py b/dbt/schema_tester.py index 467a3d0cf16..dd6e4e8813d 100644 --- a/dbt/schema_tester.py +++ b/dbt/schema_tester.py @@ -2,6 +2,9 @@ from dbt.targets import RedshiftTarget from dbt.source import Source +from dbt.runner import RunModelResult + +import psycopg2 QUERY_VALIDATE_NOT_NULL = """ with validation as ( @@ -76,6 +79,8 @@ def execute_query(self, model, sql): with handle.cursor() as cursor: try: cursor.execute(sql) + except psycopg2.ProgrammingError as e: + return e.diag.message_primary except Exception as e: e.model = model raise e @@ -183,13 +188,11 @@ def validate_schema(self, schemas): constraints = schema.schema[model_name]['constraints'] for constraint_type, constraint_data in constraints.items(): - try: - for test_passed in self.validate_schema_constraint(model, constraint_type, constraint_data): - yield model, test_passed - except RuntimeError as e: - print("ERRROR: {}".format(str(e))) + for test_passed in self.validate_schema_constraint(model, constraint_type, constraint_data): + yield model, test_passed def test(self): schemas = self.project_schemas() for (model, test_passed) in self.validate_schema(schemas): - yield model, test_passed + error = None if test_passed else "ERROR" + yield RunModelResult(model, error) diff --git a/dbt/task/test.py b/dbt/task/test.py index 0374825cc26..e96e7f02ca6 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -35,13 +35,10 @@ def run_and_catch_errors(self, func, onFinally=None): errored = False try: - #for (model, test_passed) in func(): - # executed_models.append(model) - # if test_passed: - # passed += 1 - for model in func(): - executed_models.append(model) - passed += 1 + for res in func(): + executed_models.append(res) + if not res.errored: + passed += 1 except psycopg2.ProgrammingError as e: errored = True print("") From 3ea88f3fa450c0c0ff4d69955ebaf5a194adec85 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 31 Jul 2016 00:20:11 -0400 Subject: [PATCH 10/18] better test reporting --- dbt/runner.py | 23 ++++++++++++++++++----- dbt/task/test.py | 15 ++++++++++----- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/dbt/runner.py b/dbt/runner.py index ec26073a72c..e795cd185cf 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -28,14 +28,19 @@ or adjust the permissions of the '{user}' user in the '{schema}' schema.""" class RunModelResult(object): - def __init__(self, model, error=None): + def __init__(self, model, error=None, skip=False): self.model = model self.error = error + self.skip = skip @property def errored(self): return self.error is not None + @property + def skipped(self): + return self.skip + class Runner: def __init__(self, project, target_path, run_mode): self.project = project @@ -143,7 +148,7 @@ def execute_wrapped_model(self, data): except (RuntimeError, psycopg2.ProgrammingError) as e: error = "Error executing {filepath}\n{error}".format(filepath=model.filepath, error=str(e).strip()) - return RunModelResult(model, error) + return RunModelResult(model, error=error) def execute_model(self, target, model, drop_type): if drop_type is not None: @@ -190,19 +195,27 @@ def wrap_fqn(target, models, existing, fqn): model_results = [] for model_list in model_dependency_list: failed_nodes = [tuple(model.fqn) for model in failed_models] + models_to_execute = [data for data in model_list if not linker.is_child_of(failed_nodes, tuple(data['model'].fqn))] + models_to_skip = [data for data in model_list if linker.is_child_of(failed_nodes, tuple(data['model'].fqn))] + + for i, data in enumerate(models_to_skip): + model = data['model'] + model_result = RunModelResult(model, skip=True) + model_results.append(model_result) + print("{} of {} -- SKIP relation {}.{} because parent failed".format(len(model_results), num_models, target.schema, model_result.model.name)) + run_model_results = pool.map(self.execute_wrapped_model, models_to_execute) + for run_model_result in run_model_results: model_results.append(run_model_result) if run_model_result.errored: failed_models.add(run_model_result.model) print("{} of {} -- ERROR creating relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) - print(run_model_result.error.rjust(10)) + print(run_model_result.error) else: print("{} of {} -- OK Created relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) - for i, model in enumerate(failed_models): - print("{} of {} -- SKIP relation {}.{} because parent failed".format(len(model_results) + i + 1, num_models, target.schema, run_model_result.model.name)) pool.close() pool.join() diff --git a/dbt/task/test.py b/dbt/task/test.py index e96e7f02ca6..0ae3fb41c0b 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -37,8 +37,6 @@ def run_and_catch_errors(self, func, onFinally=None): try: for res in func(): executed_models.append(res) - if not res.errored: - passed += 1 except psycopg2.ProgrammingError as e: errored = True print("") @@ -54,14 +52,21 @@ def run_and_catch_errors(self, func, onFinally=None): sys.exit(1) print("") - num_passed = len(executed_models) - print("{passed}/{num_executed} tests passed!".format(passed=passed, num_executed=num_passed)) + total = len(executed_models) + passed = len([model for model in executed_models if not model.errored and not model.skipped]) + errored = len([model for model in executed_models if model.errored]) + skipped = len([model for model in executed_models if model.skipped]) + print("PASS={passed} ERROR={errored} SKIP={skipped} TOTAL={total}".format(total=total, passed=passed, errored=errored, skipped=skipped)) + if errored > 0: + print("Tests completed with errors") + else: + print("All tests passed") print("") def run_test_creates(self): runner = Runner(self.project, self.project['target-path'], TestCreateTemplate.label) def on_complete(query_results): - models = [query_result.model for query_result in query_results if not query_result.errored] + models = [query_result.model for query_result in query_results if not query_result.errored and not query_result.skipped] runner.drop_models(models) self.run_and_catch_errors(runner.run, on_complete) From df405b99e21700a6200c66c4cb563d09b438c789 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 31 Jul 2016 17:08:36 -0400 Subject: [PATCH 11/18] code cleanup --- dbt/compilation.py | 5 +---- dbt/task/run.py | 1 + 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 84f5896f453..9649fd85d05 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -27,15 +27,12 @@ def as_dependency_list(self, limit_to=None): dependenices are subsumed by the union of all lists before it. In this way, all nodes in list `i` can be run simultaneously assuming that all lists before list `i` have been completed""" - # only populate `nodes` with nodes at or below nodes in the limit_to list. If limit_to - # is None, then use all of the nodes in the graph if limit_to is None: graph_nodes = set(self.graph.nodes()) else: graph_nodes = set() for node in limit_to: - reachable_nodes = nx.depth_first_search.dfs_postorder_nodes(self.graph, node) - graph_nodes.update(set(reachable_nodes)) + graph_nodes.update(nx.descendants(self.graph, node)) depth_nodes = defaultdict(list) diff --git a/dbt/task/run.py b/dbt/task/run.py index 4805c6127e3..3b51d866dfd 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -19,6 +19,7 @@ def compile(self): def run(self): self.compile() + runner = Runner(self.project, self.project['target-path'], BaseCreateTemplate.label) results = runner.run(self.args.models) From fc179eeca83fcaae7c0d2dde2b3a73865ee67eb9 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 31 Jul 2016 17:47:36 -0400 Subject: [PATCH 12/18] also compile specified node when --models given --- dbt/compilation.py | 1 + dbt/runner.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 9649fd85d05..3b09c1dc7cf 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -32,6 +32,7 @@ def as_dependency_list(self, limit_to=None): else: graph_nodes = set() for node in limit_to: + graph_nodes.add(node) graph_nodes.update(nx.descendants(self.graph, node)) depth_nodes = defaultdict(list) diff --git a/dbt/runner.py b/dbt/runner.py index e795cd185cf..5f970c031d5 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -176,7 +176,7 @@ def execute_models(self, linker, models, limit_to=None, num_threads=4): if num_models == 0: print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) - return + return [] existing = self.query_for_existing(target, target.schema); @@ -236,7 +236,7 @@ def run(self, specified_models=None): except RuntimeError as e: print("ERROR: {}".format(str(e))) print("Exiting") - return + return[] target_cfg = self.project.run_environment() schema_name = target_cfg['schema'] From 9a77cc804fc28ae30a7462840b710d196c36450d Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 31 Jul 2016 23:18:17 -0400 Subject: [PATCH 13/18] output when models are "missing" --- dbt/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/utils.py b/dbt/utils.py index 73f420d9628..bc105a4abf7 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -10,6 +10,9 @@ def find_model_by_name(models, name, package_namespace=None): nice_package_name = 'ANY' if package_namespace is None else package_namespace if len(found) == 0: + print("Enabled models:") + for model in models: + print(" - {}".format(".".join(model.fqn))) raise RuntimeError("Can't find a model named '{}' in package '{}' -- does it exist?".format(name, nice_package_name)) elif len(found) == 1: return found[0] From 5455289383f847c3334352bf96aa9aa5c40a3e19 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 31 Jul 2016 23:39:17 -0400 Subject: [PATCH 14/18] fix running test files with specific models enabled --- dbt/compilation.py | 12 +++++++++++- dbt/model.py | 12 +++++++++++- dbt/utils.py | 3 --- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index 3b09c1dc7cf..f4d88d909d8 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -151,7 +151,17 @@ def do_ref(*args): linker.dependency(source_model, other_model_fqn) return '"{}"."{}"'.format(schema, other_model_name) - return do_ref + def wrapped_do_ref(*args): + try: + return do_ref(*args) + except RuntimeError as e: + print("Compiler error in {}".format(model.filepath)) + print("Enabled models:") + for m in all_models: + print(" - {}".format(".".join(m.fqn))) + raise e + + return wrapped_do_ref def compile_model(self, linker, model, models): jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir)) diff --git a/dbt/model.py b/dbt/model.py index 633cd63a62c..daf059ef845 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -33,7 +33,7 @@ def get_config(self, primary_project): def load_from_project(model, the_project): config = the_project['model-defaults'].copy() model_configs = the_project['models'] - fqn = model.fqn[:] + fqn = model.original_fqn[:] while len(fqn) > 0: model_group = fqn.pop(0) if model_group in model_configs: @@ -59,6 +59,10 @@ def fqn(self): name, _ = os.path.splitext(parts[-1]) return [self.project['name']] + parts[1:-1] + [name] + @property + def original_fqn(self): + return self.fqn + class Model(DBTSource): def __init__(self, project, model_dir, rel_filepath): super(Model, self).__init__(project, model_dir, rel_filepath) @@ -142,6 +146,12 @@ def fqn(self): test_name = TestCreateTemplate.model_name(name) return [self.project['name']] + parts[1:-1] + [test_name] + @property + def original_fqn(self): + parts = self.filepath.split("/") + name, _ = os.path.splitext(parts[-1]) + return [self.project['name']] + parts[1:-1] + [name] + def __repr__(self): return "".format(self.project['name'], self.name, self.filepath) diff --git a/dbt/utils.py b/dbt/utils.py index bc105a4abf7..73f420d9628 100644 --- a/dbt/utils.py +++ b/dbt/utils.py @@ -10,9 +10,6 @@ def find_model_by_name(models, name, package_namespace=None): nice_package_name = 'ANY' if package_namespace is None else package_namespace if len(found) == 0: - print("Enabled models:") - for model in models: - print(" - {}".format(".".join(model.fqn))) raise RuntimeError("Can't find a model named '{}' in package '{}' -- does it exist?".format(name, nice_package_name)) elif len(found) == 1: return found[0] From eaf791887e369e56d4a9cc0052f4abe7ad6d0ca8 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 1 Aug 2016 09:10:07 -0400 Subject: [PATCH 15/18] provide num threads on CLI --- dbt/main.py | 7 +++++-- dbt/runner.py | 8 +++++--- dbt/task/run.py | 4 +++- dbt/task/test.py | 3 ++- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/dbt/main.py b/dbt/main.py index a0d62539eb9..3fcea3da40e 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -47,7 +47,10 @@ def handle(args): sub = subs.add_parser('deps', parents=[base_subparser]) sub.set_defaults(cls=deps_task.DepsTask, which='deps') - sub = subs.add_parser('run', parents=[base_subparser]) + concurrent_sub = argparse.ArgumentParser(add_help=False) + concurrent_sub.add_argument('-t', '--threads', type=int, default=1, choices=range(1, run_task.THREAD_LIMIT), help='Number of queries to execute concurrently') + + sub = subs.add_parser('run', parents=[base_subparser, concurrent_sub]) sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run") sub.set_defaults(cls=run_task.RunTask, which='run') @@ -55,7 +58,7 @@ def handle(args): sub.add_argument('--drop-existing', action='store_true', help="Drop existing seed tables and recreate them") sub.set_defaults(cls=seed_task.SeedTask, which='seed') - sub = subs.add_parser('test', parents=[base_subparser]) + sub = subs.add_parser('test', parents=[base_subparser, concurrent_sub]) sub.add_argument('--skip-test-creates', action='store_true', help="Don't create temporary views to validate model SQL") sub.add_argument('--validate', action='store_true', help='Run constraint validations from schema.yml files') sub.set_defaults(cls=test_task.TestTask, which='test') diff --git a/dbt/runner.py b/dbt/runner.py index 5f970c031d5..36ca87dec6c 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -168,9 +168,11 @@ def execute_model(self, target, model, drop_type): else: raise e - def execute_models(self, linker, models, limit_to=None, num_threads=4): + def execute_models(self, linker, models, num_threads, limit_to=None): target = self.get_target() + print("Concurrency: {} threads".format(num_threads)) + dependency_list = linker.as_dependency_list(limit_to) num_models = sum([len(node_list) for node_list in dependency_list]) @@ -222,7 +224,7 @@ def wrap_fqn(target, models, existing, fqn): return model_results - def run(self, specified_models=None): + def run(self, specified_models=None, threads=1): linker = self.deserialize_graph() compiled_models = self.get_compiled_models() @@ -247,7 +249,7 @@ def run(self, specified_models=None): if schema_name not in schemas: self.create_schema_or_exit(schema_name) - return self.execute_models(linker, compiled_models, limit_to) + return self.execute_models(linker, compiled_models, threads, limit_to) except psycopg2.OperationalError as e: print("ERROR: Could not connect to the target database. Try `dbt debug` for more information") print(str(e)) diff --git a/dbt/task/run.py b/dbt/task/run.py index 3b51d866dfd..cc62dd57fca 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -6,6 +6,8 @@ from dbt.runner import Runner from dbt.compilation import Compiler +THREAD_LIMIT = 9 + class RunTask: def __init__(self, args, project): self.args = args @@ -21,7 +23,7 @@ def run(self): self.compile() runner = Runner(self.project, self.project['target-path'], BaseCreateTemplate.label) - results = runner.run(self.args.models) + results = runner.run(self.args.models, threads=self.args.threads) successes = [r for r in results if not r.errored] errors = [r for r in results if r.errored] diff --git a/dbt/task/test.py b/dbt/task/test.py index 0ae3fb41c0b..e96459ab904 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -69,7 +69,8 @@ def on_complete(query_results): models = [query_result.model for query_result in query_results if not query_result.errored and not query_result.skipped] runner.drop_models(models) - self.run_and_catch_errors(runner.run, on_complete) + run_func = lambda: runner.run(threads=self.args.threads) + self.run_and_catch_errors(run_func, on_complete) def run_validations(self): print("Validating schemas") From 12d6ff0615dc6f25abf967e6bd7b92763f58d142 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 1 Aug 2016 09:17:04 -0400 Subject: [PATCH 16/18] show skipped models in run output --- dbt/task/run.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dbt/task/run.py b/dbt/task/run.py index cc62dd57fca..e4c7606c341 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -25,8 +25,11 @@ def run(self): runner = Runner(self.project, self.project['target-path'], BaseCreateTemplate.label) results = runner.run(self.args.models, threads=self.args.threads) - successes = [r for r in results if not r.errored] - errors = [r for r in results if r.errored] + total = len(results) + passed = len([r for r in results if not r.errored and not r.skipped]) + errored = len([r for r in results if r.errored]) + skipped = len([r for r in results if r.skipped]) + print() - print("Done. Executed {} models executed with {} failures".format(len(successes) + len(errors), len(errors))) + print("Done. PASS={passed} ERROR={errored} SKIP={skipped} TOTAL={total}".format(total=total, passed=passed, errored=errored, skipped=skipped)) From ca5f7d039fd8dc394220278857ce8e7b6f579443 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 1 Aug 2016 09:35:58 -0400 Subject: [PATCH 17/18] fix linker tests --- dbt/compilation.py | 5 ++++- tests/test_linker.py | 11 +++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/dbt/compilation.py b/dbt/compilation.py index f4d88d909d8..c398baa6edb 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -33,7 +33,10 @@ def as_dependency_list(self, limit_to=None): graph_nodes = set() for node in limit_to: graph_nodes.add(node) - graph_nodes.update(nx.descendants(self.graph, node)) + if node in self.graph: + graph_nodes.update(nx.descendants(self.graph, node)) + else: + raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(node)) depth_nodes = defaultdict(list) diff --git a/tests/test_linker.py b/tests/test_linker.py index df511e9aae4..9602543a889 100644 --- a/tests/test_linker.py +++ b/tests/test_linker.py @@ -24,7 +24,7 @@ def test_linker_add_dependency(self): for (l, r) in actual_deps: self.linker.dependency(l, r) - expected_dep_list = ['C', 'B', 'A'] + expected_dep_list = [['C'], ['B'], ['A']] actual_dep_list = self.linker.as_dependency_list() self.assertEqual(expected_dep_list, actual_dep_list) @@ -36,11 +36,10 @@ def test_linker_add_disjoint_dependencies(self): self.linker.dependency(l, r) self.linker.add_node(additional_node) - # has to be one of these three + # has to be one of these two possible = [ - ['Z', 'B', 'A'], - ['B', 'A', 'Z'], - ['B', 'Z', 'A'], + [['Z', 'B'], ['A']], + [['B', 'Z'], ['A']], ] actual = self.linker.as_dependency_list() @@ -57,7 +56,7 @@ def test_linker_dependencies_limited_to_some_nodes(self): self.linker.dependency(l, r) actual_limit = self.linker.as_dependency_list(['B']) - expected_limit = ['B', 'A'] + expected_limit = [['B'], ['A']] self.assertEqual(expected_limit, actual_limit) def test_linker_bad_limit_throws_runtime_error(self): From 24ebe12aa0004c13bb10cdbeaec3085a6ee5c265 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 2 Aug 2016 12:25:28 -0400 Subject: [PATCH 18/18] get # threads from profiles file --- dbt/main.py | 7 ++----- dbt/runner.py | 12 +++++++----- dbt/targets.py | 20 ++++++++++++++++++++ dbt/task/run.py | 2 +- dbt/task/test.py | 3 +-- 5 files changed, 31 insertions(+), 13 deletions(-) diff --git a/dbt/main.py b/dbt/main.py index 3fcea3da40e..a0d62539eb9 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -47,10 +47,7 @@ def handle(args): sub = subs.add_parser('deps', parents=[base_subparser]) sub.set_defaults(cls=deps_task.DepsTask, which='deps') - concurrent_sub = argparse.ArgumentParser(add_help=False) - concurrent_sub.add_argument('-t', '--threads', type=int, default=1, choices=range(1, run_task.THREAD_LIMIT), help='Number of queries to execute concurrently') - - sub = subs.add_parser('run', parents=[base_subparser, concurrent_sub]) + sub = subs.add_parser('run', parents=[base_subparser]) sub.add_argument('--models', required=False, nargs='+', help="Specify the models to run. All models depending on these models will also be run") sub.set_defaults(cls=run_task.RunTask, which='run') @@ -58,7 +55,7 @@ def handle(args): sub.add_argument('--drop-existing', action='store_true', help="Drop existing seed tables and recreate them") sub.set_defaults(cls=seed_task.SeedTask, which='seed') - sub = subs.add_parser('test', parents=[base_subparser, concurrent_sub]) + sub = subs.add_parser('test', parents=[base_subparser]) sub.add_argument('--skip-test-creates', action='store_true', help="Don't create temporary views to validate model SQL") sub.add_argument('--validate', action='store_true', help='Run constraint validations from schema.yml files') sub.set_defaults(cls=test_task.TestTask, which='test') diff --git a/dbt/runner.py b/dbt/runner.py index 36ca87dec6c..c463f88cb15 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -168,11 +168,9 @@ def execute_model(self, target, model, drop_type): else: raise e - def execute_models(self, linker, models, num_threads, limit_to=None): + def execute_models(self, linker, models, limit_to=None): target = self.get_target() - print("Concurrency: {} threads".format(num_threads)) - dependency_list = linker.as_dependency_list(limit_to) num_models = sum([len(node_list) for node_list in dependency_list]) @@ -190,6 +188,10 @@ def wrap_fqn(target, models, existing, fqn): # we can only pass one arg to the self.execute_model method below. Pass a dict w/ all the data we need model_dependency_list = [[wrap_fqn(target, models, existing, fqn) for fqn in node_list] for node_list in dependency_list] + num_threads = target.threads + print("Concurrency: {} threads (target='{}')".format(num_threads, self.project['run-target'])) + print("Running!") + pool = ThreadPool(num_threads) failed_models = set() @@ -224,7 +226,7 @@ def wrap_fqn(target, models, existing, fqn): return model_results - def run(self, specified_models=None, threads=1): + def run(self, specified_models=None): linker = self.deserialize_graph() compiled_models = self.get_compiled_models() @@ -249,7 +251,7 @@ def run(self, specified_models=None, threads=1): if schema_name not in schemas: self.create_schema_or_exit(schema_name) - return self.execute_models(linker, compiled_models, threads, limit_to) + return self.execute_models(linker, compiled_models, limit_to) except psycopg2.OperationalError as e: print("ERROR: Could not connect to the target database. Try `dbt debug` for more information") print(str(e)) diff --git a/dbt/targets.py b/dbt/targets.py index c4927329cef..ae4132a58d7 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -1,6 +1,12 @@ import psycopg2 +THREAD_MIN = 1 +THREAD_MAX = 8 + +BAD_THREADS_ERROR = """Invalid value given for "threads" in active run-target. +Value given was {supplied} but it should be an int between {min_val} and {max_val}""" + class RedshiftTarget: def __init__(self, cfg): assert cfg['type'] == 'redshift' @@ -10,6 +16,20 @@ def __init__(self, cfg): self.port = cfg['port'] self.dbname = cfg['dbname'] self.schema = cfg['schema'] + self.threads = self.__get_threads(cfg) + + def __get_threads(self, cfg): + supplied = cfg.get('threads', 1) + + bad_threads_error = RuntimeError(BAD_THREADS_ERROR.format(run_target="...", supplied=supplied, min_val=THREAD_MIN, max_val=THREAD_MAX)) + + if type(supplied) != int: + raise bad_threads_error + + if supplied >= 1 and supplied <= 8: + return supplied + else: + raise bad_threads_error def __get_spec(self): return "dbname='{}' user='{}' host='{}' password='{}' port='{}'".format( diff --git a/dbt/task/run.py b/dbt/task/run.py index e4c7606c341..34598855d6a 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -23,7 +23,7 @@ def run(self): self.compile() runner = Runner(self.project, self.project['target-path'], BaseCreateTemplate.label) - results = runner.run(self.args.models, threads=self.args.threads) + results = runner.run(self.args.models) total = len(results) passed = len([r for r in results if not r.errored and not r.skipped]) diff --git a/dbt/task/test.py b/dbt/task/test.py index e96459ab904..0ae3fb41c0b 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -69,8 +69,7 @@ def on_complete(query_results): models = [query_result.model for query_result in query_results if not query_result.errored and not query_result.skipped] runner.drop_models(models) - run_func = lambda: runner.run(threads=self.args.threads) - self.run_and_catch_errors(run_func, on_complete) + self.run_and_catch_errors(runner.run, on_complete) def run_validations(self): print("Validating schemas")