Skip to content

Commit

Permalink
Merge pull request #402 from radical-cybertools/fix/flux_launcher
Browse files Browse the repository at this point in the history
allow launcher command for flux startup
  • Loading branch information
andre-merzky authored May 1, 2024
2 parents 3a782df + c43e7e5 commit dd54e29
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/radical/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from .testing import TestConfig
from .testing import set_test_config, add_test_config, get_test_config
from .env import env_read, env_write, env_read_lines, env_eval
from .env import env_prep, env_diff, EnvProcess
from .env import env_prep, env_diff, EnvProcess, env_dump
from .stack import stack
from .modules import import_module, find_module, import_file
from .modules import get_type, load_class
Expand Down
31 changes: 21 additions & 10 deletions src/radical/utils/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import re
import os
import sys
import queue
import hashlib
import tempfile
import traceback

from typing import List, Dict, Tuple, Any, Optional

Expand Down Expand Up @@ -469,7 +471,7 @@ def __init__(self, env : Dict[str, str]) -> None:

self._q = mp.Queue()
self._env = env
self._data = [None, None] # data, exception
self._data = None
self._child = None


Expand Down Expand Up @@ -511,13 +513,15 @@ def __enter__(self) -> 'EnvProcess':

# --------------------------------------------------------------------------
#
def __exit__(self, exc : Optional[Exception],
value: Optional[Any],
tb : Optional[Any]
def __exit__(self, exc_type: Optional[Exception],
exc_val : Optional[Any],
exc_tb : Optional[Any]
) -> None:

if exc and self._child:
self._q.put([None, exc])
if exc_type and self._child:
stacktrace = ' '.join(traceback.format_exception(
exc_type, exc_val, exc_tb))
self._q.put([None, exc_type, exc_val, stacktrace])
self._q.close()
self._q.join_thread()
os._exit(0)
Expand All @@ -530,6 +534,7 @@ def __exit__(self, exc : Optional[Exception],
self._data = self._q.get(timeout=1)
break
except queue.Empty:
self._data = None
pass


Expand All @@ -538,7 +543,7 @@ def __exit__(self, exc : Optional[Exception],
def put(self, data: str) -> None:

if self._child:
self._q.put([data, None])
self._q.put([data, None, None, None])
self._q.close()
self._q.join_thread()
os._exit(0)
Expand All @@ -548,9 +553,15 @@ def put(self, data: str) -> None:
#
def get(self) -> Any:

data, exc = self._data
if exc:
raise exc # pylint: disable=raising-bad-type
if self._data is None:
return


data, exc_type, exc_val, stacktrace = self._data
if exc_type:
sys.stdout.write('%s [%s]\n' % (exc_type, exc_val))
sys.stdout.write('%s\n\n' % stacktrace)
raise exc_type # pylint: disable=raising-bad-type

return data

Expand Down
30 changes: 18 additions & 12 deletions src/radical/utils/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import time
import json
import shlex

from typing import Optional, List, Dict, Any, Callable

Expand All @@ -18,9 +19,7 @@
from .logger import Logger
from .profile import Profiler
from .modules import import_module
from .misc import ru_open
from .host import get_hostname
from .debug import get_stacktrace


# --------------------------------------------------------------------------
Expand Down Expand Up @@ -119,7 +118,8 @@ def _watch(self) -> None:
# --------------------------------------------------------------------------
#
def start_service(self,
env: Optional[Dict[str,str]] = None
launcher: Optional[str] = None,
env : Optional[Dict[str,str]] = None
) -> Optional[str]:

with self._lock:
Expand All @@ -129,16 +129,22 @@ def start_service(self,

self._term.clear()

return self._locked_start_service(env)
return self._locked_start_service(launcher, env)


# --------------------------------------------------------------------------
#
def _locked_start_service(self,
env: Optional[Dict[str,str]] = None
launcher: Optional[str] = None,
env : Optional[Dict[str,str]] = None
) -> Optional[str]:

cmd = ['flux', 'start', 'bash', '-c', 'echo URI:$FLUX_URI && sleep inf']
cmd = list()

if launcher:
cmd += shlex.split(launcher)

cmd += ['flux', 'start', 'bash', '-c', 'echo URI:$FLUX_URI && sleep inf']

flux_proc = sp.Popen(cmd, encoding="utf-8",
stdin=sp.DEVNULL, stdout=sp.PIPE, stderr=sp.PIPE)
Expand Down Expand Up @@ -403,7 +409,7 @@ def env(self):

# --------------------------------------------------------------------------
#
def start_flux(self) -> None:
def start_flux(self, launcher: Optional[str] = None) -> None:
'''
Start a private Flux instance
Expand All @@ -416,7 +422,7 @@ def start_flux(self) -> None:
raise RuntimeError('service already connected: %s' % self._uri)

self._service = _FluxService(self._uid, self._log, self._prof)
self._service.start_service()
self._service.start_service(launcher=launcher)

self._uri = self._service.check_service()
self._env = self._service.env
Expand All @@ -438,10 +444,10 @@ def connect_flux(self, uri : Optional[str] = None) -> None:

with self._lock:

with ru_open(self._uid + '.dump', 'a') as fout:
fout.write('connect flux %d: %s\n' % (os.getpid(), uri))
for l in get_stacktrace():
fout.write(l + '\n')
# with ru_open(self._uid + '.dump', 'a') as fout:
# fout.write('connect flux %d: %s\n' % (os.getpid(), uri))
# for l in get_stacktrace():
# fout.write(l + '\n')

if self._uri:
raise RuntimeError('service already connected: %s' % self._uri)
Expand Down
7 changes: 5 additions & 2 deletions tests/unittests/test_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def test_hb_uid():
# --------------------------------------------------------------------------
def proc():

start = time.time()
print('proc start: %.2f' % (time.time() - start))

hb = ru.Heartbeat('test', timeout=0.1, interval=0.01)
t0 = time.time()

Expand All @@ -112,7 +115,7 @@ def proc():
time.sleep(0.1)

while True:
time.sleep(1)
time.sleep(0.1)

finally:
if time.time() > t0 + 3.2:
Expand All @@ -129,7 +132,7 @@ def proc():
assert p.is_alive()

# but it should have a zero exit value after 2 more seconds
time.sleep(6)
time.sleep(3)
assert not p.is_alive()
assert p.exitcode

Expand Down
37 changes: 1 addition & 36 deletions tests/unittests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,12 @@
# noqa: E201


import radical.utils as ru


# ------------------------------------------------------------------------------
#
def test_metric_expand():

d_in = {'total' : [{'ru.EVENT': 'bootstrap_0_start'},
{'ru.EVENT': 'bootstrap_0_stop' }],
'boot' : [{'ru.EVENT': 'bootstrap_0_start'},
{'ru.EVENT': 'sync_rel' }],
'setup_1' : [{'ru.EVENT': 'sync_rel' },
{'ru.STATE': 'rp.PMGR_ACTIVE' }],
'ignore' : [{'ru.STATE': 'rp.PMGR_ACTIVE' },
{'ru.EVENT': 'cmd' ,
'ru.MSG ': 'cancel_pilot' }],
'term' : [{'ru.EVENT': 'cmd' ,
'ru.MSG ': 'cancel_pilot' },
{'ru.EVENT': 'bootstrap_0_stop' }]}

d_out = {'total' : [{1 : 'bootstrap_0_start'},
{1 : 'bootstrap_0_stop' }],
'boot' : [{1 : 'bootstrap_0_start'},
{1 : 'sync_rel' }],
'setup_1' : [{1 : 'sync_rel' },
{5 : 'PMGR_ACTIVE' }],
'ignore' : [{5 : 'PMGR_ACTIVE' },
{1 : 'cmd' ,
6 : 'cancel_pilot' }],
'term' : [{1 : 'cmd' ,
6 : 'cancel_pilot' },
{1 : 'bootstrap_0_stop' }]}

assert ru.metric_expand(d_in) == d_out


# ------------------------------------------------------------------------------
# run tests if called directly
if __name__ == "__main__":

test_metric_expand()
pass


# ------------------------------------------------------------------------------
Expand Down

0 comments on commit dd54e29

Please sign in to comment.