From c97de0557fb1fe0eca291770418d15fed78ee9f9 Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 15:55:03 +0900 Subject: [PATCH 1/8] [add] new type of data comm channel --- .../edge/executionproperty/CommunicationPatternProperty.java | 3 ++- .../nemo/common/ir/executionproperty/ExecutionPropertyMap.java | 1 + .../nemo/runtime/executor/datatransfer/PipeInputReader.java | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/CommunicationPatternProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/CommunicationPatternProperty.java index 23909b43bd..accb8ac054 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/CommunicationPatternProperty.java +++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/CommunicationPatternProperty.java @@ -52,6 +52,7 @@ public static CommunicationPatternProperty of(final Value value) { public enum Value { ONE_TO_ONE, BROADCAST, - SHUFFLE + SHUFFLE, + PARTIAL_SHUFFLE } } diff --git a/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java index d0ef23549d..19f52432a1 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java +++ b/common/src/main/java/org/apache/nemo/common/ir/executionproperty/ExecutionPropertyMap.java @@ -73,6 +73,7 @@ public static ExecutionPropertyMap of( map.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY)); map.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY)); switch (commPattern) { + case PARTIAL_SHUFFLE: case SHUFFLE: map.put(DataFlowProperty.of(DataFlowProperty.Value.PULL)); map.put(PartitionerProperty.of(PartitionerProperty.Type.HASH)); diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java index cab2ed2f43..194fc258c3 100644 --- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java +++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java @@ -71,7 +71,8 @@ public List> read() { if (comValue.equals(CommunicationPatternProperty.Value.ONE_TO_ONE)) { return Collections.singletonList(pipeManagerWorker.read(dstTaskIndex, runtimeEdge, dstTaskIndex)); } else if (comValue.equals(CommunicationPatternProperty.Value.BROADCAST) - || comValue.equals(CommunicationPatternProperty.Value.SHUFFLE)) { + || comValue.equals(CommunicationPatternProperty.Value.SHUFFLE) + || comValue.equals(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE)) { final int numSrcTasks = InputReader.getSourceParallelism(this); final List> futures = new ArrayList<>(); for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) { From f5b139d6c4e5a82bead52bd3fea53e7358bb4332 Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 15:58:16 +0900 Subject: [PATCH 2/8] [add] new vertex property about network hierarchy --- .../ShuffleExecutorSetProperty.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java new file mode 100644 index 0000000000..2f75ac52a6 --- /dev/null +++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/ShuffleExecutorSetProperty.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.common.ir.vertex.executionproperty; + +import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; + +import java.util.ArrayList; +import java.util.HashSet; + +/** + * List of set of node names to limit the scheduling of the tasks of the vertex to while shuffling. + */ +public final class ShuffleExecutorSetProperty extends VertexExecutionProperty>> { + + /** + * Default constructor. + * @param value value of the execution property. + */ + private ShuffleExecutorSetProperty(final ArrayList> value) { + super(value); + } + + /** + * Static method for constructing {@link ShuffleExecutorSetProperty}. + * + * @param setsOfExecutors the list of executors to schedule the tasks of the vertex on. + * Leave empty to make it effectless. + * @return the new execution property + */ + public static ShuffleExecutorSetProperty of(final HashSet> setsOfExecutors) { + return new ShuffleExecutorSetProperty(new ArrayList<>(setsOfExecutors)); + } +} From 60537badaa6c0721793265d25a140879d7c98173 Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 16:00:33 +0900 Subject: [PATCH 3/8] [add] accumulator vertex insertion logic --- .../java/org/apache/nemo/common/ir/IRDAG.java | 34 +++++++++++++++++++ .../apache/nemo/common/ir/IRDAGChecker.java | 31 +++++++++++++++-- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java index c619b563b0..d6a929c6b7 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java +++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java @@ -35,6 +35,7 @@ import org.apache.nemo.common.ir.executionproperty.ResourceSpecification; import org.apache.nemo.common.ir.vertex.IRVertex; import org.apache.nemo.common.ir.vertex.LoopVertex; +import org.apache.nemo.common.ir.vertex.OperatorVertex; import org.apache.nemo.common.ir.vertex.SourceVertex; import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty; import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; @@ -799,6 +800,39 @@ public void insert(final TaskSizeSplitterVertex toInsert) { modifiedDAG = builder.build(); } + public void insert(final OperatorVertex accumulatorVertex, final IREdge targetEdge) { + // Create a completely new DAG with the vertex inserted. + final DAGBuilder builder = new DAGBuilder<>(); + + builder.addVertex(accumulatorVertex); + modifiedDAG.topologicalDo(v -> { + builder.addVertex(v); + + modifiedDAG.getIncomingEdgesOf(v).forEach(e -> { + if (e == targetEdge) { + // Edge to the accumulatorVertex + final IREdge toAV = new IREdge(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE, + e.getSrc(), accumulatorVertex); + e.copyExecutionPropertiesTo(toAV); + toAV.setProperty(CommunicationPatternProperty.of(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE)); + + // Edge from the accumulatorVertex + final IREdge fromAV = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, accumulatorVertex, e.getDst()); + e.copyExecutionPropertiesTo(fromAV); + + // Connect the new edges + builder.connectVertices(toAV); + builder.connectVertices(fromAV); + } else { + // Simply connect vertices as before + builder.connectVertices(e); + } + }); + }); + + modifiedDAG = builder.build(); + } + /** * Reshape unsafely, without guarantees on preserving application semantics. * TODO #330: Refactor Unsafe Reshaping Passes diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java index ae8a8b3889..8eb33324e3 100644 --- a/common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java +++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java @@ -79,6 +79,7 @@ private IRDAGChecker() { addLoopVertexCheckers(); addScheduleGroupCheckers(); addCacheCheckers(); + addIntermediateAccumulatorVertexCheckers(); } /** @@ -284,23 +285,25 @@ void addShuffleEdgeCheckers() { final NeighborChecker shuffleChecker = ((v, inEdges, outEdges) -> { for (final IREdge inEdge : inEdges) { if (CommunicationPatternProperty.Value.SHUFFLE + .equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get()) + || CommunicationPatternProperty.Value.PARTIAL_SHUFFLE .equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())) { // Shuffle edges must have the following properties if (!inEdge.getPropertyValue(KeyExtractorProperty.class).isPresent() || !inEdge.getPropertyValue(KeyEncoderProperty.class).isPresent() || !inEdge.getPropertyValue(KeyDecoderProperty.class).isPresent()) { - return failure("Shuffle edge does not have a Key-related property: " + inEdge.getId()); + return failure("(Partial)Shuffle edge does not have a Key-related property: " + inEdge.getId()); } } else { // Non-shuffle edges must not have the following properties final Optional> partitioner = inEdge.getPropertyValue(PartitionerProperty.class); if (partitioner.isPresent() && partitioner.get().left().equals(PartitionerProperty.Type.HASH)) { - return failure("Only shuffle can have the hash partitioner", + return failure("Only (partial)shuffle can have the hash partitioner", inEdge, CommunicationPatternProperty.class, PartitionerProperty.class); } if (inEdge.getPropertyValue(PartitionSetProperty.class).isPresent()) { - return failure("Only shuffle can select partition sets", + return failure("Only (partial)shuffle can select partition sets", inEdge, CommunicationPatternProperty.class, PartitionSetProperty.class); } } @@ -486,6 +489,28 @@ void addEncodingCompressionCheckers() { singleEdgeCheckerList.add(compressAndDecompress); } + void addIntermediateAccumulatorVertexCheckers() { + final NeighborChecker shuffleExecutorSet = ((v, inEdges, outEdges) -> { + if (v.getPropertyValue(ShuffleExecutorSetProperty.class).isPresent()) { + if (inEdges.size() != 1 || outEdges.size() != 1 || inEdges.stream().anyMatch(e -> + !e.getPropertyValue(CommunicationPatternProperty.class).get() + .equals(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE))) { + return failure("Only intermediate accumulator vertex can have shuffle executor set property", v); + } else if (v.getPropertyValue(ParallelismProperty.class).get() + < v.getPropertyValue(ShuffleExecutorSetProperty.class).get().size()) { + return failure("Parallelism must be greater or equal to the number of shuffle executor set", v); + } + } else { + if (inEdges.stream().anyMatch(e -> e.getPropertyValue(CommunicationPatternProperty.class).get() + .equals(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE))) { + return failure("Intermediate accumulator vertex must have shuffle executor set property", v); + } + } + return success(); + }); + neighborCheckerList.add(shuffleExecutorSet); + } + /** * Group outgoing edges by the additional output tag property. * @param outEdges the outedges to group. From 2e871b21ab3883627de16b03c307b58f78620a4c Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 16:01:09 +0900 Subject: [PATCH 4/8] [add] unit test for insertion --- .../test/java/org/apache/nemo/common/ir/IRDAGTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/common/src/test/java/org/apache/nemo/common/ir/IRDAGTest.java b/common/src/test/java/org/apache/nemo/common/ir/IRDAGTest.java index 6113c85266..7a0a6741be 100644 --- a/common/src/test/java/org/apache/nemo/common/ir/IRDAGTest.java +++ b/common/src/test/java/org/apache/nemo/common/ir/IRDAGTest.java @@ -327,6 +327,15 @@ public void testSplitterVertex() { mustPass(); } + @Test + public void testAccumulatorVertex() { + final OperatorVertex cv = new OperatorVertex(new EmptyComponents.EmptyTransform("iav")); + cv.setProperty(ShuffleExecutorSetProperty.of(new HashSet<>())); + cv.setProperty(ParallelismProperty.of(5)); + irdag.insert(cv, shuffleEdge); + mustPass(); + } + private MessageAggregatorVertex insertNewTriggerVertex(final IRDAG dag, final IREdge edgeToGetStatisticsOf) { final MessageGeneratorVertex mb = new MessageGeneratorVertex<>((l, r) -> null); final MessageAggregatorVertex ma = new MessageAggregatorVertex<>(() -> new Object(), (l, r) -> null); From 0a3f0aba7403d9020cdb4fdcf2223d83288a18ac Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 16:10:34 +0900 Subject: [PATCH 5/8] [add] implement intermediate combine pass --- .../IntermediateAccumulatorInsertionPass.java | 177 ++++++++++++++++++ .../policy/IntermediateAccumulatorPolicy.java | 56 ++++++ 2 files changed, 233 insertions(+) create mode 100644 compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java create mode 100644 compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/IntermediateAccumulatorPolicy.java diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java new file mode 100644 index 0000000000..10c11528f7 --- /dev/null +++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nemo.common.Util; +import org.apache.nemo.common.exception.SchedulingException; +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.edge.IREdge; +import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty; +import org.apache.nemo.common.ir.vertex.OperatorVertex; +import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import org.apache.nemo.common.ir.vertex.executionproperty.ShuffleExecutorSetProperty; +import org.apache.nemo.compiler.frontend.beam.transform.CombineTransform; +import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires; + +import java.io.File; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Pass for inserting intermediate aggregator for partial shuffle. + */ +@Requires(ParallelismProperty.class) +public class IntermediateAccumulatorInsertionPass extends ReshapingPass { + private final String networkFilePath; + private boolean isUnitTest = false; + private static final Map> UNIT_TEST_NETWORK_FILE = getUnitTestNetworkFile(); + + /** + * Default constructor. + */ + public IntermediateAccumulatorInsertionPass() { + super(IntermediateAccumulatorInsertionPass.class); + this.networkFilePath = Util.fetchProjectRootPath() + "/bin/labeldict.json"; + } + + /** + * Constructor for unit test. + * @param isUnitTest indicates unit test. + */ + public IntermediateAccumulatorInsertionPass(final boolean isUnitTest) { + this(); + this.isUnitTest = isUnitTest; + } + + private static Map> getUnitTestNetworkFile() { + Map> map = new HashMap<>(); + map.put("0", new ArrayList<>(Arrays.asList("mulan-16.maas", "0"))); + map.put("1", new ArrayList<>(Arrays.asList("mulan-23.maas", "0"))); + map.put("2", new ArrayList<>(Arrays.asList("mulan-m", "0"))); + map.put("3", new ArrayList<>(Arrays.asList("1+2", "0.00003721"))); + map.put("4", new ArrayList<>(Arrays.asList("0+3", "2.19395143"))); + return map; + } + + /** + * Insert accumulator vertex based on network hierarchy. + * + * @param irdag irdag to apply pass. + * @return modified irdag. + */ + @Override + public IRDAG apply(final IRDAG irdag) { + try { + ObjectMapper mapper = new ObjectMapper(); + Map> map; + if (isUnitTest) { + map = UNIT_TEST_NETWORK_FILE; + } else { + map = mapper.readValue(new File(networkFilePath), Map.class); + } + + irdag.topologicalDo(v -> { + if (v instanceof OperatorVertex && ((OperatorVertex) v).getTransform() instanceof CombineTransform) { + final CombineTransform finalCombineStreamTransform = (CombineTransform) ((OperatorVertex) v).getTransform(); + if (finalCombineStreamTransform.getIntermediateCombine().isPresent()) { + irdag.getIncomingEdgesOf(v).forEach(e -> { + if (CommunicationPatternProperty.Value.SHUFFLE + .equals(e.getPropertyValue(CommunicationPatternProperty.class) + .orElse(CommunicationPatternProperty.Value.ONE_TO_ONE))) { + handleDataTransferFor(irdag, map, finalCombineStreamTransform, e, 10F); + } + }); + } + } + }); + + return irdag; + } catch (final Exception e) { + throw new SchedulingException(e); + } + } + + private static void handleDataTransferFor(final IRDAG irdag, + final Map> map, + final CombineTransform finalCombineStreamTransform, + final IREdge targetEdge, + final Float threshold) { + final int srcParallelism = targetEdge.getSrc().getPropertyValue(ParallelismProperty.class).get(); + + final int mapSize = map.size(); + final int numOfNodes = (mapSize + 1) / 2; + Float previousDistance = 0F; + + for (int i = numOfNodes; i < mapSize; i++) { + final float currentDistance = Float.parseFloat(map.get(String.valueOf(i)).get(1)); + if (previousDistance != 0 && currentDistance > threshold * previousDistance + && srcParallelism * 2 / 3 >= mapSize - i + 1) { + final Integer targetNumberOfSets = mapSize - i; + final HashSet> setsOfExecutors = getTargetNumberOfExecutorSetsFrom(map, targetNumberOfSets); + + final CombineTransform intermediateCombineStreamTransform = + (CombineTransform) finalCombineStreamTransform.getIntermediateCombine().get(); + final OperatorVertex accumulatorVertex = new OperatorVertex(intermediateCombineStreamTransform); + + targetEdge.getDst().copyExecutionPropertiesTo(accumulatorVertex); + accumulatorVertex.setProperty(ParallelismProperty.of(srcParallelism * 2 / 3)); + accumulatorVertex.setProperty(ShuffleExecutorSetProperty.of(setsOfExecutors)); + + irdag.insert(accumulatorVertex, targetEdge); + break; + } + previousDistance = currentDistance; + } + } + + private static HashSet> getTargetNumberOfExecutorSetsFrom(final Map> map, + final Integer targetNumber) { + final HashSet> result = new HashSet<>(); + final Integer index = map.size() - targetNumber; + final List indicesToCheck = IntStream.range(0, index) + .map(i -> -i).sorted().map(i -> -i) + .mapToObj(String::valueOf) + .collect(Collectors.toList()); + + Arrays.asList(map.get(String.valueOf(index)).get(0).split("\\+")) + .forEach(key -> result.add(recursivelyExtractExecutorsFrom(map, key, indicesToCheck))); + + while (!indicesToCheck.isEmpty()) { + result.add(recursivelyExtractExecutorsFrom(map, indicesToCheck.get(0), indicesToCheck)); + } + + return result; + } + + private static HashSet recursivelyExtractExecutorsFrom(final Map> map, + final String key, + final List indicesToCheck) { + indicesToCheck.remove(key); + final HashSet result = new HashSet<>(); + final List indices = Arrays.asList(map.get(key).get(0).split("\\+")); + if (indices.size() == 1) { + result.add(indices.get(0)); + } else { + indices.forEach(index -> result.addAll(recursivelyExtractExecutorsFrom(map, index, indicesToCheck))); + } + return result; + } +} diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/IntermediateAccumulatorPolicy.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/IntermediateAccumulatorPolicy.java new file mode 100644 index 0000000000..4f9c3f09f6 --- /dev/null +++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/IntermediateAccumulatorPolicy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nemo.compiler.optimizer.policy; + +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.PipeTransferForAllEdgesPass; +import org.apache.nemo.compiler.optimizer.pass.compiletime.composite.DefaultCompositePass; +import org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.IntermediateAccumulatorInsertionPass; +import org.apache.nemo.compiler.optimizer.pass.runtime.Message; + +/** + * A policy to perform intermediate data accumulation in shuffle edges (e.g. WAN networks). + */ +public final class IntermediateAccumulatorPolicy implements Policy { + public static final PolicyBuilder BUILDER = + new PolicyBuilder() + .registerCompileTimePass(new DefaultCompositePass()) + .registerCompileTimePass(new PipeTransferForAllEdgesPass()) + .registerCompileTimePass(new IntermediateAccumulatorInsertionPass()); + + private final Policy policy; + + /** + * Default constructor. + */ + public IntermediateAccumulatorPolicy() { + this.policy = BUILDER.build(); + } + + @Override + public IRDAG runCompileTimeOptimization(final IRDAG dag, final String dagDirectory) { + return this.policy.runCompileTimeOptimization(dag, dagDirectory); + } + + @Override + public IRDAG runRunTimeOptimizations(final IRDAG dag, final Message message) { + return this.policy.runRunTimeOptimizations(dag, message); + } +} From 9fe50a0c6addf9566238b1597d91ece73b01379e Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 16:24:33 +0900 Subject: [PATCH 6/8] [add] unit tests for ct pass --- .../PipeTransferForAllEdgesPass.java | 10 ++- .../optimizer/policy/PolicyBuilderTest.java | 6 ++ .../nemo/compiler/CompilerTestUtil.java | 21 ++++++ ...ermediateAccumulatorInsertionPassTest.java | 70 +++++++++++++++++++ 4 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPassTest.java diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java index cddbb53f21..85b0157d60 100644 --- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java +++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java @@ -19,6 +19,8 @@ package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating; import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty; +import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty; import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty; /** @@ -37,9 +39,11 @@ public PipeTransferForAllEdgesPass() { public IRDAG apply(final IRDAG dag) { dag.getVertices().forEach(vertex -> dag.getIncomingEdgesOf(vertex).stream() - .forEach(edge -> edge.setPropertyPermanently( - DataStoreProperty.of(DataStoreProperty.Value.PIPE))) - ); + .forEach(edge -> { + edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.PIPE)); + edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.PUSH)); + edge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.DISCARD)); + })); return dag; } } diff --git a/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java b/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java index 606b9851a5..d2bfc7f26b 100644 --- a/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java +++ b/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java @@ -45,6 +45,12 @@ public void testDataSkewPolicy() { assertEquals(1, DataSkewPolicy.BUILDER.getRunTimePasses().size()); } + @Test + public void testIntermediateAccumulatorPolicy() { + assertEquals(11, IntermediateAccumulatorPolicy.BUILDER.getCompileTimePasses().size()); + assertEquals(0, IntermediateAccumulatorPolicy.BUILDER.getRunTimePasses().size()); + } + @Test public void testShouldFailPolicy() { try { diff --git a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java index 3f5001a0b7..dd888b3853 100644 --- a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java +++ b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java @@ -143,4 +143,25 @@ public static IRDAG compileMLRDAG() throws Exception { .addUserArgs(input, numFeatures, numClasses, numIteration); return compileDAG(mlrArgBuilder.build()); } + + public static IRDAG compileEDGARDAG() throws Exception { + final String input = ROOT_DIR + "/examples/resources/inputs/test_input_windowed_wordcount"; + final String windowType = "fixed"; + final String inputType = "bounded"; + final String main = "org.apache.nemo.examples.beam.WindowedWordCount"; + final String output = ROOT_DIR + "/examples/resources/inputs/test_output"; + final String scheduler = "org.apache.nemo.runtime.master.scheduler.StreamingScheduler"; + final String resourceJson = ROOT_DIR + "/examples/resources/executors/beam_test_executor_resources.json"; + final String jobId = "testIntermediateAccumulatorInsertionPass"; + final String policy = "org.apache.nemo.compiler.optimizer.policy.StreamingPolicy"; + + final ArgBuilder edgarArgBuilder = new ArgBuilder() + .addScheduler(scheduler) + .addUserMain(main) + .addUserArgs(output, windowType, inputType, input) + .addResourceJson(resourceJson) + .addJobId(jobId) + .addOptimizationPolicy(policy); + return compileDAG(edgarArgBuilder.build()); + } } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPassTest.java new file mode 100644 index 0000000000..46ef847edb --- /dev/null +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPassTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping; + +import org.apache.nemo.client.JobLauncher; +import org.apache.nemo.common.ir.IRDAG; +import org.apache.nemo.common.ir.vertex.executionproperty.ShuffleExecutorSetProperty; +import org.apache.nemo.compiler.CompilerTestUtil; +import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.*; +import org.apache.nemo.compiler.optimizer.policy.Policy; +import org.apache.nemo.compiler.optimizer.policy.PolicyBuilder; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY; + +/** + * Test {@link IntermediateAccumulatorInsertionPass}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(JobLauncher.class) +public class IntermediateAccumulatorInsertionPassTest { + private IRDAG compiledDAG; + + @Before + public void setUp() throws Exception { + compiledDAG = CompilerTestUtil.compileEDGARDAG(); + } + + @Test + public void testIntermediateAccumulatorInsertionPass() { + final PolicyBuilder builder = new PolicyBuilder(); + builder.registerCompileTimePass(new DefaultParallelismPass(16, 2)) + .registerCompileTimePass(new DefaultEdgeEncoderPass()) + .registerCompileTimePass(new DefaultEdgeDecoderPass()) + .registerCompileTimePass(new DefaultDataStorePass()) + .registerCompileTimePass(new DefaultDataPersistencePass()) + .registerCompileTimePass(new DefaultScheduleGroupPass()) + .registerCompileTimePass(new CompressionPass()) + .registerCompileTimePass(new ResourceLocalityPass()) + .registerCompileTimePass(new ResourceSlotPass()) + .registerCompileTimePass(new PipeTransferForAllEdgesPass()) + .registerCompileTimePass(new IntermediateAccumulatorInsertionPass(true)); + final Policy policy = builder.build(); + compiledDAG = policy.runCompileTimeOptimization(compiledDAG, EMPTY_DAG_DIRECTORY); + assertTrue(compiledDAG.getTopologicalSort().stream() + .anyMatch(v -> v.getPropertyValue(ShuffleExecutorSetProperty.class).isPresent())); + assertTrue(compiledDAG.checkIntegrity().isPassed()); + } +} From c5d6f9cd1bdc3dfb06c86eb9304f735d727229e4 Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 16:32:36 +0900 Subject: [PATCH 7/8] [fix] add dependency --- compiler/optimizer/pom.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/compiler/optimizer/pom.xml b/compiler/optimizer/pom.xml index 49a546d86e..e75c3ca093 100644 --- a/compiler/optimizer/pom.xml +++ b/compiler/optimizer/pom.xml @@ -73,5 +73,17 @@ under the License. jackson-databind ${jackson.version} + + + org.apache.nemo + nemo-compiler-frontend-beam + ${project.version} + + + org.apache.nemo + nemo-compiler-frontend-beam + 0.4-SNAPSHOT + compile + From 32ebe2c58f3d914120dd886f3ea9472b93c0a445 Mon Sep 17 00:00:00 2001 From: Kangji Date: Sun, 15 Aug 2021 17:19:47 +0900 Subject: [PATCH 8/8] [fix] fixed labeldict path --- .../reshaping/IntermediateAccumulatorInsertionPass.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java index 10c11528f7..cba5f67f96 100644 --- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java +++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/IntermediateAccumulatorInsertionPass.java @@ -49,7 +49,7 @@ public class IntermediateAccumulatorInsertionPass extends ReshapingPass { */ public IntermediateAccumulatorInsertionPass() { super(IntermediateAccumulatorInsertionPass.class); - this.networkFilePath = Util.fetchProjectRootPath() + "/bin/labeldict.json"; + this.networkFilePath = Util.fetchProjectRootPath() + "/bin/network_profiling/labeldict.json"; } /**