Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaner shutdown #41

Merged
merged 8 commits into from
Jun 9, 2020
70 changes: 58 additions & 12 deletions data_integration/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import sys
import signal
import atexit
import time
import traceback
from multiprocessing import queues
Expand Down Expand Up @@ -109,6 +110,24 @@ def with_all_upstreams(nodes: {pipelines.Node}):
failed_pipelines: {pipelines.Pipeline} = set() # pipelines with failed tasks
running_task_processes: {pipelines.Task: TaskProcess} = {}

# make sure any running tasks are killed when this executor process is shutdown
executor_pid = os.getpid()

def ensure_task_processes_killed():
# as we fork, the TaskProcess also runs this function -> ignore it there
if os.getpid() != executor_pid: return
try:
for tp in list(running_task_processes.values()): # type: TaskProcess
if tp.is_alive():
# give it a chance to gracefully shutdown
tp.terminate()
statistics_process.kill()
except BaseException as e:
print(f"Exception during TaskProcess cleanup: {repr(e)}", file=sys.stderr, flush=True)
return

atexit.register(ensure_task_processes_killed)

def dequeue() -> pipelines.Node:
"""
Finds the next task in the queue
Expand All @@ -122,10 +141,22 @@ def dequeue() -> pipelines.Node:
or (not node.parent in running_pipelines)
or (running_pipelines[node.parent][1] < node.parent.max_number_of_parallel_tasks))):
node_queue.remove(node)
if node.parent in failed_pipelines and not node.parent.force_run_all_children:
processed_as_parent_failed = False
parent = node.parent
while parent:
# if the parent pipeline failed (and no overwrite), don't launch new nodes
processed_nodes.add(node)
else:
# this needs to go down to the ultimate parent as we can have cases where we already
# queued a subpipeline and now the parent pipeline failed but the tasks parent pipeline
# (the sub pipeline) is not failed.
# If a task from a parent pipeline fails, even with force_run_all_children on the
# sub pipeline, the sub pipeline would stop. Only if the failed parent pipeline also has
# force_run_all_children, the task would get scheduled
if parent in failed_pipelines and not parent.force_run_all_children:
processed_nodes.add(node)
processed_as_parent_failed = True
break
else: parent = parent.parent
if not processed_as_parent_failed:
return node

def track_finished_pipelines():
Expand Down Expand Up @@ -285,6 +316,18 @@ def track_finished_pipelines():

runlogger = run_log.RunLogger()

# make sure that we close this run (if still open) as failed when we close this python process
# On SIGKILL we will still leave behind open runs...
# this needs to run after we forked off the run_process as that one should not inherit the atexit function
def ensure_closed_run_on_abort():
try:
run_log.close_open_run_after_error(runlogger.run_id)
except BaseException as e:
print(f"Exception during 'close_open_run_after_error()': {repr(e)}", file=sys.stderr, flush=True)
return

atexit.register(ensure_closed_run_on_abort)

def _notify_all(event):
try:
runlogger.handle_event(event)
Expand Down Expand Up @@ -313,10 +356,9 @@ def _notify_all(event):
# Catching GeneratorExit needs to end in a return!
return
except:
def _create_exception_output_event(msg: str = ''):
if msg:
msg = msg + '\n'
return pipeline_events.Output(node_path=pipeline.path(), message=msg + traceback.format_exc(),
def _create_exception_output_event(msg: str = None):
return pipeline_events.Output(node_path=pipeline.path(),
message=(msg + '\n' if msg else '') + traceback.format_exc(),
format=logger.Format.ITALICS, is_error=True)

output_event = _create_exception_output_event()
Expand All @@ -327,23 +369,27 @@ def _create_exception_output_event(msg: str = ''):
# we are already in the generic exception handler, so we cannot do anything
# if we still fail, as we have to get to the final close_open_run_after_error()
# and 'return'...
msg = "Could not notify about final output event"
exception_events.append(_create_exception_output_event(msg))
exception_events.append(_create_exception_output_event("Could not notify about final output event"))
yield output_event
try:
run_log.close_open_run_after_error(runlogger.run_id)
except BaseException as e:
msg = "Exception during 'close_open_run_after_error()'"
exception_events.append(_create_exception_output_event(msg))
exception_events.append(_create_exception_output_event("Exception during 'close_open_run_after_error()'"))

# At least try to notify the UI
for e in exception_events:
print(f"{repr(e)}", file=sys.stderr)
yield e
events.notify_configured_event_handlers(e)

# try to terminate the run_process which itself will also cleanup in an atexit handler
try:
run_process.terminate()
except:
pass
return
if not run_process.is_alive():
# If we are here it might be that the executor dies without sending the necessary run finished events
ensure_closed_run_on_abort()
break
time.sleep(0.001)

Expand Down
15 changes: 12 additions & 3 deletions data_integration/logging/run_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,29 @@ def close_open_run_after_error(run_id: int):
"""Closes all open run and node_run for this run_id as failed"""
if run_id is None:
return
print(f'Run aborted, cleaning up (run_id = {run_id})')
_close_run = f'''
UPDATE data_integration_run
SET end_time = now(), succeeded = FALSE
WHERE run_id = {"%s"} and end_time IS NULL
RETURNING run_id
'''
_close_node_run = f'''
UPDATE data_integration_node_run
SET end_time = now(), succeeded = FALSE
WHERE run_id = {"%s"} and end_time IS NULL
RETURNING run_id
'''
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(_close_node_run, (run_id,))
cursor.execute(_close_run, (run_id,))
_closed_any=False
for code in [_close_node_run, _close_run]:
try:
cursor.execute(code, (run_id,))
if cursor.fetchall():
_closed_any = True
except:
pass
if _closed_any:
print(f'Cleaned up open runs/node_runs (run_id = {run_id})')


class RunLogger(events.EventHandler):
Expand Down