Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

472 ct pass #2

Merged
merged 8 commits into from
Aug 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.LoopVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
Expand Down Expand Up @@ -799,6 +800,39 @@ public void insert(final TaskSizeSplitterVertex toInsert) {
modifiedDAG = builder.build();
}

public void insert(final OperatorVertex accumulatorVertex, final IREdge targetEdge) {
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();

builder.addVertex(accumulatorVertex);
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v);

modifiedDAG.getIncomingEdgesOf(v).forEach(e -> {
if (e == targetEdge) {
// Edge to the accumulatorVertex
final IREdge toAV = new IREdge(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE,
e.getSrc(), accumulatorVertex);
e.copyExecutionPropertiesTo(toAV);
toAV.setProperty(CommunicationPatternProperty.of(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE));

// Edge from the accumulatorVertex
final IREdge fromAV = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, accumulatorVertex, e.getDst());
e.copyExecutionPropertiesTo(fromAV);

// Connect the new edges
builder.connectVertices(toAV);
builder.connectVertices(fromAV);
} else {
// Simply connect vertices as before
builder.connectVertices(e);
}
});
});

modifiedDAG = builder.build();
}

/**
* Reshape unsafely, without guarantees on preserving application semantics.
* TODO #330: Refactor Unsafe Reshaping Passes
Expand Down
31 changes: 28 additions & 3 deletions common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private IRDAGChecker() {
addLoopVertexCheckers();
addScheduleGroupCheckers();
addCacheCheckers();
addIntermediateAccumulatorVertexCheckers();
}

/**
Expand Down Expand Up @@ -284,23 +285,25 @@ void addShuffleEdgeCheckers() {
final NeighborChecker shuffleChecker = ((v, inEdges, outEdges) -> {
for (final IREdge inEdge : inEdges) {
if (CommunicationPatternProperty.Value.SHUFFLE
.equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())
|| CommunicationPatternProperty.Value.PARTIAL_SHUFFLE
.equals(inEdge.getPropertyValue(CommunicationPatternProperty.class).get())) {
// Shuffle edges must have the following properties
if (!inEdge.getPropertyValue(KeyExtractorProperty.class).isPresent()
|| !inEdge.getPropertyValue(KeyEncoderProperty.class).isPresent()
|| !inEdge.getPropertyValue(KeyDecoderProperty.class).isPresent()) {
return failure("Shuffle edge does not have a Key-related property: " + inEdge.getId());
return failure("(Partial)Shuffle edge does not have a Key-related property: " + inEdge.getId());
}
} else {
// Non-shuffle edges must not have the following properties
final Optional<Pair<PartitionerProperty.Type, Integer>> partitioner =
inEdge.getPropertyValue(PartitionerProperty.class);
if (partitioner.isPresent() && partitioner.get().left().equals(PartitionerProperty.Type.HASH)) {
return failure("Only shuffle can have the hash partitioner",
return failure("Only (partial)shuffle can have the hash partitioner",
inEdge, CommunicationPatternProperty.class, PartitionerProperty.class);
}
if (inEdge.getPropertyValue(PartitionSetProperty.class).isPresent()) {
return failure("Only shuffle can select partition sets",
return failure("Only (partial)shuffle can select partition sets",
inEdge, CommunicationPatternProperty.class, PartitionSetProperty.class);
}
}
Expand Down Expand Up @@ -486,6 +489,28 @@ void addEncodingCompressionCheckers() {
singleEdgeCheckerList.add(compressAndDecompress);
}

void addIntermediateAccumulatorVertexCheckers() {
final NeighborChecker shuffleExecutorSet = ((v, inEdges, outEdges) -> {
if (v.getPropertyValue(ShuffleExecutorSetProperty.class).isPresent()) {
if (inEdges.size() != 1 || outEdges.size() != 1 || inEdges.stream().anyMatch(e ->
!e.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE))) {
return failure("Only intermediate accumulator vertex can have shuffle executor set property", v);
} else if (v.getPropertyValue(ParallelismProperty.class).get()
< v.getPropertyValue(ShuffleExecutorSetProperty.class).get().size()) {
return failure("Parallelism must be greater or equal to the number of shuffle executor set", v);
}
} else {
if (inEdges.stream().anyMatch(e -> e.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.PARTIAL_SHUFFLE))) {
return failure("Intermediate accumulator vertex must have shuffle executor set property", v);
}
}
return success();
});
neighborCheckerList.add(shuffleExecutorSet);
}

/**
* Group outgoing edges by the additional output tag property.
* @param outEdges the outedges to group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static CommunicationPatternProperty of(final Value value) {
public enum Value {
ONE_TO_ONE,
BROADCAST,
SHUFFLE
SHUFFLE,
PARTIAL_SHUFFLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static ExecutionPropertyMap<EdgeExecutionProperty> of(
map.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
map.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
switch (commPattern) {
case PARTIAL_SHUFFLE:
case SHUFFLE:
map.put(DataFlowProperty.of(DataFlowProperty.Value.PULL));
map.put(PartitionerProperty.of(PartitionerProperty.Type.HASH));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.nemo.common.ir.vertex.executionproperty;

import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;

import java.util.ArrayList;
import java.util.HashSet;

/**
* List of set of node names to limit the scheduling of the tasks of the vertex to while shuffling.
*/
public final class ShuffleExecutorSetProperty extends VertexExecutionProperty<ArrayList<HashSet<String>>> {

/**
* Default constructor.
* @param value value of the execution property.
*/
private ShuffleExecutorSetProperty(final ArrayList<HashSet<String>> value) {
super(value);
}

/**
* Static method for constructing {@link ShuffleExecutorSetProperty}.
*
* @param setsOfExecutors the list of executors to schedule the tasks of the vertex on.
* Leave empty to make it effectless.
* @return the new execution property
*/
public static ShuffleExecutorSetProperty of(final HashSet<HashSet<String>> setsOfExecutors) {
return new ShuffleExecutorSetProperty(new ArrayList<>(setsOfExecutors));
}
}
9 changes: 9 additions & 0 deletions common/src/test/java/org/apache/nemo/common/ir/IRDAGTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ public void testSplitterVertex() {
mustPass();
}

