Skip to content

Commit

Permalink
Spanner: use WatchdogProvider (#3923)
Browse files Browse the repository at this point in the history
* change to watchdog provider
  • Loading branch information
yihanzhen authored Nov 9, 2018
1 parent c43e377 commit a7bd8a5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 440 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFunction;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.core.InstantiatingExecutorProvider;
Expand All @@ -29,12 +30,14 @@
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.InstantiatingWatchdogProvider;
import com.google.api.gax.rpc.OperationCallable;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.api.pathtemplate.PathTemplate;
import com.google.cloud.ServiceOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
Expand All @@ -54,6 +57,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
import com.google.protobuf.Empty;
Expand Down Expand Up @@ -101,8 +105,10 @@
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
public class GapicSpannerRpc implements SpannerRpc {
Expand All @@ -113,13 +119,25 @@ public class GapicSpannerRpc implements SpannerRpc {
PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}");
private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes
private static final String PROPERTY_TIMEOUT_SECONDS =
"com.google.cloud.spanner.watchdogTimeoutSeconds";
private static final String PROPERTY_PERIOD_SECONDS =
"com.google.cloud.spanner.watchdogPeriodSeconds";
private static final int DEFAULT_TIMEOUT_SECONDS = 30 * 60;
private static final int DEFAULT_PERIOD_SECONDS = 10;

private final SpannerStub spannerStub;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStub databaseAdminStub;
private final String projectId;
private final String projectName;
private final SpannerMetadataProvider metadataProvider;
private final Duration waitTimeout =
systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS);
private final Duration idleTimeout =
systemProperty(PROPERTY_TIMEOUT_SECONDS, DEFAULT_TIMEOUT_SECONDS);
private final Duration checkInterval =
systemProperty(PROPERTY_PERIOD_SECONDS, DEFAULT_PERIOD_SECONDS);

public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
Expand Down Expand Up @@ -173,6 +191,17 @@ public GapicSpannerRpc(SpannerOptions options) {
CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

WatchdogProvider watchdogProvider =
InstantiatingWatchdogProvider.create()
.withExecutor(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Cloud-Spanner-WatchdogProvider-%d")
.build()))
.withCheckInterval(checkInterval)
.withClock(NanoClock.getDefaultClock());

// Disabling retry for now because spanner handles retry in SpannerImpl.
// We will finally want to improve gax but for smooth transitioning we
// preserve the retry in SpannerImpl
Expand All @@ -184,6 +213,7 @@ public GapicSpannerRpc(SpannerOptions options) {
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
Expand All @@ -199,6 +229,7 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
Expand All @@ -213,6 +244,7 @@ public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
Expand Down Expand Up @@ -533,7 +565,7 @@ private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
return context;
return context.withStreamWaitTimeout(waitTimeout).withStreamIdleTimeout(idleTimeout);
}

public void shutdown() {
Expand Down Expand Up @@ -582,4 +614,9 @@ StreamController getController() {
return Preconditions.checkNotNull(this.controller);
}
}

private static Duration systemProperty(String name, int defaultValue) {
String stringValue = System.getProperty(name, "");
return Duration.ofSeconds(stringValue.isEmpty() ? defaultValue : Integer.parseInt(stringValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ public class SpannerInterceptorProvider implements GrpcInterceptorProvider {
private static final List<ClientInterceptor> defaultInterceptors =
ImmutableList.of(
new SpannerErrorInterceptor(),
new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER),
WatchdogInterceptor.newDefaultWatchdogInterceptor());
new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER));

private final List<ClientInterceptor> clientInterceptors;

Expand Down

This file was deleted.

Loading

0 comments on commit a7bd8a5

Please sign in to comment.