Skip to content

Commit

Permalink
Bring in master up to PR #1670 into lsst-dm-202005
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed May 13, 2020
2 parents 0911ea6 + 6f8f66c commit 12f9cc6
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
39 changes: 36 additions & 3 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
import os
import time
import datetime

from typing import Any, Dict, Set

Expand Down Expand Up @@ -298,6 +299,8 @@ def __init__(self,
batching_threshold=99999,
):

self.workflow_end = False
self.workflow_start_message = None
self.logdir = logdir
os.makedirs(self.logdir, exist_ok=True)

Expand Down Expand Up @@ -400,13 +403,16 @@ def start(self, priority_queue, node_queue, resource_queue) -> None:
logger.debug(
"Inserting workflow start info to WORKFLOW table")
self._insert(table=WORKFLOW, messages=[msg])
self.workflow_start_message = msg
else: # workflow end message
logger.debug(
"Updating workflow end info to WORKFLOW table")
self._update(table=WORKFLOW,
columns=['run_id', 'tasks_failed_count',
'tasks_completed_count', 'time_completed'],
messages=[msg])
self.workflow_end = True

elif msg_type.value == MessageType.TASK_INFO.value:
task_try_id = str(msg['task_id']) + "." + str(msg['try_id'])
task_info_all_messages.append(msg)
Expand Down Expand Up @@ -564,6 +570,13 @@ def _migrate_logs_to_internal(self, logs_queue, queue_tag, kill_event):
def _update(self, table, columns, messages):
try:
self.db.update(table=table, columns=columns, messages=messages)
except KeyboardInterrupt:
logger.exception("KeyboardInterrupt when trying to update Table {}".format(table))
try:
self.db.rollback()
except Exception:
logger.exception("Rollback failed")
raise
except Exception:
logger.exception("Got exception when trying to update table {}".format(table))
try:
Expand All @@ -574,6 +587,13 @@ def _update(self, table, columns, messages):
def _insert(self, table, messages):
try:
self.db.insert(table=table, messages=messages)
except KeyboardInterrupt:
logger.exception("KeyboardInterrupt when trying to update Table {}".format(table))
try:
self.db.rollback()
except Exception:
logger.exception("Rollback failed")
raise
except Exception:
logger.exception("Got exception when trying to insert to table {}".format(table))
try:
Expand All @@ -598,9 +618,17 @@ def _get_messages_in_batch(self, msg_queue, interval=1, threshold=99999):
return messages

def close(self):
if logger:
logger.info(
"Finishing all the logging and terminating Database Manager.")
logger.info("Database Manager cleanup initiated.")
if not self.workflow_end and self.workflow_start_message:
logger.info("Logging workflow end info to database due to abnormal exit")
time_completed = datetime.datetime.now()
msg = {'time_completed': time_completed,
'workflow_duration': (time_completed - self.workflow_start_message['time_began']).total_seconds()}
self.workflow_start_message.update(msg)
self._update(table=WORKFLOW,
columns=['run_id', 'time_completed',
'workflow_duration'],
messages=[self.workflow_start_message])
self.batching_interval, self.batching_threshold = float(
'inf'), float('inf')
self._kill_event.set()
Expand All @@ -617,8 +645,13 @@ def dbm_starter(exception_q, priority_msgs, node_msgs, resource_msgs, *args, **k
dbm = DatabaseManager(*args, **kwargs)
logger.info("Starting dbm in dbm starter")
dbm.start(priority_msgs, node_msgs, resource_msgs)
except KeyboardInterrupt:
dbm.logger.exception("KeyboardInterrupt signal caught")
dbm.close()
raise
except Exception as e:
logger.exception("dbm.start exception")
exception_q.put(("DBM", str(e)))
dbm.close()

logger.info("End of dbm_starter")
13 changes: 10 additions & 3 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ def start(self, run_id):

self.logger.debug("Initializing ZMQ Pipes to client")
self.monitoring_hub_active = True
self.dfk_channel_timeout = 10000 # in milliseconds
self._context = zmq.Context()
self._dfk_channel = self._context.socket(zmq.DEALER)
self._dfk_channel.setsockopt(zmq.SNDTIMEO, self.dfk_channel_timeout)
self._dfk_channel.set_hwm(0)
self.dfk_port = self._dfk_channel.bind_to_random_port("tcp://{}".format(self.client_address),
min_port=self.client_port_range[0],
Expand Down Expand Up @@ -256,9 +258,13 @@ def start(self, run_id):

def send(self, mtype, message):
self.logger.debug("Sending message {}, {}".format(mtype, message))
r = self._dfk_channel.send_pyobj((mtype, message))
self.logger.debug("Sent message {}, {}".format(mtype, message))
return r
try:
r = self._dfk_channel.send_pyobj((mtype, message))
except zmq.Again:
self.logger.exception(
"[MONITORING] The monitoring message sent from DFK to Hub timeouts after {}ms".format(self.dfk_channel_timeout))
else:
self.logger.debug("Sent message {}, {}".format(mtype, message))

def close(self):
if self.logger:
Expand All @@ -284,6 +290,7 @@ def close(self):
if len(exception_msgs) == 0:
self.priority_msgs.put(("STOP", 0))
self.dbm_proc.join()
self.logger.debug("Finished waiting for DBM termination")

@staticmethod
def monitor_wrapper(f,
Expand Down

0 comments on commit 12f9cc6

Please sign in to comment.