Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public class JobManagerOptions {
text("'legacy': legacy scheduler"),
text("'ng': new generation scheduler"))
.build());
/**
* Config parameter controlling whether partitions should already be released during the job execution.
*/
@Documentation.ExcludeFromDocumentation("User normally should not be expected to deactivate this feature. " +
"We aim at removing this flag eventually.")
public static final ConfigOption<Boolean> PARTITION_RELEASE_DURING_JOB_EXECUTION =
key("jobmanager.partition.release-during-job-execution")
.defaultValue(true)
.withDescription("Controls whether partitions should already be released during the job execution.");

@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand All @@ -66,6 +70,11 @@
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.SharedStateRegistry;
Expand Down Expand Up @@ -250,6 +259,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
/** The total number of vertices currently in the execution graph. */
private int numVerticesTotal;

private final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory;

private PartitionReleaseStrategy partitionReleaseStrategy;

private SchedulingTopology schedulingTopology;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about initializing schedulingTopology and partitionReleaseStrategy in the constructor, then we could make them final, and no need to maintain class-level partitionReleaseStrategyFactory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not possible because the JobGraph is attached to the ExecutionGraph in an extra step. ExecutionGraphToSchedulingTopologyAdapter adapts the ExecutionGraph eagerly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have not looked through the details in ExecutionGraphToSchedulingTopologyAdapter before, only saw it relies ExecutionGraph, actually it relies some infos after attaching JobGraph.


// ------ Configuration of the Execution -------

/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
Expand Down Expand Up @@ -413,6 +428,7 @@ public ExecutionGraph(
userClassLoader,
blobWriter,
allocationTimeout,
new NotReleasingPartitionReleaseStrategy.Factory(),
NettyShuffleMaster.INSTANCE,
true,
new PartitionTrackerImpl(
Expand All @@ -433,6 +449,7 @@ public ExecutionGraph(
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout,
PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
ShuffleMaster<?> shuffleMaster,
boolean forcePartitionReleaseOnConsumption,
PartitionTracker partitionTracker) throws IOException {
Expand Down Expand Up @@ -464,6 +481,8 @@ public ExecutionGraph(
this.rpcTimeout = checkNotNull(rpcTimeout);
this.allocationTimeout = checkNotNull(allocationTimeout);

this.partitionReleaseStrategyFactory = checkNotNull(partitionReleaseStrategyFactory);

this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());

Expand Down Expand Up @@ -913,6 +932,11 @@ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobExcepti
}

failoverStrategy.notifyNewVertices(newExecJobVertices);

schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
schedulingTopology,
new DefaultFailoverTopology(this));
}

