Skip to content

Commit

Permalink
fix: Fixed Span nesting for ReadWriteTransactionCallable by using p…
Browse files Browse the repository at this point in the history
…arent SpanContext instead of just parent Context (#1495)

* fix: Fixed the TraceUtil.startSpan method to use `SpanContext` for linking with the parent instead of `Context`.
- This fixes the hierarchy of Spans appearing in a transaction under a Run method.
- Tested using existing transaction test

* Fixed commit reordering and typos

* fix: lint errors

* fix: Refactored the ReadWriteTransactioncallable.call method to use startSpan idiomatically
- TraceUtil.startSpan needs more debugging
- return DefaultTracerProvider instance (no-op) when initializing DisabledTraceUtil - this fixes the unit tests in DatastoreTest.testRunInTransactionWithReadWriteOption

* feat: Added tracing for Transaction.RunQuery (#1499)

* feat: Added span for Transactional RunQuery
- tested

* fix: lint

* fix: patch apply issues

* fix: refactor using boolean flag

* fix: s/startSpan/startSpanWithParentContext
  • Loading branch information
jimit-j-shah authored Jun 27, 2024
1 parent 2a38147 commit 82a8d78
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -42,6 +42,11 @@
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -53,7 +58,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datastore {

Expand Down Expand Up @@ -106,15 +111,18 @@ static class ReadWriteTransactionCallable<T> implements Callable<T> {
private volatile TransactionOptions options;
private volatile Transaction transaction;

private final com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext;

ReadWriteTransactionCallable(
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
@Nonnull Context parentTraceContext) {
@Nullable com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
this.transaction = null;
this.parentSpanContext = parentSpanContext;
}

Datastore getDatastore() {
Expand All @@ -135,26 +143,57 @@ void setPrevTransactionId(ByteString transactionId) {
options = options.toBuilder().setReadWrite(readWrite).build();
}

private io.opentelemetry.api.trace.Span startSpanWithParentContext(
String spanName,
com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
datastore.getOptions().getTraceUtil();
SpanBuilder spanBuilder =
otelTraceUtil
.getTracer()
.spanBuilder(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN)
.setSpanKind(SpanKind.PRODUCER)
.setParent(
Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(
parentSpanContext.getSpanContext())));
return spanBuilder.startSpan();
}

@Override
public T call() throws DatastoreException {
com.google.cloud.datastore.telemetry.TraceUtil traceUtil =
datastore.getOptions().getTraceUtil();
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
traceUtil.startSpan(
// TODO Instead of using OTel Spans directly, TraceUtil.Span should be used here. However,
// the same code in startSpanInternal doesn't work when EnabledTraceUtil.StartSpan is called
// probably because of some thread-local caching that is getting lost. This needs more
// debugging. The code below works and is idiomatic but could be prettier and more consistent
// with the use of TraceUtil-provided framework.
io.opentelemetry.api.trace.Span span =
startSpanWithParentContext(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN,
datastore.getOptions().getTraceUtil().getCurrentContext());
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
parentSpanContext);
try (io.opentelemetry.context.Scope ignored = span.makeCurrent()) {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
} catch (Exception ex) {
transaction.rollback();
span.setStatus(StatusCode.ERROR, ex.getMessage());
span.recordException(
ex,
Attributes.builder()
.put("exception.message", ex.getMessage())
.put("exception.type", ex.getClass().getName())
.put("exception.stacktrace", Throwables.getStackTraceAsString(ex))
.build());
span.end();
throw DatastoreException.propagateUserException(ex);
} finally {
if (transaction.isActive()) {
transaction.rollback();
}
span.end();
if (options != null
&& options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) {
setPrevTransactionId(transaction.getTransactionId());
Expand All @@ -165,42 +204,30 @@ public T call() throws DatastoreException {

@Override
public <T> T runInTransaction(final TransactionCallable<T> callable) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
try {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, null, otelTraceUtil.getCurrentContext()),
this, callable, null, otelTraceUtil.getCurrentSpanContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

@Override
public <T> T runInTransaction(
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
try {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, transactionOptions, otelTraceUtil.getCurrentContext()),
this, callable, transactionOptions, otelTraceUtil.getCurrentSpanContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

Expand Down Expand Up @@ -258,11 +285,14 @@ public AggregationResults runAggregation(

com.google.datastore.v1.RunQueryResponse runQuery(
final com.google.datastore.v1.RunQueryRequest requestPb) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY);
ReadOptions readOptions = requestPb.getReadOptions();
span.setAttribute(
"isTransactional", readOptions.hasTransaction() || readOptions.hasNewTransaction());
boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction();
String spanName =
(isTransactional
? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY);
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName);
span.setAttribute("isTransactional", isTransactional);
span.setAttribute("readConsistency", readOptions.getReadConsistency().toString());

try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
Expand All @@ -275,7 +305,7 @@ com.google.datastore.v1.RunQueryResponse runQuery(
: TRANSACTION_OPERATION_EXCEPTION_HANDLER,
getOptions().getClock());
span.addEvent(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY + ": Completed",
spanName + ": Completed",
new ImmutableMap.Builder<String, Object>()
.put("Received", response.getBatch().getEntityResultsCount())
.put("More results", response.getBatch().getMoreResults().toString())
Expand Down Expand Up @@ -689,7 +719,7 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction(
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION,
otelTraceUtil.getCurrentContext());
otelTraceUtil.getCurrentSpanContext());
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.beginTransaction(requestPb),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -31,6 +35,13 @@
@InternalApi
public class DisabledTraceUtil implements TraceUtil {

static class SpanContext implements TraceUtil.SpanContext {
@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return null;
}
}

static class Span implements TraceUtil.Span {
@Override
public void end() {}
Expand Down Expand Up @@ -66,6 +77,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
return this;
}

public io.opentelemetry.api.trace.Span getSpan() {
return null;
}

@Override
public Scope makeCurrent() {
return new Scope();
Expand Down Expand Up @@ -96,7 +111,7 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
return new Span();
}

Expand All @@ -111,4 +126,15 @@ public TraceUtil.Span getCurrentSpan() {
public TraceUtil.Context getCurrentContext() {
return new Context();
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext();
}

@Override
public Tracer getTracer() {
return TracerProvider.noop().get(LIBRARY_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import com.google.common.base.Throwables;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -70,6 +73,19 @@ public ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> getChannelConfi
return null;
}

static class SpanContext implements TraceUtil.SpanContext {
private final io.opentelemetry.api.trace.SpanContext spanContext;

public SpanContext(io.opentelemetry.api.trace.SpanContext spanContext) {
this.spanContext = spanContext;
}

@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return this.spanContext;
}
}

static class Span implements TraceUtil.Span {
private final io.opentelemetry.api.trace.Span span;
private final String spanName;
Expand Down Expand Up @@ -181,6 +197,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
return this;
}

public io.opentelemetry.api.trace.Span getSpan() {
return this.span;
}

@Override
public Scope makeCurrent() {
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
Expand Down Expand Up @@ -286,13 +306,15 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {
assert (parent instanceof Context);
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
SpanBuilder spanBuilder =
tracer
.spanBuilder(spanName)
.setSpanKind(SpanKind.PRODUCER)
.setParent(((Context) parent).context);
.setParent(
io.opentelemetry.context.Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(parentSpanContext.getSpanContext())));
io.opentelemetry.api.trace.Span span =
addSettingsAttributesToCurrentSpan(spanBuilder).startSpan();
return new Span(span, spanName);
Expand All @@ -309,4 +331,15 @@ public TraceUtil.Span getCurrentSpan() {
public TraceUtil.Context getCurrentContext() {
return new Context(io.opentelemetry.context.Context.current());
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext(io.opentelemetry.api.trace.Span.current().getSpanContext());
}

@Override
public Tracer getTracer() {
return this.tracer;
}
}
Loading

0 comments on commit 82a8d78

Please sign in to comment.