Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add changes in Spanner executor for testing end to end tracing #3264

Merged
merged 13 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions google-cloud-spanner-executor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,40 @@
</properties>

<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-common</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-trace</artifactId>
<version>0.32.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-spanner</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-trace</artifactId>
<version>2.52.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
Expand Down Expand Up @@ -94,6 +124,11 @@
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-spanner-executor-v1</artifactId>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-trace-v1</artifactId>
<version>2.52.0</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-spanner-executor-v1</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.spanner.TransactionRunner.TransactionCallable;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.paging.Page;
import com.google.api.gax.retrying.RetrySettings;
Expand Down Expand Up @@ -70,15 +71,21 @@
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.encryption.CustomerManagedEncryption;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.cloud.trace.v1.TraceServiceClient;
import com.google.cloud.trace.v1.TraceServiceSettings;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.cloudtrace.v1.GetTraceRequest;
import com.google.devtools.cloudtrace.v1.Trace;
import com.google.devtools.cloudtrace.v1.TraceSpan;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
Expand Down Expand Up @@ -152,6 +159,9 @@
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
Expand All @@ -166,7 +176,9 @@
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -332,24 +344,28 @@ public void startRWTransaction() throws Exception {
// Try to commit
return null;
};
io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
Runnable runnable =
() -> {
try {
runner =
optimistic
? dbClient.readWriteTransaction(Options.optimisticLock())
: dbClient.readWriteTransaction();
LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
transactionSucceeded(runner.getCommitTimestamp().toProto());
} catch (SpannerException e) {
LOGGER.log(
Level.WARNING,
String.format("Transaction runnable failed with exception %s\n", e.getMessage()),
e);
transactionFailed(e);
}
};
context.wrap(
() -> {
try {
runner =
optimistic
? dbClient.readWriteTransaction(Options.optimisticLock())
: dbClient.readWriteTransaction();
LOGGER.log(
Level.INFO, String.format("Ready to run callable %s\n", transactionSeed));
runner.run(callable);
transactionSucceeded(runner.getCommitTimestamp().toProto());
} catch (SpannerException e) {
LOGGER.log(
Level.WARNING,
String.format(
"Transaction runnable failed with exception %s\n", e.getMessage()),
e);
transactionFailed(e);
}
});
LOGGER.log(
Level.INFO,
String.format("Callable and Runnable created, ready to execute %s\n", transactionSeed));
Expand Down Expand Up @@ -753,6 +769,11 @@ public synchronized void closeBatchTxn() throws SpannerException {
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("action-pool-%d").build());

// Thread pool to verify end to end traces.
private static final ExecutorService endToEndTracesThreadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("end-to-end-traces-pool-%d").build());

private synchronized Spanner getClientWithTimeout(
long timeoutSeconds, boolean useMultiplexedSession) throws IOException {
if (clientWithTimeout != null) {
Expand Down Expand Up @@ -818,6 +839,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
.setHost(HOST_PREFIX + WorkerProxy.spannerPort)
.setCredentials(credentials)
.setChannelProvider(channelProvider)
.setEnableEndToEndTracing(true)
.setOpenTelemetry(WorkerProxy.openTelemetrySdk)
.setSessionPoolOption(sessionPoolOptions);

SpannerStubSettings.Builder stubSettingsBuilder =
Expand All @@ -841,6 +864,88 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
return optionsBuilder.build().getService();
}

private TraceServiceClient traceServiceClient;

// Return the trace service client, create one if not exists.
private synchronized TraceServiceClient getTraceServiceClient() throws IOException {
if (traceServiceClient != null) {
return traceServiceClient;
}
// Create a trace service client
Credentials credentials;
if (WorkerProxy.serviceKeyFile.isEmpty()) {
credentials = NoCredentials.getInstance();
} else {
credentials =
GoogleCredentials.fromStream(
new ByteArrayInputStream(
FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))),
HTTP_TRANSPORT_FACTORY);
}

