Skip to content

Commit

Permalink
[NEMO-395] Address SonarCloud issues for the scheduler package (apach…
Browse files Browse the repository at this point in the history
…e#220)

JIRA: [NEMO-395: Address SonarCloud issues for the scheduler package](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-395)

**Major changes:**
- Fixes all "Critical" issues
- Fixes most of the "Major" issues
  • Loading branch information
John Yang authored and alapha23 committed Aug 2, 2019
1 parent 614bc18 commit 40da9ad
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.nemo.runtime.master.scheduler;

import com.google.common.annotations.VisibleForTesting;
import org.apache.nemo.common.ir.executionproperty.AssociatedProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourceAntiAffinityProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
Expand All @@ -38,9 +37,8 @@
@DriverSide
@AssociatedProperty(ResourceAntiAffinityProperty.class)
public final class AntiAffinitySchedulingConstraint implements SchedulingConstraint {
@VisibleForTesting
@Inject
public AntiAffinitySchedulingConstraint() {
AntiAffinitySchedulingConstraint() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,15 @@ private void onTaskExecutionOnHold(final String executorId,
}

private int getMessageId(final Set<StageEdge> stageEdges) {
final Set<Integer> messageIds = stageEdges.stream()
.map(edge -> edge.getExecutionProperties().get(MessageIdEdgeProperty.class).get())
.findFirst().get();
// Here we simply use findFirst() for now...
// TODO #345: Simplify insert
final Set<Integer> messageIds = stageEdges.stream()
.map(edge -> edge.getExecutionProperties()
.get(MessageIdEdgeProperty.class)
.<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
.findFirst().<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException());
// Type casting is needed. See: https://stackoverflow.com/a/40865318

return messageIds.iterator().next();
}

Expand Down Expand Up @@ -281,7 +285,7 @@ public void onTaskStateReportFromExecutor(final String executorId,

@Override
public void onSpeculativeExecutionCheck() {
MutableBoolean isNumOfCloneChanged = new MutableBoolean(false);
MutableBoolean isNewCloneCreated = new MutableBoolean(false);

selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
Expand All @@ -290,40 +294,13 @@ public void onSpeculativeExecutionCheck() {
// Only if the ClonedSchedulingProperty is set...
stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stageId).toArray();

// Only after the fraction of the tasks are done...
// Delayed cloning (aggressive)
if (completedTaskTimes.length > 0
&& completedTaskTimes.length >= Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
Arrays.sort(completedTaskTimes);
final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
final Map<String, Long> execTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stageId);
for (final Map.Entry<String, Long> entry : execTaskToTime.entrySet()) {

// Only if the running task is considered a 'straggler'....
final long runningTime = entry.getValue();
if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
final String taskId = entry.getKey();
final boolean isCloned = planStateManager.setNumOfClones(
stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
if (isCloned) {
LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+ "(median) {} (ms) * (multiplier) {}", taskId, runningTime, completedTaskTimes.length,
medianTime, medianTimeMultiplier);
}
isNumOfCloneChanged.setValue(isCloned);
}
}
}
isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
}
});
});
});

if (isNumOfCloneChanged.booleanValue()) {
if (isNewCloneCreated.booleanValue()) {
doSchedule(); // Do schedule the new clone.
}
}
Expand Down Expand Up @@ -447,6 +424,60 @@ private List<Task> selectSchedulableTasks(final Stage stageToSchedule) {
return tasks;
}

////////////////////////////////////////////////////////////////////// Task cloning methods.

/**
* @return true if a new clone is created.
* false otherwise.
*/
private boolean doSpeculativeExecution(final Stage stage, final ClonedSchedulingProperty.CloneConf cloneConf) {
final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stage.getId()).toArray();

// Only after the fraction of the tasks are done...
// Delayed cloning (aggressive)
if (completedTaskTimes.length > 0
&& completedTaskTimes.length >= Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
// Only if the running task is considered a 'straggler'....
Arrays.sort(completedTaskTimes);
final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
final Map<String, Long> executingTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stage.getId());

return modifyStageNumCloneUsingMedianTime(
stage.getId(), completedTaskTimes.length, medianTime, medianTimeMultiplier, executingTaskToTime);
} else {
return false;
}
}

/**
* @return true if the number of clones for the stage is modified.
* false otherwise.
*/
private boolean modifyStageNumCloneUsingMedianTime(final String stageId,
final long numCompletedTasks,
final long medianTime,
final double medianTimeMultiplier,
final Map<String, Long> executingTaskToTime) {
for (final Map.Entry<String, Long> entry : executingTaskToTime.entrySet()) {
final long runningTime = entry.getValue();
if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
final String taskId = entry.getKey();
final boolean isNumCloneModified = planStateManager
.setNumOfClones(stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
if (isNumCloneModified) {
LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+ "(median) {} (ms) * (multiplier) {}", taskId, runningTime, numCompletedTasks,
medianTime, medianTimeMultiplier);
return true;
}
}
}

return false;
}

////////////////////////////////////////////////////////////////////// Task state change handlers

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ private List<String> getIntermediateDataLocations(final Task task) {
.map(handler -> {
try {
return handler.getLocationFuture().get();
} catch (InterruptedException | ExecutionException e) {
} catch (final ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
private NodeShareSchedulingConstraint() {
}

private String getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
private Optional<String> getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
final List<String> nodeNames = new ArrayList<>(propertyValue.keySet());
Collections.sort(nodeNames, Comparator.naturalOrder());
int index = taskIndex;
for (final String nodeName : nodeNames) {
if (index >= propertyValue.get(nodeName)) {
index -= propertyValue.get(nodeName);
} else {
return nodeName;
return Optional.of(nodeName);
}
}
throw new IllegalStateException("Detected excessive parallelism which ResourceSiteProperty does not cover");

return Optional.empty();
}

@Override
Expand All @@ -58,11 +59,17 @@ public boolean testSchedulability(final ExecutorRepresenter executor, final Task
if (propertyValue.isEmpty()) {
return true;
}
try {
return executor.getNodeName().equals(
getNodeName(propertyValue, RuntimeIdManager.getIndexFromTaskId(task.getTaskId())));
} catch (final IllegalStateException e) {
throw new RuntimeException(String.format("Cannot schedule %s", task.getTaskId(), e));

final String executorNodeName = executor.getNodeName();
final Optional<String> taskNodeName =
getNodeName(propertyValue, RuntimeIdManager.getIndexFromTaskId(task.getTaskId()));

if (!taskNodeName.isPresent()) {
throw new IllegalStateException(
String.format("Detected excessive parallelism which ResourceSiteProperty does not cover: %s",
task.getTaskId()));
} else {
return executorNodeName.equals(taskNodeName.get());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void signal() {
void await() {
lock.lock();
try {
if (!hasDelayedSignal) {
while (!hasDelayedSignal) { // to handle spurious wakeups
condition.await();
}
hasDelayedSignal = false;
Expand Down

0 comments on commit 40da9ad

Please sign in to comment.