From c6e3b28e9f6fcee026d5cb781157e5104f4a4fe5 Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Tue, 26 May 2020 00:09:21 +0200 Subject: [PATCH 1/8] Make sure to close runs on exit --- data_integration/execution.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/data_integration/execution.py b/data_integration/execution.py index 2eabc28..921fd48 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -10,6 +10,7 @@ import os import sys import signal +import atexit import time import traceback from multiprocessing import queues @@ -285,6 +286,19 @@ def track_finished_pipelines(): runlogger = run_log.RunLogger() + # make sure that we close this run (if still open) as failed when we close this python process + # On SIGKILL we will still leave behind open runs... + # this needs to run after we forked off the run_process as that one should not inherit the atexit function + def ensure_closed_run_on_abort(): + try: + run_log.close_open_run_after_error(runlogger.run_id) + except BaseException as e: + msg = "Exception during 'close_open_run_after_error()'" + print(f"{msg}: {repr(e)}", file=sys.stderr, flush=True) + return + + atexit.register(ensure_closed_run_on_abort) + def _notify_all(event): try: runlogger.handle_event(event) From 9d3afce1b0dcb92ca836093996be6d08c520c924 Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Tue, 26 May 2020 12:21:34 +0200 Subject: [PATCH 2/8] Terminate running TaskProcess when the executor process is shutdown Also kill the the statistics process in that case, so that one is not left arround --- data_integration/execution.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/data_integration/execution.py b/data_integration/execution.py index 921fd48..1e2d4a6 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -110,6 +110,25 @@ def with_all_upstreams(nodes: {pipelines.Node}): failed_pipelines: {pipelines.Pipeline} = set() # pipelines with failed tasks running_task_processes: {pipelines.Task: TaskProcess} = {} + # make sure any running tasks are killed when this executor process is shutdown + executor_pid = os.getpid() + + def ensure_task_processes_killed(): + # as we fork, the TaskProcess also run this function -> ignore it there + if os.getpid() != executor_pid: return + try: + for tp in list(running_task_processes.values()): # type: TaskProcess + if tp.is_alive(): + # give it a chance to gracefully shutdown + tp.terminate() + statistics_process.kill() + except BaseException as e: + msg = "Exception during TaskProcess cleanup" + print(f"{msg}: {repr(e)}", file=sys.stderr, flush=True) + return + + atexit.register(ensure_task_processes_killed) + def dequeue() -> pipelines.Node: """ Finds the next task in the queue From b8bc3413343d509b8b316f187b3ac9d4726096a7 Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Tue, 26 May 2020 13:15:54 +0200 Subject: [PATCH 3/8] Forward a shutdown from run_pipeline to the run_process --- data_integration/execution.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/data_integration/execution.py b/data_integration/execution.py index 1e2d4a6..3de93dc 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -374,7 +374,11 @@ def _create_exception_output_event(msg: str = ''): print(f"{repr(e)}", file=sys.stderr) yield e events.notify_configured_event_handlers(e) - + # try to terminate the run_process which itself will also cleanup in an atexit handler + try: + run_process.terminate() + except: + pass return if not run_process.is_alive(): break From 26bdc01f6e561e259dadd71f2ca825dc0835f3bb Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Tue, 26 May 2020 16:47:22 +0200 Subject: [PATCH 4/8] Check all ancestor pipelines for failure before scheduling a task Before, we could have the effect that a pipeline would fail but a already queued subpipeline (e.g. a parallel task) would still start. Now we check all ancestors before starting a task. As we propagate failure state to all parent pipelines this should stop any new pipelines to get scheduled. --- data_integration/execution.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/data_integration/execution.py b/data_integration/execution.py index 3de93dc..521de2a 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -142,10 +142,22 @@ def dequeue() -> pipelines.Node: or (not node.parent in running_pipelines) or (running_pipelines[node.parent][1] < node.parent.max_number_of_parallel_tasks))): node_queue.remove(node) - if node.parent in failed_pipelines and not node.parent.force_run_all_children: + processed_as_parent_failed = False + parent = node.parent + while parent: # if the parent pipeline failed (and no overwrite), don't launch new nodes - processed_nodes.add(node) - else: + # this needs to go down to the ultimate parent as we can have cases where we already + # queued a subpipeline and now the parent pipeline failed but the tasks parent pipeline + # (the sub pipeline) is not failed. + # If a task from a parent pipeline fails, even with force_run_all_children on the + # sub pipeline, the sub pipeline would stop. Only if the failed parent pipeline also has + # force_run_all_children, the task would get scheduled + if parent in failed_pipelines and not parent.force_run_all_children: + processed_nodes.add(node) + processed_as_parent_failed = True + break + else: parent = parent.parent + if not processed_as_parent_failed: return node def track_finished_pipelines(): From 7d1c88ff73e8657ed6e56e0cac8f7ab094a6e345 Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Sat, 6 Jun 2020 15:38:22 +0200 Subject: [PATCH 5/8] Also close runs if the executor died before sending a finish event --- data_integration/execution.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data_integration/execution.py b/data_integration/execution.py index 521de2a..05f19a2 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -393,6 +393,8 @@ def _create_exception_output_event(msg: str = ''): pass return if not run_process.is_alive(): + # If we are here it might be that the executor dies without sending the necessary run finished events + ensure_closed_run_on_abort() break time.sleep(0.001) From d20a1809812217c87918622627ae34975ca041b5 Mon Sep 17 00:00:00 2001 From: Jan Katins Date: Mon, 8 Jun 2020 11:13:38 +0200 Subject: [PATCH 6/8] Only print cleanup message if any cleanup happened --- data_integration/logging/run_log.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/data_integration/logging/run_log.py b/data_integration/logging/run_log.py index 27f486e..4db4706 100644 --- a/data_integration/logging/run_log.py +++ b/data_integration/logging/run_log.py @@ -70,20 +70,29 @@ def close_open_run_after_error(run_id: int): """Closes all open run and node_run for this run_id as failed""" if run_id is None: return - print(f'Run aborted, cleaning up (run_id = {run_id})') _close_run = f''' UPDATE data_integration_run SET end_time = now(), succeeded = FALSE WHERE run_id = {"%s"} and end_time IS NULL +RETURNING run_id ''' _close_node_run = f''' UPDATE data_integration_node_run SET end_time = now(), succeeded = FALSE WHERE run_id = {"%s"} and end_time IS NULL +RETURNING run_id ''' with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor - cursor.execute(_close_node_run, (run_id,)) - cursor.execute(_close_run, (run_id,)) + _closed_any=False + for code in [_close_node_run, _close_run]: + try: + cursor.execute(code, (run_id,)) + if cursor.fetchall(): + _closed_any = True + except: + pass + if _closed_any: + print(f'Cleaned up open runs/node_runs (run_id = {run_id})') class RunLogger(events.EventHandler): From e244f1e7697b613f91ff78a6bf267d73ec1270b5 Mon Sep 17 00:00:00 2001 From: Martin Loetzsch Date: Tue, 9 Jun 2020 15:34:19 +0200 Subject: [PATCH 7/8] fix typo --- data_integration/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_integration/execution.py b/data_integration/execution.py index 05f19a2..4ae3ae1 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -114,7 +114,7 @@ def with_all_upstreams(nodes: {pipelines.Node}): executor_pid = os.getpid() def ensure_task_processes_killed(): - # as we fork, the TaskProcess also run this function -> ignore it there + # as we fork, the TaskProcess also runs this function -> ignore it there if os.getpid() != executor_pid: return try: for tp in list(running_task_processes.values()): # type: TaskProcess From 0087259e75e165d5c06a2fbd92ddc7e2ff51f806 Mon Sep 17 00:00:00 2001 From: Martin Loetzsch Date: Tue, 9 Jun 2020 16:03:20 +0200 Subject: [PATCH 8/8] Simplify exception output --- data_integration/execution.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/data_integration/execution.py b/data_integration/execution.py index 4ae3ae1..4f94517 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -123,8 +123,7 @@ def ensure_task_processes_killed(): tp.terminate() statistics_process.kill() except BaseException as e: - msg = "Exception during TaskProcess cleanup" - print(f"{msg}: {repr(e)}", file=sys.stderr, flush=True) + print(f"Exception during TaskProcess cleanup: {repr(e)}", file=sys.stderr, flush=True) return atexit.register(ensure_task_processes_killed) @@ -324,8 +323,7 @@ def ensure_closed_run_on_abort(): try: run_log.close_open_run_after_error(runlogger.run_id) except BaseException as e: - msg = "Exception during 'close_open_run_after_error()'" - print(f"{msg}: {repr(e)}", file=sys.stderr, flush=True) + print(f"Exception during 'close_open_run_after_error()': {repr(e)}", file=sys.stderr, flush=True) return atexit.register(ensure_closed_run_on_abort) @@ -358,10 +356,9 @@ def _notify_all(event): # Catching GeneratorExit needs to end in a return! return except: - def _create_exception_output_event(msg: str = ''): - if msg: - msg = msg + '\n' - return pipeline_events.Output(node_path=pipeline.path(), message=msg + traceback.format_exc(), + def _create_exception_output_event(msg: str = None): + return pipeline_events.Output(node_path=pipeline.path(), + message=(msg + '\n' if msg else '') + traceback.format_exc(), format=logger.Format.ITALICS, is_error=True) output_event = _create_exception_output_event() @@ -372,14 +369,12 @@ def _create_exception_output_event(msg: str = ''): # we are already in the generic exception handler, so we cannot do anything # if we still fail, as we have to get to the final close_open_run_after_error() # and 'return'... - msg = "Could not notify about final output event" - exception_events.append(_create_exception_output_event(msg)) + exception_events.append(_create_exception_output_event("Could not notify about final output event")) yield output_event try: run_log.close_open_run_after_error(runlogger.run_id) except BaseException as e: - msg = "Exception during 'close_open_run_after_error()'" - exception_events.append(_create_exception_output_event(msg)) + exception_events.append(_create_exception_output_event("Exception during 'close_open_run_after_error()'")) # At least try to notify the UI for e in exception_events: