diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index 3a23ec55780..75dd1b96659 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -21,11 +21,41 @@ UTF-8 + + + + io.opentelemetry + opentelemetry-bom + 1.41.0 + pom + import + + + + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-sdk + + + com.google.cloud.opentelemetry + exporter-trace + 0.29.0 + com.google.cloud google-cloud-spanner + + com.google.cloud + google-cloud-trace + 2.47.0 + io.grpc grpc-api diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index 443a8faf238..a493aef0621 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; import com.google.api.gax.retrying.RetrySettings; @@ -70,15 +71,21 @@ import com.google.cloud.spanner.TimestampBound; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.Type; import com.google.cloud.spanner.Value; import com.google.cloud.spanner.encryption.CustomerManagedEncryption; import com.google.cloud.spanner.v1.stub.SpannerStubSettings; +import com.google.cloud.trace.v1.TraceServiceClient; +import com.google.cloud.trace.v1.TraceServiceSettings; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.devtools.cloudtrace.v1.GetTraceRequest; +import com.google.devtools.cloudtrace.v1.Trace; +import com.google.devtools.cloudtrace.v1.TraceSpan; import com.google.longrunning.Operation; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; @@ -152,6 +159,9 @@ import com.google.spanner.v1.TypeCode; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -332,24 +342,28 @@ public void startRWTransaction() throws Exception { // Try to commit return null; }; + io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current(); Runnable runnable = - () -> { - try { - runner = - optimistic - ? dbClient.readWriteTransaction(Options.optimisticLock()) - : dbClient.readWriteTransaction(); - LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); - runner.run(callable); - transactionSucceeded(runner.getCommitTimestamp().toProto()); - } catch (SpannerException e) { - LOGGER.log( - Level.WARNING, - String.format("Transaction runnable failed with exception %s\n", e.getMessage()), - e); - transactionFailed(e); - } - }; + context.wrap( + () -> { + try { + runner = + optimistic + ? dbClient.readWriteTransaction(Options.optimisticLock()) + : dbClient.readWriteTransaction(); + LOGGER.log( + Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); + runner.run(callable); + transactionSucceeded(runner.getCommitTimestamp().toProto()); + } catch (SpannerException e) { + LOGGER.log( + Level.WARNING, + String.format( + "Transaction runnable failed with exception %s\n", e.getMessage()), + e); + transactionFailed(e); + } + }); LOGGER.log( Level.INFO, String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed)); @@ -815,6 +829,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex .setHost(HOST_PREFIX + WorkerProxy.spannerPort) .setCredentials(credentials) .setChannelProvider(channelProvider) + .setEnableServerSideTracing(true) + .setOpenTelemetry(WorkerProxy.openTelemetrySdk) .setSessionPoolOption(sessionPoolOptions); SpannerStubSettings.Builder stubSettingsBuilder = @@ -838,6 +854,70 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex return optionsBuilder.build().getService(); } + private TraceServiceClient traceServiceClient; + + // Return the trace service client, create one if not exists. + private synchronized TraceServiceClient getTraceServiceClient() throws IOException { + if (traceServiceClient != null) { + return traceServiceClient; + } + // Create a trace service client + Credentials credentials; + if (WorkerProxy.serviceKeyFile.isEmpty()) { + credentials = NoCredentials.getInstance(); + } else { + credentials = + GoogleCredentials.fromStream( + new ByteArrayInputStream( + FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))), + HTTP_TRANSPORT_FACTORY); + } + + TransportChannelProvider transportChannelProvider = + CloudUtil.newCloudTraceChannelProviderHelper(WorkerProxy.CLOUD_TRACE_ENDPOINT, 443); + + TraceServiceSettings traceServiceSettings = + TraceServiceSettings.newBuilder() + .setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) + .setTransportChannelProvider(transportChannelProvider) + .build(); + + traceServiceClient = TraceServiceClient.create(traceServiceSettings); + return traceServiceClient; + } + + private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; + private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; + + /* Handles verification of OpenTelemetry traces for server side tracing feature. */ + public boolean verifyExportedEndToEndTrace(String traceId) { + try { + GetTraceRequest getTraceRequest = + GetTraceRequest.newBuilder() + .setProjectId(WorkerProxy.PROJECT_ID) + .setTraceId(traceId) + .build(); + Trace trace = getTraceServiceClient().getTrace(getTraceRequest); + boolean readWriteorReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false; + for (TraceSpan span : trace.getSpansList()) { + if (span.getName() == READ_ONLY_TRANSACTION || span.getName() == READ_WRITE_TRANSACTION) { + readWriteorReadOnlyTxnPresent = true; + } + if (span.getName().startsWith("Spanner.")) { + spannerServerSideSpanPresent = true; + } + } + if (readWriteorReadOnlyTxnPresent && !spannerServerSideSpanPresent) { + return false; + } + } catch (IOException e) { + LOGGER.log(Level.WARNING, "failed to verify end to end traces.", e); + return false; + } + return true; + } + /** Handle actions. */ public Status startHandlingRequest( SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) { @@ -862,17 +942,20 @@ public Status startHandlingRequest( useMultiplexedSession = false; } + io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current(); actionThreadPool.execute( - () -> { - Status status = - executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext); - if (!status.isOk()) { - LOGGER.log( - Level.WARNING, - String.format("Failed to execute action with error: %s\n%s", status, action)); - executionContext.onError(status.getCause()); - } - }); + context.wrap( + () -> { + Status status = + executeAction( + outcomeSender, action, dbPath, useMultiplexedSession, executionContext); + if (!status.isOk()) { + LOGGER.log( + Level.WARNING, + String.format("Failed to execute action with error: %s\n%s", status, action)); + executionContext.onError(status.getCause()); + } + })); return Status.OK; } @@ -883,7 +966,12 @@ private Status executeAction( String dbPath, boolean useMultiplexedSession, ExecutionFlowContext executionContext) { - + Tracer tracer = + WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0"); + String spanName = String.format("performaction_%s", action.getActionCase().toString()); + LOGGER.log(Level.INFO, String.format("spanName: %s", spanName)); + Span span = tracer.spanBuilder(spanName).startSpan(); + Scope scope = span.makeCurrent(); try { if (action.hasAdmin()) { return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender); @@ -956,11 +1044,15 @@ private Status executeAction( ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action))); } } catch (Exception e) { + span.recordException(e); LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage()); return outcomeSender.finishWithError( toStatus( SpannerExceptionFactory.newSpannerException( ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage()))); + } finally { + scope.close(); + span.end(); } } diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java index d2e7d9b19d1..a42c3797c15 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java @@ -26,6 +26,10 @@ import com.google.spanner.executor.v1.SpannerOptions; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Scope; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,9 +44,16 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP // Ratio of operations to use multiplexed sessions. private final double multiplexedSessionOperationsRatio; + // Count of checks performed to verify end to end traces using Cloud Trace APIs. + private int cloudTraceCheckCount; + + // Maximum checks allowed to verify end to end traces using Cloud Trace APIs. + private static final int MAX_CLOUD_TRACE_CHECK_LIMIT = 20; + public CloudExecutorImpl( boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) { clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector); + this.cloudTraceCheckCount = 0; this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio; } @@ -50,9 +61,20 @@ public CloudExecutorImpl( @Override public StreamObserver executeActionAsync( StreamObserver responseObserver) { + // Create a top-level OpenTelemetry span for streaming request. + Tracer tracer = + WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0"); + Span span = tracer.spanBuilder("java_systest_execute_actions_stream").setNoParent().startSpan(); + Scope scope = span.makeCurrent(); + + final String traceId = span.getSpanContext().getTraceId(); + final boolean isSampled = span.getSpanContext().getTraceFlags().isSampled(); + AtomicBoolean requestHasReadOrQueryAction = new AtomicBoolean(false); + CloudClientExecutor.ExecutionFlowContext executionContext = clientExecutor.new ExecutionFlowContext(responseObserver); return new StreamObserver() { + @Override public void onNext(SpannerAsyncActionRequest request) { LOGGER.log(Level.INFO, String.format("Receiving request: \n%s", request)); @@ -86,6 +108,11 @@ public void onNext(SpannerAsyncActionRequest request) { Level.INFO, String.format("Updated request to set multiplexed session flag: \n%s", request)); } + String actionName = request.getAction().getActionCase().toString(); + if (actionName == "READ" || actionName == "QUERY") { + requestHasReadOrQueryAction.set(true); + } + Status status = clientExecutor.startHandlingRequest(request, executionContext); if (!status.isOk()) { LOGGER.log( @@ -104,9 +131,26 @@ public void onError(Throwable t) { @Override public void onCompleted() { + if (isSampled + && cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT + && requestHasReadOrQueryAction.get()) { + cloudTraceCheckCount++; + if (!clientExecutor.verifyExportedEndToEndTrace(traceId)) { + executionContext.onError( + Status.INTERNAL + .withDescription( + String.format( + "failed to verify end to end trace for trace_id: %s", traceId)) + .getCause()); + executionContext.cleanup(); + return; + } + } LOGGER.log(Level.INFO, "Client called Done, half closed"); executionContext.cleanup(); responseObserver.onCompleted(); + scope.close(); + span.end(); } }; } diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java index 30a4d98a354..96338c3b7ea 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudUtil.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.TransportChannel; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.spanner.spi.v1.TraceContextInterceptor; import com.google.common.net.HostAndPort; import io.grpc.ManagedChannelBuilder; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; @@ -51,7 +52,7 @@ public class CloudUtil { public static TransportChannelProvider newChannelProviderHelper(int port) { NettyChannelBuilder builder = (NettyChannelBuilder) - getChannelBuilderForTestGFE("localhost", port, WorkerProxy.cert, TEST_HOST_IN_CERT) + getChannelBuilder("localhost", port, WorkerProxy.cert, TEST_HOST_IN_CERT) .maxInboundMessageSize(100 * 1024 * 1024 /* 100 MB */); if (WorkerProxy.usePlainTextChannel) { builder.usePlaintext(); @@ -64,7 +65,23 @@ public static TransportChannelProvider newChannelProviderHelper(int port) { return FixedTransportChannelProvider.create(channel); } - public static ManagedChannelBuilder getChannelBuilderForTestGFE( + public static TransportChannelProvider newCloudTraceChannelProviderHelper(String host, int port) { + NettyChannelBuilder builder = + (NettyChannelBuilder) + getChannelBuilder(host, port, WorkerProxy.rootCert, "") + .maxInboundMessageSize(100 * 1024 * 1024 /* 100 MB */); + if (WorkerProxy.usePlainTextChannel) { + builder.usePlaintext(); + } + TransportChannel channel = + GrpcTransportChannel.newBuilder() + .setManagedChannel( + builder.maxInboundMetadataSize(GRPC_MAX_HEADER_LIST_SIZE_BYTES).build()) + .build(); + return FixedTransportChannelProvider.create(channel); + } + + public static ManagedChannelBuilder getChannelBuilder( String host, int sslPort, String certPath, String hostInCert) { SslContext sslContext; try { @@ -91,6 +108,7 @@ public static ManagedChannelBuilder getChannelBuilderForTestGFE( return channelBuilder .overrideAuthority(hostInCert) .sslContext(sslContext) + .intercept(new TraceContextInterceptor(WorkerProxy.openTelemetrySdk)) .negotiationType(NegotiationType.TLS); } catch (Throwable t) { throw new RuntimeException(t); diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java index 61034754f20..098b7a92a6d 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java @@ -16,12 +16,34 @@ package com.google.cloud.executor.spanner; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.auth.Credentials; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.trace.v2.stub.TraceServiceStub; +import com.google.cloud.trace.v2.stub.TraceServiceStubSettings; + import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.protobuf.services.ProtoReflectionService; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,6 +52,7 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.FileUtils; /** * Worker proxy for Java API. This is the main entry of the Java client proxy on cloud Spanner Java @@ -42,6 +65,7 @@ public class WorkerProxy { private static final String OPTION_SPANNER_PORT = "spanner_port"; private static final String OPTION_PROXY_PORT = "proxy_port"; private static final String OPTION_CERTIFICATE = "cert"; + private static final String OPTION_ROOT_CERTIFICATE = "root_cert"; private static final String OPTION_SERVICE_KEY_FILE = "service_key_file"; private static final String OPTION_USE_PLAIN_TEXT_CHANNEL = "use_plain_text_channel"; private static final String OPTION_ENABLE_GRPC_FAULT_INJECTOR = "enable_grpc_fault_injector"; @@ -51,17 +75,65 @@ public class WorkerProxy { public static int spannerPort = 0; public static int proxyPort = 0; public static String cert = ""; + public static String rootCert = ""; public static String serviceKeyFile = ""; public static double multiplexedSessionOperationsRatio = 0.0; public static boolean usePlainTextChannel = false; public static boolean enableGrpcFaultInjector = false; + public static OpenTelemetrySdk openTelemetrySdk; public static CommandLine commandLine; + public static final String PROJECT_ID = "spanner-cloud-systest"; + public static final String CLOUD_TRACE_ENDPOINT = "staging-cloudtrace.sandbox.googleapis.com:443"; + private static final int MIN_PORT = 0, MAX_PORT = 65535; private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0; + public static OpenTelemetrySdk setupOpenTelemetrySdk() throws Exception { + // Read credentials from the serviceKeyFile. + HttpTransportFactory HTTP_TRANSPORT_FACTORY = NetHttpTransport::new; + Credentials credentials = + GoogleCredentials.fromStream( + new ByteArrayInputStream(FileUtils.readFileToByteArray(new File(serviceKeyFile))), + HTTP_TRANSPORT_FACTORY); + + TransportChannelProvider transportChannelProvider = + CloudUtil.newCloudTraceChannelProviderHelper(CLOUD_TRACE_ENDPOINT, 443); + + // Create Cloud Trace Service Stub. + TraceServiceStub traceServiceStub = + TraceServiceStubSettings.newBuilder() + .setEndpoint(CLOUD_TRACE_ENDPOINT) + .setTransportChannelProvider(transportChannelProvider) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)) + .build() + .createStub(); + + // OpenTelemetry configuration. + SpanExporter spanExporter = + TraceExporter.createWithConfiguration( + TraceConfiguration.builder() + .setProjectId(PROJECT_ID) + .setCredentials(credentials) + .setTraceServiceStub(traceServiceStub) + .build()); + return OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .setResource(Resource.getDefault()) + .setSampler(Sampler.parentBased(Sampler.traceIdRatioBased(0.01))) + .build()) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .build(); + } + public static void main(String[] args) throws Exception { + // Enable OpenTelemetry metrics and traces before injecting Opentelemetry. + SpannerOptions.enableOpenTelemetryMetrics(); + SpannerOptions.enableOpenTelemetryTraces(); + commandLine = buildOptions(args); if (!commandLine.hasOption(OPTION_SPANNER_PORT)) { @@ -92,6 +164,14 @@ public static void main(String[] args) throws Exception { "Certificate need to be assigned in order to start worker proxy."); } cert = commandLine.getOptionValue(OPTION_CERTIFICATE); + + if (!commandLine.hasOption(OPTION_ROOT_CERTIFICATE)) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INVALID_ARGUMENT, + "Root certificate need to be assigned in order to start worker proxy."); + } + rootCert = commandLine.getOptionValue(OPTION_ROOT_CERTIFICATE); + if (commandLine.hasOption(OPTION_SERVICE_KEY_FILE)) { serviceKeyFile = commandLine.getOptionValue(OPTION_SERVICE_KEY_FILE); } @@ -117,6 +197,8 @@ public static void main(String[] args) throws Exception { + MAX_RATIO); } } + // Setup the OpenTelemetry for tracing. + openTelemetrySdk = setupOpenTelemetrySdk(); Server server; while (true) { @@ -151,6 +233,8 @@ private static CommandLine buildOptions(String[] args) { options.addOption(null, OPTION_PROXY_PORT, true, "Proxy port to start worker proxy on."); options.addOption( null, OPTION_CERTIFICATE, true, "Certificate used to connect to Spanner GFE."); + options.addOption( + null, OPTION_ROOT_CERTIFICATE, true, "Root certificate used for calls to Cloud Trace API"); options.addOption( null, OPTION_SERVICE_KEY_FILE, true, "Service key file used to set authentication."); options.addOption(