Skip to content

Commit

Permalink
Merge pull request #83 from analyst-collective/feature/parallel-model…
Browse files Browse the repository at this point in the history
…-execution

Feature/parallel model execution
  • Loading branch information
drewbanin authored Aug 2, 2016
2 parents 607d4ab + 24ebe12 commit 24a91a1
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 76 deletions.
54 changes: 51 additions & 3 deletions dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,50 @@ def __init__(self):
def nodes(self):
return self.graph.nodes()

def as_dependency_list(self, limit_to=None):
def as_topological_ordering(self, limit_to=None):
try:
return nx.topological_sort(self.graph, nbunch=limit_to)
return nx.topological_sort(the_graph, nbunch=limit_to)
except KeyError as e:
raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(e))

def as_dependency_list(self, limit_to=None):
"""returns a list of list of nodes, eg. [[0,1], [2], [4,5,6]]. Each element contains nodes whose
dependenices are subsumed by the union of all lists before it. In this way, all nodes in list `i`
can be run simultaneously assuming that all lists before list `i` have been completed"""

if limit_to is None:
graph_nodes = set(self.graph.nodes())
else:
graph_nodes = set()
for node in limit_to:
graph_nodes.add(node)
if node in self.graph:
graph_nodes.update(nx.descendants(self.graph, node))
else:
raise RuntimeError("Couldn't find model '{}' -- does it exist or is it diabled?".format(node))

depth_nodes = defaultdict(list)

for node in graph_nodes:
num_ancestors = len(nx.ancestors(self.graph, node))
depth_nodes[num_ancestors].append(node)

dependency_list = []
for depth in sorted(depth_nodes.keys()):
dependency_list.append(depth_nodes[depth])

return dependency_list

def is_child_of(self, nodes, target_node):
"returns True if node is a child of a node in nodes. Otherwise, False"
node_span = set()
for node in nodes:
node_span.add(node)
for child in nx.descendants(self.graph, node):
node_span.add(child)

return target_node in node_span

def dependency(self, node1, node2):
"indicate that node1 depends on node2"
self.graph.add_node(node1)
Expand Down Expand Up @@ -116,7 +154,17 @@ def do_ref(*args):
linker.dependency(source_model, other_model_fqn)
return '"{}"."{}"'.format(schema, other_model_name)

return do_ref
def wrapped_do_ref(*args):
try:
return do_ref(*args)
except RuntimeError as e:
print("Compiler error in {}".format(model.filepath))
print("Enabled models:")
for m in all_models:
print(" - {}".format(".".join(m.fqn)))
raise e

return wrapped_do_ref

def compile_model(self, linker, model, models):
jinja = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=model.root_dir))
Expand Down
12 changes: 11 additions & 1 deletion dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_config(self, primary_project):
def load_from_project(model, the_project):
config = the_project['model-defaults'].copy()
model_configs = the_project['models']
fqn = model.fqn[:]
fqn = model.original_fqn[:]
while len(fqn) > 0:
model_group = fqn.pop(0)
if model_group in model_configs:
Expand All @@ -59,6 +59,10 @@ def fqn(self):
name, _ = os.path.splitext(parts[-1])
return [self.project['name']] + parts[1:-1] + [name]

@property
def original_fqn(self):
return self.fqn

class Model(DBTSource):
def __init__(self, project, model_dir, rel_filepath):
super(Model, self).__init__(project, model_dir, rel_filepath)
Expand Down Expand Up @@ -142,6 +146,12 @@ def fqn(self):
test_name = TestCreateTemplate.model_name(name)
return [self.project['name']] + parts[1:-1] + [test_name]

@property
def original_fqn(self):
parts = self.filepath.split("/")
name, _ = os.path.splitext(parts[-1])
return [self.project['name']] + parts[1:-1] + [name]

def __repr__(self):
return "<TestModel {}.{}: {}>".format(self.project['name'], self.name, self.filepath)

Expand Down
186 changes: 135 additions & 51 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,43 @@

import psycopg2
import os, sys
import functools

from dbt.compilation import Linker, Compiler
from dbt.templates import BaseCreateTemplate
from dbt.targets import RedshiftTarget
from dbt.source import Source
from dbt.utils import find_model_by_name

from multiprocessing.dummy import Pool as ThreadPool

