Skip to content

Commit

Permalink
Merge pull request #1218 from fishtown-analytics/fix/lets-not-hang-on…
Browse files Browse the repository at this point in the history
…-errors

move cleanup into the executing thread (#1214)
  • Loading branch information
beckjake authored Jan 7, 2019
2 parents 3c25a9b + 3e4523e commit 7179d13
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, config, query, Runner):
])
self.node_results = []
self._skipped_children = {}
self._raise_next_tick = None

def get_runner(self, node):
adapter = get_adapter(self.config)
Expand Down Expand Up @@ -82,7 +83,11 @@ def call_runner(self, runner):
runner.after_execute(result)

if result.errored and runner.raise_on_first_error():
raise dbt.exceptions.RuntimeException(result.error)
# if we raise inside a thread, it'll just get silently swallowed.
# stash the error message we want on the RunManager, and it will
# check the next 'tick' - should be soon since our thread is about
# to finish!
self._raise_next_tick = result.error

return result

Expand All @@ -100,16 +105,23 @@ def _submit(self, pool, args, callback):
else:
pool.apply_async(self.call_runner, args=args, callback=callback)

def _raise_set_error(self):
if self._raise_next_tick is not None:
raise dbt.exceptions.RuntimeException(self._raise_next_tick)

def run_queue(self, pool):
"""Given a pool, submit jobs from the queue to the pool.
"""
def callback(result):
"""A callback to handle results."""
"""Note: mark_done, at a minimum, must happen here or dbt will
deadlock during ephemeral result error handling!
"""
self._handle_result(result)
self.job_queue.mark_done(result.node.unique_id)

while not self.job_queue.empty():
node = self.job_queue.get()
self._raise_set_error()
runner = self.get_runner(node)
# we finally know what we're running! Make sure we haven't decided
# to skip it due to upstream failures
Expand All @@ -118,14 +130,18 @@ def callback(result):
runner.do_skip(cause=cause)
args = (runner,)
self._submit(pool, args, callback)

# block on completion
self.job_queue.join()
# if an error got set during join(), raise it.
self._raise_set_error()

return

def _handle_result(self, result):
"""Note: this happens inside an apply_async() callback, so it must be
"fast". (The pool worker thread will block!)
"""Mark the result as completed, insert the `CompiledResultNode` into
the manifest, and mark any descendants (potentially with a 'cause' if
the result was an ephemeral model) as skipped.
"""
is_ephemeral = self.Runner.is_ephemeral_model(result.node)
if not is_ephemeral:
Expand Down

0 comments on commit 7179d13

Please sign in to comment.