public void scheduleForExecution() throws JobException {
Expand Down Expand Up @@ -1605,36 +1629,9 @@ public boolean updateState(TaskExecutionState state) {

if (attempt != null) {
try {
Map<String, Accumulator<?, ?>> accumulators;

switch (state.getExecutionState()) {
case RUNNING:
return attempt.switchToRunning();

case FINISHED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFinished(accumulators, state.getIOMetrics());
return true;

case CANCELED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.completeCancelling(accumulators, state.getIOMetrics());
return true;

case FAILED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
return true;

default:
// we mark as failed and return false, which triggers the TaskManager
// to remove the task
attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
return false;
}
final boolean stateUpdated = updateStateInternal(state, attempt);
maybeReleasePartitions(attempt);
return stateUpdated;
}
catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
Expand All @@ -1649,6 +1646,77 @@ public boolean updateState(TaskExecutionState state) {
}
}

private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) {
Map<String, Accumulator<?, ?>> accumulators;

switch (state.getExecutionState()) {
case RUNNING:
return attempt.switchToRunning();

case FINISHED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFinished(accumulators, state.getIOMetrics());
return true;

case CANCELED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.completeCancelling(accumulators, state.getIOMetrics());
return true;

case FAILED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
return true;

default:
// we mark as failed and return false, which triggers the TaskManager
// to remove the task
attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
return false;
}
}

private void maybeReleasePartitions(final Execution attempt) {
final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID();

if (attempt.getState() == ExecutionState.FINISHED) {
final List<IntermediateResultPartitionID> releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
releasePartitions(releasablePartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be easy for constructing ResultPartitionID here, because we already have attempt info in parameters, then we could avoid the complex logic in following createResultPartitionId.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attempt here is the finishing vertex. However, we need the producer of the partition that is incident to the pipelined region.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed it. The current attempt is consumer, and we need the correspond producer attempt for the partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still felt the following createResultPartitionId method seems complex because of involving in many components, I tried to find a way for constructing it easily.

Another options is that IntermediateResultPartition could get ResultPartitionID directly via internal IntermediateResultPartitionID and producer ExecutionVertex. Further SchedulingResultPartition could also provide the getter method for ResultPartitionID via constructing from IntermediateResultPartition. I am not sure whether it is worth doing so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth it at the moment because it is not even clear whether it is a good thing to use both SchedulingTopology and FailoverTopology in the release strategy.

Copy link
Contributor

@zhijiangW zhijiangW Jun 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might only need SchedulingTopology which was already used in below createResultPartitionId method. It might be like this in createResultPartitionId.

SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartition);
ResultPartitionID resultPartitonId = schedulingResultPartition.getResultPartitionId();

If we add the ResultPartitionID info in the constructor of DefaultSchedulingResultPartition.

} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}

private void releasePartitions(final List<IntermediateResultPartitionID> releasablePartitions) {
if (releasablePartitions.size() > 0) {
final List<ResultPartitionID> partitionIds = releasablePartitions.stream()
.map(this::createResultPartitionId)
.collect(Collectors.toList());

partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
}
}

private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
final SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer();
final ExecutionVertexID producerId = producer.getId();
final JobVertexID jobVertexId = producerId.getJobVertexId();
final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId);
checkNotNull(jobVertex, "Unknown job vertex %s", jobVertexId);

final ExecutionVertex[] taskVertices = jobVertex.getTaskVertices();
final int subtaskIndex = producerId.getSubtaskIndex();
checkState(subtaskIndex < taskVertices.length, "Invalid subtask index %d for job vertex %s", subtaskIndex, jobVertexId);

final ExecutionVertex taskVertex = taskVertices[subtaskIndex];
final Execution execution = taskVertex.getCurrentExecutionAttempt();
return new ResultPartitionID(resultPartitionId, execution.getAttemptId());
}

/**
* Deserializes accumulators from a task state update.
*
Expand Down Expand Up @@ -1835,4 +1903,8 @@ ShuffleMaster<?> getShuffleMaster() {
public PartitionTracker getPartitionTracker() {
return partitionTracker;
}

PartitionReleaseStrategy getPartitionReleaseStrategy() {
return partitionReleaseStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
Expand Down Expand Up @@ -117,6 +119,9 @@ public static ExecutionGraph buildGraph(
final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);

final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);

final boolean forcePartitionReleaseOnConsumption =
jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);

Expand All @@ -136,6 +141,7 @@ public static ExecutionGraph buildGraph(
classLoader,
blobWriter,
allocationTimeout,
partitionReleaseStrategyFactory,
shuffleMaster,
forcePartitionReleaseOnConsumption,
partitionTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -83,6 +84,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi

private final int subTaskIndex;

private final ExecutionVertexID executionVertexId;

private final EvictingBoundedList<ArchivedExecution> priorExecutions;

private final Time timeout;
Expand Down Expand Up @@ -142,6 +145,7 @@ public ExecutionVertex(

this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);
this.taskNameWithSubtask = String.format("%s (%d/%d)",
jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());

Expand Down Expand Up @@ -228,6 +232,10 @@ public int getParallelSubtaskIndex() {
return this.subTaskIndex;
}

public ExecutionVertexID getID() {
return executionVertexId;
}

public int getNumberOfInputs() {
return this.inputEdges.length;
}
Expand Down Expand Up @@ -593,6 +601,7 @@ public Execution resetForNewExecution(final long timestamp, final long originati
if (oldState.isTerminal()) {
if (oldState == FINISHED) {
oldExecution.stopTrackingAndReleasePartitions();
getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
}

priorExecutions.add(oldExecution.archive());
Expand Down
Loading