Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
Expand Down Expand Up @@ -688,7 +689,7 @@ else if (current == RUNNING || current == DEPLOYING) {
else if (current == FINISHED || current == FAILED) {
// nothing to do any more. finished failed before it could be cancelled.
// in any case, the task is removed from the TaskManager already
sendFailIntermediateResultPartitionsRpcCall();
sendReleaseIntermediateResultPartitionsRpcCall();

return;
}
Expand Down Expand Up @@ -721,7 +722,7 @@ public CompletableFuture<?> suspend() {
break;
case FINISHED:
case FAILED:
sendFailIntermediateResultPartitionsRpcCall();
sendReleaseIntermediateResultPartitionsRpcCall();
break;
case CANCELED:
break;
Expand Down Expand Up @@ -1202,14 +1203,23 @@ private void sendCancelRpcCall(int numberRetries) {
}
}

private void sendFailIntermediateResultPartitionsRpcCall() {
private void sendReleaseIntermediateResultPartitionsRpcCall() {
LOG.info("Discarding the results produced by task execution {}.", attemptId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to put this log in the first line of this method.

final LogicalSlot slot = assignedResource;

if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

// TODO For some tests this could be a problem when querying too early if all resources were released
taskManagerGateway.failPartition(attemptId);
Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values();
Collection<ResultPartitionID> partitionIds = new ArrayList<>(partitions.size());
for (IntermediateResultPartition partition : partitions) {
partitionIds.add(new ResultPartitionID(partition.getPartitionId(), attemptId));
}

if (!partitionIds.isEmpty()) {
// TODO For some tests this could be a problem when querying too early if all resources were released
taskManagerGateway.releasePartitions(partitionIds);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
Expand All @@ -38,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -207,6 +209,17 @@ public void setupInputGate(SingleInputGate gate) throws IOException {
}
}

/**
* Batch release intermediate result partitions.
*
* @param partitionIds partition ids to release
*/
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
for (ResultPartitionID partitionId : partitionIds) {
resultPartitionManager.releasePartition(partitionId, null);
}
}

public void start() throws IOException {
synchronized (lock) {
Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void close() {
}

public void fail(@Nullable Throwable throwable) {
partitionManager.releasePartitionsProducedBy(partitionId.getProducerId(), throwable);
partitionManager.releasePartition(partitionId, throwable);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

import org.apache.flink.shaded.guava18.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava18.com.google.common.collect.Table;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkState;
Expand All @@ -41,19 +35,15 @@ public class ResultPartitionManager implements ResultPartitionProvider {

private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionManager.class);

public final Table<ExecutionAttemptID, IntermediateResultPartitionID, ResultPartition>
registeredPartitions = HashBasedTable.create();
private final Map<ResultPartitionID, ResultPartition> registeredPartitions = new HashMap<>(16);

private boolean isShutdown;

public void registerResultPartition(ResultPartition partition) throws IOException {
public void registerResultPartition(ResultPartition partition) {
synchronized (registeredPartitions) {
checkState(!isShutdown, "Result partition manager already shut down.");

ResultPartitionID partitionId = partition.getPartitionId();

ResultPartition previous = registeredPartitions.put(
partitionId.getProducerId(), partitionId.getPartitionId(), partition);
ResultPartition previous = registeredPartitions.put(partition.getPartitionId(), partition);

if (previous != null) {
throw new IllegalStateException("Result partition already registered.");
Expand All @@ -70,8 +60,7 @@ public ResultSubpartitionView createSubpartitionView(
BufferAvailabilityListener availabilityListener) throws IOException {

synchronized (registeredPartitions) {
final ResultPartition partition = registeredPartitions.get(partitionId.getProducerId(),
partitionId.getPartitionId());
final ResultPartition partition = registeredPartitions.get(partitionId);

if (partition == null) {
throw new PartitionNotFoundException(partitionId);
Expand All @@ -83,26 +72,14 @@ public ResultSubpartitionView createSubpartitionView(
}
}

public void releasePartitionsProducedBy(ExecutionAttemptID executionId) {
releasePartitionsProducedBy(executionId, null);
}

public void releasePartitionsProducedBy(ExecutionAttemptID executionId, Throwable cause) {
public void releasePartition(ResultPartitionID partitionId, Throwable cause) {
synchronized (registeredPartitions) {
final Map<IntermediateResultPartitionID, ResultPartition> partitions =
registeredPartitions.row(executionId);

for (ResultPartition partition : partitions.values()) {
partition.release(cause);
ResultPartition resultPartition = registeredPartitions.remove(partitionId);
if (resultPartition != null) {
resultPartition.release(cause);
LOG.debug("Released partition {} produced by {}.",
partitionId.getPartitionId(), partitionId.getProducerId());
}

for (IntermediateResultPartitionID partitionId : ImmutableList
.copyOf(partitions.keySet())) {

registeredPartitions.remove(executionId, partitionId);
}

LOG.debug("Released all partitions produced by {}.", executionId);
}
}

Expand Down Expand Up @@ -134,10 +111,7 @@ void onConsumedPartition(ResultPartition partition) {
LOG.debug("Received consume notification from {}.", partition);

synchronized (registeredPartitions) {
ResultPartitionID partitionId = partition.getPartitionId();

previous = registeredPartitions.remove(partitionId.getProducerId(),
partitionId.getPartitionId());
previous = registeredPartitions.remove(partition.getPartitionId());
}

// Release the partition if it was successfully removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.rpc.RpcTimeout;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -98,11 +100,11 @@ CompletableFuture<Acknowledge> updatePartitions(
Time timeout);

/**
* Fail all intermediate result partitions of the given task.
* Batch release intermediate result partitions.
*
* @param executionAttemptID identifying the task
* @param partitionIds partition ids to release
*/
void failPartition(ExecutionAttemptID executionAttemptID);
void releasePartitions(Collection<ResultPartitionID> partitionIds);

/**
* Notify the given task about a completed checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -86,8 +88,8 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {
taskExecutorGateway.failPartition(executionAttemptID);
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
taskExecutorGateway.releasePartitions(partitionIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
Expand Down Expand Up @@ -649,11 +650,9 @@ public CompletableFuture<Acknowledge> updatePartitions(
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {
log.info("Discarding the results produced by task execution {}.", executionAttemptID);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether to retain the previous log for tracing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about it, the problem here is that previously it happened per task. Now, it gets a list of partitions without assumption that it is per task. JM will still log it in sendReleaseIntermediateResultPartitionsRpcCall. If more verbose mode is needed, debug level will enable per partition logging in ResultPartitionManager.releasePartition in TM.


public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
try {
networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
networkEnvironment.releasePartitions(partitionIds);
} catch (Throwable t) {
// TODO: Do we still need this catch branch?
onFatalError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.types.SerializableOptional;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -99,11 +101,11 @@ CompletableFuture<Acknowledge> updatePartitions(
@RpcTimeout Time timeout);

/**
* Fail all intermediate result partitions of the given task.
* Batch release intermediate result partitions.
*
* @param executionAttemptID identifying the task
* @param partitionIds partition ids to release
*/
void failPartition(ExecutionAttemptID executionAttemptID);
void releasePartitions(Collection<ResultPartitionID> partitionIds);

/**
* Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -95,7 +97,8 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {}
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
}

@Override
public void notifyCheckpointComplete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.Preconditions;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -123,7 +125,7 @@ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID execut
}

@Override
public void failPartition(ExecutionAttemptID executionAttemptID) {
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
// noop
}

Expand Down