Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-332] Refactor RunTimePass #191

Merged
merged 77 commits into from
Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from 71 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
d245546
init
Jan 21, 2019
2a8e8d8
use the irdag class
Jan 21, 2019
d22fc1d
chkpt
Jan 21, 2019
fea8405
optimize imports
Jan 21, 2019
1257851
largeshuffle done
Jan 21, 2019
ba034cc
chkpt
Jan 22, 2019
4319fb1
handle loops
Jan 22, 2019
7a80134
skew restructuring
Jan 22, 2019
c393d58
chkpt
Jan 22, 2019
55a5e6f
done
Jan 22, 2019
0c08d8b
chkpt
Jan 22, 2019
f894b97
refactor largeshuffle
Jan 22, 2019
1ea942d
save
Jan 22, 2019
4690d80
message done
Jan 22, 2019
35bf7e9
aggr done
Jan 22, 2019
17d8202
src code compiles
Jan 22, 2019
290be9f
return an irdag
Jan 22, 2019
182b887
tests almost
Jan 22, 2019
9cba450
all compiles
Jan 22, 2019
37b51d3
largeshuffle debugging done
Jan 22, 2019
409fc67
skew debugged
Jan 22, 2019
cd38af6
relay to stream
Jan 22, 2019
a5f22f0
unit tests pass
Jan 22, 2019
4ee3346
fix tests
Jan 22, 2019
d409b9b
merge
Jan 23, 2019
66cc885
checkstyle
Jan 23, 2019
2e03c47
minor change
Jan 23, 2019
8524111
runtime
Jan 23, 2019
17d4e04
chkpt
Jan 23, 2019
38961fe
remove
Jan 23, 2019
ee80087
remove dep
Jan 23, 2019
5cdd7eb
refactor
Jan 24, 2019
f00dd17
chkpt
Jan 24, 2019
256bfa5
update
Jan 24, 2019
19b6582
chkpt
Jan 24, 2019
296bd71
sanha's comment
Jan 24, 2019
8b9837d
nit
Jan 24, 2019
82102df
revert optimize() to apply()
Jan 24, 2019
a4cabaa
use irdag consistently
Jan 24, 2019
95b7328
checkstyle
Jan 24, 2019
544e423
clean up
Jan 24, 2019
276d899
fix boolean error
Jan 24, 2019
73108db
fix CompilerTestUtil to use IRDAG
Jan 24, 2019
3cab078
checkstyle
johnyangk Jan 24, 2019
32f3f49
fix unit test
johnyangk Jan 24, 2019
3be27df
merge
Jan 24, 2019
3fdf730
optimize to apply
Jan 24, 2019
321f8ce
chkpt
Jan 25, 2019
873cfa7
chkpt
Jan 25, 2019
e0e9a24
chkpt
Jan 25, 2019
d56a5b3
fix
Jan 28, 2019
8e44351
policy interface change
Jan 28, 2019
e0dd922
data plane
Jan 28, 2019
8168af2
chkpt
Jan 28, 2019
13a1e8a
proto
Jan 28, 2019
d97d4e1
plan rewriter
Jan 29, 2019
8b5b4c6
chkpt
Jan 29, 2019
8e1f84a
anti affinity
Jan 29, 2019
5eab3a0
revert to update from accumulate
Jan 29, 2019
14c9085
anti affinity
Jan 29, 2019
c8e54b7
compilation wip
Jan 29, 2019
7e26b7b
compiles
Jan 29, 2019
b16ddbc
unit tests
Jan 30, 2019
0b61bb9
checkstyle
Jan 30, 2019
679685c
tests pass
Jan 30, 2019
acf375f
itcase
Jan 30, 2019
11b0d36
address comment
Jan 31, 2019
5681ba4
merge
Feb 1, 2019
b365c9a
address comments
Feb 1, 2019
6ccd9c9
nit
Feb 1, 2019
5409e42
merge
Feb 1, 2019
0352023
remove min from parallelism
Feb 1, 2019
02db05a
address comments
Feb 1, 2019
9de6d91
chkpt
Feb 1, 2019
a6f678f
fix stageEdge bug
Feb 1, 2019
ba35ee2
provided to utility
Feb 1, 2019
bf806a3
nit
Feb 1, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/src/main/java/org/apache/nemo/client/JobLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.compiler.backend.nemo.NemoPlanRewriter;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.driver.NemoDriver;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageParameters;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.runtime.common.plan.PlanRewriter;
import org.apache.nemo.runtime.master.scheduler.Scheduler;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.DriverLauncher;
Expand Down Expand Up @@ -320,6 +322,7 @@ private static Configuration getSchedulerConf(final Configuration jobConf)
final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
final Class schedulerImpl = ((Class<Scheduler>) Class.forName(classImplName));
jcb.bindImplementation(Scheduler.class, schedulerImpl);
jcb.bindImplementation(PlanRewriter.class, NemoPlanRewriter.class);
return jcb.build();
}

