Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-332] Refactor RunTimePass #191

Merged
merged 77 commits into from
Feb 7, 2019
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
d245546
init
Jan 21, 2019
2a8e8d8
use the irdag class
Jan 21, 2019
d22fc1d
chkpt
Jan 21, 2019
fea8405
optimize imports
Jan 21, 2019
1257851
largeshuffle done
Jan 21, 2019
ba034cc
chkpt
Jan 22, 2019
4319fb1
handle loops
Jan 22, 2019
7a80134
skew restructuring
Jan 22, 2019
c393d58
chkpt
Jan 22, 2019
55a5e6f
done
Jan 22, 2019
0c08d8b
chkpt
Jan 22, 2019
f894b97
refactor largeshuffle
Jan 22, 2019
1ea942d
save
Jan 22, 2019
4690d80
message done
Jan 22, 2019
35bf7e9
aggr done
Jan 22, 2019
17d8202
src code compiles
Jan 22, 2019
290be9f
return an irdag
Jan 22, 2019
182b887
tests almost
Jan 22, 2019
9cba450
all compiles
Jan 22, 2019
37b51d3
largeshuffle debugging done
Jan 22, 2019
409fc67
skew debugged
Jan 22, 2019
cd38af6
relay to stream
Jan 22, 2019
a5f22f0
unit tests pass
Jan 22, 2019
4ee3346
fix tests
Jan 22, 2019
d409b9b
merge
Jan 23, 2019
66cc885
checkstyle
Jan 23, 2019
2e03c47
minor change
Jan 23, 2019
8524111
runtime
Jan 23, 2019
17d4e04
chkpt
Jan 23, 2019
38961fe
remove
Jan 23, 2019
ee80087
remove dep
Jan 23, 2019
5cdd7eb
refactor
Jan 24, 2019
f00dd17
chkpt
Jan 24, 2019
256bfa5
update
Jan 24, 2019
19b6582
chkpt
Jan 24, 2019
296bd71
sanha's comment
Jan 24, 2019
8b9837d
nit
Jan 24, 2019
82102df
revert optimize() to apply()
Jan 24, 2019
a4cabaa
use irdag consistently
Jan 24, 2019
95b7328
checkstyle
Jan 24, 2019
544e423
clean up
Jan 24, 2019
276d899
fix boolean error
Jan 24, 2019
73108db
fix CompilerTestUtil to use IRDAG
Jan 24, 2019
3cab078
checkstyle
johnyangk Jan 24, 2019
32f3f49
fix unit test
johnyangk Jan 24, 2019
3be27df
merge
Jan 24, 2019
3fdf730
optimize to apply
Jan 24, 2019
321f8ce
chkpt
Jan 25, 2019
873cfa7
chkpt
Jan 25, 2019
e0e9a24
chkpt
Jan 25, 2019
d56a5b3
fix
Jan 28, 2019
8e44351
policy interface change
Jan 28, 2019
e0dd922
data plane
Jan 28, 2019
8168af2
chkpt
Jan 28, 2019
13a1e8a
proto
Jan 28, 2019
d97d4e1
plan rewriter
Jan 29, 2019
8b5b4c6
chkpt
Jan 29, 2019
8e1f84a
anti affinity
Jan 29, 2019
5eab3a0
revert to update from accumulate
Jan 29, 2019
14c9085
anti affinity
Jan 29, 2019
c8e54b7
compilation wip
Jan 29, 2019
7e26b7b
compiles
Jan 29, 2019
b16ddbc
unit tests
Jan 30, 2019
0b61bb9
checkstyle
Jan 30, 2019
679685c
tests pass
Jan 30, 2019
acf375f
itcase
Jan 30, 2019
11b0d36
address comment
Jan 31, 2019
5681ba4
merge
Feb 1, 2019
b365c9a
address comments
Feb 1, 2019
6ccd9c9
nit
Feb 1, 2019
5409e42
merge
Feb 1, 2019
0352023
remove min from parallelism
Feb 1, 2019
02db05a
address comments
Feb 1, 2019
9de6d91
chkpt
Feb 1, 2019
a6f678f
fix stageEdge bug
Feb 1, 2019
ba35ee2
provided to utility
Feb 1, 2019
bf806a3
nit
Feb 1, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions client/src/main/java/org/apache/nemo/client/JobLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.compiler.backend.nemo.NemoPlanRewriter;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.driver.NemoDriver;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageParameters;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.runtime.common.plan.PlanRewriter;
import org.apache.nemo.runtime.master.scheduler.Scheduler;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.DriverLauncher;
Expand Down Expand Up @@ -206,15 +208,15 @@ public static void shutdown() {
* @param dag the application DAG.
*/
// When modifying the signature of this method, see CompilerTestUtil#compileDAG and make corresponding changes
public static void launchDAG(final DAG dag) {
public static void launchDAG(final IRDAG dag) {
launchDAG(dag, Collections.emptyMap(), "");
}

/**
* @param dag the application DAG.
* @param jobId job ID.
*/
public static void launchDAG(final DAG dag, final String jobId) {
public static void launchDAG(final IRDAG dag, final String jobId) {
launchDAG(dag, Collections.emptyMap(), jobId);
}

Expand All @@ -223,7 +225,9 @@ public static void launchDAG(final DAG dag, final String jobId) {
* @param broadcastVariables broadcast variables (can be empty).
* @param jobId job ID.
*/
public static void launchDAG(final DAG dag, final Map<Serializable, Object> broadcastVariables, final String jobId) {
public static void launchDAG(final IRDAG dag,
final Map<Serializable, Object> broadcastVariables,
final String jobId) {
// launch driver if it hasn't been already
if (driverReadyLatch == null) {
try {
Expand Down Expand Up @@ -318,6 +322,7 @@ private static Configuration getSchedulerConf(final Configuration jobConf)
final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
final Class schedulerImpl = ((Class<Scheduler>) Class.forName(classImplName));
jcb.bindImplementation(Scheduler.class, schedulerImpl);
jcb.bindImplementation(PlanRewriter.class, NemoPlanRewriter.class);
return jcb.build();
}

Expand Down
27 changes: 7 additions & 20 deletions common/src/main/java/org/apache/nemo/common/HashRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,21 @@
* Descriptor for hash range.
*/
public final class HashRange implements KeyRange<Integer> {
private static final HashRange ALL = new HashRange(0, Integer.MAX_VALUE, false);
private static final HashRange ALL = new HashRange(0, Integer.MAX_VALUE);
private final int rangeBeginInclusive;
private final int rangeEndExclusive;
private boolean isSkewed;

/**
* Private constructor.
* @param rangeBeginInclusive point at which the hash range starts (inclusive).
* @param rangeEndExclusive point at which the hash range ends (exclusive).
* @param isSkewed whether or not the range is skewed
*/
private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive, final boolean isSkewed) {
private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive) {
if (rangeBeginInclusive < 0 || rangeEndExclusive < 0) {
throw new RuntimeException("Each boundary value of the range have to be non-negative.");
}
this.rangeBeginInclusive = rangeBeginInclusive;
this.rangeEndExclusive = rangeEndExclusive;
this.isSkewed = isSkewed;
}

/**
Expand All @@ -54,11 +51,10 @@ public static HashRange all() {
/**
* @param rangeStartInclusive the start of the range (inclusive)
* @param rangeEndExclusive the end of the range (exclusive)
* @param isSkewed whether or not the range is skewed
* @return A hash range descriptor representing [{@code rangeBeginInclusive}, {@code rangeEndExclusive})
*/
public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive, final boolean isSkewed) {
return new HashRange(rangeStartInclusive, rangeEndExclusive, isSkewed);
public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive) {
return new HashRange(rangeStartInclusive, rangeEndExclusive);
}

/**
Expand Down Expand Up @@ -114,8 +110,7 @@ public boolean equals(final Object o) {
}
final HashRange hashRange = (HashRange) o;
if (rangeBeginInclusive != hashRange.rangeBeginInclusive
|| rangeEndExclusive != hashRange.rangeEndExclusive
|| isSkewed != hashRange.isSkewed) {
|| rangeEndExclusive != hashRange.rangeEndExclusive) {
return false;
}
return true;
Expand All @@ -127,16 +122,8 @@ public boolean equals(final Object o) {
@Override
public int hashCode() {
return Arrays.hashCode(new Object[] {
rangeBeginInclusive,
rangeEndExclusive,
isSkewed,
rangeBeginInclusive,
rangeEndExclusive,
});
}

/**
* @return whether or not the range is skewed.
*/
public boolean isSkewed() {
return isSkewed;
}
}
2 changes: 1 addition & 1 deletion common/src/main/java/org/apache/nemo/common/KeyRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public interface KeyRange<K extends Serializable> extends Serializable {

/**
/**
* @return whether this instance represents the entire range or not.
*/
boolean isAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ public Decoder<Pair<A, B>> create(final InputStream inputStream) throws IOExcept
return new PairDecoder<>(inputStream, leftDecoderFactory, rightDecoderFactory);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Pair(");
sb.append(leftDecoderFactory.toString());
sb.append(", ");
sb.append(rightDecoderFactory.toString());
sb.append(")");
return sb.toString();
}

/**
* PairDecoder.
* @param <T1> type for the left coder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ public Encoder<Pair<A, B>> create(final OutputStream outputStream) throws IOExce
return new PairEncoder<>(outputStream, leftEncoderFactory, rightEncoderFactory);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Pair(");
sb.append(leftEncoderFactory.toString());
sb.append(", ");
sb.append(rightEncoderFactory.toString());
sb.append(")");
return sb.toString();
}

/**
* PairEncoder.
* @param <T1> type for the left coder.
Expand Down
Loading