Skip to content

Commit

Permalink
Merge pull request #420 from radical-cybertools/feature/simple_hb
Browse files Browse the repository at this point in the history
Feature/simple hb
  • Loading branch information
andre-merzky authored Oct 16, 2024
2 parents 8f767b8 + 0db3f0f commit 2a4de17
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 9 deletions.
14 changes: 7 additions & 7 deletions bin/radical-utils-pwatch
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# Depending on what finishes first, the sleep 5 or the random sleep (and thus
# the parent shell), the process watcher will kill the other shell.
#
#
#
# Example 2:
#
# sleep $((RANDOM % 10)) & RU_PW_PID1=$!;
Expand All @@ -58,29 +58,29 @@ run_pwatch(){
# check RU_PW_PPID and RU_PW_CPID are set, and use them
test -z "$RU_PW_PID1" || pid1=$RU_PW_PID1
test -z "$RU_PW_PID2" || pid2=$RU_PW_PID2

# use fallback pid1 (PPID)
test -z "$pid1" && pid1=$PPID

# use fallback forpid2: cmd subshell
# NOTE: we cannot use `test -z $pid2 && $ARGS & pid2=$!` - that would give
# us the pid of the subshell, not of the command (at least in bash)
if test -z "$pid2"
then
then
# eval ensures that args could contain pipelines and I/O redirections,
# but we need to add the backgrounding to arga (if its not there, yet).
# but we need to add the backgrounding to args (if its not there, yet).
# Don't background `eval` itself, that results in a subshell pid.
ARGS=$(echo "$ARGS" | sed 's/[& ]\+$//')
eval "$ARGS &"
pid2=$!
fi

# ensure we have two PIDs
test -z "$pid1" && echo 'missing process to watch (1)' && return 1
test -z "$pid2" && echo 'missing process to watch (2)' && return 2

# echo "pwatch [$pid1] [$pid2]"

# --------------------------------------------------------------------------
while true
do
Expand All @@ -98,7 +98,7 @@ run_pwatch(){
kill $pid1
return 0
fi

sleep 1
done
}
Expand Down
2 changes: 1 addition & 1 deletion src/radical/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .object_cache import ObjectCache
from .plugin_manager import PluginManager, PluginBase
from .singleton import Singleton
from .heartbeat import Heartbeat
from .heartbeat import Heartbeat, PWatcher
from .threads import is_main_thread, is_this_thread, cancel_main_thread
from .threads import main_thread, this_thread, get_thread_name, gettid
from .threads import set_cancellation_handler, unset_cancellation_handler
Expand Down
134 changes: 134 additions & 0 deletions src/radical/utils/heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import os
import sys
import time
import pprint
import signal
Expand Down Expand Up @@ -263,5 +264,138 @@ def wait_startup(self, uids=None, timeout=None):
self._log.debug_3('wait ok : %s', ok)


# ------------------------------------------------------------------------------
#
class PWatcher(object):

NOTHING = 'nothing'
SUICIDE = 'suicide'
KILLALL = 'killall'
RAMPAGE = 'rampage'

# --------------------------------------------------------------------------
#
def __init__(self, action=None, uid=None, log=None):
'''
This is a simple process monitor: once started it runs in a separate
thread and monitors all given process IDs (`self._watch_pid`). If
a process is found to have died, the watcher will invoke the given
action:
- `nothing`: log event and do nothing else
- `suicide`: kill the curent process
- `killall`: kill all monitored pids
- `rampage`: both of the above (`suicide + killall`)
The default action is `rampage`.
The passed uid (default: `pwatcher`) is used for logging purposes only.
'''

self._action = action or self.RAMPAGE
self._uid = uid or 'pwatcher'
self._log = log or Logger(name=self._uid, ns='radical.utils')
self._pids = list()
self._lock = mt.Lock()

self._log.debug_1('pwatcher create')

self._thread = mt.Thread(target=self._watch)
self._thread.daemon = True
self._thread.start()


# --------------------------------------------------------------------------
#
def _is_alive(self, pid):
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True


# --------------------------------------------------------------------------
#
def _watch(self):

self._log.debug_1('pwatcher started')

while True:

with self._lock:

for pid in list(self._pids):

if not self._is_alive(pid):

self._log.warn('process %d died, exit', pid)
self._pids.remove(pid)

if self._action == self.SUICIDE: self._suicide(pid)
elif self._action == self.KILLALL: self._killall(pid)
elif self._action == self.RAMPAGE: self._rampage(pid)

time.sleep(0.05)


# --------------------------------------------------------------------------
#
def watch(self, pid):

self._log.debug('add pid %d to watchlist', pid)

with self._lock:
self._pids.append(pid)


# --------------------------------------------------------------------------
#
def unwatch(self, pid):

self._log.debug('remove pid %d from watchlist', pid)

with self._lock:
if pid in self._pids:
self._pids.remove(pid)


# --------------------------------------------------------------------------
#
def _nothing(self, pid):

self._log.debug("process %d's demise triggered, well, nothing", pid)


# --------------------------------------------------------------------------
#
def _suicide(self, pid):

self._log.debug("process %d's demise triggered suicide", pid)
os.kill(os.getpid(), signal.SIGKILL)


# --------------------------------------------------------------------------
#
def _killall(self, pid):

self._log.debug("process %d's demise triggered killall (%s)",
pid, self._pids)

for pid in list(self._pids):
try : os.kill(pid, signal.SIGKILL)
except: pass


# --------------------------------------------------------------------------
#
def _rampage(self, pid):

self._killall(pid)
self._suicide(pid)


# ------------------------------------------------------------------------------

3 changes: 3 additions & 0 deletions src/radical/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ def __init__(self, name, ns=None, path=None, targets=None, level=None,
settings.
"""

if name is None:
raise ValueError('logger name must be specified and not `None`')

self._name = name
self._ns = ns
self._path = path
Expand Down
15 changes: 15 additions & 0 deletions src/radical/utils/zmq/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ def start(self):
self._log.info('started bridge %s', self._uid)


# --------------------------------------------------------------------------
#
def wait(self, timeout=None):
'''
wait for the bridge to terminate. If `timeout` is set, the call will
return after that many seconds, with a return value indicating whether
the bridge is still alive.
'''

self._bridge_thread.join(timeout=timeout)

if timeout is not None:
return not self._bridge_thread.is_alive()


# --------------------------------------------------------------------------
#
@staticmethod
Expand Down
17 changes: 17 additions & 0 deletions tests/bin/test_pwatch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/sh

# test the shell pwatcher

start=$(date +%s)

sleep 1 &
export RU_PW_PID1=$!

sleep 3 &
export RU_PW_PID2=$!

./bin/radical-utils-pwatch

stop=$(date +%s)
test "$(($stop-$start))" -gt 2 && exit 1 || exit 0

Loading

0 comments on commit 2a4de17

Please sign in to comment.