Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix monitoring ctrlc hang #1670

Merged
merged 16 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
import os
import time
import datetime

from parsl.dataflow.states import States
from parsl.providers.error import OptionalModuleMissing
Expand Down Expand Up @@ -202,6 +203,8 @@ def __init__(self,
batching_threshold=99999,
):

self.workflow_end = False
self.workflow_start_message = None
self.logdir = logdir
os.makedirs(self.logdir, exist_ok=True)

Expand Down Expand Up @@ -284,6 +287,7 @@ def start(self, priority_queue, node_queue, resource_queue):
self.logger.debug(
"Inserting workflow start info to WORKFLOW table")
self._insert(table=WORKFLOW, messages=[msg])
self.workflow_start_message = msg
else: # workflow end message
self.logger.debug(
"Updating workflow end info to WORKFLOW table")
Expand All @@ -292,6 +296,8 @@ def start(self, priority_queue, node_queue, resource_queue):
'tasks_completed_count', 'time_completed',
'workflow_duration'],
messages=[msg])
self.workflow_end = True

else: # TASK_INFO message
all_messages.append(msg)
if msg['task_id'] in inserted_tasks:
Expand Down Expand Up @@ -389,22 +395,24 @@ def _migrate_logs_to_internal(self, logs_queue, queue_tag, kill_event):
def _update(self, table, columns, messages):
try:
self.db.update(table=table, columns=columns, messages=messages)
except Exception:
except BaseException:
self.logger.exception("Got exception when trying to update Table {}".format(table))
try:
self.db.rollback()
except Exception:
self.logger.exception("Rollback failed")
raise

def _insert(self, table, messages):
try:
self.db.insert(table=table, messages=messages)
except Exception:
except BaseException:
self.logger.exception("Got exception when trying to insert to Table {}".format(table))
try:
self.db.rollback()
except Exception:
self.logger.exception("Rollback failed")
raise
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_insert and _update now pass on database errors to their caller, rather than absorbing them.
So probably the caller (the main loop process) now needs to deal with non-KeyboardInterrupt exceptions that might occur. Or, the _insert and _update code should only re-raise KeyboardInterrupt exceptions to preserve previous behaviour.

Copy link
Contributor Author

@ZhuozhaoLi ZhuozhaoLi May 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am choosing the later approach: raise KeyboardInterrupt exceptions in _insert and _update code. I think for exceptions other than KeyboardInterrupt, we should re-raise too since the db is missing some messages at that point, no?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should re-raise too since the db is missing some messages at that point, no?

the behaviour in master is to ignore most exceptions at the top level, and then carry on the main loop receiving and processing messages. I think this PR should not change that behaviour.

It would be worth investigating separately though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. thanks


def _get_messages_in_batch(self, msg_queue, interval=1, threshold=99999):
messages = []
Expand All @@ -425,7 +433,19 @@ def _get_messages_in_batch(self, msg_queue, interval=1, threshold=99999):
def close(self):
if self.logger:
self.logger.info(
"Finishing all the logging and terminating Database Manager.")
"Database Manager cleanup initiated.")
if not self.workflow_end and self.workflow_start_message:
if self.logger:
self.logger.info(
"Logging workflow end info to database due to abnormal exit")
time_completed = datetime.datetime.now()
msg = {'time_completed': time_completed,
'workflow_duration': (time_completed - self.workflow_start_message['time_began']).total_seconds()}
self.workflow_start_message.update(msg)
self._update(table=WORKFLOW,
columns=['run_id', 'time_completed',
'workflow_duration'],
messages=[self.workflow_start_message])
self.batching_interval, self.batching_threshold = float(
'inf'), float('inf')
self._kill_event.set()
Expand Down Expand Up @@ -472,8 +492,13 @@ def dbm_starter(exception_q, priority_msgs, node_msgs, resource_msgs, *args, **k
dbm.logger.info("Starting dbm in dbm starter")
try:
dbm.start(priority_msgs, node_msgs, resource_msgs)
except KeyboardInterrupt:
dbm.logger.exception("KeyboardInterrupt signal caught")
dbm.close()
benclifford marked this conversation as resolved.
Show resolved Hide resolved
raise
except Exception as e:
dbm.logger.exception("dbm.start exception")
exception_q.put(("DBM", str(e)))
dbm.close()

dbm.logger.info("End of dbm_starter")
9 changes: 8 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,10 @@ def start(self, run_id):

self.logger.debug("Initializing ZMQ Pipes to client")
self.monitoring_hub_active = True
self.dfk_channel_timeout = 10000 # in milliseconds
self._context = zmq.Context()
self._dfk_channel = self._context.socket(zmq.DEALER)
self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout)
self._dfk_channel.set_hwm(0)
self.dfk_port = self._dfk_channel.bind_to_random_port("tcp://{}".format(self.client_address),
min_port=self.client_port_range[0],
Expand Down Expand Up @@ -259,7 +261,11 @@ def start(self, run_id):

def send(self, mtype, message):
self.logger.debug("Sending message {}, {}".format(mtype, message))
return self._dfk_channel.send_pyobj((mtype, message))
try:
self._dfk_channel.send_pyobj((mtype, message))
except zmq.Again:
self.logger.exception(
"[MONITORING] The monitoring message sent from DFK to Hub timeouts after {}ms".format(self.dfk_channel_timeout))

def close(self):
if self.logger:
Expand All @@ -285,6 +291,7 @@ def close(self):
if len(exception_msgs) == 0:
self.priority_msgs.put(("STOP", 0))
self.dbm_proc.join()
self.logger.debug("Finished waiting for DBM termination")

@staticmethod
def monitor_wrapper(f,
Expand Down