Skip to content

Commit

Permalink
Cover all but one test case from compiler/test_evaluation_scripts.sh (#…
Browse files Browse the repository at this point in the history
…612)

* Fix bug in parser to switch from flag to operand mode when reading hyphen

Signed-off-by: Felix Stutz <fstutz@mpi-sws.org>

* Adapt shortest_scripts.sh to work with parser

Signed-off-by: Felix Stutz <fstutz@mpi-sws.org>

* Cover more test cases from script_microbenchmarks

Signed-off-by: Felix Stutz <fstutz@mpi-sws.org>

* Parallelize spell-grep as done in `future`, i.e., not RR but CC for `set_diff`

Signed-off-by: Felix Stutz <fstutz@mpi-sws.org>

* Clean up and clarifying comment in parser

Signed-off-by: Felix Stutz <fstutz@mpi-sws.org>

* Simplify control flow in parallelization

Signed-off-by: Felix Stutz <fstutz@mpi-sws.org>
  • Loading branch information
festutz authored Jul 20, 2022
1 parent 3bd0cf6 commit 44017dc
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 27 deletions.
13 changes: 10 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
## TODOs before merging to `future`

- support for RR with unwrap for commutative commands
- working on all tests
- fix tests from compiler/test_evaluation_scripts.sh:
+ bigrams
- clean up utils for annotations
- graphviz
- Changing PaSh flags (making the default be priority r-split and then consecutive chunks), so remove the r_split flag and make defaults be the ones from the OSDI paper
- Fixing annotation library installation to a specific commit
- Remove code which got obsolete due to the changes
- Remove code which got obsolete due to the changes
- Room for optimization: basically disable parallelization after a tr which squeezes all new lines since there are no sequences of data to parallelize anyway for the moment.
Long-term, we could allow parallelization but with a adj_line_merge aggregator.
- Changes to scripts:
+ `shortest_scripts.sh`: here I only needed to modify the script slightly:
(1) option arguments for `cut` with whitespace as the parser cannot deal with them otherwise currently but we might want to change this in the future,
(2) `head -n 15` instead of `head -15` which might be a bit harder to support. I did not really see how the man-page supports this actually when skimming but I might have missed that.
- tr_test.sh: Outside the testing script, the outputs are the same but somehow it still shows different outputs. Checked this with Konstantinos and he will check the testing script later.
4 changes: 2 additions & 2 deletions compiler/annotations_utils/util_cmd_invocations.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ def to_arg_from_cmd_inv_with_io_vars_without_streaming_inputs_or_outputs_for_wra
whole_cmd.concatenate(Arg(string_to_argument("\'")))
return whole_cmd

def to_arg_flagoption(flagoption, _edges):
def to_arg_flagoption(flagoption, edges):
if isinstance(flagoption, Flag):
return [Arg(string_to_argument(flagoption.get_name()))]
elif isinstance(flagoption, OptionWithIO):
opt_name_arg = Arg(string_to_argument(flagoption.get_name()))
opt_arg_arg = translate_io_var_to_arg_if_applicable(flagoption.get_arg())
opt_arg_arg = translate_io_var_to_arg_if_applicable(flagoption.get_arg(), edges)
return [opt_name_arg, opt_arg_arg]

def to_arg_operand(operand, edges):
Expand Down
2 changes: 2 additions & 0 deletions compiler/annotations_utils/util_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def parse_arg_list_to_command_invocation(command, flags_options_operands) -> Com
option = Option(option_name_as_string, option_arg_as_arg)
flag_option_list.append(option)
i += 1 # since we consumed another term for the argument
elif potential_flag_or_option_name == "-": # switch to operand mode (interpreted as hyphen-stdin)
break
elif are_all_individually_flags(potential_flag_or_option_name, set_of_all_flags):
for split_el in list(potential_flag_or_option_name[1:]):
flag: Flag = Flag(f'-{split_el}')
Expand Down
6 changes: 3 additions & 3 deletions compiler/definitions/ir/nodes/pash_split.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datatypes_new.AccessKind import AccessKind
from datatypes_new.AccessKind import make_stream_input, make_stream_output
from datatypes_new.CommandInvocationWithIOVars import CommandInvocationWithIOVars

