diff --git a/data_integration/execution.py b/data_integration/execution.py index 2eabc28..4f94517 100644 --- a/data_integration/execution.py +++ b/data_integration/execution.py @@ -10,6 +10,7 @@ import os import sys import signal +import atexit import time import traceback from multiprocessing import queues @@ -109,6 +110,24 @@ def with_all_upstreams(nodes: {pipelines.Node}): failed_pipelines: {pipelines.Pipeline} = set() # pipelines with failed tasks running_task_processes: {pipelines.Task: TaskProcess} = {} + # make sure any running tasks are killed when this executor process is shutdown + executor_pid = os.getpid() + + def ensure_task_processes_killed(): + # as we fork, the TaskProcess also runs this function -> ignore it there + if os.getpid() != executor_pid: return + try: + for tp in list(running_task_processes.values()): # type: TaskProcess + if tp.is_alive(): + # give it a chance to gracefully shutdown + tp.terminate() + statistics_process.kill() + except BaseException as e: + print(f"Exception during TaskProcess cleanup: {repr(e)}", file=sys.stderr, flush=True) + return + + atexit.register(ensure_task_processes_killed) + def dequeue() -> pipelines.Node: """ Finds the next task in the queue @@ -122,10 +141,22 @@ def dequeue() -> pipelines.Node: or (not node.parent in running_pipelines) or (running_pipelines[node.parent][1] < node.parent.max_number_of_parallel_tasks))): node_queue.remove(node) - if node.parent in failed_pipelines and not node.parent.force_run_all_children: + processed_as_parent_failed = False + parent = node.parent + while parent: # if the parent pipeline failed (and no overwrite), don't launch new nodes - processed_nodes.add(node) - else: + # this needs to go down to the ultimate parent as we can have cases where we already + # queued a subpipeline and now the parent pipeline failed but the tasks parent pipeline + # (the sub pipeline) is not failed. + # If a task from a parent pipeline fails, even with force_run_all_children on the + # sub pipeline, the sub pipeline would stop. Only if the failed parent pipeline also has + # force_run_all_children, the task would get scheduled + if parent in failed_pipelines and not parent.force_run_all_children: + processed_nodes.add(node) + processed_as_parent_failed = True + break + else: parent = parent.parent + if not processed_as_parent_failed: return node def track_finished_pipelines(): @@ -285,6 +316,18 @@ def track_finished_pipelines(): runlogger = run_log.RunLogger() + # make sure that we close this run (if still open) as failed when we close this python process + # On SIGKILL we will still leave behind open runs... + # this needs to run after we forked off the run_process as that one should not inherit the atexit function + def ensure_closed_run_on_abort(): + try: + run_log.close_open_run_after_error(runlogger.run_id) + except BaseException as e: + print(f"Exception during 'close_open_run_after_error()': {repr(e)}", file=sys.stderr, flush=True) + return + + atexit.register(ensure_closed_run_on_abort) + def _notify_all(event): try: runlogger.handle_event(event) @@ -313,10 +356,9 @@ def _notify_all(event): # Catching GeneratorExit needs to end in a return! return except: - def _create_exception_output_event(msg: str = ''): - if msg: - msg = msg + '\n' - return pipeline_events.Output(node_path=pipeline.path(), message=msg + traceback.format_exc(), + def _create_exception_output_event(msg: str = None): + return pipeline_events.Output(node_path=pipeline.path(), + message=(msg + '\n' if msg else '') + traceback.format_exc(), format=logger.Format.ITALICS, is_error=True) output_event = _create_exception_output_event() @@ -327,23 +369,27 @@ def _create_exception_output_event(msg: str = ''): # we are already in the generic exception handler, so we cannot do anything # if we still fail, as we have to get to the final close_open_run_after_error() # and 'return'... - msg = "Could not notify about final output event" - exception_events.append(_create_exception_output_event(msg)) + exception_events.append(_create_exception_output_event("Could not notify about final output event")) yield output_event try: run_log.close_open_run_after_error(runlogger.run_id) except BaseException as e: - msg = "Exception during 'close_open_run_after_error()'" - exception_events.append(_create_exception_output_event(msg)) + exception_events.append(_create_exception_output_event("Exception during 'close_open_run_after_error()'")) # At least try to notify the UI for e in exception_events: print(f"{repr(e)}", file=sys.stderr) yield e events.notify_configured_event_handlers(e) - + # try to terminate the run_process which itself will also cleanup in an atexit handler + try: + run_process.terminate() + except: + pass return if not run_process.is_alive(): + # If we are here it might be that the executor dies without sending the necessary run finished events + ensure_closed_run_on_abort() break time.sleep(0.001) diff --git a/data_integration/logging/run_log.py b/data_integration/logging/run_log.py index 27f486e..4db4706 100644 --- a/data_integration/logging/run_log.py +++ b/data_integration/logging/run_log.py @@ -70,20 +70,29 @@ def close_open_run_after_error(run_id: int): """Closes all open run and node_run for this run_id as failed""" if run_id is None: return - print(f'Run aborted, cleaning up (run_id = {run_id})') _close_run = f''' UPDATE data_integration_run SET end_time = now(), succeeded = FALSE WHERE run_id = {"%s"} and end_time IS NULL +RETURNING run_id ''' _close_node_run = f''' UPDATE data_integration_node_run SET end_time = now(), succeeded = FALSE WHERE run_id = {"%s"} and end_time IS NULL +RETURNING run_id ''' with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor - cursor.execute(_close_node_run, (run_id,)) - cursor.execute(_close_run, (run_id,)) + _closed_any=False + for code in [_close_node_run, _close_run]: + try: + cursor.execute(code, (run_id,)) + if cursor.fetchall(): + _closed_any = True + except: + pass + if _closed_any: + print(f'Cleaned up open runs/node_runs (run_id = {run_id})') class RunLogger(events.EventHandler):