Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize pash_runtime #665

Merged
merged 14 commits into from
Apr 21, 2023
1 change: 1 addition & 0 deletions compiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
PLANNER_EXECUTABLE = os.path.join(PASH_TOP, "compiler/pash_compiler.py")
RUNTIME_EXECUTABLE = os.path.join(PASH_TOP, "compiler/pash_runtime.sh")
SAVE_ARGS_EXECUTABLE = os.path.join(PASH_TOP, "runtime/save_args.sh")
SAVE_SHELL_STATE_EXECUTABLE = os.path.join(PASH_TOP, "compiler/orchestrator_runtime/save_shell_state.sh")

## Ensure that PASH_TMP_PREFIX is set by pa.sh
assert(not os.getenv('PASH_TMP_PREFIX') is None)
Expand Down
1 change: 1 addition & 0 deletions compiler/orchestrator_runtime/pash_init_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
[ -f ~/.pash_init ] && source ~/.pash_init
## File directory
export RUNTIME_DIR=$(dirname "${BASH_SOURCE[0]}")
export WRAPPER_LIB_DIR="$RUNTIME_DIR/../wrapper_library/"
## TODO: Is there a better way to do this?
export RUNTIME_LIBRARY_DIR="$RUNTIME_DIR/../../runtime/"
export PASH_REDIR="&2"
Expand Down
79 changes: 79 additions & 0 deletions compiler/orchestrator_runtime/pash_prepare_call_compiler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/bin/bash

## OUTPUT: When it completes it sets "$pash_script_to_execute"

## Only needed for expansion
export pash_input_args=( "$@" )

## Move some pash env variables to local so that tests pass
tmp="$pash_disable_parallel_pipelines"
unset pash_disable_parallel_pipelines
pash_disable_parallel_pipelines="$tmp"

tmp="$pash_input_ir_file"
unset pash_input_ir_file
pash_input_ir_file="$tmp"

tmp="$pash_sequential_script_file"
unset pash_sequential_script_file
pash_sequential_script_file="$tmp"

## Save the shell variables to a file (necessary for expansion)
export pash_runtime_shell_variables_file="${PASH_TMP_PREFIX}/variables_$RANDOM$RANDOM$RANDOM"
source "$RUNTIME_DIR/pash_declare_vars.sh" "$pash_runtime_shell_variables_file"
pash_redir_output echo "$$: (1) Bash variables saved in: $pash_runtime_shell_variables_file"

## The parallel script will be saved in the following file if compilation is successful.
pash_compiled_script_file="${PASH_TMP_PREFIX}/pash_$RANDOM$RANDOM$RANDOM"

## TODO: Have a more proper communication protocol
## TODO: Make a proper client for the daemon
pash_redir_output echo "$$: (2) Before asking the daemon for compilation..."
## Send and receive from daemon
msg="Compile:${pash_compiled_script_file}| Variable File:${pash_runtime_shell_variables_file}| Input IR File:${pash_input_ir_file}"
daemon_response=$(pash_communicate_daemon "$msg") # Blocking step, daemon will not send response until it's safe to continue

if [[ "$daemon_response" == *"OK:"* ]]; then
pash_runtime_return_code=0
elif [ -z "$daemon_response" ]; then
## Trouble... Daemon crashed, rip
pash_redir_output echo "$$: ERROR: (2) Daemon crashed!"
exit 1
else
pash_runtime_return_code=1
fi

# Get assigned process id
# We need to split the daemon response into elements of an array by
# shell's field splitting.
# shellcheck disable=SC2206
response_args=($daemon_response)
process_id=${response_args[1]}

pash_redir_output echo "$$: (2) Compiler exited with code: $pash_runtime_return_code"
if [ "$pash_runtime_return_code" -ne 0 ] && [ "$pash_assert_compiler_success_flag" -eq 1 ]; then
pash_redir_output echo "$$: ERROR: (2) Compiler failed with error code: $pash_runtime_return_code while assert_compiler_success was enabled! Exiting PaSh..."
exit 1
fi

# store functions for distributed execution
if [ "$distributed_exec" -eq 1 ]; then
declared_functions="${PASH_TMP_PREFIX}/pash_$RANDOM$RANDOM$RANDOM"
declare -f > "$declared_functions"
export declared_functions
fi

## If the compiler failed or if we dry_run the compiler, we have to run the sequential
if [ "$pash_runtime_return_code" -ne 0 ] || [ "$pash_dry_run_compiler_flag" -eq 1 ]; then
export pash_script_to_execute="${pash_sequential_script_file}"
else
export pash_script_to_execute="${pash_compiled_script_file}"
fi

## Let daemon know that this region is done
function inform_daemon_exit () {
## Send to daemon
msg="Exit:${process_id}"
daemon_response=$(pash_communicate_daemon_just_send "$msg")
}