from definitions.ir.file_id import *
Expand Down Expand Up @@ -26,8 +26,8 @@ def make_split_file(input_id, out_ids):
auto_split_bin = os.path.join(config.PASH_TOP, config.config['runtime']['auto_split_binary'])
operand_list = [input_id]
operand_list.extend(out_ids)
access_map = {output_id: AccessKind.make_stream_output() for output_id in out_ids}
access_map[input_id] = AccessKind.make_stream_input()
access_map = {output_id: make_stream_output() for output_id in out_ids}
access_map[input_id] = make_stream_input()
cmd_inv_with_io_vars = CommandInvocationWithIOVars(
cmd_name=auto_split_bin,
flag_option_list=[],
Expand Down
48 changes: 30 additions & 18 deletions compiler/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def compile_command_to_DFG(fileIdGen, command, options,
redirections=[]):
command_invocation: CommandInvocationInitial = parse_arg_list_to_command_invocation(command, options)
io_info: InputOutputInfo = get_input_output_info_from_cmd_invocation_util(command_invocation)
if io_info is None:
raise Exception(f"InputOutputInformation for {format_arg_chars(command)} not provided so considered side-effectful.")
if io_info.has_other_outputs():
raise Exception(f"Command {format_arg_chars(command)} has outputs other than streaming.")
para_info: ParallelizabilityInfo = get_parallelizability_info_from_cmd_invocation_util(command_invocation)
command_invocation_with_io = io_info.apply_input_output_info_to_command_invocation(command_invocation)
parallelizer_list, round_robin_compatible_with_cat, is_commutative = para_info.unpack_info()
Expand Down Expand Up @@ -766,7 +770,6 @@ def apply_parallelization_to_node(self, node_id, parallelizer, fileIdGen, fan_ou
batch_size, no_cat_split_vanish, r_split_batch_size):
splitter = parallelizer.get_splitter()
if splitter.is_splitter_round_robin():
# TODO: for both functions, check which parameters are needed
self.apply_round_robin_parallelization_to_node(node_id, parallelizer, fileIdGen, fan_out,
batch_size, no_cat_split_vanish, r_split_batch_size)
elif splitter.is_splitter_round_robin_with_unwrap_flag():
Expand Down Expand Up @@ -797,14 +800,19 @@ def apply_round_robin_parallelization_to_node(self, node_id, parallelizer, fileI
node.get_single_streaming_input_single_output_and_configuration_inputs_of_node_for_parallelization()
original_cmd_invocation_with_io_vars = node.cmd_invocation_with_io_vars


can_be_fused_with_prev = False
prev_nodes = self.get_previous_nodes(node_id)
first_pred_node, first_pred_cmd_inv = self.get_first_previous_node_and_first_previous_cmd_invocation(prev_nodes)
if len(prev_nodes) == 1:
first_pred_node, first_pred_cmd_inv = \
self.get_only_previous_node_and_only_previous_cmd_invocation(prev_nodes)
if isinstance(first_pred_node, r_merge.RMerge):
can_be_fused_with_prev = True

# remove node to be parallelized
self.remove_node(node_id) # remove it here already as as we need to remove edge end points ow. to avoid disconnecting graph to avoid disconnecting graph

if len(prev_nodes) == 1 and isinstance(first_pred_node, r_merge.RMerge):
# can be fused
if can_be_fused_with_prev:
self.remove_node(prev_nodes[0]) # also sets respective edge to's and from's to None
in_mapper_ids = first_pred_cmd_inv.operand_list
else: # cannot be fused so introduce splitter
Expand All @@ -830,16 +838,19 @@ def apply_round_robin_with_unwrap_flag_parallelization_to_node(self, node_id, pa
node.get_single_streaming_input_single_output_and_configuration_inputs_of_node_for_parallelization()
original_cmd_invocation_with_io_vars = node.cmd_invocation_with_io_vars

can_be_fused_with_prev = False
prev_nodes = self.get_previous_nodes(node_id)
first_pred_node, first_pred_cmd_inv = self.get_first_previous_node_and_first_previous_cmd_invocation(prev_nodes)
if len(prev_nodes) == 1:
first_pred_node, first_pred_cmd_inv = \
self.get_only_previous_node_and_only_previous_cmd_invocation(prev_nodes)
if isinstance(first_pred_node, r_merge.RMerge):
can_be_fused_with_prev = True

# remove node to be parallelized
self.remove_node(node_id) # remove it here already as as we need to remove edge end points ow. to avoid disconnecting graph to avoid disconnecting graph

if len(prev_nodes) == 1 and isinstance(first_pred_node, r_merge.RMerge):
# and node.is_commutative(): implied by how this kind of splitter is inferred
if can_be_fused_with_prev: # and node.is_commutative(): implied by how this kind of splitter is inferred
self.remove_node(prev_nodes[0]) # also sets respective edge to's and from's to None

in_unwrap_ids = first_pred_cmd_inv.operand_list
out_unwrap_ids = self.introduce_unwraps(fileIdGen, in_unwrap_ids)
in_mapper_ids = out_unwrap_ids
Expand Down Expand Up @@ -870,23 +881,24 @@ def apply_consecutive_chunks_parallelization_to_node(self, node_id, parallelizer
node.get_single_streaming_input_single_output_and_configuration_inputs_of_node_for_parallelization()
original_cmd_invocation_with_io_vars = node.cmd_invocation_with_io_vars

can_be_fused_with_prev = False
prev_nodes = self.get_previous_nodes(node_id)
first_pred_node, first_pred_cmd_inv = self.get_first_previous_node_and_first_previous_cmd_invocation(prev_nodes)
if len(prev_nodes) == 1:
first_pred_node, first_pred_cmd_inv = \
self.get_only_previous_node_and_only_previous_cmd_invocation(prev_nodes)
if first_pred_cmd_inv.is_aggregator_concatenate():
can_be_fused_with_prev = True

# remove node to be parallelized
self.remove_node(node_id) # remove it here already as as we need to remove edge end points ow. to avoid disconnecting graph to avoid disconnecting graph

# TODO: change first check to first_pred_node and not cmd_inv
if len(prev_nodes) == 1 and first_pred_cmd_inv.is_aggregator_concatenate():
# can be fused
if can_be_fused_with_prev:
self.remove_node(prev_nodes[0]) # also sets respective edge to's and from's to None
in_mapper_ids = first_pred_cmd_inv.operand_list
else: # cannot be fused so introduce splitter
# splitter
consec_chunks_splitter_generator = lambda input_id, output_ids: pash_split.make_split_file(input_id,
output_ids)
out_split_ids = self.introduce_splitter(consec_chunks_splitter_generator, fan_out, fileIdGen,
streaming_input)
consec_chunks_splitter_generator = lambda input_id, output_ids: pash_split.make_split_file(input_id, output_ids)
out_split_ids = self.introduce_splitter(consec_chunks_splitter_generator, fan_out, fileIdGen, streaming_input)
in_mapper_ids = out_split_ids

# mappers
Expand All @@ -900,9 +912,9 @@ def apply_consecutive_chunks_parallelization_to_node(self, node_id, parallelizer
original_cmd_invocation_with_io_vars, out_aggregator_id, parallelizer,
streaming_output)

def get_first_previous_node_and_first_previous_cmd_invocation(self, prev_nodes):
assert (len(prev_nodes) > 0)
def get_only_previous_node_and_only_previous_cmd_invocation(self, prev_nodes):
# get info about first one but also ensure that it is the only one if we fuse
assert len(prev_nodes) == 1
first_pred_id = prev_nodes[0]
first_pred_node = self.get_node(first_pred_id)
first_pred_cmd_inv = first_pred_node.cmd_invocation_with_io_vars
Expand Down
4 changes: 3 additions & 1 deletion evaluation/tests/shortest_scripts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@
# +p.95 multiple sed
# +p.XX crawler

cat $IN | xargs file | grep "shell script" | cut -d: -f1 | xargs -L 1 wc -l | grep -v '^0$' | sort -n | head -15
# cut -d: -f1 -> cut -d : -f 1; as parser recognizes option arguments only if given with whitespace
# head -15 -> head -n 15; not documented in man page
cat $IN | xargs file | grep "shell script" | cut -d : -f 1 | xargs -L 1 wc -l | grep -v '^0$' | sort -n | head -n 15

0 comments on commit 44017dc

Please sign in to comment.