Skip to content

Commit

Permalink
Introduce background mode to magic %run and %sosrun #178
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Jan 17, 2019
1 parent e9a18a1 commit a91e809
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 18 deletions.
3 changes: 3 additions & 0 deletions src/sos_notebook/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ define([
// if there is an existing status table, try to retrieve its information
// if the new data does not have it
let has_status_table = document.getElementById(`workflow_${cell_id}`);
if (!has_status_table && info.status != 'pending') {
return;
}
let timer_text = '';
if (info.start_time) {
// convert from python time to JS time.
Expand Down
3 changes: 2 additions & 1 deletion src/sos_notebook/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,8 @@ def run_sos_code(self, code, silent):
try:
if self._workflow_mode:
res = run_sos_workflow(
code=code, raw_args=self.options, kernel=self)
code=code, raw_args=self.options, kernel=self,
run_in_queue=self._workflow_mode == 'nowait')
else:
res = execute_scratch_cell(code=code, raw_args=self.options,
kernel=self)
Expand Down
21 changes: 16 additions & 5 deletions src/sos_notebook/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,9 +1375,10 @@ def __init__(self, kernel):

def get_parser(self):
parser = argparse.ArgumentParser(prog='%run',
description='''Execute the current cell with specified command line
description='''Execute the current cell with specified command line
arguments. Arguments set by magic %set will be appended at the
end of command line''')
end of command line. If the magic ends with "&", it will be sent
to a queue to be executed sequentially.''')
parser.error = self._parse_error
return parser

Expand All @@ -1403,13 +1404,17 @@ def apply(self, code, silent, store_history, user_expressions, allow_stdin):
run_code = '[default]\n' + run_code
# now we need to run the code multiple times with each option
for options in run_options:
if options.strip().endswith('&'):
self.sos_kernel._meta['workflow_mode'] = 'nowait'
options = options[:-1]
else:
self.sos_kernel._meta['workflow_mode'] = 'wait'
old_options = self.sos_kernel.options
self.sos_kernel.options = options + ' ' + self.sos_kernel.options
try:
# %run is executed in its own namespace
old_dict = env.sos_dict
self.sos_kernel._reset_dict()
self.sos_kernel._meta['workflow_mode'] = True
if self.sos_kernel._debug_mode:
self.sos_kernel.warn(f'Executing\n{run_code}')
if self.sos_kernel.kernel != 'SoS':
Expand Down Expand Up @@ -1723,21 +1728,27 @@ def get_parser(self):
description='''Execute the entire notebook with steps consisting of SoS
cells (cells with SoS kernel) with section header, with specified command
line arguments. Arguments set by magic %set will be appended at the
end of command line''')
end of command line. If the magic ends with "&", it will be sent
to a queue to be executed sequentially.''')
parser.error = self._parse_error
return parser

def apply(self, code, silent, store_history, user_expressions, allow_stdin):
options, remaining_code = self.get_magic_and_code(code, False)
old_options = self.sos_kernel.options
if options.strip().endswith('&'):
self.sos_kernel._meta['workflow_mode'] = 'nowait'
options = options[:-1]
else:
self.sos_kernel._meta['workflow_mode'] = 'wait'
self.sos_kernel.options = options + ' ' + self.sos_kernel.options
try:
if self.sos_kernel.kernel != 'SoS':
self.sos_kernel.switch_kernel('SoS')
# %run is executed in its own namespace
old_dict = env.sos_dict
self.sos_kernel._reset_dict()
self.sos_kernel._meta['workflow_mode'] = True

# self.sos_kernel.send_frontend_msg('preview-workflow', self.sos_kernel._meta['workflow'])
if not self.sos_kernel._meta['workflow']:
self.sos_kernel.warn(
Expand Down
34 changes: 22 additions & 12 deletions src/sos_notebook/workflow_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ def run(self):
cmd = f'sos run .sos/interactive.sos {self.args} -m tapping slave {self.config["slave_id"]} {self.config["sockets"]["tapping_logging"]} {self.config["sockets"]["tapping_listener"]} {self.config["sockets"]["tapping_controller"]}'
ret_code = pexpect_run(
cmd, shell=True, stdout_socket=stdout_socket)
# status will not trigger frontend update if it was not
# started with a pending status
informer_socket.send_pyobj(
{'msg_type': 'workflow_status',
'data': {
Expand Down Expand Up @@ -260,7 +262,7 @@ def execute_pending_workflow(cell_ids, kernel):
run_next_workflow_in_queue()


def run_sos_workflow(code, raw_args='', kernel=None, workflow_mode=False):
def run_sos_workflow(code, raw_args='', kernel=None, workflow_mode=False, run_in_queue=False):
# when user asks to execute a cell as workflow. We either
# execute the workflow or put it in queue
global g_workflow_queue
Expand All @@ -269,20 +271,28 @@ def run_sos_workflow(code, raw_args='', kernel=None, workflow_mode=False):
# completed job (dead process), or running job.
if kernel.cell_id in [cid for cid, proc in g_workflow_queue if proc is not None]:
cancel_workflow(kernel.cell_id, kernel)
# put to the back
g_workflow_queue.append([kernel.cell_id, (code, raw_args, env.config)])
# in any case, we start with a pending status
kernel.send_frontend_msg('workflow_status',
{
'cell_id': kernel.cell_id,
'status': 'pending',
'index': len(g_workflow_queue)
})
run_next_workflow_in_queue()

if run_in_queue:
# put to the back
g_workflow_queue.append([kernel.cell_id, (code, raw_args, env.config)])
# in any case, we start with a pending status
kernel.send_frontend_msg('workflow_status',
{
'cell_id': kernel.cell_id,
'status': 'pending',
'index': len(g_workflow_queue)
})
run_next_workflow_in_queue()
else:
env.config['slave_id'] = kernel.cell_id
executor = Tapped_Executor(code, raw_args, env.config)
executor.start()
executor.join()


def cancel_workflow(cell_id, kernel):
env.log_to_file(f'cancel {cell_id}')
global g_workflow_queue
env.logger.info(f'A queued or running workflow in this cell is canceled')
kernel.send_frontend_msg('workflow_status', {
'cell_id': cell_id,
'status': 'purged'
Expand Down

0 comments on commit a91e809

Please sign in to comment.