34 changes: 34 additions & 0 deletions compiler/orchestrator_runtime/pash_restore_state_and_execute.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash

## File directory
RUNTIME_DIR=$(dirname "${BASH_SOURCE[0]}")

## INPUT: Expects SCRIPT_TO_EXECUTE to be set

#ONLY WAY OUT IS TO TREAT EXEC in special way

## Recover the `set` state of the previous shell
# pash_redir_output echo "$$: (3) Previous BaSh set state: $pash_previous_set_status"
# pash_redir_output echo "$$: (3) PaSh-internal set state of current shell: $-"
export pash_current_set_state=$-
source "$RUNTIME_DIR/pash_set_from_to.sh" "$pash_current_set_state" "$pash_previous_set_status"
pash_redir_output echo "$$: (3) Reverted to BaSh set state: $-"

## Execute the script
pash_redir_output echo "$$: (4) Restoring previous exit code: ${pash_previous_exit_status}"
pash_redir_output echo "$$: (4) Will execute script in ${SCRIPT_TO_EXECUTE}:"
pash_redir_output cat "${SCRIPT_TO_EXECUTE}"

## Note: We run the `exit` in a checked position so that we don't simply exit when we are in `set -e`.
if (exit "$pash_previous_exit_status")
then
{
## This works w.r.t. arguments because source does not change them if there are no arguments
## being given.
source "${SCRIPT_TO_EXECUTE}"
}
else
{
source "${SCRIPT_TO_EXECUTE}"
}
fi
43 changes: 0 additions & 43 deletions compiler/orchestrator_runtime/pash_runtime_complete_execution.sh

This file was deleted.

29 changes: 0 additions & 29 deletions compiler/orchestrator_runtime/pash_runtime_shell_to_pash.sh

This file was deleted.

50 changes: 0 additions & 50 deletions compiler/orchestrator_runtime/pash_wrap_vars.sh

This file was deleted.

23 changes: 23 additions & 0 deletions compiler/orchestrator_runtime/save_shell_state.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env bash

##
## Works for bash
##

## Configuration:
## DEFAULT_SET_STATE: Set this variable to determine the safe set state "huB"
##

##
## Necessary for bash:
## - Last exit code $?
## - set state $-
##


## Save the previous exit code
export PREVIOUS_SHELL_EC="$?"

## Store the current `set` status
export PREVIOUS_SET_STATUS=$-
source "$RUNTIME_DIR/pash_set_from_to.sh" "$PREVIOUS_SET_STATUS" "${DEFAULT_SET_STATE:-huB}"
28 changes: 19 additions & 9 deletions compiler/pash_compilation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import signal
import traceback
from threading import Thread
from datetime import datetime
from datetime import datetime, timedelta
# import queue

import config
Expand Down Expand Up @@ -55,15 +55,22 @@ def init():
## This class holds information for each process id
##
class ProcIdInfo:
def __init__(self, input_ir, compiler_config, exec_time=None):
def __init__(self, input_ir, compiler_config, exec_time=None, start_exec_time=None):
self.input_ir = input_ir
self.compiler_config = compiler_config
self.exec_time = exec_time
self.start_exec_time = start_exec_time
## TODO: Extend it with other info from scheduler, like dependencies

def set_exec_time(self, exec_time):
self.exec_time = exec_time

def set_start_exec_time(self, start_exec_time):
self.start_exec_time = start_exec_time

def get_start_exec_time(self):
return self.start_exec_time

def __repr__(self):
return f'ProcIdInfo(InputIR:{self.input_ir}, CompConfig:{self.compiler_config}, ExecTime:{self.exec_time})'

Expand Down Expand Up @@ -287,6 +294,10 @@ def compile_and_add(self, compiled_script_file, var_file, input_ir_file):
pass
else:
self.running_procs += 1

## Get the time before we start executing (roughly) to determine how much time this command execution will take
command_exec_start_time = datetime.now()
self.process_id_input_ir_map[process_id].set_start_exec_time(command_exec_start_time)
return response

def remove_process(self, process_id):
Expand Down Expand Up @@ -319,13 +330,12 @@ def wait_for_all(self):

def handle_exit(self, input_cmd):
assert(input_cmd.startswith("Exit:"))
exit_part, time_part = input_cmd.split("|")
process_id = int(exit_part.split(":")[1])
log("Time part is:", time_part)
try:
exec_time = float(time_part.split(":")[1])
except:
exec_time = None
process_id = int(input_cmd.split(":")[1])

## Get the execution time
command_finish_exec_time = datetime.now()
command_start_exec_time = self.process_id_input_ir_map[process_id].get_start_exec_time()
exec_time = (command_finish_exec_time - command_start_exec_time) / timedelta(milliseconds=1)
log("Process:", process_id, "exited. Exec time was:", exec_time)
self.handle_time_measurement(process_id, exec_time)
self.remove_process(process_id)
Expand Down
Loading