SCHEMA_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the schema '{schema}'.
Either create the schema manually, or adjust the permissions of the '{user}' user."""

RELATION_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the model '{model}' \
in the schema '{schema}'.\nPlease adjust the permissions of the '{user}' user on the '{schema}' schema.
RELATION_PERMISSION_DENIED_MESSAGE = """The user '{user}' does not have sufficient permissions to create the model '{model}' in the schema '{schema}'.
Please adjust the permissions of the '{user}' user on the '{schema}' schema.
With a superuser account, execute the following commands, then re-run dbt.
grant usage, create on schema "{schema}" to "{user}";
grant select on all tables in schema "{schema}" to "{user}";
"""
grant select on all tables in schema "{schema}" to "{user}";"""

RELATION_NOT_OWNER_MESSAGE = """The user '{user}' does not have sufficient permissions to drop the model '{model}' in the schema '{schema}'.
This is likely because the relation was created by a different user. Either delete the model "{schema}"."{model}" manually,
or adjust the permissions of the '{user}' user in the '{schema}' schema."""

class RunModelResult(object):
def __init__(self, model, error=None, skip=False):
self.model = model
self.error = error
self.skip = skip

@property
def errored(self):
return self.error is not None

@property
def skipped(self):
return self.skip

class Runner:
def __init__(self, project, target_path, run_mode):
Expand Down Expand Up @@ -72,74 +92,139 @@ def create_schema_or_exit(self, schema_name):
else:
raise e

def query_for_existing(self, cursor, schema):
def query_for_existing(self, target, schema):
sql = """
select tablename as name, 'table' as type from pg_tables where schemaname = '{schema}'
union all
select viewname as name, 'view' as type from pg_views where schemaname = '{schema}' """.format(schema=schema)

cursor.execute(sql)
existing = [(name, relation_type) for (name, relation_type) in cursor.fetchall()]

return dict(existing)
with target.get_handle() as handle:
with handle.cursor() as cursor:
cursor.execute(sql)
existing = [(name, relation_type) for (name, relation_type) in cursor.fetchall()]

def __drop(self, cursor, schema, relation, relation_type):
sql = 'drop {relation_type} if exists "{schema}"."{relation}" cascade'.format(schema=schema, relation_type=relation_type, relation=relation)
cursor.execute(sql)
return dict(existing)

def __do_execute(self, cursor, sql, model):
try:
cursor.execute(sql)
except Exception as e:
e.model = model
raise e
def __drop(self, target, model, relation_type):
schema = target.schema
relation = model.name

def drop_models(self, models):
target = self.get_target()
sql = 'drop {relation_type} if exists "{schema}"."{relation}" cascade'.format(schema=schema, relation_type=relation_type, relation=relation)
self.__do_execute(target, sql, model)

def __do_execute(self, target, sql, model):
with target.get_handle() as handle:
with handle.cursor() as cursor:
try:
cursor.execute(sql)
handle.commit()
except Exception as e:
e.model = model
raise e

existing = self.query_for_existing(cursor, target.schema);
def drop_models(self, models):
target = self.get_target()

for model in models:
model_name = model.fqn[-1]
self.__drop(cursor, target.schema, model_name, existing[model_name])
existing = self.query_for_existing(target, target.schema);
for model in models:
model_name = model.fqn[-1]
self.__drop(target, model, existing[model_name])

def get_model_by_fqn(self, models, fqn):
for model in models:
if tuple(model.fqn) == tuple(fqn):
return model
raise RuntimeError("Couldn't find a compiled model with fqn: '{}'".format(fqn))

def execute_wrapped_model(self, data):
target = data['target']
model = data['model']
drop_type = data['drop_type']

error = None
try:
self.execute_model(target, model, drop_type)
except (RuntimeError, psycopg2.ProgrammingError) as e:
error = "Error executing {filepath}\n{error}".format(filepath=model.filepath, error=str(e).strip())

return RunModelResult(model, error=error)

def execute_model(self, target, model, drop_type):
if drop_type is not None:
try:
self.__drop(target, model, drop_type)
except psycopg2.ProgrammingError as e:
if "must be owner of relation" in e.diag.message_primary:
raise RuntimeError(RELATION_NOT_OWNER_MESSAGE.format(model=model.name, schema=target.schema, user=target.user))
else:
raise e

try:
self.__do_execute(target, model.contents, model)
except psycopg2.ProgrammingError as e:
if "permission denied for" in e.diag.message_primary:
raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user))
else:
raise e

def execute_models(self, linker, models, limit_to=None):
target = self.get_target()

with target.get_handle() as handle:
with handle.cursor() as cursor:
dependency_list = [self.get_model_by_fqn(models, fqn) for fqn in linker.as_dependency_list(limit_to)]
existing = self.query_for_existing(cursor, target.schema);

num_models = len(dependency_list)
if num_models == 0:
print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path))
return

for index, model in enumerate(dependency_list):
if model.name in existing:
self.__drop(cursor, target.schema, model.name, existing[model.name])
handle.commit()

