Skip to content

Commit

Permalink
Add changes in Spanner executor for testing end to end tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
nareshz committed Aug 27, 2024
1 parent 0004919 commit fd42e1d
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 30 deletions.
30 changes: 30 additions & 0 deletions google-cloud-spanner-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,41 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.41.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-trace</artifactId>
<version>0.29.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-trace</artifactId>
<version>2.47.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.TransactionRunner.TransactionCallable;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.paging.Page;
import com.google.api.gax.retrying.RetrySettings;
Expand Down Expand Up @@ -70,15 +71,21 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.encryption.CustomerManagedEncryption;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.cloud.trace.v1.TraceServiceClient;
import com.google.cloud.trace.v1.TraceServiceSettings;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.cloudtrace.v1.GetTraceRequest;
import com.google.devtools.cloudtrace.v1.Trace;
import com.google.devtools.cloudtrace.v1.TraceSpan;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
Expand Down Expand Up @@ -152,6 +159,9 @@
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -332,24 +342,28 @@ public void startRWTransaction() throws Exception {
// Try to commit
return null;
};
io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
Runnable runnable =
() -> {
try {
runner =
optimistic
? dbClient.readWriteTransaction(Options.optimisticLock())
: dbClient.readWriteTransaction();
LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
transactionSucceeded(runner.getCommitTimestamp().toProto());
} catch (SpannerException e) {
LOGGER.log(
Level.WARNING,
String.format("Transaction runnable failed with exception %s\n", e.getMessage()),
e);
transactionFailed(e);
}
};
context.wrap(
() -> {
try {
runner =
optimistic
? dbClient.readWriteTransaction(Options.optimisticLock())
: dbClient.readWriteTransaction();
LOGGER.log(
Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
transactionSucceeded(runner.getCommitTimestamp().toProto());
} catch (SpannerException e) {
LOGGER.log(
Level.WARNING,
String.format(
"Transaction runnable failed with exception %s\n", e.getMessage()),
e);
transactionFailed(e);
}
});
LOGGER.log(
Level.INFO,
String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed));
Expand Down Expand Up @@ -815,6 +829,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
.setHost(HOST_PREFIX + WorkerProxy.spannerPort)
.setCredentials(credentials)
.setChannelProvider(channelProvider)
.setEnableServerSideTracing(true)
.setOpenTelemetry(WorkerProxy.openTelemetrySdk)
.setSessionPoolOption(sessionPoolOptions);

SpannerStubSettings.Builder stubSettingsBuilder =
Expand All @@ -838,6 +854,70 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
return optionsBuilder.build().getService();
}

private TraceServiceClient traceServiceClient;

// Return the trace service client, create one if not exists.
private synchronized TraceServiceClient getTraceServiceClient() throws IOException {
if (traceServiceClient != null) {
return traceServiceClient;
}
// Create a trace service client
Credentials credentials;
if (WorkerProxy.serviceKeyFile.isEmpty()) {
credentials = NoCredentials.getInstance();
} else {
credentials =
GoogleCredentials.fromStream(
new ByteArrayInputStream(
FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))),
HTTP_TRANSPORT_FACTORY);
}

TransportChannelProvider transportChannelProvider =
CloudUtil.newCloudTraceChannelProviderHelper(WorkerProxy.CLOUD_TRACE_ENDPOINT, 443);

TraceServiceSettings traceServiceSettings =
TraceServiceSettings.newBuilder()
.setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.setTransportChannelProvider(transportChannelProvider)
.build();

traceServiceClient = TraceServiceClient.create(traceServiceSettings);
return traceServiceClient;
}

private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";

/* Handles verification of OpenTelemetry traces for server side tracing feature. */
public boolean verifyExportedEndToEndTrace(String traceId) {
try {
GetTraceRequest getTraceRequest =
GetTraceRequest.newBuilder()
.setProjectId(WorkerProxy.PROJECT_ID)
.setTraceId(traceId)
.build();
Trace trace = getTraceServiceClient().getTrace(getTraceRequest);
boolean readWriteorReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false;
for (TraceSpan span : trace.getSpansList()) {
if (span.getName() == READ_ONLY_TRANSACTION || span.getName() == READ_WRITE_TRANSACTION) {
readWriteorReadOnlyTxnPresent = true;
}
if (span.getName().startsWith("Spanner.")) {
spannerServerSideSpanPresent = true;
}
}
if (readWriteorReadOnlyTxnPresent && !spannerServerSideSpanPresent) {
return false;
}
} catch (IOException e) {
LOGGER.log(Level.WARNING, "failed to verify end to end traces.", e);
return false;
}
return true;
}

