Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow launcher command for flux startup #402

Merged
merged 13 commits into from
May 1, 2024
20 changes: 14 additions & 6 deletions src/radical/utils/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
import time
import json
import shlex

from typing import Optional, List, Dict, Any, Callable

Expand Down Expand Up @@ -119,7 +120,8 @@ def _watch(self) -> None:
# --------------------------------------------------------------------------
#
def start_service(self,
env: Optional[Dict[str,str]] = None
launcher: Optional[str] = None,
env : Optional[Dict[str,str]] = None
) -> Optional[str]:

with self._lock:
Expand All @@ -129,16 +131,22 @@ def start_service(self,

self._term.clear()

return self._locked_start_service(env)
return self._locked_start_service(launcher, env)


# --------------------------------------------------------------------------
#
def _locked_start_service(self,
env: Optional[Dict[str,str]] = None
launcher: Optional[str] = None,
env : Optional[Dict[str,str]] = None
) -> Optional[str]:

cmd = ['flux', 'start', 'bash', '-c', 'echo URI:$FLUX_URI && sleep inf']
cmd = list()

if launcher:
cmd += shlex.split(launcher)

cmd += ['flux', 'start', 'bash', '-c', 'echo URI:$FLUX_URI && sleep inf']

flux_proc = sp.Popen(cmd, encoding="utf-8",
stdin=sp.DEVNULL, stdout=sp.PIPE, stderr=sp.PIPE)
Expand Down Expand Up @@ -403,7 +411,7 @@ def env(self):

# --------------------------------------------------------------------------
#
def start_flux(self) -> None:
def start_flux(self, launcher: Optional[str] = None) -> None:
'''
Start a private Flux instance

Expand All @@ -416,7 +424,7 @@ def start_flux(self) -> None:
raise RuntimeError('service already connected: %s' % self._uri)

self._service = _FluxService(self._uid, self._log, self._prof)
self._service.start_service()
self._service.start_service(launcher=launcher)

self._uri = self._service.check_service()
self._env = self._service.env
Expand Down
Loading