Skip to content

Commit 9962e75

Browse files
author
zhouhengtong
committed
[FLINK-37607] Fix RpcEndpoint#MainThreadExecutor lost scheduling tasks when not running
1 parent 5e45679 commit 9962e75

File tree

2 files changed

+94
-21
lines changed

2 files changed

+94
-21
lines changed

flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.concurrent.TimeUnit;
5151
import java.util.concurrent.TimeoutException;
5252
import java.util.concurrent.atomic.AtomicReference;
53+
import java.util.function.Supplier;
5354

5455
import static org.apache.flink.util.Preconditions.checkNotNull;
5556

@@ -131,7 +132,7 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {
131132
* <p>IMPORTANT: the running state is not thread safe and can be used only in the main thread of
132133
* the rpc endpoint.
133134
*/
134-
private boolean isRunning;
135+
private CompletableFuture<Void> isRunningFuture;
135136

136137
/**
137138
* Initializes the RPC endpoint.
@@ -147,8 +148,14 @@ protected RpcEndpoint(
147148
this.rpcServer = rpcService.startServer(this, loggingContext);
148149
this.resourceRegistry = new CloseableRegistry();
149150

151+
this.isRunningFuture = new CompletableFuture<>();
150152
this.mainThreadExecutor =
151-
new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId);
153+
new MainThreadExecutor(
154+
rpcServer,
155+
this::validateRunsInMainThread,
156+
Executors.newSingleThreadScheduledExecutor(
157+
new ExecutorThreadFactory(endpointId + "-main-scheduler")),
158+
() -> isRunningFuture);
152159
registerResource(this.mainThreadExecutor);
153160
}
154161

@@ -187,7 +194,7 @@ public String getEndpointId() {
187194
*/
188195
protected boolean isRunning() {
189196
validateRunsInMainThread();
190-
return isRunning;
197+
return isRunningFuture.isDone();
191198
}
192199

193200
// ------------------------------------------------------------------------
@@ -210,7 +217,7 @@ public final void start() {
210217
*/
211218
public final void internalCallOnStart() throws Exception {
212219
validateRunsInMainThread();
213-
isRunning = true;
220+
isRunningFuture.complete(null);
214221
onStart();
215222
}
216223

@@ -253,7 +260,7 @@ public final CompletableFuture<Void> internalCallOnStop() {
253260
new RuntimeException("Close resource registry fail", e));
254261
}
255262
stopFuture = CompletableFuture.allOf(stopFuture, onStop());
256-
isRunning = false;
263+
isRunningFuture = new CompletableFuture<>();
257264
return stopFuture;
258265
}
259266

@@ -489,6 +496,13 @@ protected static class MainThreadExecutor implements ComponentMainThreadExecutor
489496
*/
490497
private final ScheduledExecutorService mainScheduledExecutor;
491498