/** Handle actions. */
public Status startHandlingRequest(
SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) {
Expand All @@ -862,17 +942,20 @@ public Status startHandlingRequest(
useMultiplexedSession = false;
}

io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
actionThreadPool.execute(
() -> {
Status status =
executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
if (!status.isOk()) {
LOGGER.log(
Level.WARNING,
String.format("Failed to execute action with error: %s\n%s", status, action));
executionContext.onError(status.getCause());
}
});
context.wrap(
() -> {
Status status =
executeAction(
outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
if (!status.isOk()) {
LOGGER.log(
Level.WARNING,
String.format("Failed to execute action with error: %s\n%s", status, action));
executionContext.onError(status.getCause());
}
}));
return Status.OK;
}

Expand All @@ -883,7 +966,12 @@ private Status executeAction(
String dbPath,
boolean useMultiplexedSession,
ExecutionFlowContext executionContext) {

Tracer tracer =
WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0");
String spanName = String.format("performaction_%s", action.getActionCase().toString());
LOGGER.log(Level.INFO, String.format("spanName: %s", spanName));
Span span = tracer.spanBuilder(spanName).startSpan();
Scope scope = span.makeCurrent();
try {
if (action.hasAdmin()) {
return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender);
Expand Down Expand Up @@ -956,11 +1044,15 @@ private Status executeAction(
ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action)));
}
} catch (Exception e) {
span.recordException(e);
LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage());
return outcomeSender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
} finally {
scope.close();
span.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
import com.google.spanner.executor.v1.SpannerOptions;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -40,19 +44,37 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP
// Ratio of operations to use multiplexed sessions.
private final double multiplexedSessionOperationsRatio;

// Count of checks performed to verify end to end traces using Cloud Trace APIs.
private int cloudTraceCheckCount;

// Maximum checks allowed to verify end to end traces using Cloud Trace APIs.
private static final int MAX_CLOUD_TRACE_CHECK_LIMIT = 20;

public CloudExecutorImpl(
boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) {
clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector);
this.cloudTraceCheckCount = 0;
this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio;
}

/** Execute SpannerAsync action requests. */
@Override
public StreamObserver<SpannerAsyncActionRequest> executeActionAsync(
StreamObserver<SpannerAsyncActionResponse> responseObserver) {
// Create a top-level OpenTelemetry span for streaming request.
Tracer tracer =
WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName(), "0.1.0");
Span span = tracer.spanBuilder("java_systest_execute_actions_stream").setNoParent().startSpan();
Scope scope = span.makeCurrent();

final String traceId = span.getSpanContext().getTraceId();
final boolean isSampled = span.getSpanContext().getTraceFlags().isSampled();
AtomicBoolean requestHasReadOrQueryAction = new AtomicBoolean(false);

CloudClientExecutor.ExecutionFlowContext executionContext =
clientExecutor.new ExecutionFlowContext(responseObserver);
return new StreamObserver<SpannerAsyncActionRequest>() {

@Override
public void onNext(SpannerAsyncActionRequest request) {
LOGGER.log(Level.INFO, String.format("Receiving request: \n%s", request));
Expand Down Expand Up @@ -86,6 +108,11 @@ public void onNext(SpannerAsyncActionRequest request) {
Level.INFO,
String.format("Updated request to set multiplexed session flag: \n%s", request));
}
String actionName = request.getAction().getActionCase().toString();
if (actionName == "READ" || actionName == "QUERY") {
requestHasReadOrQueryAction.set(true);
}

Status status = clientExecutor.startHandlingRequest(request, executionContext);
if (!status.isOk()) {
LOGGER.log(
Expand All @@ -104,9 +131,26 @@ public void onError(Throwable t) {

@Override
public void onCompleted() {
if (isSampled
&& cloudTraceCheckCount < MAX_CLOUD_TRACE_CHECK_LIMIT
&& requestHasReadOrQueryAction.get()) {
cloudTraceCheckCount++;
if (!clientExecutor.verifyExportedEndToEndTrace(traceId)) {
executionContext.onError(
Status.INTERNAL
.withDescription(
String.format(
"failed to verify end to end trace for trace_id: %s", traceId))
.getCause());
executionContext.cleanup();
return;
}
}
LOGGER.log(Level.INFO, "Client called Done, half closed");
executionContext.cleanup();
responseObserver.onCompleted();
scope.close();
span.end();
}
};
}
Expand Down
Loading

0 comments on commit fd42e1d

Please sign in to comment.