diff --git a/dbt/compilation.py b/dbt/compilation.py index 707d92d04ef..c398baa6edb 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -16,12 +16,50 @@ def __init__(self): def nodes(self): return self.graph.nodes() - def as_dependency_list(self, limit_to=None): + def as_topological_ordering(self, limit_to=None): try: - return nx.topological_sort(self.graph, nbunch=limit_to) + return nx.topological_sort(the_graph, nbunch=limit_to) except KeyError as e: raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(e)) + def as_dependency_list(self, limit_to=None): + """returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each element contains nodes whose + dependenices are subsumed by the union of all lists before it. In this way, all nodes in list `i` + can be run simultaneously assuming that all lists before list `i` have been completed""" + + if limit_to is None: + graph_nodes = set(self.graph.nodes()) + else: + graph_nodes = set() + for node in limit_to: + graph_nodes.add(node) + if node in self.graph: + graph_nodes.update(nx.descendants(self.graph, node)) + else: + raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(node)) + + depth_nodes = defaultdict(list) + + for node in graph_nodes: + num_ancestors = len(nx.ancestors(self.graph, node)) + depth_nodes[num_ancestors].append(node) + + dependency_list = [] + for depth in sorted(depth_nodes.keys()): + dependency_list.append(depth_nodes[depth]) + + return dependency_list + + def is_child_of(self, nodes, target_node): + "returns True if node is a child of a node in nodes. Otherwise, False" + node_span = set() + for node in nodes: + node_span.add(node) + for child in nx.descendants(self.graph, node): + node_span.add(child) + + return target_node in node_span + def dependency(self, node1, node2): "indicate that node1 depends on node2" self.graph.add_node(node1) @@ -116,7 +154,17 @@ def do_ref(*args): linker.dependency(source_model, other_model_fqn) return '"{}"."{}"'.format(schema, other_model_name) - return do_ref + def wrapped_do_ref(*args): + try: + return do_ref(*args) + except RuntimeError as e: + print("Compiler error in {}".format(model.filepath)) + print("Enabled models:") + for m in all_models: + print(" - {}".format(".".join(m.fqn))) + raise e + + return wrapped_do_ref def compile_model(self, linker, model, models): jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir)) diff --git a/dbt/model.py b/dbt/model.py index 633cd63a62c..daf059ef845 100644 --- a/dbt/model.py +++ b/dbt/model.py @@ -33,7 +33,7 @@ def get_config(self, primary_project): def load_from_project(model, the_project): config = the_project['model-defaults'].copy() model_configs = the_project['models'] - fqn = model.fqn[:] + fqn = model.original_fqn[:] while len(fqn) > 0: model_group = fqn.pop(0) if model_group in model_configs: @@ -59,6 +59,10 @@ def fqn(self): name, _ = os.path.splitext(parts[-1]) return [self.project['name']] + parts[1:-1] + [name] + @property + def original_fqn(self): + return self.fqn + class Model(DBTSource): def __init__(self, project, model_dir, rel_filepath): super(Model, self).__init__(project, model_dir, rel_filepath) @@ -142,6 +146,12 @@ def fqn(self): test_name = TestCreateTemplate.model_name(name) return [self.project['name']] + parts[1:-1] + [test_name] + @property + def original_fqn(self): + parts = self.filepath.split("/") + name, _ = os.path.splitext(parts[-1]) + return [self.project['name']] + parts[1:-1] + [name] + def __repr__(self): return "".format(self.project['name'], self.name, self.filepath) diff --git a/dbt/runner.py b/dbt/runner.py index 1fff81db8b7..c463f88cb15 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -3,6 +3,7 @@ import psycopg2 import os, sys +import functools from dbt.compilation import Linker, Compiler from dbt.templates import BaseCreateTemplate @@ -10,16 +11,35 @@ from dbt.source import Source from dbt.utils import find_model_by_name +from multiprocessing.dummy import Pool as ThreadPool + SCHEMA_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the schema '{schema}'. Either create the schema manually, or adjust the permissions of the '{user}' user.""" -RELATION_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the model '{model}' \ -in the schema '{schema}'.\nPlease adjust the permissions of the '{user}' user on the '{schema}' schema. +RELATION_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the model '{model}' in the schema '{schema}'. +Please adjust the permissions of the '{user}' user on the '{schema}' schema. With a superuser account, execute the following commands, then re-run dbt. grant usage, create on schema "{schema}" to "{user}"; -grant select on all tables in schema "{schema}" to "{user}"; -""" +grant select on all tables in schema "{schema}" to "{user}";""" + +RELATION_NOT_OWNER_MESSAGE = """The user '{user}' does not have sufficient permissions to drop the model '{model}' in the schema '{schema}'. +This is likely because the relation was created by a different user. Either delete the model "{schema}"."{model}" manually, +or adjust the permissions of the '{user}' user in the '{schema}' schema.""" + +class RunModelResult(object): + def __init__(self, model, error=None, skip=False): + self.model = model + self.error = error + self.skip = skip + + @property + def errored(self): + return self.error is not None + + @property + def skipped(self): + return self.skip class Runner: def __init__(self, project, target_path, run_mode): @@ -72,39 +92,44 @@ def create_schema_or_exit(self, schema_name): else: raise e - def query_for_existing(self, cursor, schema): + def query_for_existing(self, target, schema): sql = """ select tablename as name, 'table' as type from pg_tables where schemaname = '{schema}' union all select viewname as name, 'view' as type from pg_views where schemaname = '{schema}' """.format(schema=schema) - cursor.execute(sql) - existing = [(name, relation_type) for (name, relation_type) in cursor.fetchall()] - return dict(existing) + with target.get_handle() as handle: + with handle.cursor() as cursor: + cursor.execute(sql) + existing = [(name, relation_type) for (name, relation_type) in cursor.fetchall()] - def __drop(self, cursor, schema, relation, relation_type): - sql = 'drop {relation_type} if exists "{schema}"."{relation}" cascade'.format(schema=schema, relation_type=relation_type, relation=relation) - cursor.execute(sql) + return dict(existing) - def __do_execute(self, cursor, sql, model): - try: - cursor.execute(sql) - except Exception as e: - e.model = model - raise e + def __drop(self, target, model, relation_type): + schema = target.schema + relation = model.name - def drop_models(self, models): - target = self.get_target() + sql = 'drop {relation_type} if exists "{schema}"."{relation}" cascade'.format(schema=schema, relation_type=relation_type, relation=relation) + self.__do_execute(target, sql, model) + def __do_execute(self, target, sql, model): with target.get_handle() as handle: with handle.cursor() as cursor: + try: + cursor.execute(sql) + handle.commit() + except Exception as e: + e.model = model + raise e - existing = self.query_for_existing(cursor, target.schema); + def drop_models(self, models): + target = self.get_target() - for model in models: - model_name = model.fqn[-1] - self.__drop(cursor, target.schema, model_name, existing[model_name]) + existing = self.query_for_existing(target, target.schema); + for model in models: + model_name = model.fqn[-1] + self.__drop(target, model, existing[model_name]) def get_model_by_fqn(self, models, fqn): for model in models: @@ -112,34 +137,94 @@ def get_model_by_fqn(self, models, fqn): return model raise RuntimeError("Couldn't find a compiled model with fqn: '{}'".format(fqn)) + def execute_wrapped_model(self, data): + target = data['target'] + model = data['model'] + drop_type = data['drop_type'] + + error = None + try: + self.execute_model(target, model, drop_type) + except (RuntimeError, psycopg2.ProgrammingError) as e: + error = "Error executing {filepath}\n{error}".format(filepath=model.filepath, error=str(e).strip()) + + return RunModelResult(model, error=error) + + def execute_model(self, target, model, drop_type): + if drop_type is not None: + try: + self.__drop(target, model, drop_type) + except psycopg2.ProgrammingError as e: + if "must be owner of relation" in e.diag.message_primary: + raise RuntimeError(RELATION_NOT_OWNER_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + else: + raise e + + try: + self.__do_execute(target, model.contents, model) + except psycopg2.ProgrammingError as e: + if "permission denied for" in e.diag.message_primary: + raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) + else: + raise e + def execute_models(self, linker, models, limit_to=None): target = self.get_target() - with target.get_handle() as handle: - with handle.cursor() as cursor: - dependency_list = [self.get_model_by_fqn(models, fqn) for fqn in linker.as_dependency_list(limit_to)] - existing = self.query_for_existing(cursor, target.schema); - - num_models = len(dependency_list) - if num_models == 0: - print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) - return - - for index, model in enumerate(dependency_list): - if model.name in existing: - self.__drop(cursor, target.schema, model.name, existing[model.name]) - handle.commit() - - print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name)) - try: - self.__do_execute(cursor, model.contents, model) - except psycopg2.ProgrammingError as e: - if "permission denied for" in e.diag.message_primary: - raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user)) - else: - raise e - handle.commit() - yield model + dependency_list = linker.as_dependency_list(limit_to) + num_models = sum([len(node_list) for node_list in dependency_list]) + + if num_models == 0: + print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path)) + return [] + + existing = self.query_for_existing(target, target.schema); + + def wrap_fqn(target, models, existing, fqn): + model = self.get_model_by_fqn(models, fqn) + drop_type = existing.get(model.name, None) # False, 'view', or 'table' + return {"model" : model, "target": target, "drop_type": drop_type} + + # we can only pass one arg to the self.execute_model method below. Pass a dict w/ all the data we need + model_dependency_list = [[wrap_fqn(target, models, existing, fqn) for fqn in node_list] for node_list in dependency_list] + + num_threads = target.threads + print("Concurrency: {} threads (target='{}')".format(num_threads, self.project['run-target'])) + print("Running!") + + pool = ThreadPool(num_threads) + + failed_models = set() + + model_results = [] + for model_list in model_dependency_list: + failed_nodes = [tuple(model.fqn) for model in failed_models] + + models_to_execute = [data for data in model_list if not linker.is_child_of(failed_nodes, tuple(data['model'].fqn))] + models_to_skip = [data for data in model_list if linker.is_child_of(failed_nodes, tuple(data['model'].fqn))] + + for i, data in enumerate(models_to_skip): + model = data['model'] + model_result = RunModelResult(model, skip=True) + model_results.append(model_result) + print("{} of {} -- SKIP relation {}.{} because parent failed".format(len(model_results), num_models, target.schema, model_result.model.name)) + + run_model_results = pool.map(self.execute_wrapped_model, models_to_execute) + + for run_model_result in run_model_results: + model_results.append(run_model_result) + + if run_model_result.errored: + failed_models.add(run_model_result.model) + print("{} of {} -- ERROR creating relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) + print(run_model_result.error) + else: + print("{} of {} -- OK Created relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name)) + + pool.close() + pool.join() + + return model_results def run(self, specified_models=None): linker = self.deserialize_graph() @@ -155,7 +240,7 @@ def run(self, specified_models=None): except RuntimeError as e: print("ERROR: {}".format(str(e))) print("Exiting") - return + return[] target_cfg = self.project.run_environment() schema_name = target_cfg['schema'] @@ -166,8 +251,7 @@ def run(self, specified_models=None): if schema_name not in schemas: self.create_schema_or_exit(schema_name) - for model in self.execute_models(linker, compiled_models, limit_to): - yield model, True + return self.execute_models(linker, compiled_models, limit_to) except psycopg2.OperationalError as e: print("ERROR: Could not connect to the target database. Try `dbt debug` for more information") print(str(e)) diff --git a/dbt/schema_tester.py b/dbt/schema_tester.py index 467a3d0cf16..dd6e4e8813d 100644 --- a/dbt/schema_tester.py +++ b/dbt/schema_tester.py @@ -2,6 +2,9 @@ from dbt.targets import RedshiftTarget from dbt.source import Source +from dbt.runner import RunModelResult + +import psycopg2 QUERY_VALIDATE_NOT_NULL = """ with validation as ( @@ -76,6 +79,8 @@ def execute_query(self, model, sql): with handle.cursor() as cursor: try: cursor.execute(sql) + except psycopg2.ProgrammingError as e: + return e.diag.message_primary except Exception as e: e.model = model raise e @@ -183,13 +188,11 @@ def validate_schema(self, schemas): constraints = schema.schema[model_name]['constraints'] for constraint_type, constraint_data in constraints.items(): - try: - for test_passed in self.validate_schema_constraint(model, constraint_type, constraint_data): - yield model, test_passed - except RuntimeError as e: - print("ERRROR: {}".format(str(e))) + for test_passed in self.validate_schema_constraint(model, constraint_type, constraint_data): + yield model, test_passed def test(self): schemas = self.project_schemas() for (model, test_passed) in self.validate_schema(schemas): - yield model, test_passed + error = None if test_passed else "ERROR" + yield RunModelResult(model, error) diff --git a/dbt/targets.py b/dbt/targets.py index c4927329cef..ae4132a58d7 100644 --- a/dbt/targets.py +++ b/dbt/targets.py @@ -1,6 +1,12 @@ import psycopg2 +THREAD_MIN = 1 +THREAD_MAX = 8 + +BAD_THREADS_ERROR = """Invalid value given for "threads" in active run-target. +Value given was {supplied} but it should be an int between {min_val} and {max_val}""" + class RedshiftTarget: def __init__(self, cfg): assert cfg['type'] == 'redshift' @@ -10,6 +16,20 @@ def __init__(self, cfg): self.port = cfg['port'] self.dbname = cfg['dbname'] self.schema = cfg['schema'] + self.threads = self.__get_threads(cfg) + + def __get_threads(self, cfg): + supplied = cfg.get('threads', 1) + + bad_threads_error = RuntimeError(BAD_THREADS_ERROR.format(run_target="...", supplied=supplied, min_val=THREAD_MIN, max_val=THREAD_MAX)) + + if type(supplied) != int: + raise bad_threads_error + + if supplied >= 1 and supplied <= 8: + return supplied + else: + raise bad_threads_error def __get_spec(self): return "dbname='{}' user='{}' host='{}' password='{}' port='{}'".format( diff --git a/dbt/task/run.py b/dbt/task/run.py index bc54af15537..34598855d6a 100644 --- a/dbt/task/run.py +++ b/dbt/task/run.py @@ -1,9 +1,13 @@ +from __future__ import print_function + import os from dbt.templates import BaseCreateTemplate from dbt.runner import Runner from dbt.compilation import Compiler +THREAD_LIMIT = 9 + class RunTask: def __init__(self, args, project): self.args = args @@ -17,6 +21,15 @@ def compile(self): def run(self): self.compile() + runner = Runner(self.project, self.project['target-path'], BaseCreateTemplate.label) - for (model, passed) in runner.run(self.args.models): - pass + results = runner.run(self.args.models) + + total = len(results) + passed = len([r for r in results if not r.errored and not r.skipped]) + errored = len([r for r in results if r.errored]) + skipped = len([r for r in results if r.skipped]) + + + print() + print("Done. PASS={passed} ERROR={errored} SKIP={skipped} TOTAL={total}".format(total=total, passed=passed, errored=errored, skipped=skipped)) diff --git a/dbt/task/test.py b/dbt/task/test.py index 4a5f5265082..0ae3fb41c0b 100644 --- a/dbt/task/test.py +++ b/dbt/task/test.py @@ -35,10 +35,8 @@ def run_and_catch_errors(self, func, onFinally=None): errored = False try: - for (model, test_passed) in func(): - executed_models.append(model) - if test_passed: - passed += 1 + for res in func(): + executed_models.append(res) except psycopg2.ProgrammingError as e: errored = True print("") @@ -54,13 +52,24 @@ def run_and_catch_errors(self, func, onFinally=None): sys.exit(1) print("") - num_passed = len(executed_models) - print("{passed}/{num_executed} tests passed!".format(passed=passed, num_executed=num_passed)) + total = len(executed_models) + passed = len([model for model in executed_models if not model.errored and not model.skipped]) + errored = len([model for model in executed_models if model.errored]) + skipped = len([model for model in executed_models if model.skipped]) + print("PASS={passed} ERROR={errored} SKIP={skipped} TOTAL={total}".format(total=total, passed=passed, errored=errored, skipped=skipped)) + if errored > 0: + print("Tests completed with errors") + else: + print("All tests passed") print("") def run_test_creates(self): runner = Runner(self.project, self.project['target-path'], TestCreateTemplate.label) - self.run_and_catch_errors(runner.run, runner.drop_models) + def on_complete(query_results): + models = [query_result.model for query_result in query_results if not query_result.errored and not query_result.skipped] + runner.drop_models(models) + + self.run_and_catch_errors(runner.run, on_complete) def run_validations(self): print("Validating schemas") diff --git a/tests/test_linker.py b/tests/test_linker.py index df511e9aae4..9602543a889 100644 --- a/tests/test_linker.py +++ b/tests/test_linker.py @@ -24,7 +24,7 @@ def test_linker_add_dependency(self): for (l, r) in actual_deps: self.linker.dependency(l, r) - expected_dep_list = ['C', 'B', 'A'] + expected_dep_list = [['C'], ['B'], ['A']] actual_dep_list = self.linker.as_dependency_list() self.assertEqual(expected_dep_list, actual_dep_list) @@ -36,11 +36,10 @@ def test_linker_add_disjoint_dependencies(self): self.linker.dependency(l, r) self.linker.add_node(additional_node) - # has to be one of these three + # has to be one of these two possible = [ - ['Z', 'B', 'A'], - ['B', 'A', 'Z'], - ['B', 'Z', 'A'], + [['Z', 'B'], ['A']], + [['B', 'Z'], ['A']], ] actual = self.linker.as_dependency_list() @@ -57,7 +56,7 @@ def test_linker_dependencies_limited_to_some_nodes(self): self.linker.dependency(l, r) actual_limit = self.linker.as_dependency_list(['B']) - expected_limit = ['B', 'A'] + expected_limit = [['B'], ['A']] self.assertEqual(expected_limit, actual_limit) def test_linker_bad_limit_throws_runtime_error(self):