Skip to content

Commit

Permalink
Merge pull request #2 from Kangji/472-ct-pass
Browse files Browse the repository at this point in the history
implemented compile time pass
  • Loading branch information
Kangji authored Aug 15, 2021
2 parents fb1ca4f + 32ebe2c commit 4f137a5
Show file tree
Hide file tree
Showing 14 changed files with 475 additions and 8 deletions.
34 changes: 34 additions & 0 deletions common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.LoopVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
Expand Down Expand Up @@ -799,6 +800,39 @@ public void insert(final TaskSizeSplitterVertex toInsert) {
modifiedDAG = builder.build();
}

public void insert(final OperatorVertex accumulatorVertex, final IREdge targetEdge) {
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();

builder.addVertex(accumulatorVertex);
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v);

modifiedDAG.getIncomingEdgesOf(v).forEach(e -> {
if (e == targetEdge) {
// Edge to the accumulatorVertex
final IREdge toAV = new IREdge(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE,
e.getSrc(), accumulatorVertex);
e.copyExecutionPropertiesTo(toAV);
toAV.setProperty(CommunicationPatternProperty.of(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE));

// Edge from the accumulatorVertex
final IREdge fromAV = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, accumulatorVertex, e.getDst());
e.copyExecutionPropertiesTo(fromAV);

// Connect the new edges
builder.connectVertices(toAV);
builder.connectVertices(fromAV);
} else {
// Simply connect vertices as before
builder.connectVertices(e);
}
});
});

modifiedDAG = builder.build();
}

/**
* Reshape unsafely, without guarantees on preserving application semantics.
* TODO #330: Refactor Unsafe Reshaping Passes
Expand Down
31 changes: 28 additions & 3 deletions common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private IRDAGChecker() {
addLoopVertexCheckers();
addScheduleGroupCheckers();
addCacheCheckers();
addIntermediateAccumulatorVertexCheckers();
}

/**
Expand Down Expand Up @@ -284,23 +285,25 @@ void addShuffleEdgeCheckers() {
final NeighborChecker shuffleChecker = ((v, inEdges, outEdges) -> {
for (final IREdge inEdge : inEdges) {
if (CommunicationPatternProperty.Value.SHUFFLE
.equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())
|| CommunicationPatternProperty.Value.PARTIAL_SHUFFLE
.equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
// Shuffle edges must have the following properties
if (!inEdge.getPropertyValue(KeyExtractorProperty.class).isPresent()
|| !inEdge.getPropertyValue(KeyEncoderProperty.class).isPresent()
|| !inEdge.getPropertyValue(KeyDecoderProperty.class).isPresent()) {
return failure("Shuffle edge does not have a Key-related property: " + inEdge.getId());
return failure("(Partial)Shuffle edge does not have a Key-related property: " + inEdge.getId());
}
} else {
// Non-shuffle edges must not have the following properties
final Optional<Pair<PartitionerProperty.Type, Integer>> partitioner =
inEdge.getPropertyValue(PartitionerProperty.class);
if (partitioner.isPresent() && partitioner.get().left().equals(PartitionerProperty.Type.HASH)) {
return failure("Only shuffle can have the hash partitioner",
return failure("Only (partial)shuffle can have the hash partitioner",
inEdge, CommunicationPatternProperty.class, PartitionerProperty.class);
}
if (inEdge.getPropertyValue(PartitionSetProperty.class).isPresent()) {
return failure("Only shuffle can select partition sets",
return failure("Only (partial)shuffle can select partition sets",
inEdge, CommunicationPatternProperty.class, PartitionSetProperty.class);
}
}
Expand Down Expand Up @@ -486,6 +489,28 @@ void addEncodingCompressionCheckers() {
singleEdgeCheckerList.add(compressAndDecompress);
}

void addIntermediateAccumulatorVertexCheckers() {
final NeighborChecker shuffleExecutorSet = ((v, inEdges, outEdges) -> {
if (v.getPropertyValue(ShuffleExecutorSetProperty.class).isPresent()) {
if (inEdges.size() != 1 || outEdges.size() != 1 || inEdges.stream().anyMatch(e ->
!e.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE))) {
return failure("Only intermediate accumulator vertex can have shuffle executor set property", v);
} else if (v.getPropertyValue(ParallelismProperty.class).get()
< v.getPropertyValue(ShuffleExecutorSetProperty.class).get().size()) {
return failure("Parallelism must be greater or equal to the number of shuffle executor set", v);
}
} else {
if (inEdges.stream().anyMatch(e -> e.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE))) {
return failure("Intermediate accumulator vertex must have shuffle executor set property", v);
}
}
return success();
});
neighborCheckerList.add(shuffleExecutorSet);
}

/**
* Group outgoing edges by the additional output tag property.
* @param outEdges the outedges to group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static CommunicationPatternProperty of(final Value value) {
public enum Value {
ONE_TO_ONE,
BROADCAST,
SHUFFLE
SHUFFLE,
PARTIAL_SHUFFLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static ExecutionPropertyMap<EdgeExecutionProperty> of(
map.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
map.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
switch (commPattern) {
case PARTIAL_SHUFFLE:
case SHUFFLE:
map.put(DataFlowProperty.of(DataFlowProperty.Value.PULL));
map.put(PartitionerProperty.of(PartitionerProperty.Type.HASH));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.nemo.common.ir.vertex.executionproperty;

import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;

import java.util.ArrayList;
import java.util.HashSet;

/**
* List of set of node names to limit the scheduling of the tasks of the vertex to while shuffling.
*/
public final class ShuffleExecutorSetProperty extends VertexExecutionProperty<ArrayList<HashSet<String>>> {

/**
* Default constructor.
* @param value value of the execution property.
*/
private ShuffleExecutorSetProperty(final ArrayList<HashSet<String>> value) {
super(value);
}

/**
* Static method for constructing {@link ShuffleExecutorSetProperty}.
*
* @param setsOfExecutors the list of executors to schedule the tasks of the vertex on.
* Leave empty to make it effectless.
* @return the new execution property
*/
public static ShuffleExecutorSetProperty of(final HashSet<HashSet<String>> setsOfExecutors) {
return new ShuffleExecutorSetProperty(new ArrayList<>(setsOfExecutors));
}
}
9 changes: 9 additions & 0 deletions common/src/test/java/org/apache/nemo/common/ir/IRDAGTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ public void testSplitterVertex() {
mustPass();
}

