From a1672a2dfd64964cc6ad9923e65432044f090f0c Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Wed, 6 May 2020 18:48:54 +0000 Subject: [PATCH 01/11] add a timeout to monitoring dfk channel --- parsl/monitoring/monitoring.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 2150cf5b89..cc5fa8a6ff 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -209,6 +209,7 @@ def start(self, run_id): self.monitoring_hub_active = True self._context = zmq.Context() self._dfk_channel = self._context.socket(zmq.DEALER) + self._dfk_channel.setsockopt(zmq.SNDTIMEO, 1000) self._dfk_channel.set_hwm(0) self.dfk_port = self._dfk_channel.bind_to_random_port("tcp://{}".format(self.client_address), min_port=self.client_port_range[0], @@ -259,7 +260,10 @@ def start(self, run_id): def send(self, mtype, message): self.logger.debug("Sending message {}, {}".format(mtype, message)) - return self._dfk_channel.send_pyobj((mtype, message)) + try: + self._dfk_channel.send_pyobj((mtype, message)) + except zmq.Again as e: + self.logger.exception("[MONITORING] Monitoring send error") def close(self): if self.logger: From 159337bf8327986084c350ec67a363cedd0c1d97 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Wed, 6 May 2020 23:03:35 +0000 Subject: [PATCH 02/11] monitoring database logs workflow completion time when exiting abnormally --- parsl/monitoring/db_manager.py | 25 ++++++++++++++++++++++++- parsl/monitoring/monitoring.py | 1 + 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index bd1bed28df..c1bdfec832 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -3,6 +3,7 @@ import queue import os import time +import datetime from parsl.dataflow.states import States from parsl.providers.error import OptionalModuleMissing @@ -202,6 +203,7 @@ def __init__(self, batching_threshold=99999, ): + self.workflow_end = False self.logdir = logdir os.makedirs(self.logdir, exist_ok=True) @@ -284,6 +286,7 @@ def start(self, priority_queue, node_queue, resource_queue): self.logger.debug( "Inserting workflow start info to WORKFLOW table") self._insert(table=WORKFLOW, messages=[msg]) + self.workflow_message = msg else: # workflow end message self.logger.debug( "Updating workflow end info to WORKFLOW table") @@ -292,6 +295,8 @@ def start(self, priority_queue, node_queue, resource_queue): 'tasks_completed_count', 'time_completed', 'workflow_duration'], messages=[msg]) + self.workflow_end = True + else: # TASK_INFO message all_messages.append(msg) if msg['task_id'] in inserted_tasks: @@ -425,7 +430,21 @@ def _get_messages_in_batch(self, msg_queue, interval=1, threshold=99999): def close(self): if self.logger: self.logger.info( - "Finishing all the logging and terminating Database Manager.") + "Database Manager cleanup initiated.") + if not self.workflow_end: + if self.logger: + self.logger.info( + "Logging workflow end info to database due to abnormal exit") + time_completed = datetime.datetime.now() + msg = {'time_completed': time_completed, + 'workflow_duration': (time_completed - self.workflow_message['time_began']).total_seconds()} + if hasattr(self, 'workflow_message'): + self.workflow_message.update(msg) + self._update(table=WORKFLOW, + columns=['run_id', 'time_completed', + 'workflow_duration'], + messages=[self.workflow_message]) + self.workflow_end = True self.batching_interval, self.batching_threshold = float( 'inf'), float('inf') self._kill_event.set() @@ -472,8 +491,12 @@ def dbm_starter(exception_q, priority_msgs, node_msgs, resource_msgs, *args, **k dbm.logger.info("Starting dbm in dbm starter") try: dbm.start(priority_msgs, node_msgs, resource_msgs) + except KeyboardInterrupt: + dbm.logger.exception("KeyboardInterrupt signal caught") + dbm.close() except Exception as e: dbm.logger.exception("dbm.start exception") exception_q.put(("DBM", str(e))) + dbm.close() dbm.logger.info("End of dbm_starter") diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index cc5fa8a6ff..2c74ca707d 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -289,6 +289,7 @@ def close(self): if len(exception_msgs) == 0: self.priority_msgs.put(("STOP", 0)) self.dbm_proc.join() + self.logger.debug("Finished waiting for DBM termination") @staticmethod def monitor_wrapper(f, From 8c305f7e33abc7e7aed382055f887c947011a3aa Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Wed, 6 May 2020 23:11:45 +0000 Subject: [PATCH 03/11] remove a line that sets workflow_end attribute --- parsl/monitoring/db_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index c1bdfec832..256a898db6 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -444,7 +444,6 @@ def close(self): columns=['run_id', 'time_completed', 'workflow_duration'], messages=[self.workflow_message]) - self.workflow_end = True self.batching_interval, self.batching_threshold = float( 'inf'), float('inf') self._kill_event.set() From eafbb64133a1fbecc836be5f8a55d2ae5f514fd4 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Wed, 6 May 2020 23:16:22 +0000 Subject: [PATCH 04/11] remove one unused exception e --- parsl/monitoring/monitoring.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 2c74ca707d..7b5d4811d2 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -262,7 +262,7 @@ def send(self, mtype, message): self.logger.debug("Sending message {}, {}".format(mtype, message)) try: self._dfk_channel.send_pyobj((mtype, message)) - except zmq.Again as e: + except zmq.Again: self.logger.exception("[MONITORING] Monitoring send error") def close(self): From 2229f64386083e825d4b775caa0e9e2fade1e291 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Thu, 7 May 2020 15:47:05 +0000 Subject: [PATCH 05/11] catch KeyboardInterrupt exception and raise to the upper level in db_manager --- parsl/monitoring/db_manager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 256a898db6..ed9478fefc 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -394,22 +394,24 @@ def _migrate_logs_to_internal(self, logs_queue, queue_tag, kill_event): def _update(self, table, columns, messages): try: self.db.update(table=table, columns=columns, messages=messages) - except Exception: + except BaseException: self.logger.exception("Got exception when trying to update Table {}".format(table)) try: self.db.rollback() except Exception: self.logger.exception("Rollback failed") + raise def _insert(self, table, messages): try: self.db.insert(table=table, messages=messages) - except Exception: + except BaseException: self.logger.exception("Got exception when trying to insert to Table {}".format(table)) try: self.db.rollback() except Exception: self.logger.exception("Rollback failed") + raise def _get_messages_in_batch(self, msg_queue, interval=1, threshold=99999): messages = [] From 6faf49a446c2ba7d5246158ce31956284830ae44 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Thu, 7 May 2020 15:48:15 +0000 Subject: [PATCH 06/11] raise KeyboardInterrupt exception in dbm_starter --- parsl/monitoring/db_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index ed9478fefc..7d4a8dfec8 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -495,6 +495,7 @@ def dbm_starter(exception_q, priority_msgs, node_msgs, resource_msgs, *args, **k except KeyboardInterrupt: dbm.logger.exception("KeyboardInterrupt signal caught") dbm.close() + raise except Exception as e: dbm.logger.exception("dbm.start exception") exception_q.put(("DBM", str(e))) From ffcdcbeeed243a93afd02386ee6d5c58bd30a35b Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Thu, 7 May 2020 15:48:54 +0000 Subject: [PATCH 07/11] rename workflow_message and change code sytle --- parsl/monitoring/db_manager.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 7d4a8dfec8..8e068cf30a 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -204,6 +204,7 @@ def __init__(self, ): self.workflow_end = False + self.workflow_start_message = None self.logdir = logdir os.makedirs(self.logdir, exist_ok=True) @@ -286,7 +287,7 @@ def start(self, priority_queue, node_queue, resource_queue): self.logger.debug( "Inserting workflow start info to WORKFLOW table") self._insert(table=WORKFLOW, messages=[msg]) - self.workflow_message = msg + self.workflow_start_message = msg else: # workflow end message self.logger.debug( "Updating workflow end info to WORKFLOW table") @@ -433,19 +434,18 @@ def close(self): if self.logger: self.logger.info( "Database Manager cleanup initiated.") - if not self.workflow_end: + if not self.workflow_end and self.workflow_start_message: if self.logger: self.logger.info( "Logging workflow end info to database due to abnormal exit") time_completed = datetime.datetime.now() msg = {'time_completed': time_completed, - 'workflow_duration': (time_completed - self.workflow_message['time_began']).total_seconds()} - if hasattr(self, 'workflow_message'): - self.workflow_message.update(msg) - self._update(table=WORKFLOW, - columns=['run_id', 'time_completed', - 'workflow_duration'], - messages=[self.workflow_message]) + 'workflow_duration': (time_completed - self.workflow_start_message['time_began']).total_seconds()} + self.workflow_start_message.update(msg) + self._update(table=WORKFLOW, + columns=['run_id', 'time_completed', + 'workflow_duration'], + messages=[self.workflow_start_message]) self.batching_interval, self.batching_threshold = float( 'inf'), float('inf') self._kill_event.set() From 68796914bc28128aa9f7735bfc36424486de7084 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Thu, 7 May 2020 16:47:33 +0000 Subject: [PATCH 08/11] increase timeout and better exception text --- parsl/monitoring/monitoring.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 7b5d4811d2..b5557a059b 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -207,9 +207,10 @@ def start(self, run_id): self.logger.debug("Initializing ZMQ Pipes to client") self.monitoring_hub_active = True + self.dfk_channel_timeout = 10000 # in milliseconds self._context = zmq.Context() self._dfk_channel = self._context.socket(zmq.DEALER) - self._dfk_channel.setsockopt(zmq.SNDTIMEO, 1000) + self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout) self._dfk_channel.set_hwm(0) self.dfk_port = self._dfk_channel.bind_to_random_port("tcp://{}".format(self.client_address), min_port=self.client_port_range[0], @@ -263,7 +264,8 @@ def send(self, mtype, message): try: self._dfk_channel.send_pyobj((mtype, message)) except zmq.Again: - self.logger.exception("[MONITORING] Monitoring send error") + self.logger.exception( + f"[MONITORING] The monitoring message sent from DFK to Hub timeouts after {self.dfk_channel_timeout}ms") def close(self): if self.logger: From aa20379576953bd7de870136921cbf7f0c5d3057 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Thu, 7 May 2020 21:02:36 +0000 Subject: [PATCH 09/11] change f string to format --- parsl/monitoring/monitoring.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index b5557a059b..ef40151743 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -265,7 +265,7 @@ def send(self, mtype, message): self._dfk_channel.send_pyobj((mtype, message)) except zmq.Again: self.logger.exception( - f"[MONITORING] The monitoring message sent from DFK to Hub timeouts after {self.dfk_channel_timeout}ms") + "[MONITORING] The monitoring message sent from DFK to Hub timeouts after {}ms".format(self.dfk_channel_timeout)) def close(self): if self.logger: From ecc8d4c448457b2bc5b7d1918757153339cf64e5 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Mon, 11 May 2020 17:32:00 +0000 Subject: [PATCH 10/11] separate keyboardinterrupt and other exceptions in db_manager --- parsl/monitoring/db_manager.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 8e068cf30a..a2024c633b 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -395,7 +395,14 @@ def _migrate_logs_to_internal(self, logs_queue, queue_tag, kill_event): def _update(self, table, columns, messages): try: self.db.update(table=table, columns=columns, messages=messages) - except BaseException: + except KeyboardInterrupt: + self.logger.exception("KeyboardInterrupt when trying to update Table {}".format(table)) + try: + self.db.rollback() + except Exception: + self.logger.exception("Rollback failed") + raise + except Exception: self.logger.exception("Got exception when trying to update Table {}".format(table)) try: self.db.rollback() @@ -406,7 +413,14 @@ def _update(self, table, columns, messages): def _insert(self, table, messages): try: self.db.insert(table=table, messages=messages) - except BaseException: + except KeyboardInterrupt: + self.logger.exception("KeyboardInterrupt when trying to update Table {}".format(table)) + try: + self.db.rollback() + except Exception: + self.logger.exception("Rollback failed") + raise + except Exception: self.logger.exception("Got exception when trying to insert to Table {}".format(table)) try: self.db.rollback() From 2396072731acb94484aac797285a663521b79d48 Mon Sep 17 00:00:00 2001 From: Zhuozhao Li Date: Mon, 11 May 2020 15:46:12 -0500 Subject: [PATCH 11/11] do not re-raise exceptions in _insert and _update --- parsl/monitoring/db_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index a2024c633b..4bccba0e98 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -408,7 +408,6 @@ def _update(self, table, columns, messages): self.db.rollback() except Exception: self.logger.exception("Rollback failed") - raise def _insert(self, table, messages): try: @@ -426,7 +425,6 @@ def _insert(self, table, messages): self.db.rollback() except Exception: self.logger.exception("Rollback failed") - raise def _get_messages_in_batch(self, msg_queue, interval=1, threshold=99999): messages = []