499+
/**
500+
* The future indicate the gateway whether is running, NOTICE: can't change the state of
501+
* future.
502+
*/
503+
private final Supplier<CompletableFuture<Void>> getRunningFuture;
504+
505+
@VisibleForTesting
492506
MainThreadExecutor(
493507
MainThreadExecutable gateway, Runnable mainThreadCheck, String endpointId) {
494508
this(
@@ -503,9 +517,22 @@ protected static class MainThreadExecutor implements ComponentMainThreadExecutor
503517
MainThreadExecutable gateway,
504518
Runnable mainThreadCheck,
505519
ScheduledExecutorService mainScheduledExecutor) {
520+
this(
521+
gateway,
522+
mainThreadCheck,
523+
mainScheduledExecutor,
524+
() -> CompletableFuture.completedFuture(null));
525+
}
526+
527+
MainThreadExecutor(
528+
MainThreadExecutable gateway,
529+
Runnable mainThreadCheck,
530+
ScheduledExecutorService mainScheduledExecutor,
531+
Supplier<CompletableFuture<Void>> getRunningFuture) {
506532
this.gateway = Preconditions.checkNotNull(gateway);
507533
this.mainThreadCheck = Preconditions.checkNotNull(mainThreadCheck);
508534
this.mainScheduledExecutor = mainScheduledExecutor;
535+
this.getRunningFuture = getRunningFuture;
509536
}
510537

511538
@Override
@@ -526,14 +553,21 @@ public void execute(@Nonnull Runnable command) {
526553
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
527554
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
528555
FutureTask<Void> ft = new FutureTask<>(command, null);
529-
if (mainScheduledExecutor.isShutdown()) {
530-
log.warn(
531-
"The scheduled executor service is shutdown and ignores the command {}",
532-
command);
533-
} else {
534-
mainScheduledExecutor.schedule(
535-
() -> gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS);
536-
}
556+
getRunningFuture
557+
.get()
558+
.thenAccept(
559+
ignore -> {
560+
if (mainScheduledExecutor.isShutdown()) {
561+
log.warn(
562+
"The scheduled executor service is shutdown and ignores the command {}",
563+
command);
564+
} else {
565+
mainScheduledExecutor.schedule(
566+
() -> gateway.runAsync(ft),
567+
delayMillis,
568+
TimeUnit.MILLISECONDS);
569+
}
570+
});
537571
return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
538572
}
539573

@@ -551,14 +585,21 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
551585
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
552586
final long delayMillis = TimeUnit.MILLISECONDS.convert(delay, unit);
553587
FutureTask<V> ft = new FutureTask<>(callable);
554-
if (mainScheduledExecutor.isShutdown()) {
555-
log.warn(
556-
"The scheduled executor service is shutdown and ignores the callable {}",
557-
callable);
558-
} else {
559-
mainScheduledExecutor.schedule(
560-
() -> gateway.runAsync(ft), delayMillis, TimeUnit.MILLISECONDS);
561-
}
588+
getRunningFuture
589+
.get()
590+
.thenAccept(
591+
ignore -> {
592+
if (mainScheduledExecutor.isShutdown()) {
593+
log.warn(
594+
"The scheduled executor service is shutdown and ignores the callable {}",
595+
callable);
596+
} else {
597+
mainScheduledExecutor.schedule(
598+
() -> gateway.runAsync(ft),
599+
delayMillis,
600+
TimeUnit.MILLISECONDS);
601+
}
602+
});
562603
return new ScheduledFutureAdapter<>(ft, delayMillis, TimeUnit.MILLISECONDS);
563604
}
564605

flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040

4141
import static org.assertj.core.api.Assertions.assertThat;
4242
import static org.assertj.core.api.Assertions.assertThatThrownBy;
43+
import static org.junit.jupiter.api.Assertions.assertFalse;
44+
import static org.junit.jupiter.api.Assertions.assertThrows;
4345

4446
/** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor scheduling command. */
4547
class RpcEndpointTest {
@@ -472,6 +474,36 @@ void testCallAsyncTimeout() throws InterruptedException, ExecutionException, Tim
472474
}
473475
}
474476

477+
/** Test schedule task when the RPC is running. */
478+
@Test
479+
public void testScheduleTaskAfterStart() throws Exception {
480+
final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
481+
final CompletableFuture<Void> taskCompletedFuture = new CompletableFuture<>();
482+
try {
483+
final Duration expectedDelay = Duration.ofSeconds(0);
484+
ScheduledFuture<?> future =
485+
endpoint.getMainThreadExecutor()
486+
.schedule(
487+
() -> taskCompletedFuture.complete(null),
488+
expectedDelay.toMillis(),
489+
TimeUnit.MILLISECONDS);
490+
assertThrows(
491+
TimeoutException.class,
492+
() ->
493+
taskCompletedFuture.get(
494+
expectedDelay.toMillis() + 3000L, TimeUnit.MILLISECONDS));
495+
assertFalse(taskCompletedFuture.isDone());
496+
assertFalse(future.isDone());
497+
498+
endpoint.start();
499+
500+
taskCompletedFuture.get(1000L, TimeUnit.MILLISECONDS);
501+
} finally {
502+
RpcUtils.terminateRpcEndpoint(endpoint);
503+
endpoint.validateResourceClosed();
504+
}
505+
}
506+
475507
private static class TestMainThreadExecutable implements MainThreadExecutable {
476508

477509
private final Consumer<Runnable> scheduleRunAsyncConsumer;

0 commit comments

Comments
 (0)