Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move cleanup into the executing thread (#1214) #1218

Merged
merged 1 commit into from
Jan 7, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, config, query, Runner):
])
self.node_results = []
self._skipped_children = {}
self._raise_next_tick = None

def get_runner(self, node):
adapter = get_adapter(self.config)
Expand Down Expand Up @@ -82,7 +83,11 @@ def call_runner(self, runner):
runner.after_execute(result)

if result.errored and runner.raise_on_first_error():
raise dbt.exceptions.RuntimeException(result.error)
# if we raise inside a thread, it'll just get silently swallowed.
# stash the error message we want on the RunManager, and it will
# check the next 'tick' - should be soon since our thread is about
# to finish!
self._raise_next_tick = result.error

return result

Expand All @@ -100,16 +105,23 @@ def _submit(self, pool, args, callback):
else:
pool.apply_async(self.call_runner, args=args, callback=callback)

def _raise_set_error(self):
if self._raise_next_tick is not None:
raise dbt.exceptions.RuntimeException(self._raise_next_tick)

def run_queue(self, pool):
"""Given a pool, submit jobs from the queue to the pool.
"""
def callback(result):
"""A callback to handle results."""
"""Note: mark_done, at a minimum, must happen here or dbt will
deadlock during ephemeral result error handling!
"""
self._handle_result(result)
self.job_queue.mark_done(result.node.unique_id)

while not self.job_queue.empty():
node = self.job_queue.get()
self._raise_set_error()
runner = self.get_runner(node)
# we finally know what we're running! Make sure we haven't decided
# to skip it due to upstream failures
Expand All @@ -118,14 +130,18 @@ def callback(result):
runner.do_skip(cause=cause)
args = (runner,)
self._submit(pool, args, callback)

# block on completion
self.job_queue.join()
# if an error got set during join(), raise it.
self._raise_set_error()

return

def _handle_result(self, result):
"""Note: this happens inside an apply_async() callback, so it must be
"fast". (The pool worker thread will block!)
"""Mark the result as completed, insert the `CompiledResultNode` into
the manifest, and mark any descendants (potentially with a 'cause' if
the result was an ephemeral model) as skipped.
"""
is_ephemeral = self.Runner.is_ephemeral_model(result.node)
if not is_ephemeral:
Expand Down