Skip to content

Commit

Permalink
fix: mitigate gRPC stream connection issues (#1038)
Browse files Browse the repository at this point in the history
Mitigates hanging gRPC streams by detecting idle streams and reconnecting after a timeout (default 10min for partition assignment streams, 2min for all others).

The StreamIdleTimer is restarted when the client receives a response on the stream. The stream is reinitialized when the timeout expires. For publish and commit streams, the timeout will still expire even if there is no user activity.
  • Loading branch information
tmdiep authored Feb 4, 2022
1 parent a8a969e commit f3678b7
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 26 deletions.
9 changes: 9 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
</differences>
5 changes: 5 additions & 0 deletions google-cloud-pubsublite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
<artifactId>gson</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
<!-- Need testing utility classes for generated gRPC clients tests -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import com.google.cloud.pubsublite.proto.PartitionAssignmentAck;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Duration;

public class ConnectedAssignerImpl
extends SingleConnection<PartitionAssignmentRequest, PartitionAssignment, PartitionAssignment>
implements ConnectedAssigner {
private static final Duration STREAM_IDLE_TIMEOUT = Duration.ofMinutes(10);

private final CloseableMonitor monitor = new CloseableMonitor();

@GuardedBy("monitor.monitor")
Expand All @@ -38,7 +41,7 @@ private ConnectedAssignerImpl(
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory,
ResponseObserver<PartitionAssignment> clientStream,
PartitionAssignmentRequest initialRequest) {
super(streamFactory, clientStream, /*expectInitialResponse=*/ false);
super(streamFactory, clientStream, STREAM_IDLE_TIMEOUT, /*expectInitialResponse=*/ false);
initialize(initialRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;

public class ConnectedCommitterImpl
extends SingleConnection<
StreamingCommitCursorRequest, StreamingCommitCursorResponse, SequencedCommitCursorResponse>
implements ConnectedCommitter {
private final StreamingCommitCursorRequest initialRequest;

private ConnectedCommitterImpl(
@VisibleForTesting
ConnectedCommitterImpl(
StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory,
ResponseObserver<SequencedCommitCursorResponse> clientStream,
StreamingCommitCursorRequest initialRequest) {
super(streamFactory, clientStream);
StreamingCommitCursorRequest initialRequest,
Duration streamIdleTimeout) {
super(streamFactory, clientStream, streamIdleTimeout, /*expectInitialResponse=*/ true);
this.initialRequest = initialRequest;
initialize(initialRequest);
}
Expand All @@ -48,7 +52,8 @@ public ConnectedCommitter New(
StreamFactory<StreamingCommitCursorRequest, StreamingCommitCursorResponse> streamFactory,
ResponseObserver<SequencedCommitCursorResponse> clientStream,
StreamingCommitCursorRequest initialRequest) {
return new ConnectedCommitterImpl(streamFactory, clientStream, initialRequest);
return new ConnectedCommitterImpl(
streamFactory, clientStream, initialRequest, DEFAULT_STREAM_IDLE_TIMEOUT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ protected void doStop() {
if (completed) return;
completed = true;
logger.atFine().log("Terminating connection for %s", streamDescription());
currentConnection.close();
if (currentConnection != null) {
currentConnection.close();
}
} catch (Throwable t) {
logger.atWarning().withCause(t).log(
"Failed while terminating connection for %s", streamDescription());
Expand Down Expand Up @@ -180,7 +182,9 @@ public final void onError(Throwable t) {
Optional<Throwable> throwable = Optional.empty();
long backoffTime = 0;
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
currentConnection.close();
if (currentConnection != null) {
currentConnection.close();
}
backoffTime = nextRetryBackoffDuration;
nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis());
} catch (Throwable t2) {
Expand All @@ -197,7 +201,7 @@ public final void onError(Throwable t) {
logger.atFine().withCause(t).log(
"Stream disconnected attempting retry, after %s milliseconds for %s",
backoffTime, streamDescription());
ScheduledFuture<?> retry =
ScheduledFuture<?> unusedFuture =
SystemExecutors.getAlarmExecutor()
.schedule(() -> triggerReinitialize(statusOr.get()), backoffTime, MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Monitor.Guard;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Duration;

/**
* A SingleConnection handles the state for a stream with an initial connection request that may
Expand All @@ -38,9 +40,12 @@ public abstract class SingleConnection<StreamRequestT, StreamResponseT, ClientRe
implements ResponseObserver<StreamResponseT>, AutoCloseable {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

protected static final Duration DEFAULT_STREAM_IDLE_TIMEOUT = Duration.ofMinutes(2);

private final ClientStream<StreamRequestT> requestStream;
private final ResponseObserver<ClientResponseT> clientStream;
private final boolean expectInitial;
private final StreamIdleTimer streamIdleTimer;

private final CloseableMonitor connectionMonitor = new CloseableMonitor();

Expand All @@ -58,16 +63,18 @@ protected abstract void handleInitialResponse(StreamResponseT response)
protected SingleConnection(
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
ResponseObserver<ClientResponseT> clientStream,
Duration streamIdleTimeout,
boolean expectInitialResponse) {
this.clientStream = clientStream;
this.requestStream = streamFactory.New(this);
this.expectInitial = expectInitialResponse;
this.streamIdleTimer = new StreamIdleTimer(streamIdleTimeout, this::onStreamIdle);
this.requestStream = streamFactory.New(this);
}

protected SingleConnection(
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
ResponseObserver<ClientResponseT> clientStream) {
this(streamFactory, clientStream, /*expectInitialResponse=*/ true);
this(streamFactory, clientStream, DEFAULT_STREAM_IDLE_TIMEOUT, /*expectInitialResponse=*/ true);
}

protected void initialize(StreamRequestT initialRequest) {
Expand Down Expand Up @@ -122,20 +129,33 @@ protected boolean isCompleted() {
}
}

@Override
public void close() {
// Records the connection as completed and performs tear down, if not already completed. Returns
// whether the connection was already complete.
private boolean completeStream() {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
if (completed) {
return true;
}
completed = true;
streamIdleTimer.close();
} catch (Exception e) {
log.atSevere().withCause(e).log("Error occurred while shutting down connection.");
}
return false;
}

@Override
public void close() {
if (completeStream()) {
return;
}
requestStream.closeSend();
clientStream.onComplete();
}

private void abort(CheckedApiException error) {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
completed = true;
if (completeStream()) {
return;
}
requestStream.closeSendWithError(error.underlying);
clientStream.onError(error);
Expand All @@ -149,6 +169,7 @@ public void onStart(StreamController streamController) {}
public void onResponse(StreamResponseT response) {
boolean isFirst;
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
streamIdleTimer.restart();
if (completed) {
log.atFine().log("Received response on stream after completion: %s", response);
return;
Expand All @@ -169,21 +190,23 @@ public void onResponse(StreamResponseT response) {

@Override
public void onError(Throwable t) {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
completed = true;
if (completeStream()) {
return;
}
clientStream.onError(t);
requestStream.closeSendWithError(t);
}

@Override
public void onComplete() {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
completed = true;
if (completeStream()) {
return;
}
clientStream.onComplete();
requestStream.closeSend();
}

private void onStreamIdle() {
onError(new CheckedApiException("Detected idle stream.", Code.ABORTED));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import static com.google.common.collect.Comparators.min;

import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import java.time.Duration;
import java.util.concurrent.Future;
import javax.annotation.concurrent.GuardedBy;

/** An approximate timer used to detect idle streams. */
class StreamIdleTimer implements AutoCloseable {
/** Handles a timeout. */
interface Handler {
void onTimeout();
}

private static final long POLL_DIVISOR = 4;
private static final Duration MAX_POLL_INTERVAL = Duration.ofMinutes(1);

@VisibleForTesting
static Duration getDelay(Duration timeout) {
return min(MAX_POLL_INTERVAL, timeout.dividedBy(POLL_DIVISOR));
}

private final Duration timeout;
private final Handler handler;
private final Future<?> task;

@GuardedBy("this")
private final Stopwatch stopwatch;

/**
* Creates a started timer.
*
* @param timeout Call the handler after this duration has elapsed. The call may be delayed up to
* (timeout / POLL_DIVISOR) after the timeout duration.
* @param handler Called after the timeout has expired and the timer is running.
*/
StreamIdleTimer(Duration timeout, Handler handler) {
this(timeout, handler, Ticker.systemTicker(), AlarmFactory.create(getDelay(timeout)));
}

@VisibleForTesting
StreamIdleTimer(Duration timeout, Handler handler, Ticker ticker, AlarmFactory alarmFactory) {
this.timeout = timeout;
this.handler = handler;
this.stopwatch = Stopwatch.createStarted(ticker);
this.task = alarmFactory.newAlarm(this::onPoll);
}

@Override
public void close() throws Exception {
task.cancel(false);
}

/** Restart the timer from zero. */
public synchronized void restart() {
stopwatch.reset().start();
}

private synchronized void onPoll() {
if (stopwatch.elapsed().compareTo(timeout) > 0) {
SystemExecutors.getFuturesExecutor().execute(handler::onTimeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;

public class SystemExecutors {
Expand All @@ -31,8 +32,13 @@ private static ThreadFactory newDaemonThreadFactory(String prefix) {
}

public static ScheduledExecutorService newDaemonExecutor(String prefix) {
return Executors.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()), newDaemonThreadFactory(prefix));
ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors()),
newDaemonThreadFactory(prefix));
// Remove scheduled tasks from the executor as soon as they are cancelled.
executor.setRemoveOnCancelPolicy(true);
return executor;
}

private static final Lazy<ScheduledExecutorService> ALARM_EXECUTOR =
Expand Down
Loading

0 comments on commit f3678b7

Please sign in to comment.