diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index 552eac11ba5fa..828cc265ace2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -74,23 +74,6 @@ public abstract class RetryingRegistration targetType, - String targetAddress, - F fencingToken) { - this( - log, - rpcService, - targetName, - targetType, - targetAddress, - fencingToken, - RetryingRegistrationConfiguration.defaultConfiguration()); - } - public RetryingRegistration( Logger log, RpcService rpcService, @@ -154,28 +137,28 @@ public void startRegistration() { } try { - // trigger resolution of the resource manager address to a callable gateway - final CompletableFuture resourceManagerFuture; + // trigger resolution of the target address to a callable gateway + final CompletableFuture rpcGatewayFuture; if (FencedRpcGateway.class.isAssignableFrom(targetType)) { - resourceManagerFuture = (CompletableFuture) rpcService.connect( + rpcGatewayFuture = (CompletableFuture) rpcService.connect( targetAddress, fencingToken, targetType.asSubclass(FencedRpcGateway.class)); } else { - resourceManagerFuture = rpcService.connect(targetAddress, targetType); + rpcGatewayFuture = rpcService.connect(targetAddress, targetType); } // upon success, start the registration attempts - CompletableFuture resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync( - (G result) -> { + CompletableFuture rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync( + (G rpcGateway) -> { log.info("Resolved {} address, beginning registration", targetName); - register(result, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis()); + register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis()); }, rpcService.getExecutor()); // upon failure, retry, unless this is cancelled - resourceManagerAcceptFuture.whenCompleteAsync( + rpcGatewayAcceptFuture.whenCompleteAsync( (Void v, Throwable failure) -> { if (failure != null && !canceled) { final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java index ccf0acde05f2d..dfeaeb555c954 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java @@ -25,7 +25,7 @@ import java.util.concurrent.TimeoutException; /** - * Utility base class for testing gateways + * Utility base class for testing gateways. */ public abstract class TestingGatewayBase implements RpcGateway { @@ -81,7 +81,7 @@ public CompletableFuture futureWithTimeout(long timeoutMillis) { } // ------------------------------------------------------------------------ - + private static final class FutureTimeout implements Runnable { private final CompletableFuture promise; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 85c57075cd9ad..f42f09ccf1df2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -34,36 +34,36 @@ * An RPC Service implementation for testing. This RPC service acts as a replacement for * the regular RPC service for cases where tests need to return prepared mock gateways instead of * proper RPC gateways. - * + * *

The TestingRpcService can be used for example in the following fashion, * using Mockito for mocks and verification: - * + * *

{@code
  * TestingRpcService rpc = new TestingRpcService();
  *
  * ResourceManagerGateway testGateway = mock(ResourceManagerGateway.class);
  * rpc.registerGateway("myAddress", testGateway);
- * 
+ *
  * MyComponentToTest component = new MyComponentToTest();
  * component.triggerSomethingThatCallsTheGateway();
- * 
+ *
  * verify(testGateway, timeout(1000)).theTestMethod(any(UUID.class), anyString());
  * }
*/ public class TestingRpcService extends AkkaRpcService { - /** Map of pre-registered connections */ + /** Map of pre-registered connections. */ private final ConcurrentHashMap registeredConnections; /** - * Creates a new {@code TestingRpcService}. + * Creates a new {@code TestingRpcService}. */ public TestingRpcService() { this(new Configuration()); } /** - * Creates a new {@code TestingRpcService}, using the given configuration. + * Creates a new {@code TestingRpcService}, using the given configuration. */ public TestingRpcService(Configuration configuration) { super(AkkaUtils.createLocalActorSystem(configuration), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e9ce0d6da5fc0..5340fa288770b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -50,7 +50,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -99,6 +98,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; import org.junit.After; import org.junit.Before; @@ -316,10 +316,12 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { final UUID jmLeaderId = UUID.randomUUID(); final ResourceID jmResourceId = ResourceID.generate(); + final CountDownLatch registrationAttempts = new CountDownLatch(2); final CompletableFuture taskManagerLocationFuture = new CompletableFuture<>(); final CompletableFuture disconnectTaskManagerFuture = new CompletableFuture<>(); final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() .setRegisterTaskManagerFunction((s, taskManagerLocation) -> { + registrationAttempts.countDown(); taskManagerLocationFuture.complete(taskManagerLocation); return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId)); }) @@ -331,10 +333,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { ) .build(); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -374,6 +373,9 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { final ResourceID resourceID = disconnectTaskManagerFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS); assertThat(resourceID, equalTo(taskManagerLocation.getResourceID())); + assertTrue( + "The TaskExecutor should try to reconnect to the JM", + registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS)); } finally { RpcUtils.terminateRpcEndpoint(taskManager, timeout); } @@ -421,10 +423,7 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception { HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -455,9 +454,9 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception { // heartbeat timeout should trigger disconnect TaskManager from ResourceManager assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID())); - // the TaskExecutor should try to reconnect to the RM - registrationAttempts.await(); - + assertTrue( + "The TaskExecutor should try to reconnect to the RM", + registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS)); } finally { RpcUtils.terminateRpcEndpoint(taskManager, timeout); } @@ -512,10 +511,7 @@ public void testHeartbeatSlotReporting() throws Exception { final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2))); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -562,60 +558,35 @@ public void testHeartbeatSlotReporting() throws Exception { @Test public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception { final String resourceManagerAddress = "/resource/manager/address/one"; - final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress); - final String dispatcherAddress = "localhost"; - final String jobManagerAddress = "localhost"; - final String webMonitorAddress = "localhost"; - - // register a mock resource manager gateway - ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); - when(rmGateway.registerTaskExecutor( - anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess( - new InstanceID(), resourceManagerResourceId, new ClusterInformation("localhost", 1234)))); - rpc.registerGateway(resourceManagerAddress, rmGateway); - - StandaloneHaServices haServices = new StandaloneHaServices( - resourceManagerAddress, - dispatcherAddress, - jobManagerAddress, - webMonitorAddress); - - final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class); - final SlotReport slotReport = new SlotReport(); - when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1); + testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction( + ignored -> { + taskManagerRegisteredLatch.countDown(); + return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess( + new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234))); + } + )); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setTaskSlotTable(taskSlotTable) .setTaskStateManager(localStateStoresManager) .build(); - TaskExecutor taskManager = new TaskExecutor( - rpc, - taskManagerConfiguration, - haServices, - taskManagerServices, - HEARTBEAT_SERVICES, - UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - null, - dummyBlobCacheService, - testingFatalErrorHandler); + final TaskExecutor taskManager = createTaskExecutor(taskManagerServices); try { taskManager.start(); - String taskManagerAddress = taskManager.getAddress(); + resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID()); - verify(rmGateway, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor( - eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class)); - } - finally { + assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS)); + } finally { RpcUtils.terminateRpcEndpoint(taskManager, timeout); } } @@ -649,10 +620,7 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { final SlotReport slotReport = new SlotReport(); when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -771,10 +739,7 @@ public void testTaskSubmission() throws Exception { when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class)); when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setNetworkEnvironment(networkEnvironment) @@ -858,10 +823,7 @@ public void testJobLeaderDetection() throws Exception { final AllocationID allocationId = new AllocationID(); final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -971,10 +933,7 @@ public void testSlotAcceptance() throws Exception { rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); rpc.registerGateway(jobManagerAddress, jobMasterGateway); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -1064,10 +1023,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception { final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -1172,10 +1128,7 @@ public void testFilterOutDuplicateJobMasterRegistrations() throws Exception { final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate()); final JobManagerTable jobManagerTableMock = spy(new JobManagerTable()); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -1245,10 +1198,7 @@ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception { rpc.registerGateway(rmAddress, rmGateway); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -1302,10 +1252,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception { Collections.singleton(ResourceProfile.UNKNOWN), timerService); - TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( - false, - new File[]{tmp.newFolder()}, - Executors.directExecutor()); + final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) @@ -1790,6 +1737,13 @@ public void testDisconnectFromJobMasterWhenNewLeader() throws Exception { } } + private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() throws IOException { + return new TaskExecutorLocalStateStoresManager( + false, + new File[]{tmp.newFolder()}, + Executors.directExecutor()); + } + @Nonnull private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) { return new TaskExecutor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java new file mode 100644 index 0000000000000..698f2b3501635 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.registration.RegisteredRpcConnection; +import org.apache.flink.runtime.registration.RegistrationConnectionListener; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link TaskExecutorToResourceManagerConnection}. + */ +public class TaskExecutorToResourceManagerConnectionTest extends TestLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorToResourceManagerConnectionTest.class); + + private static final int TEST_TIMEOUT_MILLIS = 10000; + + private static final String RESOURCE_MANAGER_ADDRESS = "localhost"; + + private static final ResourceManagerId RESOURCE_MANAGER_ID = ResourceManagerId.generate(); + + private static final String TASK_MANAGER_ADDRESS = "localhost"; + + private static final ResourceID TASK_MANAGER_RESOURCE_ID = ResourceID.generate(); + + private static final int TASK_MANAGER_DATA_PORT = 12345; + + private static final HardwareDescription TASK_MANAGER_HARDWARE_DESCRIPTION = HardwareDescription.extractFromSystem(Long.MAX_VALUE); + + private TestingRpcService rpcService; + + private TestingResourceManagerGateway testingResourceManagerGateway; + + private CompletableFuture registrationSuccessFuture; + + @Test + public void testResourceManagerRegistration() throws Exception { + final TaskExecutorToResourceManagerConnection resourceManagerRegistration = createTaskExecutorToResourceManagerConnection(); + + testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple -> { + final String actualAddress = tuple.f0; + final ResourceID actualResourceId = tuple.f1; + final Integer actualDataPort = tuple.f2; + final HardwareDescription actualHardwareDescription = tuple.f3; + + assertThat(actualAddress, is(equalTo(TASK_MANAGER_ADDRESS))); + assertThat(actualResourceId, is(equalTo(TASK_MANAGER_RESOURCE_ID))); + assertThat(actualDataPort, is(equalTo(TASK_MANAGER_DATA_PORT))); + assertThat(actualHardwareDescription, is(equalTo(TASK_MANAGER_HARDWARE_DESCRIPTION))); + + return CompletableFuture.completedFuture(successfulRegistration()); + }); + + resourceManagerRegistration.start(); + registrationSuccessFuture.get(TEST_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } + + private TaskExecutorToResourceManagerConnection createTaskExecutorToResourceManagerConnection() { + return new TaskExecutorToResourceManagerConnection( + LOGGER, + rpcService, + TASK_MANAGER_ADDRESS, + TASK_MANAGER_RESOURCE_ID, + RetryingRegistrationConfiguration.defaultConfiguration(), + TASK_MANAGER_DATA_PORT, + TASK_MANAGER_HARDWARE_DESCRIPTION, + RESOURCE_MANAGER_ADDRESS, + RESOURCE_MANAGER_ID, + Executors.directExecutor(), + new TestRegistrationConnectionListener<>()); + } + + private static TaskExecutorRegistrationSuccess successfulRegistration() { + return new TaskExecutorRegistrationSuccess( + new InstanceID(), + ResourceID.generate(), + new ClusterInformation("blobServerHost", 55555)); + } + + @Before + public void setUp() { + rpcService = new TestingRpcService(); + + testingResourceManagerGateway = new TestingResourceManagerGateway(); + rpcService.registerGateway(RESOURCE_MANAGER_ADDRESS, testingResourceManagerGateway); + + registrationSuccessFuture = new CompletableFuture<>(); + } + + @After + public void tearDown() throws Exception { + rpcService.stopService().get(TEST_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } + + private class TestRegistrationConnectionListener, S extends RegistrationResponse.Success> + implements RegistrationConnectionListener { + + @Override + public void onRegistrationSuccess(final T connection, final S success) { + registrationSuccessFuture.complete(null); + } + + @Override + public void onRegistrationFailure(final Throwable failure) { + registrationSuccessFuture.completeExceptionally(failure); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java deleted file mode 100644 index 7ee0921646af3..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ /dev/null @@ -1,694 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.InvalidActorNameException; -import akka.actor.Terminated; -import akka.testkit.JavaTestKit; -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.blob.VoidBlobStore; -import org.apache.flink.runtime.clusterframework.FlinkResourceManager; -import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; -import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; -import org.apache.flink.runtime.messages.RegistrationMessages.AcknowledgeRegistration; -import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager; -import org.apache.flink.runtime.messages.RegistrationMessages.RefuseRegistration; -import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.util.TestLogger; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Matchers; -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import java.io.IOException; -import java.util.Arrays; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActor; -import static org.apache.flink.runtime.testingUtils.TestingUtils.createTaskManager; -import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGatewaysGracefully; -import static org.apache.flink.runtime.testingUtils.TestingUtils.stopActorGracefully; -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * The tests in this class verify the behavior of the TaskManager - * when connecting to the JobManager, and when the JobManager - * is unreachable. - */ -public class TaskManagerRegistrationTest extends TestLogger { - - // use one actor system throughout all tests - private static ActorSystem actorSystem; - - private static Configuration config; - - private static FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS); - - private TestingHighAvailabilityServices highAvailabilityServices; - - @BeforeClass - public static void startActorSystem() { - config = new Configuration(); - config.setString(AkkaOptions.ASK_TIMEOUT, "5 s"); - config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms"); - config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2 s"); - config.setInteger(AkkaOptions.WATCH_THRESHOLD, 2); - - actorSystem = AkkaUtils.createLocalActorSystem(config); - } - - @AfterClass - public static void shutdownActorSystem() { - if (actorSystem != null) { - actorSystem.terminate(); - } - } - - @Before - public void setupTest() { - highAvailabilityServices = new TestingHighAvailabilityServices(); - } - - @After - public void tearDownTest() throws Exception { - highAvailabilityServices.closeAndCleanupAllData(); - highAvailabilityServices = null; - } - - /** - * A test that verifies that two TaskManagers correctly register at the - * JobManager. - */ - @Test - public void testSimpleRegistration() throws Exception { - new JavaTestKit(actorSystem) {{ - - ActorGateway jobManager = null; - ActorGateway taskManager1 = null; - ActorGateway taskManager2 = null; - ActorGateway resourceManager = null; - - EmbeddedHaServices embeddedHaServices = null; - - try { - embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor()); - - // a simple JobManager - jobManager = TestingUtils.createJobManager( - actorSystem, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - config, - embeddedHaServices); - - resourceManager = new AkkaActorGateway( - startResourceManager(config, embeddedHaServices), - jobManager.leaderSessionID()); - - // start two TaskManagers. it will automatically try to register - taskManager1 = createTaskManager( - actorSystem, - embeddedHaServices, - config, - true, - false); - - taskManager2 = createTaskManager( - actorSystem, - embeddedHaServices, - config, - true, - false); - - // check that the TaskManagers are registered - Future responseFuture1 = taskManager1.ask( - TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), - timeout); - - Future responseFuture2 = taskManager2.ask( - TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), - timeout); - - Object response1 = Await.result(responseFuture1, timeout); - Object response2 = Await.result(responseFuture2, timeout); - - assertTrue(response1 instanceof TaskManagerMessages.RegisteredAtJobManager); - assertTrue(response2 instanceof TaskManagerMessages.RegisteredAtJobManager); - - // check that the JobManager has 2 TaskManagers registered - Future numTaskManagersFuture = jobManager.ask( - JobManagerMessages.getRequestNumberRegisteredTaskManager(), - timeout); - - Integer count = (Integer) Await.result(numTaskManagersFuture, timeout); - assertEquals(2, count.intValue()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager1, taskManager2, jobManager, resourceManager)); - - embeddedHaServices.closeAndCleanupAllData(); - } - }}; - } - - /** - * A test that verifies that two TaskManagers correctly register at the - * JobManager. - */ - @Test - public void testDelayedRegistration() throws Exception { - new JavaTestKit(actorSystem) {{ - ActorGateway jobManager = null; - ActorGateway taskManager = null; - - FiniteDuration delayedTimeout = timeout.$times(3L); - - final EmbeddedHaServices embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor()); - - try { - // start a TaskManager that tries to register at the JobManager before the JobManager is - // available. we give it the regular JobManager akka URL - taskManager = createTaskManager( - actorSystem, - embeddedHaServices, - new Configuration(), - true, - false); - - // let it try for a bit - Thread.sleep(6000L); - - // now start the JobManager, with the regular akka URL - jobManager = TestingUtils.createJobManager( - actorSystem, - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - new Configuration(), - embeddedHaServices); - - // check that the TaskManagers are registered - Future responseFuture = taskManager.ask( - TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), - delayedTimeout); - - Object response = Await.result(responseFuture, delayedTimeout); - - assertTrue(response instanceof TaskManagerMessages.RegisteredAtJobManager); - } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager, jobManager)); - - embeddedHaServices.closeAndCleanupAllData(); - } - }}; - } - - /** - * Tests that the TaskManager shuts down when it cannot register at the - * JobManager within the given maximum duration. - * - * Unfortunately, this test does not give good error messages. - * (I have not figured out how to get any better message out of the - * Akka TestKit than "ask timeout exception".) - * - * Anyways: An "ask timeout exception" here means that the TaskManager - * did not shut down after its registration timeout expired. - */ - @Test - public void testShutdownAfterRegistrationDurationExpired() { - new JavaTestKit(actorSystem) {{ - - ActorGateway taskManager = null; - - try { - // registration timeout of 1 second - Configuration tmConfig = new Configuration(); - tmConfig.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "500 ms"); - - highAvailabilityServices.setJobMasterLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID, - // Give a non-existent job manager address to the task manager - new SettableLeaderRetrievalService( - "foobar", - HighAvailabilityServices.DEFAULT_LEADER_ID)); - - // start the taskManager actor - taskManager = createTaskManager( - actorSystem, - highAvailabilityServices, - tmConfig, - true, - false); - - // make sure it terminates in time, since it cannot register at a JobManager - watch(taskManager.actor()); - - final ActorGateway tm = taskManager; - - new Within(timeout) { - - @Override - protected void run() { - expectTerminated(tm.actor()); - } - }; - } - catch (Throwable e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - stopActorGracefully(taskManager); - } - }}; - } - - /** - * Make sure that the TaskManager keeps trying to register, even after - * registration attempts have been refused. - */ - @Test - public void testTaskManagerResumesConnectAfterRefusedRegistration() { - new JavaTestKit(actorSystem) {{ - ActorGateway jm = null; - ActorGateway taskManager =null; - try { - jm = TestingUtils.createForwardingActor( - actorSystem, - getTestActor(), - HighAvailabilityServices.DEFAULT_LEADER_ID, - Option.empty()); - final ActorGateway jmGateway = jm; - - FiniteDuration refusedRegistrationPause = new FiniteDuration(500, TimeUnit.MILLISECONDS); - Configuration tmConfig = new Configuration(config); - tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause.toString()); - - highAvailabilityServices.setJobMasterLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID, - new SettableLeaderRetrievalService( - jm.path(), - HighAvailabilityServices.DEFAULT_LEADER_ID)); - - // we make the test actor (the test kit) the JobManager to intercept - // the messages - taskManager = createTaskManager( - actorSystem, - highAvailabilityServices, - tmConfig, - true, - false); - - final ActorGateway taskManagerGateway = taskManager; - - // check and decline initial registration - new Within(timeout) { - - @Override - protected void run() { - // the TaskManager should try to register - expectMsgClass(RegisterTaskManager.class); - - // we decline the registration - taskManagerGateway.tell( - new RefuseRegistration(new Exception("test reason")), - jmGateway); - } - }; - - - - // the TaskManager should wait a bit an retry... - FiniteDuration maxDelay = (FiniteDuration) refusedRegistrationPause.$times(3.0); - new Within(maxDelay) { - - @Override - protected void run() { - expectMsgClass(RegisterTaskManager.class); - } - }; - } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager, jm)); - } - }}; - } - - /** - * Tests that the TaskManager does not send an excessive amount of registration messages to - * the job manager if its registration was rejected. - */ - @Test - public void testTaskManagerNoExcessiveRegistrationMessages() throws Exception { - new JavaTestKit(actorSystem) {{ - ActorGateway jm = null; - ActorGateway taskManager =null; - try { - FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS); - - jm = TestingUtils.createForwardingActor( - actorSystem, - getTestActor(), - HighAvailabilityServices.DEFAULT_LEADER_ID, - Option.empty()); - - highAvailabilityServices.setJobMasterLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID, - new SettableLeaderRetrievalService( - jm.path(), - HighAvailabilityServices.DEFAULT_LEADER_ID)); - - final ActorGateway jmGateway = jm; - - long refusedRegistrationPause = 500; - long initialRegistrationPause = 100; - long maxDelay = 30000; - - Configuration tmConfig = new Configuration(config); - tmConfig.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, refusedRegistrationPause + " ms"); - tmConfig.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, initialRegistrationPause + " ms"); - - // we make the test actor (the test kit) the JobManager to intercept - // the messages - taskManager = createTaskManager( - actorSystem, - highAvailabilityServices, - tmConfig, - true, - false); - - final ActorGateway taskManagerGateway = taskManager; - - final Deadline deadline = timeout.fromNow(); - - try { - while (deadline.hasTimeLeft()) { - // the TaskManager should try to register - expectMsgClass(deadline.timeLeft(), RegisterTaskManager.class); - - // we decline the registration - taskManagerGateway.tell( - new RefuseRegistration(new Exception("test reason")), - jmGateway); - } - } catch (AssertionError error) { - // ignore since it simply means that we have used up all our time - } - - RegisterTaskManager[] registerTaskManagerMessages = new ReceiveWhile(RegisterTaskManager.class, timeout) { - @Override - protected RegisterTaskManager match(Object msg) throws Exception { - if (msg instanceof RegisterTaskManager) { - return (RegisterTaskManager) msg; - } else { - throw noMatch(); - } - } - }.get(); - - int maxExponent = (int) Math.floor(Math.log(((double) maxDelay / initialRegistrationPause + 1))/Math.log(2)); - int exponent = (int) Math.ceil(Math.log(((double) timeout.toMillis() / initialRegistrationPause + 1))/Math.log(2)); - - int exp = Math.min(maxExponent, exponent); - - long difference = timeout.toMillis() - (initialRegistrationPause * (1 << exp)); - - int numberRegisterTaskManagerMessages = exp; - - if (difference > 0) { - numberRegisterTaskManagerMessages += Math.ceil((double) difference / maxDelay); - } - - int maxExpectedNumberOfRegisterTaskManagerMessages = numberRegisterTaskManagerMessages * 2; - - assertTrue("The number of RegisterTaskManager messages #" - + registerTaskManagerMessages.length - + " should be less than #" - + maxExpectedNumberOfRegisterTaskManagerMessages, - registerTaskManagerMessages.length <= maxExpectedNumberOfRegisterTaskManagerMessages); - } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManager, jm)); - } - }}; - } - - /** - * Validate that the TaskManager attempts to re-connect after it lost the connection - * to the JobManager. - */ - @Test - public void testTaskManagerResumesConnectAfterJobManagerFailure() { - new JavaTestKit(actorSystem) {{ - ActorGateway fakeJobManager1Gateway = null; - ActorGateway fakeJobManager2Gateway = null; - ActorGateway taskManagerGateway = null; - - final String JOB_MANAGER_NAME = "ForwardingJobManager"; - - try { - fakeJobManager1Gateway = TestingUtils.createForwardingActor( - actorSystem, - getTestActor(), - HighAvailabilityServices.DEFAULT_LEADER_ID, - Option.apply(JOB_MANAGER_NAME)); - final ActorGateway fakeJM1Gateway = fakeJobManager1Gateway; - - SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService( - fakeJM1Gateway.path(), - HighAvailabilityServices.DEFAULT_LEADER_ID); - - highAvailabilityServices.setJobMasterLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID, - settableLeaderRetrievalService); - - // we make the test actor (the test kit) the JobManager to intercept - // the messages - taskManagerGateway = createTaskManager( - actorSystem, - highAvailabilityServices, - config, - true, - false); - - final ActorGateway tm = taskManagerGateway; - - // validate initial registration - new Within(timeout) { - - @Override - protected void run() { - // the TaskManager should try to register - expectMsgClass(RegisterTaskManager.class); - - // we accept the registration - tm.tell( - new AcknowledgeRegistration( - new InstanceID(), - 45234), - fakeJM1Gateway); - } - }; - - // kill the first forwarding JobManager - watch(fakeJobManager1Gateway.actor()); - stopActor(fakeJobManager1Gateway.actor()); - - final ActorGateway gateway = fakeJobManager1Gateway; - - new Within(timeout) { - - @Override - protected void run() { - Object message = null; - - // we might also receive RegisterTaskManager and Heartbeat messages which - // are queued up in the testing actor's mailbox - while(!(message instanceof Terminated)) { - message = receiveOne(timeout); - } - - Terminated terminatedMessage = (Terminated) message; - assertEquals(gateway.actor(), terminatedMessage.actor()); - } - }; - - fakeJobManager1Gateway = null; - - // now start the second fake JobManager and expect that - // the TaskManager registers again - // the second fake JM needs to have the same actor URL - - // since we cannot reliably wait until the actor is unregistered (name is - // available again) we loop with multiple tries for 20 seconds - long deadline = 20000000000L + System.nanoTime(); - do { - try { - fakeJobManager2Gateway = TestingUtils.createForwardingActor( - actorSystem, - getTestActor(), - HighAvailabilityServices.DEFAULT_LEADER_ID, - Option.apply(JOB_MANAGER_NAME)); - } catch (InvalidActorNameException e) { - // wait and retry - Thread.sleep(100); - } - } while (fakeJobManager2Gateway == null && System.nanoTime() < deadline); - - final ActorGateway fakeJM2GatewayClosure = fakeJobManager2Gateway; - - // expect the next registration - new Within(timeout) { - - @Override - protected void run() { - expectMsgClass(RegisterTaskManager.class); - - // we accept the registration - tm.tell( - new AcknowledgeRegistration( - new InstanceID(), - 45234), - fakeJM2GatewayClosure); - } - }; - } - catch (Throwable e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { - stopActorGatewaysGracefully(Arrays.asList(taskManagerGateway, fakeJobManager2Gateway)); - } - }}; - } - - @Test - public void testCheckForValidRegistrationSessionIDs() throws IOException { - new JavaTestKit(actorSystem) {{ - - ActorGateway taskManagerGateway = null; - - final UUID falseLeaderSessionID = UUID.randomUUID(); - final UUID trueLeaderSessionID = UUID.randomUUID(); - - HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class); - when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID))) - .thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID)); - when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore()); - - try { - // we make the test actor (the test kit) the JobManager to intercept - // the messages - taskManagerGateway = createTaskManager( - actorSystem, - mockedHighAvailabilityServices, - config, - true, - false); - - final ActorRef taskManager = taskManagerGateway.actor(); - - new Within(timeout) { - - @Override - protected void run() { - taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), getTestActor()); - - // the TaskManager should try to register - - LeaderSessionMessage lsm = expectMsgClass(LeaderSessionMessage.class); - - assertTrue(lsm.leaderSessionID().equals(trueLeaderSessionID)); - assertTrue(lsm.message() instanceof RegisterTaskManager); - - final ActorRef tm = getLastSender(); - - // This AcknowledgeRegistration message should be discarded because the - // registration session ID is wrong - tm.tell( - new LeaderSessionMessage( - falseLeaderSessionID, - new AcknowledgeRegistration( - new InstanceID(), - 1)), - getTestActor()); - - // Valid AcknowledgeRegistration message - tm.tell( - new LeaderSessionMessage( - trueLeaderSessionID, - new AcknowledgeRegistration( - new InstanceID(), - 1)), - getTestActor()); - - Object message = null; - - while(!(message instanceof TaskManagerMessages.RegisteredAtJobManager)) { - message = receiveOne(TestingUtils.TESTING_DURATION()); - } - - tm.tell(JobManagerMessages.getRequestLeaderSessionID(), getTestActor()); - - expectMsgEquals(new JobManagerMessages.ResponseLeaderSessionID(trueLeaderSessionID)); - } - }; - } finally { - stopActorGracefully(taskManagerGateway); - } - }}; - } - - // -------------------------------------------------------------------------------------------- - // Utility Functions - // -------------------------------------------------------------------------------------------- - - private static ActorRef startResourceManager(Configuration config, HighAvailabilityServices highAvailabilityServices) { - return FlinkResourceManager.startResourceManagerActors( - config, - actorSystem, - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), - StandaloneResourceManager.class); - } -}