Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cover all but one test case from compiler/test_evaluation_scripts.sh #612

Merged
merged 11 commits into from
Jul 20, 2022
14 changes: 11 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
## TODOs before merging to `future`

- support for RR with unwrap for commutative commands
- working on all tests
- fix tests from compiler/test_evaluation_scripts.sh:
+ bigrams
- clean up utils for annotations
- graphviz
- Changing PaSh flags (making the default be priority r-split and then consecutive chunks), so remove the r_split flag and make defaults be the ones from the OSDI paper
- Fixing annotation library installation to a specific commit
- Remove code which got obsolete due to the changes
- Remove code which got obsolete due to the changes
- Room for optimization: basically disable parallelization after a tr which squeezes all new lines since there are no sequences of data to parallelize anyway for the moment.
Long-term, we could allow parallelization but with a adj_line_merge aggregator.
- Changes to scripts:
+ `spell-grep.sh`: added `export -f set_diff` as the command was not known when parallelizing?
+ `shortest_scripts.sh`: here I only needed to modify the script slightly:
(1) option arguments for `cut` with whitespace as the parser cannot deal with them otherwise currently but we might want to change this in the future,
(2) `head -n 15` instead of `head -15` which might be a bit harder to support. I did not really see how the man-page supports this actually when skimming but I might have missed that.
- tr_test.sh: Outside the testing script, the outputs are the same but somehow it still shows different outputs. Checked this with Konstantinos and he will check the testing script later.
4 changes: 2 additions & 2 deletions compiler/annotations_utils/util_cmd_invocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ def to_arg_from_cmd_inv_with_io_vars_without_streaming_inputs_or_outputs_for_wra
whole_cmd.concatenate(Arg(string_to_argument("\'")))
return whole_cmd

def to_arg_flagoption(flagoption, _edges):
def to_arg_flagoption(flagoption, edges):
if isinstance(flagoption, Flag):
return [Arg(string_to_argument(flagoption.get_name()))]
elif isinstance(flagoption, OptionWithIO):
opt_name_arg = Arg(string_to_argument(flagoption.get_name()))
opt_arg_arg = translate_io_var_to_arg_if_applicable(flagoption.get_arg())
opt_arg_arg = translate_io_var_to_arg_if_applicable(flagoption.get_arg(), edges)
return [opt_name_arg, opt_arg_arg]

def to_arg_operand(operand, edges):
Expand Down
2 changes: 2 additions & 0 deletions compiler/annotations_utils/util_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def parse_arg_list_to_command_invocation(command, flags_options_operands) -> Com
option = Option(option_name_as_string, option_arg_as_arg)
flag_option_list.append(option)
i += 1 # since we consumed another term for the argument
elif potential_flag_or_option_name == "-": # switch to operand mode (interpreted as hyphen-stdin)
break
elif are_all_individually_flags(potential_flag_or_option_name, set_of_all_flags):
for split_el in list(potential_flag_or_option_name[1:]):
flag: Flag = Flag(f'-{split_el}')
Expand Down
6 changes: 3 additions & 3 deletions compiler/definitions/ir/nodes/pash_split.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datatypes_new.AccessKind import AccessKind
from datatypes_new.AccessKind import make_stream_input, make_stream_output
from datatypes_new.CommandInvocationWithIOVars import CommandInvocationWithIOVars

