Skip to content

Commit

Permalink
Add native runTheMatrix options via runWorkflow.
Browse files Browse the repository at this point in the history
Keep track of launched processes with monitoring..
  • Loading branch information
rovere committed Oct 2, 2023
1 parent f683163 commit ce06ef9
Showing 1 changed file with 45 additions and 5 deletions.
50 changes: 45 additions & 5 deletions Configuration/PyReleaseValidation/scripts/runTheMatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ def stepOrIndex(s):
from colorama import Fore, Style
from os import isatty
import subprocess
import time

class TheMatrix(cmd.Cmd):
intro = "Welcome to the Matrix (? for help)"
Expand All @@ -476,6 +477,7 @@ def __init__(self, opt):
self.opt_ = opt
self.matrices_ = {}
tmp = MatrixReader(self.opt_)
self.processes_ = dict()
for what in tmp.files:
what = what.replace('relval_','')
self.opt_.what = what
Expand Down Expand Up @@ -557,11 +559,20 @@ def do_showWorkflow(self, arg):
def do_runWorkflow(self, arg):
# Split the input arguments into a list
args = arg.split()
if len(args) != 2:
if len(args) < 2:
print(Fore.RED + Style.BRIGHT + "Wrong number of parameters passed")
print(Style.RESET_ALL)
return
workflow_class, workflow_id = args
workflow_class = args[0]
workflow_id = args[1]
passed_down_args = list()
if len(args) > 2:
passed_down_args = args[2:]
print(Fore.YELLOW + Style.BRIGHT + "Running with the following options:\n")
print(Fore.GREEN + Style.BRIGHT + "Workflow class: {}".format(workflow_class))
print(Fore.GREEN + Style.BRIGHT + "Workflow ID: {}".format(workflow_id))
print(Fore.GREEN + Style.BRIGHT + "Additional runTheMatrix options: {}".format(passed_down_args))
print(Style.RESET_ALL)
if workflow_class not in self.matrices_.keys():
print(Fore.RED + Style.BRIGHT + "Unknown workflow selected: {}".format(workflow_class))
print("Available workflows:")
Expand All @@ -576,14 +587,27 @@ def do_runWorkflow(self, arg):
print(wflnums)
print(Style.RESET_ALL)
return
if workflow_id in self.processes_.keys():
# Check if the process is still active
if self.processes_[workflow_id][0].poll() is None:
print(Fore.RED + Style.BRIGHT + "Workflow {} already running!".format(workflow_id))
print(Style.RESET_ALL)
return
# If it was there but it's gone, proceeed and update the value for the same key
# run a job, redirecting standard output and error to files
lognames = ['stdout', 'stderr']
logfiles = tuple('%s_%s_%s.log' % (workflow_class, workflow_id, name) for name in lognames)
stdout = open(logfiles[0], 'w')
stderr = open(logfiles[1], 'w')
p = subprocess.Popen(['runTheMatrix.py', '-w', workflow_class, '-l', workflow_id],
command = ('runTheMatrix.py', '-w', workflow_class, '-l', workflow_id)
if len(passed_down_args) > 0:
command += tuple(passed_down_args)
print(command)
p = subprocess.Popen(command,
stdout = stdout,
stderr = stderr)
self.processes_[workflow_id] = (p, time.time())


def complete_runWorkflow(self, text, line, start_idx, end_idx):
if text and len(text) > 0:
Expand All @@ -595,7 +619,7 @@ def help_runWorkflow(self):
print("\n".join(["runWorkflow workflow_class workflow_id\n",
"This command will launch a new and independent process that invokes",
"the command:\n",
"runTheMatrix.py -w workflow_class -l workflow_id",
"runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
"\nYou can specify just one workflow_class and workflow_id per invocation.",
"The job will continue even after quitting the interactive session.",
"stdout and stderr of the new process will be automatically",
Expand All @@ -606,7 +630,23 @@ def help_runWorkflow(self):
"Autocompletion is available for workflow_class, but",
"not for workflow_id. Supplying a wrong workflow_class or",
"a non-existing workflow_id for a valid workflow_class",
"will trigger an error and no process will be invoked."]))
"will trigger an error and no process will be invoked.",
"The interactive shell will keep track of all active processes",
"and will prevent the accidental resubmission of an already",
"active jobs."]))

def do_jobs(self, args):
print(Fore.GREEN + Style.BRIGHT + "List of jobs:")
for w in self.processes_.keys():
if self.processes_[w][0].poll() is None:
print(Fore.YELLOW + Style.BRIGHT + "Active job: {} since {:.2f} seconds.".format(w, time.time() - self.processes_[w][1]))
else:
print(Fore.RED + Style.BRIGHT + "Done job: {}".format(w))
print(Style.RESET_ALL)

def help_jobs(self):
print("\n".join(["Print a full list of active and done jobs submitted",
"in the ongoing interactive session"]))

def help_searchInWorkflow(self):
print("\n".join(["searchInWorkflow wfl_name search_regexp\n",
Expand Down

0 comments on commit ce06ef9

Please sign in to comment.