@Test
public void testAccumulatorVertex() {
final OperatorVertex cv = new OperatorVertex(new EmptyComponents.EmptyTransform("iav"));
cv.setProperty(ShuffleExecutorSetProperty.of(new HashSet<>()));
cv.setProperty(ParallelismProperty.of(5));
irdag.insert(cv, shuffleEdge);
mustPass();
}

private MessageAggregatorVertex insertNewTriggerVertex(final IRDAG dag, final IREdge edgeToGetStatisticsOf) {
final MessageGeneratorVertex mb = new MessageGeneratorVertex<>((l, r) -> null);
final MessageAggregatorVertex ma = new MessageAggregatorVertex<>(() -> new Object(), (l, r) -> null);
Expand Down
12 changes: 12 additions & 0 deletions compiler/optimizer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,17 @@ under the License.
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- for optimizing frontend-specific components -->
<dependency>
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-compiler-frontend-beam</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-compiler-frontend-beam</artifactId>
<version>0.4-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;

import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;

/**
Expand All @@ -37,9 +39,11 @@ public PipeTransferForAllEdgesPass() {
public IRDAG apply(final IRDAG dag) {
dag.getVertices().forEach(vertex ->
dag.getIncomingEdgesOf(vertex).stream()
.forEach(edge -> edge.setPropertyPermanently(
DataStoreProperty.of(DataStoreProperty.Value.PIPE)))
);
.forEach(edge -> {
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.PIPE));
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.PUSH));
edge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.DISCARD));
}));
return dag;
}
}
Loading

0 comments on commit 4f137a5

Please sign in to comment.