from definitions.ir.file_id import *
Expand Down Expand Up @@ -26,8 +26,8 @@ def make_split_file(input_id, out_ids):
auto_split_bin = os.path.join(config.PASH_TOP, config.config['runtime']['auto_split_binary'])
operand_list = [input_id]
operand_list.extend(out_ids)
access_map = {output_id: AccessKind.make_stream_output() for output_id in out_ids}
access_map[input_id] = AccessKind.make_stream_input()
access_map = {output_id: make_stream_output() for output_id in out_ids}
access_map[input_id] = make_stream_input()
cmd_inv_with_io_vars = CommandInvocationWithIOVars(
cmd_name=auto_split_bin,
flag_option_list=[],
Expand Down
48 changes: 30 additions & 18 deletions compiler/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def compile_command_to_DFG(fileIdGen, command, options,
redirections=[]):
command_invocation: CommandInvocationInitial = parse_arg_list_to_command_invocation(command, options)
io_info: InputOutputInfo = get_input_output_info_from_cmd_invocation_util(command_invocation)
if io_info is None:
raise Exception(f"InputOutputInformation for {format_arg_chars(command)} not provided so considered side-effectful.")
if io_info.has_other_outputs():
raise Exception(f"Command {format_arg_chars(command)} has outputs other than streaming.")
para_info: ParallelizabilityInfo = get_parallelizability_info_from_cmd_invocation_util(command_invocation)
command_invocation_with_io = io_info.apply_input_output_info_to_command_invocation(command_invocation)
parallelizer_list, round_robin_compatible_with_cat, is_commutative = para_info.unpack_info()
Expand Down Expand Up @@ -766,7 +770,6 @@ def apply_parallelization_to_node(self, node_id, parallelizer, fileIdGen, fan_ou
batch_size, no_cat_split_vanish, r_split_batch_size):
splitter = parallelizer.get_splitter()
if splitter.is_splitter_round_robin():
# TODO: for both functions, check which parameters are needed
self.apply_round_robin_parallelization_to_node(node_id, parallelizer, fileIdGen, fan_out,
batch_size, no_cat_split_vanish, r_split_batch_size)
elif splitter.is_splitter_round_robin_with_unwrap_flag():
Expand Down Expand Up @@ -797,14 +800,19 @@ def apply_round_robin_parallelization_to_node(self, node_id, parallelizer, fileI
node.get_single_streaming_input_single_output_and_configuration_inputs_of_node_for_parallelization()
original_cmd_invocation_with_io_vars = node.cmd_invocation_with_io_vars


can_be_fused_with_prev = False
prev_nodes = self.get_previous_nodes(node_id)
first_pred_node, first_pred_cmd_inv = self.get_first_previous_node_and_first_previous_cmd_invocation(prev_nodes)
if len(prev_nodes) == 1:
first_pred_node, first_pred_cmd_inv = \
self.get_only_previous_node_and_only_previous_cmd_invocation(prev_nodes)
if isinstance(first_pred_node, r_merge.RMerge):
can_be_fused_with_prev = True

# remove node to be parallelized
self.remove_node(node_id) # remove it here already as as we need to remove edge end points ow. to avoid disconnecting graph to avoid disconnecting graph

if len(prev_nodes) == 1 and isinstance(first_pred_node, r_merge.RMerge):
# can be fused
if can_be_fused_with_prev:
self.remove_node(prev_nodes[0]) # also sets respective edge to's and from's to None
in_mapper_ids = first_pred_cmd_inv.operand_list
else: # cannot be fused so introduce splitter
Expand All @@ -830,16 +838,19 @@ def apply_round_robin_with_unwrap_flag_parallelization_to_node(self, node_id, pa
node.get_single_streaming_input_single_output_and_configuration_inputs_of_node_for_parallelization()
original_cmd_invocation_with_io_vars = node.cmd_invocation_with_io_vars

can_be_fused_with_prev = False
prev_nodes = self.get_previous_nodes(node_id)
first_pred_node, first_pred_cmd_inv = self.get_first_previous_node_and_first_previous_cmd_invocation(prev_nodes)
if len(prev_nodes) == 1:
first_pred_node, first_pred_cmd_inv = \
self.get_only_previous_node_and_only_previous_cmd_invocation(prev_nodes)
if isinstance(first_pred_node, r_merge.RMerge):
can_be_fused_with_prev = True

# remove node to be parallelized
self.remove_node(node_id) # remove it here already as as we need to remove edge end points ow. to avoid disconnecting graph to avoid disconnecting graph

if len(prev_nodes) == 1 and isinstance(first_pred_node, r_merge.RMerge):
# and node.is_commutative(): implied by how this kind of splitter is inferred
if can_be_fused_with_prev: # and node.is_commutative(): implied by how this kind of splitter is inferred
self.remove_node(prev_nodes[0]) # also sets respective edge to's and from's to None

in_unwrap_ids = first_pred_cmd_inv.operand_list
out_unwrap_ids = self.introduce_unwraps(fileIdGen, in_unwrap_ids)
in_mapper_ids = out_unwrap_ids
Expand Down Expand Up @@ -870,23 +881,24 @@ def apply_consecutive_chunks_parallelization_to_node(self, node_id, parallelizer
node.get_single_streaming_input_single_output_and_configuration_inputs_of_node_for_parallelization()
original_cmd_invocation_with_io_vars = node.cmd_invocation_with_io_vars

can_be_fused_with_prev = False
prev_nodes = self.get_previous_nodes(node_id)
first_pred_node, first_pred_cmd_inv = self.get_first_previous_node_and_first_previous_cmd_invocation(prev_nodes)
if len(prev_nodes) == 1:
first_pred_node, first_pred_cmd_inv = \
self.get_only_previous_node_and_only_previous_cmd_invocation(prev_nodes)
if first_pred_cmd_inv.is_aggregator_concatenate():
can_be_fused_with_prev = True

# remove node to be parallelized
self.remove_node(node_id) # remove it here already as as we need to remove edge end points ow. to avoid disconnecting graph to avoid disconnecting graph

# TODO: change first check to first_pred_node and not cmd_inv
if len(prev_nodes) == 1 and first_pred_cmd_inv.is_aggregator_concatenate():
# can be fused
if can_be_fused_with_prev:
self.remove_node(prev_nodes[0]) # also sets respective edge to's and from's to None
in_mapper_ids = first_pred_cmd_inv.operand_list
else: # cannot be fused so introduce splitter
# splitter
consec_chunks_splitter_generator = lambda input_id, output_ids: pash_split.make_split_file(input_id,
output_ids)
out_split_ids = self.introduce_splitter(consec_chunks_splitter_generator, fan_out, fileIdGen,
streaming_input)
consec_chunks_splitter_generator = lambda input_id, output_ids: pash_split.make_split_file(input_id, output_ids)
out_split_ids = self.introduce_splitter(consec_chunks_splitter_generator, fan_out, fileIdGen, streaming_input)
in_mapper_ids = out_split_ids

# mappers
Expand All @@ -900,9 +912,9 @@ def apply_consecutive_chunks_parallelization_to_node(self, node_id, parallelizer
original_cmd_invocation_with_io_vars, out_aggregator_id, parallelizer,
streaming_output)

def get_first_previous_node_and_first_previous_cmd_invocation(self, prev_nodes):
assert (len(prev_nodes) > 0)
def get_only_previous_node_and_only_previous_cmd_invocation(self, prev_nodes):
# get info about first one but also ensure that it is the only one if we fuse
assert len(prev_nodes) == 1
first_pred_id = prev_nodes[0]
first_pred_node = self.get_node(first_pred_id)
first_pred_cmd_inv = first_pred_node.cmd_invocation_with_io_vars
Expand Down
4 changes: 3 additions & 1 deletion evaluation/tests/shortest_scripts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@
# +p.95 multiple sed
# +p.XX crawler

cat $IN | xargs file | grep "shell script" | cut -d: -f1 | xargs -L 1 wc -l | grep -v '^0$' | sort -n | head -15
# cut -d: -f1 -> cut -d : -f 1; as parser recognizes option arguments only if given with whitespace
# head -15 -> head -n 15; not documented in man page
cat $IN | xargs file | grep "shell script" | cut -d : -f 1 | xargs -L 1 wc -l | grep -v '^0$' | sort -n | head -n 15