Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Work stealing in Nemo (alternative implementation) #321

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c162cc4
modify input reader
hwarim-hyun Aug 5, 2021
09aa84a
change task id
hwarim-hyun Aug 12, 2021
7abba44
ready for test
hwarim-hyun Aug 13, 2021
6fc86e5
debug
hwarim-hyun Aug 13, 2021
f2c579c
logs for debugging
hwarim-hyun Aug 13, 2021
58f5b50
logs
hwarim-hyun Aug 13, 2021
729de29
logs
hwarim-hyun Aug 13, 2021
aa82d76
logs
hwarim-hyun Aug 13, 2021
9e7c736
backup
hwarim-hyun Aug 22, 2021
eaae64f
add work stealing policy
hwarim-hyun Aug 22, 2021
5c41fc7
backup
hwarim-hyun Aug 22, 2021
8e9fb54
add work stealing compile time pass
hwarim-hyun Aug 22, 2021
07ff1f9
checkstyle and work stealing sub split pass
hwarim-hyun Aug 22, 2021
e25355a
change block output writer
hwarim-hyun Aug 22, 2021
87f65c1
modify block input reader to support SPLIT and MERGE
hwarim-hyun Aug 22, 2021
c992465
update block input reader: still need to implement retry
hwarim-hyun Aug 22, 2021
b348f72
add retry
hwarim-hyun Aug 22, 2021
b3b42d1
debug
hwarim-hyun Aug 23, 2021
c0fe4c9
ready for remote test
hwarim-hyun Aug 23, 2021
d09d27e
working, but <duplicated channel> happens with memory leak and OOM
hwarim-hyun Aug 23, 2021
cfdf87b
this may be the solution for OOM
hwarim-hyun Aug 28, 2021
2e097da
delete unnecessary logs
hwarim-hyun Aug 28, 2021
cbd6ed5
Merge branch 'master' into follow_hurricane
hwarim-hyun Aug 28, 2021
b007798
delete logs and organize new passes to a composite pass
hwarim-hyun Aug 28, 2021
df25f9d
Merge branch 'follow_hurricane' of http://github.com/snuspl/incubator…
hwarim-hyun Aug 28, 2021
18ce735
tidy annotations
hwarim-hyun Aug 28, 2021
9667326
checkstyles
hwarim-hyun Aug 28, 2021
463a201
add tests
hwarim-hyun Aug 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
*/
public class OperatorVertex extends IRVertex {
private final Transform transform;
private final String transformFullName;

/**
* Constructor of OperatorVertex.
Expand All @@ -36,6 +37,12 @@ public class OperatorVertex extends IRVertex {
public OperatorVertex(final Transform t) {
super();
this.transform = t;
this.transformFullName = "";
}

public OperatorVertex(final Transform t, final String transformFullName) {
this.transform = t;
this.transformFullName = transformFullName;
}

/**
Expand All @@ -46,6 +53,7 @@ public OperatorVertex(final Transform t) {
private OperatorVertex(final OperatorVertex that) {
super(that);
this.transform = that.transform;
this.transformFullName = that.transformFullName;
}

@Override
Expand All @@ -60,6 +68,10 @@ public final Transform getTransform() {
return transform;
}

public final String getTransformFullName() {
return transformFullName;
}

@Override
public final ObjectNode getPropertiesAsJsonNode() {
final ObjectNode node = getIRVertexPropertiesAsJsonNode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common.ir.vertex.executionproperty;

import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;

/**
* Marks Work Stealing Strategy of the vertex.
*
* Currently, there are three types:
* SPLIT : vertex which is the subject of work stealing
* MERGE : vertex which merges the effect of work stealing
* DEFAULT : vertex which is not the subject of work stealing
*/
public class WorkStealingStateProperty extends VertexExecutionProperty<String> {
/**
* Default constructor.
*
* @param value value of the VertexExecutionProperty.
*/
public WorkStealingStateProperty(final String value) {
super(value);
}

/**
* Static method exposing the constructor.
*
* @param value value of the new execution property.
* @return the newly created execution property.
*/
public static WorkStealingStateProperty of(final String value) {
return new WorkStealingStateProperty(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common.ir.vertex.executionproperty;

import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;

/**
* Property to store the sub-split number of work stealing tasks.
*/
public class WorkStealingSubSplitProperty extends VertexExecutionProperty<Integer> {
/**
* Default constructor.
*
* @param value value of the VertexExecutionProperty.
*/
public WorkStealingSubSplitProperty(final Integer value) {
super(value);
}

/**
* Static method exposing the constructor.
*
* @param value value of the new execution property.
* @return the newly created execution property.
*/
public static WorkStealingSubSplitProperty of(final Integer value) {
return new WorkStealingSubSplitProperty(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ private static void parDoMultiOutputTranslator(final PipelineTranslationContext
private static void groupByKeyTranslator(final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final GroupByKey<?, ?> transform) {
final IRVertex vertex = new OperatorVertex(createGBKTransform(ctx, beamNode));
final String fullName = beamNode.getFullName();
final IRVertex vertex = new OperatorVertex(createGBKTransform(ctx, beamNode), fullName);
ctx.addVertex(vertex);
beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
Expand Down Expand Up @@ -324,7 +325,8 @@ private static void createPCollectionViewTranslator(final PipelineTranslationCon
private static void flattenTranslator(final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final Flatten.PCollections<?> transform) {
final IRVertex vertex = new OperatorVertex(new FlattenTransform());
final String fullName = beamNode.getFullName();
final IRVertex vertex = new OperatorVertex(new FlattenTransform(), fullName);
ctx.addVertex(vertex);
beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
Expand All @@ -350,6 +352,7 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato
final PTransform<?, ?> transform) {

final Combine.PerKey perKey = (Combine.PerKey) transform;
final String fullName = beamNode.getFullName();

// If there's any side inputs, translate each primitive transforms in this composite transform one by one.
if (!perKey.getSideInputs().isEmpty()) {
Expand Down Expand Up @@ -382,8 +385,8 @@ private static Pipeline.PipelineVisitor.CompositeBehavior combinePerKeyTranslato
// Choose between batch processing and stream processing based on window type and boundedness of data
if (isMainInputBounded(beamNode, ctx.getPipeline()) && isGlobalWindow(beamNode, ctx.getPipeline())) {
// Batch processing, using CombinePartialTransform and CombineFinalTransform
partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn), fullName);
finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn), fullName);
} else {
// Stream data processing, using GBKTransform
final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;

import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.Edge;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.WorkStealingStateProperty;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
import org.apache.nemo.runtime.common.plan.StagePartitioner;

import java.util.*;

/**
* Optimization pass for annotating {@link WorkStealingStateProperty}.
*/
@Annotates(WorkStealingStateProperty.class)
@Requires(CommunicationPatternProperty.class)
public final class WorkStealingStatePass extends AnnotatingPass {
private static final String SPLIT_STRATEGY = "SPLIT";
private static final String MERGE_STRATEGY = "MERGE";
private static final String DEFAULT_STRATEGY = "DEFAULT";

private final StagePartitioner stagePartitioner = new StagePartitioner();

public WorkStealingStatePass() {
super(WorkStealingStatePass.class);
}

@Override
public IRDAG apply(final IRDAG irdag) {
irdag.topologicalDo(irVertex -> {
final boolean notConnectedToO2OEdge = irdag.getIncomingEdgesOf(irVertex).stream()
.map(edge -> edge.getPropertyValue(CommunicationPatternProperty.class).get())
.noneMatch(property -> property.equals(CommunicationPatternProperty.Value.ONE_TO_ONE));
if (irVertex instanceof OperatorVertex && notConnectedToO2OEdge) {
Transform transform = ((OperatorVertex) irVertex).getTransform();
String transformFullName = ((OperatorVertex) irVertex).getTransformFullName();
if (transform.toString().contains("work stealing") || transformFullName.contains("work stealing")) {
irVertex.setProperty(WorkStealingStateProperty.of(SPLIT_STRATEGY));
} else if (transform.toString().contains("merge") || transformFullName.contains("merge")) {
irVertex.setProperty(WorkStealingStateProperty.of(MERGE_STRATEGY));
} else {
irVertex.setProperty(WorkStealingStateProperty.of(DEFAULT_STRATEGY));
}
} else {
irVertex.setProperty(WorkStealingStateProperty.of(DEFAULT_STRATEGY));
}
});
return tidyWorkStealingAnnotation(irdag);
}


/**
* Tidy annotated dag.
* Cleanup conditions:
* - The number of SPLIT annotations and MERGE annotations should be equal
* - SPLIT and MERGE should not be together in one stage, but needs to be in adjacent stage.
* - For now, nested work stealing optimizations are not provided. If detected, leave only the
* innermost pair.
*
* @param irdag irdag to cleanup.
* @return cleaned irdag.
*/
private IRDAG tidyWorkStealingAnnotation(final IRDAG irdag) {
String splitVertexId = null;

final List<Pair<String, String>> splitMergePairs = new ArrayList<>();
final Set<String> pairedVertices = new HashSet<>();
final Map<Integer, Set<IRVertex>> stageIdToStageVertices = new HashMap<>();

// Make SPLIT - MERGE vertex pair.
for (IRVertex vertex : irdag.getTopologicalSort()) {
if (vertex.getPropertyValue(WorkStealingStateProperty.class).get().equals(SPLIT_STRATEGY)) {
if (splitVertexId != null) {
// nested SPLIT vertex detected: delete the prior one.
irdag.getVertexById(splitVertexId).setProperty(WorkStealingStateProperty.of(DEFAULT_STRATEGY));
}
splitVertexId = vertex.getId();
} else if (vertex.getPropertyValue(WorkStealingStateProperty.class).get().equals(MERGE_STRATEGY)) {
if (splitVertexId != null) {
splitMergePairs.add(Pair.of(splitVertexId, vertex.getId()));
pairedVertices.add(splitVertexId);
pairedVertices.add(vertex.getId());
splitVertexId = null;
} else {
// no corresponding SPLIT vertex: delete
vertex.setProperty(WorkStealingStateProperty.of(DEFAULT_STRATEGY));
}
}
}

final Map<IRVertex, Integer> vertexToStageId = stagePartitioner.apply(irdag);

for (Pair<String, String> splitMergePair : splitMergePairs) {
IRVertex splitVertex = irdag.getVertexById(splitMergePair.left());
IRVertex mergeVertex = irdag.getVertexById(splitMergePair.right());

if (vertexToStageId.get(splitVertex) >= vertexToStageId.get(mergeVertex)
|| irdag.getIncomingEdgesOf(mergeVertex).stream()
.map(Edge::getSrc)
.map(vertexToStageId::get)
.noneMatch(stageId -> stageId.equals(vertexToStageId.get(splitVertex)))) {
// split vertex is descendent of merge vertex or they are in the same stage,
// or they are not in adjacent stages
splitVertex.setProperty(WorkStealingStateProperty.of(DEFAULT_STRATEGY));
mergeVertex.setProperty(WorkStealingStateProperty.of(DEFAULT_STRATEGY));
pairedVertices.remove(splitVertex.getId());
pairedVertices.remove(mergeVertex.getId());
}
}

irdag.topologicalDo(vertex -> {
if (!vertex.getPropertyValue(WorkStealingStateProperty.class)
.orElse(DEFAULT_STRATEGY).equals(DEFAULT_STRATEGY)) {
if (!pairedVertices.contains(vertex.getId())) {
vertex.setProperty(WorkStealingStateProperty.of(DEFAULT_STRATEGY));
}
}
});

// update execution property of other vertices in same stage.
vertexToStageId.forEach((vertex, stageId) -> {
if (!stageIdToStageVertices.containsKey(stageId)) {
stageIdToStageVertices.put(stageId, new HashSet<>());
}
stageIdToStageVertices.get(stageId).add(vertex);
});

for (String vertexId : pairedVertices) {
IRVertex vertex = irdag.getVertexById(vertexId);
Set<IRVertex> stageVertices = stageIdToStageVertices.get(vertexToStageId.get(vertex));
String strategy = vertex.getPropertyValue(WorkStealingStateProperty.class)
.orElse(DEFAULT_STRATEGY);
for (IRVertex stageVertex : stageVertices) {
stageVertex.setProperty(WorkStealingStateProperty.of(strategy));
}
}

return irdag;
}
}
Loading