print("Running {} of {} -- Creating relation {}.{}".format(index + 1, num_models, target.schema, model.name))
try:
self.__do_execute(cursor, model.contents, model)
except psycopg2.ProgrammingError as e:
if "permission denied for" in e.diag.message_primary:
raise RuntimeError(RELATION_PERMISSION_DENIED_MESSAGE.format(model=model.name, schema=target.schema, user=target.user))
else:
raise e
handle.commit()
yield model
dependency_list = linker.as_dependency_list(limit_to)
num_models = sum([len(node_list) for node_list in dependency_list])

if num_models == 0:
print("WARNING: No models to run in '{}'. Try checking your model configs and running `dbt compile`".format(self.target_path))
return []

existing = self.query_for_existing(target, target.schema);

def wrap_fqn(target, models, existing, fqn):
model = self.get_model_by_fqn(models, fqn)
drop_type = existing.get(model.name, None) # False, 'view', or 'table'
return {"model" : model, "target": target, "drop_type": drop_type}

# we can only pass one arg to the self.execute_model method below. Pass a dict w/ all the data we need
model_dependency_list = [[wrap_fqn(target, models, existing, fqn) for fqn in node_list] for node_list in dependency_list]

num_threads = target.threads
print("Concurrency: {} threads (target='{}')".format(num_threads, self.project['run-target']))
print("Running!")

pool = ThreadPool(num_threads)

failed_models = set()

model_results = []
for model_list in model_dependency_list:
failed_nodes = [tuple(model.fqn) for model in failed_models]

models_to_execute = [data for data in model_list if not linker.is_child_of(failed_nodes, tuple(data['model'].fqn))]
models_to_skip = [data for data in model_list if linker.is_child_of(failed_nodes, tuple(data['model'].fqn))]

for i, data in enumerate(models_to_skip):
model = data['model']
model_result = RunModelResult(model, skip=True)
model_results.append(model_result)
print("{} of {} -- SKIP relation {}.{} because parent failed".format(len(model_results), num_models, target.schema, model_result.model.name))

run_model_results = pool.map(self.execute_wrapped_model, models_to_execute)

for run_model_result in run_model_results:
model_results.append(run_model_result)

if run_model_result.errored:
failed_models.add(run_model_result.model)
print("{} of {} -- ERROR creating relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name))
print(run_model_result.error)
else:
print("{} of {} -- OK Created relation {}.{}".format(len(model_results), num_models, target.schema, run_model_result.model.name))

pool.close()
pool.join()

return model_results

def run(self, specified_models=None):
linker = self.deserialize_graph()
Expand All @@ -155,7 +240,7 @@ def run(self, specified_models=None):
except RuntimeError as e:
print("ERROR: {}".format(str(e)))
print("Exiting")
return
return[]

target_cfg = self.project.run_environment()
schema_name = target_cfg['schema']
Expand All @@ -166,8 +251,7 @@ def run(self, specified_models=None):
if schema_name not in schemas:
self.create_schema_or_exit(schema_name)

for model in self.execute_models(linker, compiled_models, limit_to):
yield model, True
return self.execute_models(linker, compiled_models, limit_to)
except psycopg2.OperationalError as e:
print("ERROR: Could not connect to the target database. Try `dbt debug` for more information")
print(str(e))
Expand Down
15 changes: 9 additions & 6 deletions dbt/schema_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from dbt.targets import RedshiftTarget
from dbt.source import Source
from dbt.runner import RunModelResult

import psycopg2

QUERY_VALIDATE_NOT_NULL = """
with validation as (
Expand Down Expand Up @@ -76,6 +79,8 @@ def execute_query(self, model, sql):
with handle.cursor() as cursor:
try:
cursor.execute(sql)
except psycopg2.ProgrammingError as e:
return e.diag.message_primary
except Exception as e:
e.model = model
raise e
Expand Down Expand Up @@ -183,13 +188,11 @@ def validate_schema(self, schemas):

constraints = schema.schema[model_name]['constraints']
for constraint_type, constraint_data in constraints.items():
try:
for test_passed in self.validate_schema_constraint(model, constraint_type, constraint_data):
yield model, test_passed
except RuntimeError as e:
print("ERRROR: {}".format(str(e)))
for test_passed in self.validate_schema_constraint(model, constraint_type, constraint_data):
yield model, test_passed

def test(self):
schemas = self.project_schemas()
for (model, test_passed) in self.validate_schema(schemas):
yield model, test_passed
error = None if test_passed else "ERROR"
yield RunModelResult(model, error)
Loading

0 comments on commit 24a91a1

Please sign in to comment.