Expand Down
27 changes: 7 additions & 20 deletions common/src/main/java/org/apache/nemo/common/HashRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,21 @@
* Descriptor for hash range.
*/
public final class HashRange implements KeyRange<Integer> {
private static final HashRange ALL = new HashRange(0, Integer.MAX_VALUE, false);
private static final HashRange ALL = new HashRange(0, Integer.MAX_VALUE);
private final int rangeBeginInclusive;
private final int rangeEndExclusive;
private boolean isSkewed;

/**
* Private constructor.
* @param rangeBeginInclusive point at which the hash range starts (inclusive).
* @param rangeEndExclusive point at which the hash range ends (exclusive).
* @param isSkewed whether or not the range is skewed
*/
private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive, final boolean isSkewed) {
private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive) {
if (rangeBeginInclusive < 0 || rangeEndExclusive < 0) {
throw new RuntimeException("Each boundary value of the range have to be non-negative.");
}
this.rangeBeginInclusive = rangeBeginInclusive;
this.rangeEndExclusive = rangeEndExclusive;
this.isSkewed = isSkewed;
}

/**
Expand All @@ -54,11 +51,10 @@ public static HashRange all() {
/**
* @param rangeStartInclusive the start of the range (inclusive)
* @param rangeEndExclusive the end of the range (exclusive)
* @param isSkewed whether or not the range is skewed
* @return A hash range descriptor representing [{@code rangeBeginInclusive}, {@code rangeEndExclusive})
*/
public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive, final boolean isSkewed) {
return new HashRange(rangeStartInclusive, rangeEndExclusive, isSkewed);
public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive) {
return new HashRange(rangeStartInclusive, rangeEndExclusive);
}

