Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,6 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc

// ------------------------------------------------------------------------

public RetryingRegistration(
Logger log,
RpcService rpcService,
String targetName,
Class<G> targetType,
String targetAddress,
F fencingToken) {
this(
log,
rpcService,
targetName,
targetType,
targetAddress,
fencingToken,
RetryingRegistrationConfiguration.defaultConfiguration());
}

public RetryingRegistration(
Logger log,
RpcService rpcService,
Expand Down Expand Up @@ -154,28 +137,28 @@ public void startRegistration() {
}

try {
// trigger resolution of the resource manager address to a callable gateway
final CompletableFuture<G> resourceManagerFuture;
// trigger resolution of the target address to a callable gateway
final CompletableFuture<G> rpcGatewayFuture;

if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
resourceManagerFuture = (CompletableFuture<G>) rpcService.connect(
rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class));
} else {
resourceManagerFuture = rpcService.connect(targetAddress, targetType);
rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
}

// upon success, start the registration attempts
CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(
(G result) -> {
CompletableFuture<Void> rpcGatewayAcceptFuture = rpcGatewayFuture.thenAcceptAsync(
(G rpcGateway) -> {
log.info("Resolved {} address, beginning registration", targetName);
register(result, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());
register(rpcGateway, 1, retryingRegistrationConfiguration.getInitialRegistrationTimeoutMillis());
},
rpcService.getExecutor());

// upon failure, retry, unless this is cancelled
resourceManagerAcceptFuture.whenCompleteAsync(
rpcGatewayAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !canceled) {
final Throwable strippedFailure = ExceptionUtils.stripCompletionException(failure);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good refactorings!

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.TimeoutException;

/**
* Utility base class for testing gateways
* Utility base class for testing gateways.
*/
public abstract class TestingGatewayBase implements RpcGateway {

Expand Down Expand Up @@ -81,7 +81,7 @@ public <T> CompletableFuture<T> futureWithTimeout(long timeoutMillis) {
}

// ------------------------------------------------------------------------

private static final class FutureTimeout implements Runnable {

private final CompletableFuture<?> promise;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,36 @@
* An RPC Service implementation for testing. This RPC service acts as a replacement for
* the regular RPC service for cases where tests need to return prepared mock gateways instead of
* proper RPC gateways.
*
*
* <p>The TestingRpcService can be used for example in the following fashion,
* using <i>Mockito</i> for mocks and verification:
*
*
* <pre>{@code
* TestingRpcService rpc = new TestingRpcService();
*
* ResourceManagerGateway testGateway = mock(ResourceManagerGateway.class);
* rpc.registerGateway("myAddress", testGateway);
*
*
* MyComponentToTest component = new MyComponentToTest();
* component.triggerSomethingThatCallsTheGateway();
*
*
* verify(testGateway, timeout(1000)).theTestMethod(any(UUID.class), anyString());
* }</pre>
*/
public class TestingRpcService extends AkkaRpcService {

/** Map of pre-registered connections */
/** Map of pre-registered connections. */
private final ConcurrentHashMap<String, RpcGateway> registeredConnections;

/**
* Creates a new {@code TestingRpcService}.
* Creates a new {@code TestingRpcService}.
*/
public TestingRpcService() {
this(new Configuration());
}

/**
* Creates a new {@code TestingRpcService}, using the given configuration.
* Creates a new {@code TestingRpcService}, using the given configuration.
*/
public TestingRpcService(Configuration configuration) {
super(AkkaUtils.createLocalActorSystem(configuration),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
Expand Down Expand Up @@ -99,6 +98,7 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;

import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -316,10 +316,12 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
final UUID jmLeaderId = UUID.randomUUID();

final ResourceID jmResourceId = ResourceID.generate();
final CountDownLatch registrationAttempts = new CountDownLatch(2);
final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = new CompletableFuture<>();
final CompletableFuture<ResourceID> disconnectTaskManagerFuture = new CompletableFuture<>();
final TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder()
.setRegisterTaskManagerFunction((s, taskManagerLocation) -> {
registrationAttempts.countDown();
taskManagerLocationFuture.complete(taskManagerLocation);
return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId));
})
Expand All @@ -331,10 +333,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
)
.build();

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -374,6 +373,9 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception {
final ResourceID resourceID = disconnectTaskManagerFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS);
assertThat(resourceID, equalTo(taskManagerLocation.getResourceID()));

assertTrue(
"The TaskExecutor should try to reconnect to the JM",
registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
Expand Down Expand Up @@ -421,10 +423,7 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception {

HeartbeatServices heartbeatServices = new HeartbeatServices(heartbeatInterval, heartbeatTimeout);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -455,9 +454,9 @@ public void testHeartbeatTimeoutWithResourceManager() throws Exception {
// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));

// the TaskExecutor should try to reconnect to the RM
registrationAttempts.await();

assertTrue(
"The TaskExecutor should try to reconnect to the RM",
registrationAttempts.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
Expand Down Expand Up @@ -512,10 +511,7 @@ public void testHeartbeatSlotReporting() throws Exception {

final TestingTaskSlotTable taskSlotTable = new TestingTaskSlotTable(new ArrayDeque<>(Arrays.asList(slotReport1, slotReport2)));

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -562,60 +558,35 @@ public void testHeartbeatSlotReporting() throws Exception {
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
final String resourceManagerAddress = "/resource/manager/address/one";
final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
final String dispatcherAddress = "localhost";
final String jobManagerAddress = "localhost";
final String webMonitorAddress = "localhost";

// register a mock resource manager gateway
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
anyString(), any(ResourceID.class), anyInt(), any(HardwareDescription.class), any(Time.class)))
.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
new InstanceID(), resourceManagerResourceId, new ClusterInformation("localhost", 1234))));

rpc.registerGateway(resourceManagerAddress, rmGateway);

StandaloneHaServices haServices = new StandaloneHaServices(
resourceManagerAddress,
dispatcherAddress,
jobManagerAddress,
webMonitorAddress);

final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
final CountDownLatch taskManagerRegisteredLatch = new CountDownLatch(1);
testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(
ignored -> {
taskManagerRegisteredLatch.countDown();
return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
new InstanceID(), new ResourceID(resourceManagerAddress), new ClusterInformation("localhost", 1234)));
}
));

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);

final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();
final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
.setTaskSlotTable(taskSlotTable)
.setTaskStateManager(localStateStoresManager)
.build();

TaskExecutor taskManager = new TaskExecutor(
rpc,
taskManagerConfiguration,
haServices,
taskManagerServices,
HEARTBEAT_SERVICES,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
null,
dummyBlobCacheService,
testingFatalErrorHandler);
final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);

try {
taskManager.start();
String taskManagerAddress = taskManager.getAddress();
resourceManagerLeaderRetriever.notifyListener(resourceManagerAddress, UUID.randomUUID());

verify(rmGateway, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
eq(taskManagerAddress), eq(taskManagerLocation.getResourceID()), anyInt(), any(HardwareDescription.class), any(Time.class));
}
finally {
assertTrue(taskManagerRegisteredLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
} finally {
RpcUtils.terminateRpcEndpoint(taskManager, timeout);
}
}
Expand Down Expand Up @@ -649,10 +620,7 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception {
final SlotReport slotReport = new SlotReport();
when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -771,10 +739,7 @@ public void testTaskSubmission() throws Exception {
when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setNetworkEnvironment(networkEnvironment)
Expand Down Expand Up @@ -858,10 +823,7 @@ public void testJobLeaderDetection() throws Exception {
final AllocationID allocationId = new AllocationID();
final SlotID slotId = new SlotID(taskManagerLocation.getResourceID(), 0);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -971,10 +933,7 @@ public void testSlotAcceptance() throws Exception {
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -1064,10 +1023,7 @@ public void testSubmitTaskBeforeAcceptSlot() throws Exception {

final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -1172,10 +1128,7 @@ public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate());
final JobManagerTable jobManagerTableMock = spy(new JobManagerTable());

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -1245,10 +1198,7 @@ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {

rpc.registerGateway(rmAddress, rmGateway);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -1302,10 +1252,7 @@ public void testRemoveJobFromJobLeaderService() throws Exception {
Collections.singleton(ResourceProfile.UNKNOWN),
timerService);

TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager();

final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
Expand Down Expand Up @@ -1790,6 +1737,13 @@ public void testDisconnectFromJobMasterWhenNewLeader() throws Exception {
}
}

private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() throws IOException {
return new TaskExecutorLocalStateStoresManager(
false,
new File[]{tmp.newFolder()},
Executors.directExecutor());
}

@Nonnull
private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) {
return new TaskExecutor(
Expand Down
Loading