TraceServiceSettings traceServiceSettings =
TraceServiceSettings.newBuilder()
.setEndpoint(WorkerProxy.CLOUD_TRACE_ENDPOINT)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.build();

traceServiceClient = TraceServiceClient.create(traceServiceSettings);
return traceServiceClient;
}

public Future<Boolean> getEndToEndTraceVerificationTask(String traceId) {
return endToEndTracesThreadPool.submit(
() -> {
try {
// Wait for 10 seconds before verifying to ensure traces are exported.
long sleepDuration = TimeUnit.SECONDS.toMillis(10);
LOGGER.log(
Level.INFO,
String.format(
"Sleeping for %d milliseconds before verifying end to end trace",
sleepDuration));
Thread.sleep(sleepDuration);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Handle interruption
LOGGER.log(Level.INFO, String.format("Thread interrupted."));
return false; // Return false if interrupted
}
return isExportedEndToEndTraceValid(traceId);
});
}

private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction";
private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction";

/* Returns whether a exported trace is valid. */
public boolean isExportedEndToEndTraceValid(String traceId) {
try {
GetTraceRequest getTraceRequest =
GetTraceRequest.newBuilder()
.setProjectId(WorkerProxy.PROJECT_ID)
.setTraceId(traceId)
.build();
Trace trace = getTraceServiceClient().getTrace(getTraceRequest);
boolean readWriteOrReadOnlyTxnPresent = false, spannerServerSideSpanPresent = false;
for (TraceSpan span : trace.getSpansList()) {
if (span.getName().contains(READ_ONLY_TRANSACTION)
|| span.getName().contains(READ_WRITE_TRANSACTION)) {
readWriteOrReadOnlyTxnPresent = true;
}
if (span.getName().startsWith("Spanner.")) {
spannerServerSideSpanPresent = true;
}
}
if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent) {
return false;
}
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failed to verify end to end trace.", e);
return false;
}
return true;
}

/** Handle actions. */
public Status startHandlingRequest(
SpannerAsyncActionRequest req, ExecutionFlowContext executionContext) {
Expand All @@ -865,17 +970,20 @@ public Status startHandlingRequest(
useMultiplexedSession = false;
}

io.opentelemetry.context.Context context = io.opentelemetry.context.Context.current();
actionThreadPool.execute(
() -> {
Status status =
executeAction(outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
if (!status.isOk()) {
LOGGER.log(
Level.WARNING,
String.format("Failed to execute action with error: %s\n%s", status, action));
executionContext.onError(status.getCause());
}
});
context.wrap(
() -> {
Status status =
executeAction(
outcomeSender, action, dbPath, useMultiplexedSession, executionContext);
if (!status.isOk()) {
LOGGER.log(
Level.WARNING,
String.format("Failed to execute action with error: %s\n%s", status, action));
executionContext.onError(status.getCause());
}
}));
return Status.OK;
}

Expand All @@ -886,7 +994,10 @@ private Status executeAction(
String dbPath,
boolean useMultiplexedSession,
ExecutionFlowContext executionContext) {

Tracer tracer = WorkerProxy.openTelemetrySdk.getTracer(CloudClientExecutor.class.getName());
String actionType = action.getActionCase().toString();
Span span = tracer.spanBuilder(String.format("performaction_%s", actionType)).startSpan();
Scope scope = span.makeCurrent();
try {
if (action.hasAdmin()) {
return executeAdminAction(useMultiplexedSession, action.getAdmin(), outcomeSender);
Expand Down Expand Up @@ -959,11 +1070,15 @@ private Status executeAction(
ErrorCode.UNIMPLEMENTED, "Not implemented yet: \n" + action)));
}
} catch (Exception e) {
span.recordException(e);
LOGGER.log(Level.WARNING, "Unexpected error: " + e.getMessage());
return outcomeSender.finishWithError(
toStatus(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
} finally {
scope.close();
span.end();
}
}

Expand Down
Loading
Loading