diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index fb3f899b89b83..7bf5984619c3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -34,8 +34,10 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -75,6 +77,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.state.TaskLocalStateStore; @@ -86,6 +89,7 @@ import org.apache.flink.runtime.taskexecutor.exceptions.TaskException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; import org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager; import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider; @@ -204,6 +208,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { /** The heartbeat manager for resource manager in the task manager. */ private HeartbeatManager resourceManagerHeartbeatManager; + private final PartitionTable partitionTable; + // --------- resource manager -------- @Nullable @@ -220,6 +226,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final StackTraceSampleService stackTraceSampleService; + private Map>> taskResultPartitionCleanupFuturesPerJob = new HashMap<>(8); + public TaskExecutor( RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, @@ -229,7 +237,8 @@ public TaskExecutor( TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService, - FatalErrorHandler fatalErrorHandler) { + FatalErrorHandler fatalErrorHandler, + PartitionTable partitionTable) { super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME)); @@ -240,6 +249,7 @@ public TaskExecutor( this.taskExecutorServices = checkNotNull(taskExecutorServices); this.haServices = checkNotNull(haServices); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); + this.partitionTable = partitionTable; this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.blobCacheService = checkNotNull(blobCacheService); this.metricQueryServiceAddress = metricQueryServiceAddress; @@ -587,6 +597,7 @@ public CompletableFuture submitTask( if (taskAdded) { task.startTaskThread(); + setupResultPartitionBookkeeping(tdd, task.getTerminationFuture()); return CompletableFuture.completedFuture(Acknowledge.get()); } else { final String message = "TaskManager already contains a task for id " + @@ -600,6 +611,40 @@ public CompletableFuture submitTask( } } + private void setupResultPartitionBookkeeping(TaskDeploymentDescriptor tdd, CompletableFuture terminationFuture) { + final List partitionsRequiringRelease = tdd.getProducedPartitions().stream() + // partitions that are released on consumption don't require any explicit release call + .filter(d -> !d.isReleasedOnConsumption()) + .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor) + // partitions without local resources don't store anything on the TaskExecutor + .filter(d -> d.storesLocalResourcesOn().isPresent()) + .map(ShuffleDescriptor::getResultPartitionID) + .collect(Collectors.toList()); + + partitionTable.startTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease); + + final CompletableFuture taskTerminationWithResourceCleanupFuture = + terminationFuture.thenApplyAsync( + executionState -> { + if (executionState != ExecutionState.FINISHED) { + partitionTable.stopTrackingPartitions(tdd.getJobId(), partitionsRequiringRelease); + } + return executionState; + }, + getMainThreadExecutor()); + + taskResultPartitionCleanupFuturesPerJob.compute( + tdd.getJobId(), + (jobID, completableFutures) -> { + if (completableFutures == null) { + completableFutures = new ArrayList<>(4); + } + + completableFutures.add(taskTerminationWithResourceCleanupFuture); + return completableFutures; + }); + } + @Override public CompletableFuture cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { final Task task = taskSlotTable.getTask(executionAttemptID); @@ -666,7 +711,9 @@ public CompletableFuture updatePartitions( @Override public void releasePartitions(JobID jobId, Collection partitionIds) { try { + partitionTable.stopTrackingPartitions(jobId, partitionIds); shuffleEnvironment.releasePartitionsLocally(partitionIds); + closeJobManagerConnectionIfNoAllocatedResources(jobId); } catch (Throwable t) { // TODO: Do we still need this catch branch? onFatalError(t); @@ -1278,10 +1325,15 @@ private JobManagerConnection associateWithJobManager( private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException { checkNotNull(jobManagerConnection); + final JobID jobId = jobManagerConnection.getJobID(); + + // cleanup remaining partitions once all tasks for this job have completed + scheduleResultPartitionCleanup(jobId); + final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry(); if (kvStateRegistry != null) { - kvStateRegistry.unregisterListener(jobManagerConnection.getJobID()); + kvStateRegistry.unregisterListener(jobId); } final KvStateClientProxy kvStateClientProxy = kvStateService.getKvStateClientProxy(); @@ -1295,6 +1347,17 @@ private void disassociateFromJobManager(JobManagerConnection jobManagerConnectio jobManagerConnection.getLibraryCacheManager().shutdown(); } + private void scheduleResultPartitionCleanup(JobID jobId) { + final Collection> taskTerminationFutures = taskResultPartitionCleanupFuturesPerJob.remove(jobId); + if (taskTerminationFutures != null) { + FutureUtils.waitForAll(taskTerminationFutures) + .thenRunAsync(() -> { + Collection partitionsForJob = partitionTable.stopTrackingPartitions(jobId); + shuffleEnvironment.releasePartitionsLocally(partitionsForJob); + }, getMainThreadExecutor()); + } + } + private void registerQueryableState(JobID jobId, JobMasterGateway jobMasterGateway) { final KvStateServer kvStateServer = kvStateService.getKvStateServer(); final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry(); @@ -1404,20 +1467,7 @@ private void freeSlotInternal(AllocationID allocationId, Throwable cause) { } if (jobId != null) { - // check whether we still have allocated slots for the same job - if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) { - // we can remove the job from the job leader service - try { - jobLeaderService.removeJob(jobId); - } catch (Exception e) { - log.info("Could not remove job {} from JobLeaderService.", jobId, e); - } - - closeJobManagerConnection( - jobId, - new FlinkException("TaskExecutor " + getAddress() + - " has no more allocated slots for job " + jobId + '.')); - } + closeJobManagerConnectionIfNoAllocatedResources(jobId); } } } catch (SlotNotFoundException e) { @@ -1427,6 +1477,23 @@ private void freeSlotInternal(AllocationID allocationId, Throwable cause) { localStateStoresManager.releaseLocalStateForAllocationId(allocationId); } + private void closeJobManagerConnectionIfNoAllocatedResources(JobID jobId) { + // check whether we still have allocated slots for the same job + if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty() && !partitionTable.hasTrackedPartitions(jobId)) { + // we can remove the job from the job leader service + try { + jobLeaderService.removeJob(jobId); + } catch (Exception e) { + log.info("Could not remove job {} from JobLeaderService.", jobId, e); + } + + closeJobManagerConnection( + jobId, + new FlinkException("TaskExecutor " + getAddress() + + " has no more allocated slots for job " + jobId + '.')); + } + } + private void timeoutSlot(AllocationID allocationId, UUID ticket) { checkNotNull(allocationId); checkNotNull(ticket); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 2b4e0f2ddcd2e..23cb6bc5f690e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; import org.apache.flink.runtime.taskmanager.MemoryLogger; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -384,7 +385,8 @@ public static TaskExecutor startTaskManager( taskManagerMetricGroup.f0, metricQueryServiceAddress, blobCacheService, - fatalErrorHandler); + fatalErrorHandler, + new PartitionTable()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java new file mode 100644 index 0000000000000..8548024f12653 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTable.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.util.Preconditions; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Thread-safe Utility for tracking partitions. + */ +@ThreadSafe +public class PartitionTable { + + private final Map> trackedPartitionsPerJob = new ConcurrentHashMap<>(8); + + /** + * Returns whether any partitions are being tracked for the given job. + */ + public boolean hasTrackedPartitions(JobID jobId) { + return trackedPartitionsPerJob.containsKey(jobId); + } + + /** + * Starts the tracking of the given partition for the given job. + */ + public void startTrackingPartitions(JobID jobId, Collection newPartitionIds) { + Preconditions.checkNotNull(jobId); + Preconditions.checkNotNull(newPartitionIds); + + trackedPartitionsPerJob.compute(jobId, (ignored, partitionIds) -> { + if (partitionIds == null) { + partitionIds = new HashSet<>(8); + } + partitionIds.addAll(newPartitionIds); + return partitionIds; + }); + } + + /** + * Stops the tracking of all partition for the given job. + */ + public Collection stopTrackingPartitions(JobID jobId) { + Preconditions.checkNotNull(jobId); + + Set storedPartitions = trackedPartitionsPerJob.remove(jobId); + return storedPartitions == null + ? Collections.emptyList() + : storedPartitions; + } + + /** + * Stops the tracking of the given set of partitions for the given job. + */ + public void stopTrackingPartitions(JobID jobId, Collection partitionIds) { + Preconditions.checkNotNull(jobId); + Preconditions.checkNotNull(partitionIds); + + // If the JobID is unknown we do not fail here, in line with ShuffleEnvironment#releaseFinishedPartitions + trackedPartitionsPerJob.computeIfPresent( + jobId, + (key, resultPartitionIDS) -> { + resultPartitionIDS.removeAll(partitionIds); + return resultPartitionIDS.isEmpty() + ? null + : resultPartitionIDS; + }); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java index a30e31cc4045f..9fb83a7b53e8d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java @@ -18,7 +18,12 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; import org.hamcrest.Matchers; @@ -64,4 +69,19 @@ static void verifyCreateSubpartitionViewThrowsException( assertThat(partitionId, Matchers.is(notFound.getPartitionId())); } } + + public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ResultPartitionType partitionType) { + ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal(); + PartitionDescriptor partitionDescriptor = new PartitionDescriptor( + new IntermediateDataSetID(), + shuffleDescriptor.getResultPartitionID().getPartitionId(), + partitionType, + 1, + 0); + return new ResultPartitionDeploymentDescriptor( + partitionDescriptor, + shuffleDescriptor, + 1, + true); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 17f963bc5031b..2072a12bb4f35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -19,20 +19,15 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.shuffle.PartitionDescriptor; -import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.apache.flink.runtime.taskmanager.TaskActions; -import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; import org.junit.Assert; import org.junit.Test; @@ -188,7 +183,7 @@ private void testAddOnReleasedPartition(final ResultPartitionType partitionType) TaskActions taskActions = new NoOpTaskActions(); ResultPartition partition = createPartition(partitionType); ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate( - Collections.singleton(createPartitionDeploymentDescriptor(partitionType)), + Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), new ResultPartitionWriter[] {partition}, taskActions, jobId, @@ -317,25 +312,10 @@ private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter( JobID jobId, ResultPartitionConsumableNotifier notifier) { return ConsumableNotifyingResultPartitionWriterDecorator.decorate( - Collections.singleton(createPartitionDeploymentDescriptor(partitionType)), + Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), new ResultPartitionWriter[] {createPartition(partitionType)}, taskActions, jobId, notifier)[0]; } - - private ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ResultPartitionType partitionType) { - ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal(); - PartitionDescriptor partitionDescriptor = new PartitionDescriptor( - new IntermediateDataSetID(), - shuffleDescriptor.getResultPartitionID().getPartitionId(), - partitionType, - 1, - 0); - return new ResultPartitionDeploymentDescriptor( - partitionDescriptor, - shuffleDescriptor, - 1, - true); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java new file mode 100644 index 0000000000000..9ceacadb1428e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; +import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; +import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; +import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.TriConsumer; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the partition-lifecycle logic in the {@link TaskExecutor}. + */ +public class TaskExecutorPartitionLifecycleTest extends TestLogger { + + private static final Time timeout = Time.seconds(10L); + + private static final TestingRpcService RPC = new TestingRpcService(); + + private final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + private final SettableLeaderRetrievalService jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final SettableLeaderRetrievalService resourceManagerLeaderRetriever = new SettableLeaderRetrievalService(); + private final JobID jobId = new JobID(); + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Before + public void setup() { + haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); + haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); + } + + @After + public void shutdown() { + RPC.clearGateways(); + } + + @AfterClass + public static void shutdownClass() throws ExecutionException, InterruptedException { + RPC.stopService().get(); + } + + @Test + public void testConnectionTerminationAfterExternalRelease() throws Exception { + final CompletableFuture disconnectFuture = new CompletableFuture<>(); + final JobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() + .setDisconnectTaskManagerFunction(resourceID -> { + disconnectFuture.complete(null); + return CompletableFuture.completedFuture(Acknowledge.get()); + }).build(); + + final JobManagerConnection jobManagerConnection = TaskSubmissionTestEnvironment.createJobManagerConnection( + jobId, jobMasterGateway, RPC, new NoOpTaskManagerActions(), timeout); + + final JobManagerTable jobManagerTable = new JobManagerTable(); + jobManagerTable.put(jobId, jobManagerConnection); + + final TestingShuffleEnvironment shuffleEnvironment = new TestingShuffleEnvironment(); + + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setJobManagerTable(jobManagerTable) + .setShuffleEnvironment(shuffleEnvironment) + .setTaskSlotTable(createTaskSlotTable()) + .build(); + + final PartitionTable partitionTable = new PartitionTable(); + final ResultPartitionID resultPartitionId = new ResultPartitionID(); + + final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTable); + + try { + taskExecutor.start(); + taskExecutor.waitUntilStarted(); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + // baseline, jobmanager was added in test setup + runInTaskExecutorThreadAndWait(taskExecutor, () -> assertTrue(jobManagerTable.contains(jobId))); + + runInTaskExecutorThreadAndWait(taskExecutor, () -> partitionTable.startTrackingPartitions(jobId, Collections.singletonList(resultPartitionId))); + + final CompletableFuture> firstReleasePartitionsCallFuture = new CompletableFuture<>(); + runInTaskExecutorThreadAndWait(taskExecutor, () -> shuffleEnvironment.releasePartitionsLocallyFuture = firstReleasePartitionsCallFuture); + + taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(new ResultPartitionID())); + + // at this point we only know that the TE has entered releasePartitions; we cannot be certain whether it + // has already checked whether it should disconnect or not + firstReleasePartitionsCallFuture.get(); + + // connection should be kept alive since the table still contains partitions + // once this returns we know that the TE has exited releasePartitions and associated connection checks + runInTaskExecutorThreadAndWait(taskExecutor, () -> assertTrue(jobManagerTable.contains(jobId))); + + final CompletableFuture> secondReleasePartitionsCallFuture = new CompletableFuture<>(); + runInTaskExecutorThreadAndWait(taskExecutor, () -> shuffleEnvironment.releasePartitionsLocallyFuture = secondReleasePartitionsCallFuture); + + // the TM should check whether partitions are still stored, and afterwards terminate the connection + taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(resultPartitionId)); + + disconnectFuture.get(); + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + @Test + public void testPartitionReleaseAfterDisconnect() throws Exception { + testPartitionRelease( + (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")), + true); + } + + @Test + public void testPartitionReleaseAfterReleaseCall() throws Exception { + testPartitionRelease( + (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)), + true); + } + + @Test + public void testPartitionReleaseAfterShutdown() throws Exception { + // don't do any explicit release action, so that the partition must be cleaned up on shutdown + testPartitionRelease( + (jobId, partitionId, taskExecutorGateway) -> { }, + false); + } + + private void testPartitionRelease( + TriConsumer releaseAction, + boolean waitForRelease) throws Exception { + + final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = + PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING); + final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId(); + + final TaskDeploymentDescriptor taskDeploymentDescriptor = + TaskExecutorSubmissionTest.createTaskDeploymentDescriptor( + jobId, + "job", + eid1, + new SerializedValue<>(new ExecutionConfig()), + "Sender", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + TestingInvokable.class.getName(), + Collections.singletonList(taskResultPartitionDescriptor), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + 0); + + final TaskSlotTable taskSlotTable = createTaskSlotTable(); + + final TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( + false, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + + final TestingShuffleEnvironment shuffleEnvironment = new TestingShuffleEnvironment(); + + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() + .setTaskSlotTable(taskSlotTable) + .setTaskStateManager(localStateStoresManager) + .setShuffleEnvironment(shuffleEnvironment) + .build(); + + final CompletableFuture taskFinishedFuture = new CompletableFuture<>(); + + final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() + .setRegisterTaskManagerFunction((s, location) -> CompletableFuture.completedFuture(new JMTMRegistrationSuccess(ResourceID.generate()))) + .setOfferSlotsFunction((resourceID, slotOffers) -> CompletableFuture.completedFuture(slotOffers)) + .setUpdateTaskExecutionStateFunction(taskExecutionState -> { + if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) { + taskFinishedFuture.complete(null); + } + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .build(); + + final PartitionTable partitionTable = new PartitionTable(); + + final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTable); + + final CompletableFuture initialSlotReportFuture = new CompletableFuture<>(); + + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + testingResourceManagerGateway.setSendSlotReportFunction(resourceIDInstanceIDSlotReportTuple3 -> { + initialSlotReportFuture.complete(resourceIDInstanceIDSlotReportTuple3.f2); + return CompletableFuture.completedFuture(Acknowledge.get()); + }); + testingResourceManagerGateway.setRegisterTaskExecutorFunction(input -> CompletableFuture.completedFuture( + new TaskExecutorRegistrationSuccess( + new InstanceID(), + testingResourceManagerGateway.getOwnResourceId(), + new ClusterInformation("blobServerHost", 55555)))); + + try { + taskExecutor.start(); + taskExecutor.waitUntilStarted(); + + final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class); + + final String jobMasterAddress = "jm"; + RPC.registerGateway(jobMasterAddress, jobMasterGateway); + RPC.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + + // inform the task manager about the job leader + taskManagerServices.getJobLeaderService().addJob(jobId, jobMasterAddress); + jobManagerLeaderRetriever.notifyListener(jobMasterAddress, UUID.randomUUID()); + resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + + final Optional slotStatusOptional = StreamSupport.stream(initialSlotReportFuture.get().spliterator(), false) + .findAny(); + + assertTrue(slotStatusOptional.isPresent()); + + final SlotStatus slotStatus = slotStatusOptional.get(); + + while (true) { + try { + taskExecutorGateway.requestSlot( + slotStatus.getSlotID(), + jobId, + taskDeploymentDescriptor.getAllocationId(), + jobMasterAddress, + testingResourceManagerGateway.getFencingToken(), + timeout + ).get(); + break; + } catch (Exception e) { + // the proper establishment of the RM connection is tracked + // asynchronously, so we have to poll here until it went through + // until then, slot requests will fail with an exception + Thread.sleep(50); + } + } + + TestingInvokable.sync = new BlockerSync(); + + taskExecutorGateway.submitTask(taskDeploymentDescriptor, jobMasterGateway.getFencingToken(), timeout) + .get(); + + TestingInvokable.sync.awaitBlocker(); + + // the task is still running => the partition is in in-progress + runInTaskExecutorThreadAndWait( + taskExecutor, + () -> assertTrue(partitionTable.hasTrackedPartitions(jobId))); + + TestingInvokable.sync.releaseBlocker(); + taskFinishedFuture.get(timeout.getSize(), timeout.getUnit()); + + // the task is finished => the partition should be finished now + runInTaskExecutorThreadAndWait( + taskExecutor, + () -> assertTrue(partitionTable.hasTrackedPartitions(jobId))); + + final CompletableFuture> releasePartitionsFuture = new CompletableFuture<>(); + runInTaskExecutorThreadAndWait( + taskExecutor, + () -> shuffleEnvironment.releasePartitionsLocallyFuture = releasePartitionsFuture); + + releaseAction.accept( + jobId, + taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID(), + taskExecutorGateway); + + if (waitForRelease) { + Collection resultPartitionIDS = releasePartitionsFuture.get(); + assertThat(resultPartitionIDS, contains(taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID())); + } + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + + // the shutdown of the backing shuffle environment releases all partitions + // the book-keeping is not aware of this + assertTrue(shuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty()); + } + + /** + * Test invokable which completes the given future when executed. + */ + public static class TestingInvokable extends AbstractInvokable { + + static BlockerSync sync; + + public TestingInvokable(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + sync.block(); + } + } + + private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, PartitionTable partitionTable) throws IOException { + return new TestingTaskExecutor( + RPC, + TaskManagerConfiguration.fromConfiguration(new Configuration()), + haServices, + taskManagerServices, + new HeartbeatServices(10_000L, 30_000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + null, + new BlobCacheService( + new Configuration(), + new VoidBlobStore(), + null), + new TestingFatalErrorHandler(), + partitionTable); + } + + private static TaskSlotTable createTaskSlotTable() { + return new TaskSlotTable( + Collections.singletonList(ResourceProfile.UNKNOWN), + new TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds())); + + } + + private static void runInTaskExecutorThreadAndWait(TaskExecutor taskExecutor, Runnable runnable) throws ExecutionException, InterruptedException { + taskExecutor.getRpcService().scheduleRunnable( + runnable, + 0, + TimeUnit.SECONDS + ).get(); + } + + private static class TestingShuffleEnvironment implements ShuffleEnvironment { + + private final ShuffleEnvironment backingShuffleEnvironment = + new NettyShuffleEnvironmentBuilder().build(); + + CompletableFuture> releasePartitionsLocallyFuture = null; + + @Override + public int start() throws IOException { + return backingShuffleEnvironment.start(); + } + + @Override + public ShuffleIOOwnerContext createShuffleIOOwnerContext(String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup) { + return backingShuffleEnvironment.createShuffleIOOwnerContext(ownerName, executionAttemptID, parentGroup); + } + + @Override + public Collection createResultPartitionWriters(ShuffleIOOwnerContext ownerContext, Collection resultPartitionDeploymentDescriptors) { + return backingShuffleEnvironment.createResultPartitionWriters(ownerContext, resultPartitionDeploymentDescriptors); + } + + @Override + public void releasePartitionsLocally(Collection partitionIds) { + backingShuffleEnvironment.releasePartitionsLocally(partitionIds); + if (releasePartitionsLocallyFuture != null) { + releasePartitionsLocallyFuture.complete(partitionIds); + } + } + + @Override + public Collection getPartitionsOccupyingLocalResources() { + return backingShuffleEnvironment.getPartitionsOccupyingLocalResources(); + } + + @Override + public Collection createInputGates(ShuffleIOOwnerContext ownerContext, PartitionProducerStateProvider partitionProducerStateProvider, Collection inputGateDeploymentDescriptors) { + return backingShuffleEnvironment.createInputGates(ownerContext, partitionProducerStateProvider, inputGateDeploymentDescriptors); + } + + @Override + public boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo partitionInfo) throws IOException, InterruptedException { + return backingShuffleEnvironment.updatePartitionInfo(consumerID, partitionInfo); + } + + @Override + public void close() throws Exception { + backingShuffleEnvironment.close(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java index 5511d1bd45b46..be0e295b5309d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java @@ -775,7 +775,7 @@ private TaskDeploymentDescriptor createTestTaskDeploymentDescriptor( 0); } - private static TaskDeploymentDescriptor createTaskDeploymentDescriptor( + static TaskDeploymentDescriptor createTaskDeploymentDescriptor( JobID jobId, String jobName, ExecutionAttemptID executionAttemptId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index c163c839bac49..33113f812de51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -87,6 +87,7 @@ import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -1866,7 +1867,8 @@ private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, - testingFatalErrorHandler); + testingFatalErrorHandler, + new PartitionTable()); } private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) { @@ -1883,7 +1885,8 @@ private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskMa UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, dummyBlobCacheService, - testingFatalErrorHandler); + testingFatalErrorHandler, + new PartitionTable()); } private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index a802c71c0aa43..a9b6f92d7fa6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -205,11 +206,12 @@ private TestingTaskExecutor createTaskExecutor(TaskManagerServices taskManagerSe UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, blobCacheService, - testingFatalErrorHandler + testingFatalErrorHandler, + new PartitionTable() ); } - private static JobManagerConnection createJobManagerConnection(JobID jobId, JobMasterGateway jobMasterGateway, RpcService testingRpcService, TaskManagerActions taskManagerActions, Time timeout) { + static JobManagerConnection createJobManagerConnection(JobID jobId, JobMasterGateway jobMasterGateway, RpcService testingRpcService, TaskManagerActions taskManagerActions, Time timeout) { final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class); when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java index c8e26643e7e77..74d2e084e3d31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; import javax.annotation.Nullable; @@ -44,7 +45,8 @@ public TestingTaskExecutor( TaskManagerMetricGroup taskManagerMetricGroup, @Nullable String metricQueryServiceAddress, BlobCacheService blobCacheService, - FatalErrorHandler fatalErrorHandler) { + FatalErrorHandler fatalErrorHandler, + PartitionTable partitionTable) { super( rpcService, taskManagerConfiguration, @@ -54,7 +56,8 @@ public TestingTaskExecutor( taskManagerMetricGroup, metricQueryServiceAddress, blobCacheService, - fatalErrorHandler); + fatalErrorHandler, + partitionTable); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java new file mode 100644 index 0000000000000..c00517e420996 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/partition/PartitionTableTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; + +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertFalse; + +/** + * Tests for the {@link PartitionTable}. + */ +public class PartitionTableTest extends TestLogger { + + private static final JobID JOB_ID = new JobID(); + private static final ResultPartitionID PARTITION_ID = new ResultPartitionID(); + + @Test + public void testEmptyTable() { + final PartitionTable table = new PartitionTable(); + + // an empty table should always return an empty collection + Collection partitionsForNonExistingJob = table.stopTrackingPartitions(JOB_ID); + assertNotNull(partitionsForNonExistingJob); + assertThat(partitionsForNonExistingJob, empty()); + + assertFalse(table.hasTrackedPartitions(JOB_ID)); + } + + @Test + public void testStartTrackingPartition() { + final PartitionTable table = new PartitionTable(); + + table.startTrackingPartitions(JOB_ID, Collections.singletonList(PARTITION_ID)); + + assertTrue(table.hasTrackedPartitions(JOB_ID)); + } + + @Test + public void testStopTrackingAllPartitions() { + final PartitionTable table = new PartitionTable(); + + table.startTrackingPartitions(JOB_ID, Collections.singletonList(PARTITION_ID)); + + Collection storedPartitions = table.stopTrackingPartitions(JOB_ID); + assertThat(storedPartitions, contains(PARTITION_ID)); + assertFalse(table.hasTrackedPartitions(JOB_ID)); + } + + @Test + public void testStopTrackingPartitions() { + final ResultPartitionID partitionId2 = new ResultPartitionID(); + final PartitionTable table = new PartitionTable(); + + table.startTrackingPartitions(JOB_ID, Collections.singletonList(PARTITION_ID)); + table.startTrackingPartitions(JOB_ID, Collections.singletonList(partitionId2)); + + table.stopTrackingPartitions(JOB_ID, Collections.singletonList(partitionId2)); + assertTrue(table.hasTrackedPartitions(JOB_ID)); + + Collection storedPartitions = table.stopTrackingPartitions(JOB_ID); + assertThat(storedPartitions, contains(PARTITION_ID)); + assertFalse(table.hasTrackedPartitions(JOB_ID)); + } +}