@Test
public void testAccumulatorVertex() {
final OperatorVertex cv = new OperatorVertex(new EmptyComponents.EmptyTransform("iav"));
cv.setProperty(ShuffleExecutorSetProperty.of(new HashSet<>()));
cv.setProperty(ParallelismProperty.of(5));
irdag.insert(cv, shuffleEdge);
mustPass();
}

private MessageAggregatorVertex insertNewTriggerVertex(final IRDAG dag, final IREdge edgeToGetStatisticsOf) {
final MessageGeneratorVertex mb = new MessageGeneratorVertex<>((l, r) -> null);
final MessageAggregatorVertex ma = new MessageAggregatorVertex<>(() -> new Object(), (l, r) -> null);
Expand Down
12 changes: 12 additions & 0 deletions compiler/optimizer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,17 @@ under the License.
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- for optimizing frontend-specific components -->
<dependency>
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-compiler-frontend-beam</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-compiler-frontend-beam</artifactId>
<version>0.4-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;

import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;

/**
Expand All @@ -37,9 +39,11 @@ public PipeTransferForAllEdgesPass() {
public IRDAG apply(final IRDAG dag) {
dag.getVertices().forEach(vertex ->
dag.getIncomingEdgesOf(vertex).stream()
.forEach(edge -> edge.setPropertyPermanently(
DataStoreProperty.of(DataStoreProperty.Value.PIPE)))
);
.forEach(edge -> {
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.PIPE));
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.PUSH));
edge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.DISCARD));
}));
return dag;
}
}
Loading