Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;

import java.io.Serializable;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -92,6 +95,16 @@ public boolean sendScheduleOrUpdateConsumersMessage() {
return sendScheduleOrUpdateConsumersMessage;
}

/**
* Returns whether to release the partition after having been fully consumed once.
*
* <p>Indicates whether the shuffle service should automatically release all partition resources after
* the first full consumption has been acknowledged. This kind of partition does not need to be explicitly released
* by {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
* and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
*
* @return whether to release the partition after having been fully consumed once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has the same information with above Returns whether to release the partition after having been fully consumed once., so might be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to keep jdoc format consistent.

*/
public boolean isReleasedOnConsumption() {
return releasedOnConsumption;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public Optional<InputGate> getInputGate(InputGateID id) {
}

@Override
public void releasePartitions(Collection<ResultPartitionID> partitionIds) {
public void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds) {
for (ResultPartitionID partitionId : partitionIds) {
resultPartitionManager.releasePartition(partitionId, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Optional;

/**
* Default implementation of {@link ShuffleDescriptor} for {@link NettyShuffleMaster}.
Expand Down Expand Up @@ -57,6 +58,11 @@ public ResultPartitionID getResultPartitionID() {
return resultPartitionID;
}

@Override
public Optional<ResourceID> storesLocalResourcesOn() {
return Optional.of(producerLocation);
}

public boolean isLocalTo(ResourceID consumerLocation) {
return producerLocation.equals(consumerLocation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(
return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
}

@Override
public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
}

private static NettyShuffleDescriptor.PartitionConnectionInfo createConnectionInfo(
ProducerDescriptor producerDescriptor,
int connectionIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

import java.io.Serializable;
import java.util.Collection;
import java.util.Optional;

/**
* Interface for shuffle deployment descriptor of result partition resource.
Expand Down Expand Up @@ -50,4 +54,17 @@ public interface ShuffleDescriptor extends Serializable {
default boolean isUnknown() {
return false;
}

/**
* Returns the location of the producing task executor if the partition occupies local resources there.
*
* <p>Indicates that this partition occupies local resources in the producing task executor. Such partition requires
* that the task executor is running and being connected to be able to consume the produced data. This is mostly
* relevant for the batch jobs and blocking result partitions which should outlive the producer lifetime and
* be released externally: {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false}.
* {@link ShuffleEnvironment#releasePartitionsLocally(Collection)} can be used to release such kind of partitions locally.
*
* @return the resource id of the producing task executor if the partition occupies local resources there
*/
Optional<ResourceID> storesLocalResourcesOn();
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,20 @@
* {@link ShuffleEnvironment#createResultPartitionWriters}. The created writers are grouped per owner.
* The owner is responsible for the writers' lifecycle from the moment of creation.
*
* <p>Partitions are released in the following cases:
* <p>Partitions are fully released in the following cases:
* <ol>
* <li>{@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called
* if the production has failed.
* </li>
* <li>{@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called
* if the production is done. The actual release can take some time depending on implementation details,
* e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly
* or the partition is later released by {@link ShuffleEnvironment#releasePartitions(Collection)}.</li>
* <li>{@link ShuffleEnvironment#releasePartitions(Collection)} is called outside of the producer thread,
* e.g. to manage the lifecycle of BLOCKING result partitions which can outlive their producers.</li>
* <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true} and
* {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called when the production is done.
* The actual release can take some time depending on implementation details,
* e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly.</li>
* <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false} and
* {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)},
* if it occupies any producer local resources ({@link ShuffleDescriptor#storesLocalResourcesOn()}),
* are called outside of the producer thread, e.g. to manage the lifecycle of BLOCKING result partitions
* which can outlive their producers.</li>
* </ol>
* The partitions, which currently still occupy local resources, can be queried with
* {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
Expand Down Expand Up @@ -115,9 +118,13 @@ Collection<P> createResultPartitionWriters(
/**
* Release local resources occupied by the given partitions.
*
* <p>This is called for partitions which occupy resources locally
* (can be checked by {@link ShuffleDescriptor#storesLocalResourcesOn()}).
* This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
*
* @param partitionIds identifying the partitions to be released
*/
void releasePartitions(Collection<ResultPartitionID> partitionIds);
void releasePartitionsLocally(Collection<ResultPartitionID> partitionIds);

/**
* Report partitions which still occupy some resources locally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -41,4 +44,17 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
CompletableFuture<T> registerPartitionWithProducer(
PartitionDescriptor partitionDescriptor,
ProducerDescriptor producerDescriptor);

/**
* Release any external resources occupied by the given partition.
*
* <p>This call triggers release of any resources which are occupied by the given partition in the external systems
* outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions.
* This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
* The producer local resources are managed by {@link ShuffleDescriptor#storesLocalResourcesOn()} and
* {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
*
* @param shuffleDescriptor shuffle descriptor of the result partition to release externally.
*/
void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

import java.util.Optional;

/**
* Unknown {@link ShuffleDescriptor} for which the producer has not been deployed yet.
*
Expand Down Expand Up @@ -48,4 +51,9 @@ public ResultPartitionID getResultPartitionID() {
public boolean isUnknown() {
return true;
}

@Override
public Optional<ResourceID> storesLocalResourcesOn() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public CompletableFuture<Acknowledge> updatePartitions(
@Override
public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) {
try {
shuffleEnvironment.releasePartitions(partitionIds);
shuffleEnvironment.releasePartitionsLocally(partitionIds);
} catch (Throwable t) {
// TODO: Do we still need this catch branch?
onFatalError(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void testReleasedOnConsumptionFlag() {
for (ResultPartitionType partitionType : ResultPartitionType.values()) {
ResultPartitionDeploymentDescriptor partitionDescriptor = new ResultPartitionDeploymentDescriptor(
new PartitionDescriptor(resultId, partitionId, partitionType, numberOfSubpartitions, connectionIndex),
ResultPartitionID::new,
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be a separate hotfix commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target change is not possible without this small change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I misunderstood before.

1,
true
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -64,7 +65,7 @@ private static void testForceConsumptionOnRelease(boolean forceConsumptionOnRele
ResultPartitionType.BLOCKING,
1,
0),
ResultPartitionID::new,
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

1,
true
);
Expand Down