diff --git a/google-cloud-datastore/pom.xml b/google-cloud-datastore/pom.xml index 19e045ccb..3a768c4a4 100644 --- a/google-cloud-datastore/pom.xml +++ b/google-cloud-datastore/pom.xml @@ -147,6 +147,10 @@ opentelemetry-context ${opentelemetry.version} + + io.opentelemetry.instrumentation + opentelemetry-grpc-1.6 + diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java index 1ea79298c..242ce3b01 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java @@ -18,6 +18,7 @@ import static com.google.cloud.datastore.Validator.validateNamespace; +import com.google.api.core.ApiFunction; import com.google.api.core.BetaApi; import com.google.api.gax.grpc.ChannelPoolSettings; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; @@ -35,6 +36,7 @@ import com.google.cloud.http.HttpTransportOptions; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; +import io.grpc.ManagedChannelBuilder; import java.io.IOException; import java.lang.reflect.Method; import java.util.Objects; @@ -222,20 +224,37 @@ private DatastoreOptions(Builder builder) { throw new IllegalArgumentException( "Only gRPC transport allows setting of channel provider or credentials provider"); } else if (getTransportOptions() instanceof GrpcTransportOptions) { - // For grpc transport options, configure default gRPC Connection pool with minChannelCount = 1 - // and maxChannelCount = 4 - this.channelProvider = - builder.channelProvider != null - ? builder.channelProvider - : GrpcTransportOptions.setUpChannelProvider( + if (builder.channelProvider == null) { + /* + The default gRPC connection pool is configured with a minimum of 1 channel. + The maximum channel count automatically defaults to 200 (Defined in gax-grpc). + */ + ChannelPoolSettings datastoreChannelPoolSettings = + ChannelPoolSettings.builder() + .setInitialChannelCount(INIT_CHANNEL_COUNT) + .setMinChannelCount(MIN_CHANNEL_COUNT) + .build(); + + ApiFunction channelConfigurator = + this.traceUtil.getChannelConfigurator(); + if (channelConfigurator == null) { + this.channelProvider = + GrpcTransportOptions.setUpChannelProvider( DatastoreSettings.defaultGrpcTransportProviderBuilder() - .setChannelPoolSettings( - ChannelPoolSettings.builder() - .setInitialChannelCount(INIT_CHANNEL_COUNT) - .setMinChannelCount(MIN_CHANNEL_COUNT) - .setMaxChannelCount(MAX_CHANNEL_COUNT) - .build()), + .setChannelPoolSettings(datastoreChannelPoolSettings), this); + } else { + // Intercept the grpc channel calls to add telemetry info. + this.channelProvider = + GrpcTransportOptions.setUpChannelProvider( + DatastoreSettings.defaultGrpcTransportProviderBuilder() + .setChannelPoolSettings(datastoreChannelPoolSettings) + .setChannelConfigurator(channelConfigurator), + this); + } + } else { + this.channelProvider = builder.channelProvider; + } } } @@ -256,6 +275,7 @@ protected String getDefaultProject() { } private static class DatastoreDefaults implements ServiceDefaults { + private final TransportOptions TRANSPORT_OPTIONS = getDefaultTransportOptionsBuilder().build(); @Override diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/GrpcDatastoreRpc.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/GrpcDatastoreRpc.java index ea9043bb9..f9e3a34f3 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/GrpcDatastoreRpc.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/GrpcDatastoreRpc.java @@ -75,7 +75,7 @@ public GrpcDatastoreRpc(DatastoreOptions datastoreOptions) throws IOException { ? getClientContextForEmulator(datastoreOptions) : getClientContext(datastoreOptions); - /* For grpc transport options, configure default gRPC Connection pool with minChannelCount = 1 and maxChannelCount = 4 */ + /* For grpc transport options, configure default gRPC Connection pool with minChannelCount = 1 */ DatastoreStubSettings datastoreStubSettings = DatastoreStubSettings.newBuilder(clientContext) .applyToAllUnaryMethods(retrySettingSetter(datastoreOptions)) @@ -85,7 +85,6 @@ public GrpcDatastoreRpc(DatastoreOptions datastoreOptions) throws IOException { ChannelPoolSettings.builder() .setInitialChannelCount(DatastoreOptions.INIT_CHANNEL_COUNT) .setMinChannelCount(DatastoreOptions.MIN_CHANNEL_COUNT) - .setMaxChannelCount(DatastoreOptions.MAX_CHANNEL_COUNT) .build()) .build()) .build(); diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java index 40fc7308e..711e94020 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java @@ -35,6 +35,8 @@ import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; +import io.opentelemetry.instrumentation.grpc.v1_6.GrpcTelemetry; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -66,14 +68,30 @@ public OpenTelemetry getOpenTelemetry() { return openTelemetry; } + // The gRPC channel configurator that intercepts gRPC calls for tracing purposes. + public class OpenTelemetryGrpcChannelConfigurator + implements ApiFunction { + + @Override + public ManagedChannelBuilder apply(ManagedChannelBuilder managedChannelBuilder) { + GrpcTelemetry grpcTelemetry = GrpcTelemetry.create(getOpenTelemetry()); + return managedChannelBuilder.intercept(grpcTelemetry.newClientInterceptor()); + } + } + @Override @Nullable public ApiFunction getChannelConfigurator() { - // TODO(jimit) Update this to return a gRPC Channel Configurator after gRPC upgrade. - return null; + // Note: using `==` rather than `.equals` since OpenTelemetry has only 1 static instance of + // `TracerProvider.noop`. + if (openTelemetry.getTracerProvider() == TracerProvider.noop()) { + return null; + } + return new OpenTelemetryGrpcChannelConfigurator(); } static class Span implements TraceUtil.Span { + private final io.opentelemetry.api.trace.Span span; private final String spanName; @@ -198,6 +216,7 @@ public Scope makeCurrent() { } static class Scope implements TraceUtil.Scope { + private final io.opentelemetry.context.Scope scope; Scope(io.opentelemetry.context.Scope scope) { @@ -211,6 +230,7 @@ public void close() { } static class Context implements TraceUtil.Context { + private final io.opentelemetry.context.Context context; Context(io.opentelemetry.context.Context context) { diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java index c6a26d05d..0c25c3b6c 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java @@ -41,6 +41,7 @@ public class DatastoreOptionsTest { private static final String PROJECT_ID = "project-id"; private static final String DATABASE_ID = "database-id"; private static final int PORT = 8080; + private static final int DEFAULT_MAX_CHANNEL_COUNT = 200; private DatastoreRpcFactory datastoreRpcFactory; private DatastoreRpc datastoreRpc; private DatastoreOptions.Builder options; @@ -119,7 +120,7 @@ public void testGrpcDefaultChannelConfigurations() { .getChannelPoolSettings(); assertEquals(channelPoolSettings.getInitialChannelCount(), DatastoreOptions.INIT_CHANNEL_COUNT); assertEquals(channelPoolSettings.getMinChannelCount(), DatastoreOptions.MIN_CHANNEL_COUNT); - assertEquals(channelPoolSettings.getMaxChannelCount(), DatastoreOptions.MAX_CHANNEL_COUNT); + assertEquals(channelPoolSettings.getMaxChannelCount(), DEFAULT_MAX_CHANNEL_COUNT); } @Test diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java index 50d7b6820..f076c0966 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java @@ -66,12 +66,12 @@ public void usesOpenTelemetryFromOptions() { @Test public void usesGlobalOpenTelemetryIfOpenTelemetryInstanceNotProvided() { OpenTelemetrySdk ignored = OpenTelemetrySdk.builder().buildAndRegisterGlobal(); - DatastoreOptions firestoreOptions = + DatastoreOptions datastoreOptions = getBaseOptions() .setOpenTelemetryOptions( DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()) .build(); - EnabledTraceUtil traceUtil = new EnabledTraceUtil(firestoreOptions); + EnabledTraceUtil traceUtil = new EnabledTraceUtil(datastoreOptions); assertThat(traceUtil.getOpenTelemetry()).isEqualTo(GlobalOpenTelemetry.get()); } @@ -80,6 +80,34 @@ public void enabledTraceUtilProvidesChannelConfigurator() { assertThat(newEnabledTraceUtil().getChannelConfigurator()).isNull(); } + @Test + public void openTelemetryInstanceRegistersGrpcChannelConfigurator() { + OpenTelemetrySdk myOpenTelemetrySdk = OpenTelemetrySdk.builder().build(); + DatastoreOptions firestoreOptions = + getBaseOptions() + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder() + .setTracingEnabled(true) + .setOpenTelemetry(myOpenTelemetrySdk) + .build()) + .build(); + EnabledTraceUtil traceUtil = new EnabledTraceUtil(firestoreOptions); + assertThat(traceUtil.getChannelConfigurator()).isNotNull(); + } + + @Test + public void globalOpenTelemetryRegistersGrpcChannelConfigurator() { + + OpenTelemetrySdk.builder().buildAndRegisterGlobal(); + DatastoreOptions datastoreOptions = + getBaseOptions() + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()) + .build(); + EnabledTraceUtil traceUtil = new EnabledTraceUtil(datastoreOptions); + assertThat(traceUtil.getChannelConfigurator()).isNotNull(); + } + @Test public void usesEnabledContext() { assertThat(newEnabledTraceUtil().getCurrentContext() instanceof EnabledTraceUtil.Context)