Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed Span nesting for ReadWriteTransactionCallable by using parent SpanContext instead of just parent Context #1495

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -42,6 +42,11 @@
import io.opencensus.common.Scope;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -53,7 +58,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class DatastoreImpl extends BaseService<DatastoreOptions> implements Datastore {

Expand Down Expand Up @@ -106,15 +111,18 @@ static class ReadWriteTransactionCallable<T> implements Callable<T> {
private volatile TransactionOptions options;
private volatile Transaction transaction;

private final com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext;

ReadWriteTransactionCallable(
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
@Nonnull Context parentTraceContext) {
@Nullable com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
this.transaction = null;
this.parentSpanContext = parentSpanContext;
}

Datastore getDatastore() {
Expand All @@ -135,26 +143,57 @@ void setPrevTransactionId(ByteString transactionId) {
options = options.toBuilder().setReadWrite(readWrite).build();
}

private io.opentelemetry.api.trace.Span startSpanInternal(
jimit-j-shah marked this conversation as resolved.
Show resolved Hide resolved
String spanName,
com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
datastore.getOptions().getTraceUtil();
SpanBuilder spanBuilder =
otelTraceUtil
.getTracer()
.spanBuilder(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN)
.setSpanKind(SpanKind.PRODUCER)
.setParent(
Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(
parentSpanContext.getSpanContext())));
return spanBuilder.startSpan();
}

@Override
public T call() throws DatastoreException {
com.google.cloud.datastore.telemetry.TraceUtil traceUtil =
datastore.getOptions().getTraceUtil();
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
traceUtil.startSpan(
// TODO Instead of using OTel Spans directly, TraceUtil.Span should be used here. However,
// the same code in startSpanInternal doesn't work when EnabledTraceUtil.StartSpan is called
// probably because of some thread-local caching that is getting lost. This needs more
// debugging. The code below works and is idiomatic but could be prettier and more consistent
// with the use of TraceUtil-provided framework.
io.opentelemetry.api.trace.Span span =
startSpanInternal(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN,
datastore.getOptions().getTraceUtil().getCurrentContext());
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
parentSpanContext);
try (io.opentelemetry.context.Scope ignored = span.makeCurrent()) {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
} catch (Exception ex) {
transaction.rollback();
span.setStatus(StatusCode.ERROR, ex.getMessage());
span.recordException(
ex,
Attributes.builder()
.put("exception.message", ex.getMessage())
.put("exception.type", ex.getClass().getName())
.put("exception.stacktrace", Throwables.getStackTraceAsString(ex))
.build());
span.end();
throw DatastoreException.propagateUserException(ex);
} finally {
if (transaction.isActive()) {
transaction.rollback();
}
span.end();
if (options != null
&& options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) {
setPrevTransactionId(transaction.getTransactionId());
Expand All @@ -165,42 +204,30 @@ public T call() throws DatastoreException {

@Override
public <T> T runInTransaction(final TransactionCallable<T> callable) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
cindy-peng marked this conversation as resolved.
Show resolved Hide resolved
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
try {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, null, otelTraceUtil.getCurrentContext()),
this, callable, null, otelTraceUtil.getCurrentSpanContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

@Override
public <T> T runInTransaction(
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
try {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, transactionOptions, otelTraceUtil.getCurrentContext()),
this, callable, transactionOptions, otelTraceUtil.getCurrentSpanContext()),
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

Expand Down Expand Up @@ -687,7 +714,7 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction(
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION,
otelTraceUtil.getCurrentContext());
otelTraceUtil.getCurrentSpanContext());
jimit-j-shah marked this conversation as resolved.
Show resolved Hide resolved
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.beginTransaction(requestPb),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.TracerProvider;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -31,6 +35,13 @@
@InternalApi
public class DisabledTraceUtil implements TraceUtil {

static class SpanContext implements TraceUtil.SpanContext {
@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return null;
}
}

static class Span implements TraceUtil.Span {
@Override
public void end() {}
Expand Down Expand Up @@ -66,6 +77,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
return this;
}

public io.opentelemetry.api.trace.Span getSpan() {
return null;
}

@Override
public Scope makeCurrent() {
return new Scope();
Expand Down Expand Up @@ -96,7 +111,7 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
return new Span();
}

Expand All @@ -111,4 +126,15 @@ public TraceUtil.Span getCurrentSpan() {
public TraceUtil.Context getCurrentContext() {
return new Context();
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext();
}

@Override
public Tracer getTracer() {
return TracerProvider.noop().get(LIBRARY_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import com.google.common.base.Throwables;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -70,6 +73,19 @@ public ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> getChannelConfi
return null;
}

static class SpanContext implements TraceUtil.SpanContext {
private final io.opentelemetry.api.trace.SpanContext spanContext;

public SpanContext(io.opentelemetry.api.trace.SpanContext spanContext) {
this.spanContext = spanContext;
}

@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return this.spanContext;
}
}

static class Span implements TraceUtil.Span {
private final io.opentelemetry.api.trace.Span span;
private final String spanName;
Expand Down Expand Up @@ -181,6 +197,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
return this;
}

public io.opentelemetry.api.trace.Span getSpan() {
return this.span;
}

@Override
public Scope makeCurrent() {
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
Expand Down Expand Up @@ -286,13 +306,15 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) {
assert (parent instanceof Context);
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
SpanBuilder spanBuilder =
tracer
.spanBuilder(spanName)
.setSpanKind(SpanKind.PRODUCER)
.setParent(((Context) parent).context);
.setParent(
io.opentelemetry.context.Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(parentSpanContext.getSpanContext())));
io.opentelemetry.api.trace.Span span =
addSettingsAttributesToCurrentSpan(spanBuilder).startSpan();
return new Span(span, spanName);
Expand All @@ -309,4 +331,15 @@ public TraceUtil.Span getCurrentSpan() {
public TraceUtil.Context getCurrentContext() {
return new Context(io.opentelemetry.context.Context.current());
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext(io.opentelemetry.api.trace.Span.current().getSpanContext());
}

@Override
public Tracer getTracer() {
return this.tracer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.datastore.DatastoreOptions;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -36,7 +38,7 @@ public interface TraceUtil {
static final String SPAN_NAME_COMMIT = "Commit";
static final String SPAN_NAME_RUN_QUERY = "RunQuery";
static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery";
static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.run";
static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.Run";
static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin";
static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup";
static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit";
Expand Down Expand Up @@ -77,6 +79,11 @@ static TraceUtil getInstance(@Nonnull DatastoreOptions datastoreOptions) {
@Nullable
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> getChannelConfigurator();

/** Represents a trace span's context */
interface SpanContext {
io.opentelemetry.api.trace.SpanContext getSpanContext();
}

/** Represents a trace span. */
interface Span {
/** Adds the given event to this span. */
Expand All @@ -94,6 +101,8 @@ interface Span {
/** Adds the given attribute to this span. */
Span setAttribute(String key, boolean value);

io.opentelemetry.api.trace.Span getSpan();

/** Marks this span as the current span. */
Scope makeCurrent();

Expand Down Expand Up @@ -128,10 +137,10 @@ interface Scope extends AutoCloseable {
Span startSpan(String spanName);

/**
* Starts a new span with the given name and the given context as its parent, sets it as the
* current span, and returns it.
* Starts a new span with the given name and the span represented by the parentSpanContext as its
* parents, sets it as the current span and returns it.
*/
Span startSpan(String spanName, Context parent);
Span startSpan(String spanName, SpanContext parentSpanContext);

/** Returns the current span. */
@Nonnull
Expand All @@ -140,4 +149,11 @@ interface Scope extends AutoCloseable {
/** Returns the current Context. */
@Nonnull
Context getCurrentContext();

/** Returns the current SpanContext */
@Nonnull
SpanContext getCurrentSpanContext();

/** Returns the current OpenTelemetry Tracer when OpenTelemetry SDK is provided. */
Tracer getTracer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -831,10 +831,9 @@ public void runInTransactionQueryTest() throws Exception {
customSpanContext.getTraceId(),
/*numExpectedSpans=*/ 4,
Arrays.asList(
Collections.singletonList(SPAN_NAME_TRANSACTION_RUN),
Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION),
Collections.singletonList(SPAN_NAME_RUN_QUERY),
Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT)));
Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_BEGIN_TRANSACTION),
Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_RUN_QUERY),
Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_TRANSACTION_COMMIT)));
}

@Test
Expand Down
Loading
Loading