/**
Expand Down Expand Up @@ -114,8 +110,7 @@ public boolean equals(final Object o) {
}
final HashRange hashRange = (HashRange) o;
if (rangeBeginInclusive != hashRange.rangeBeginInclusive
|| rangeEndExclusive != hashRange.rangeEndExclusive
|| isSkewed != hashRange.isSkewed) {
|| rangeEndExclusive != hashRange.rangeEndExclusive) {
return false;
}
return true;
Expand All @@ -127,16 +122,8 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Arrays.hashCode(new Object[] {
rangeBeginInclusive,
rangeEndExclusive,
isSkewed,
rangeBeginInclusive,
rangeEndExclusive,
});
}

/**
* @return whether or not the range is skewed.
*/
public boolean isSkewed() {
return isSkewed;
}
}
2 changes: 1 addition & 1 deletion common/src/main/java/org/apache/nemo/common/KeyRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public interface KeyRange<K extends Serializable> extends Serializable {

/**
/**
* @return whether this instance represents the entire range or not.
*/
boolean isAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.nemo.common.exception.CompileTimeOptimizationException;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import org.apache.nemo.common.ir.edge.executionproperty.MessageIdProperty;
import org.apache.nemo.common.ir.vertex.*;
import org.apache.nemo.common.exception.IllegalVertexOperationException;
import org.apache.nemo.common.ir.vertex.system.MessageAggregatorVertex;
Expand Down Expand Up @@ -260,7 +260,7 @@ private void sinkCheck() {
private void executionPropertyCheck() {
// DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection)
vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
.filter(e -> e.getPropertyValue(MetricCollectionProperty.class).isPresent())
.filter(e -> e.getPropertyValue(MessageIdProperty.class).isPresent())
.filter(e -> !(e.getDst() instanceof OperatorVertex
&& e.getDst() instanceof MessageAggregatorVertex))
.filter(e -> DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
Expand Down
6 changes: 4 additions & 2 deletions common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.NotThreadSafe;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -54,6 +55,7 @@
* - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge
* - Reshaping: insert(), delete() on the IRDAG
*/
@NotThreadSafe
public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
private static final Logger LOG = LoggerFactory.getLogger(IRDAG.class.getName());

Expand Down Expand Up @@ -207,7 +209,7 @@ public void insert(final MessageBarrierVertex messageBarrierVertex,
final IREdge edgeToOriginalDst =
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v);
edge.copyExecutionPropertiesTo(edgeToOriginalDst);
edgeToOriginalDst.setPropertyPermanently(MetricCollectionProperty.of(currentMetricCollectionId));
edgeToOriginalDst.setPropertyPermanently(MessageIdProperty.of(currentMetricCollectionId));
builder.connectVertices(edgeToOriginalDst);
} else {
// NO MATCH, so simply connect vertices as before.
Expand Down Expand Up @@ -247,7 +249,7 @@ private IREdge edgeBetweenMessageVertices(final MessageBarrierVertex mbv,
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
newEdge.setPropertyPermanently(MetricCollectionProperty.of(currentMetricCollectionId));
newEdge.setPropertyPermanently(MessageIdProperty.of(currentMetricCollectionId));
final KeyExtractor pairKeyExtractor = (element) -> {
if (element instanceof Pair) {
return ((Pair) element).left();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;

/**
* MetricCollection ExecutionProperty that indicates the edge of which data metric will be collected.
* Edges with the same MessageId are subject to the same run-time optimization.
*/
public final class MetricCollectionProperty extends EdgeExecutionProperty<Integer> {
public final class MessageIdProperty extends EdgeExecutionProperty<Integer> {
/**
* Constructor.
* @param value value of the execution property.
*/
private MetricCollectionProperty(final Integer value) {
private MessageIdProperty(final Integer value) {
super(value);
}

Expand All @@ -37,7 +37,7 @@ private MetricCollectionProperty(final Integer value) {
* @param value value of the new execution property.
* @return the newly created execution property.
*/
public static MetricCollectionProperty of(final Integer value) {
return new MetricCollectionProperty(value);
public static MessageIdProperty of(final Integer value) {
return new MessageIdProperty(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common.ir.edge.executionproperty;

import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;

import java.util.ArrayList;

/**
* This property decides which partitions the tasks of the destination IRVertex should fetch.
* The position of a KeyRange in the list corresponds to the offset of the destination task.
*
* For example, in the following setup:
* Source IRVertex (Parallelism=2) - IREdge (Partitioner.Num=4) - Destination IRVertex (Parallelism=2)
*
* Setting PartitionSetProperty([0, 3), [3, 3)) on the IREdge with will enforce the following behaviors.
* - The first destination task fetches the first 3 partitions from each of the 2 data blocks
* - The second destination task fetches the last partitions from each of the 2 data blocks
*
* This property is useful for handling data skews.
* For example, if the size ratio of the 4 partitions in the above setup are (17%, 16%, 17%, 50%),
* then each of the destination task will evenly handle 50% of the load.
*/
public final class PartitionSetProperty extends EdgeExecutionProperty<ArrayList<KeyRange>> {
/**
* Constructor.
*
* @param value value of the execution property.
*/
private PartitionSetProperty(final ArrayList<KeyRange> value) {
super(value);
}

/**
* Static method exposing the constructor.
*
* @param value value of the new execution property.
* @return the newly created execution property.
*/
public static PartitionSetProperty of(final ArrayList<KeyRange> value) {
return new PartitionSetProperty(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,63 @@
*/
package org.apache.nemo.common.ir.edge.executionproperty;

import org.apache.nemo.common.Pair;
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;

/**
* Partitioner ExecutionProperty.
*/
public final class PartitionerProperty extends EdgeExecutionProperty<PartitionerProperty.Value> {
public final class PartitionerProperty
extends EdgeExecutionProperty<Pair<PartitionerProperty.Type, Integer>> {

// Lazily use the number of destination parallelism to minimize the number of partitions.
public static final int NUM_EQUAL_TO_DST_PARALLELISM = 0;

/**
* Constructor.
*
* @param value value of the execution property.
*/
private PartitionerProperty(final Value value) {
private PartitionerProperty(final Pair<Type, Integer> value) {
super(value);
}

/**
* Static method exposing the constructor.
*
* @param value value of the new execution property.
* @return the newly created execution property.
* @param type of the partitioner.
* @return the property.
*/
public static PartitionerProperty of(final Type type) {
return PartitionerProperty.of(type, NUM_EQUAL_TO_DST_PARALLELISM, true);
}

/**
* @param type of the partitioner.
* @param numOfPartitions to create.
* @return the property.
*/
public static PartitionerProperty of(final Type type, final int numOfPartitions) {
return PartitionerProperty.of(type, numOfPartitions, false);
}

/**
* @param type of the partitioner.
* @param numOfPartitions to create.
* @param auto if the number of partitions is auto.
* @return the property.
*/
public static PartitionerProperty of(final Value value) {
return new PartitionerProperty(value);
private static PartitionerProperty of(final Type type, final int numOfPartitions, final boolean auto) {
if (!auto && numOfPartitions <= 0) {
throw new IllegalArgumentException(String.valueOf(numOfPartitions));
}
return new PartitionerProperty(Pair.of(type, numOfPartitions));
}

/**
* Possible values of Partitioner ExecutionProperty.
* Partitioning types.
*/
public enum Value {
DataSkewHashPartitioner,
HashPartitioner,
IntactPartitioner,
DedicatedKeyPerElementPartitioner
public enum Type {
Hash,
Intact,
DedicatedKeyPerElement
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,14 @@ public final boolean equals(final Object o) {
public final int hashCode() {
return value != null ? value.hashCode() : 0;
}

@Override
public final String toString() {
final StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getSimpleName());
sb.append("(");
sb.append(value.toString());
sb.append(")");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.MinParallelismProperty;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.builder.HashCodeBuilder;
Expand Down Expand Up @@ -74,20 +74,19 @@ public static ExecutionPropertyMap<EdgeExecutionProperty> of(
map.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
switch (commPattern) {
case Shuffle:
map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
map.put(PartitionerProperty.of(PartitionerProperty.Type.Hash));
map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
break;
case BroadCast:
map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
map.put(PartitionerProperty.of(PartitionerProperty.Type.Intact));
map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
break;
case OneToOne:
map.put(PartitionerProperty.of(PartitionerProperty.Value.IntactPartitioner));
map.put(PartitionerProperty.of(PartitionerProperty.Type.Intact));
map.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
break;
default:
map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
map.put(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
throw new IllegalStateException(commPattern.toString());
}
return map;
}
Expand All @@ -99,7 +98,7 @@ public static ExecutionPropertyMap<EdgeExecutionProperty> of(
*/
public static ExecutionPropertyMap<VertexExecutionProperty> of(final IRVertex irVertex) {
final ExecutionPropertyMap<VertexExecutionProperty> map = new ExecutionPropertyMap<>(irVertex.getId());
map.put(ParallelismProperty.of(1));
map.put(MinParallelismProperty.of(1));
map.put(ResourcePriorityProperty.of(ResourcePriorityProperty.NONE));
return map;
}
Expand Down
Loading