Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Create Span hierarchy using parent Span #1580

Merged
merged 7 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_DEFERRED;
import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_DOCUMENT_COUNT;
import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_EXCEPTION_MESSAGE;
import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_EXCEPTION_STACKTRACE;
import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_EXCEPTION_TYPE;
import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_MISSING;
import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_MORE_RESULTS;
import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_READ_CONSISTENCY;
Expand All @@ -37,10 +34,10 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.TraceUtil;
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -53,10 +50,6 @@
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.TransactionOptions;
import com.google.protobuf.ByteString;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -115,25 +108,24 @@ public Transaction newTransaction() {
return new TransactionImpl(this);
}

static class ReadWriteTransactionCallable<T> implements Callable<T> {

static class TracedReadWriteTransactionCallable<T> implements Callable<T> {
private final Datastore datastore;
private final TransactionCallable<T> callable;
private volatile TransactionOptions options;
private volatile Transaction transaction;

private final com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext;
private final TraceUtil.Span parentSpan;

ReadWriteTransactionCallable(
TracedReadWriteTransactionCallable(
Datastore datastore,
TransactionCallable<T> callable,
TransactionOptions options,
@Nullable com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
@Nullable com.google.cloud.datastore.telemetry.TraceUtil.Span parentSpan) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
this.transaction = null;
this.parentSpanContext = parentSpanContext;
this.parentSpan = parentSpan;
}

Datastore getDatastore() {
Expand All @@ -154,57 +146,75 @@ void setPrevTransactionId(ByteString transactionId) {
options = options.toBuilder().setReadWrite(readWrite).build();
}

private io.opentelemetry.api.trace.Span startSpanWithParentContext(
String spanName,
com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) {
com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
datastore.getOptions().getTraceUtil();
SpanBuilder spanBuilder =
otelTraceUtil
.getTracer()
.spanBuilder(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN)
.setSpanKind(SpanKind.PRODUCER)
.setParent(
Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(
parentSpanContext.getSpanContext())));
return otelTraceUtil.addSettingsAttributesToCurrentSpan(spanBuilder).startSpan();
@Override
public T call() throws DatastoreException {
try (io.opentelemetry.context.Scope ignored =
Context.current().with(parentSpan.getSpan()).makeCurrent()) {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
} catch (Exception ex) {
transaction.rollback();
throw DatastoreException.propagateUserException(ex);
} finally {
if (transaction.isActive()) {
transaction.rollback();
}
if (options != null
&& options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) {
setPrevTransactionId(transaction.getTransactionId());
}
}
}
}

static class ReadWriteTransactionCallable<T> implements Callable<T> {
private final Datastore datastore;
private final TransactionCallable<T> callable;
private volatile TransactionOptions options;
private volatile Transaction transaction;

ReadWriteTransactionCallable(
Datastore datastore, TransactionCallable<T> callable, TransactionOptions options) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
this.transaction = null;
}

Datastore getDatastore() {
return datastore;
}

TransactionOptions getOptions() {
return options;
}

Transaction getTransaction() {
return transaction;
}

void setPrevTransactionId(ByteString transactionId) {
TransactionOptions.ReadWrite readWrite =
TransactionOptions.ReadWrite.newBuilder().setPreviousTransaction(transactionId).build();
options = options.toBuilder().setReadWrite(readWrite).build();
}

@Override
public T call() throws DatastoreException {
// TODO Instead of using OTel Spans directly, TraceUtil.Span should be used here. However,
// the same code in startSpanInternal doesn't work when EnabledTraceUtil.StartSpan is called
// probably because of some thread-local caching that is getting lost. This needs more
// debugging. The code below works and is idiomatic but could be prettier and more consistent
// with the use of TraceUtil-provided framework.
io.opentelemetry.api.trace.Span span =
startSpanWithParentContext(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN,
parentSpanContext);
try (io.opentelemetry.context.Scope ignored = span.makeCurrent()) {
try {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
return value;
} catch (Exception ex) {
transaction.rollback();
span.setStatus(StatusCode.ERROR, ex.getMessage());
span.recordException(
ex,
Attributes.builder()
.put(ATTRIBUTES_KEY_EXCEPTION_MESSAGE, ex.getMessage())
.put(ATTRIBUTES_KEY_EXCEPTION_TYPE, ex.getClass().getName())
.put(ATTRIBUTES_KEY_EXCEPTION_STACKTRACE, Throwables.getStackTraceAsString(ex))
.build());
span.end();
throw DatastoreException.propagateUserException(ex);
} finally {
if (transaction.isActive()) {
transaction.rollback();
}
span.end();
if (options != null
&& options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) {
setPrevTransactionId(transaction.getTransactionId());
Expand All @@ -215,30 +225,51 @@ public T call() throws DatastoreException {

@Override
public <T> T runInTransaction(final TransactionCallable<T> callable) {
try {
TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);
Callable<T> transactionCallable =
(getOptions().getOpenTelemetryOptions().isEnabled()
? new TracedReadWriteTransactionCallable<T>(
this, callable, /*transactionOptions=*/ null, span)
: new ReadWriteTransactionCallable<T>(this, callable, /*transactionOptions=*/ null));
try (Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, null, otelTraceUtil.getCurrentSpanContext()),
transactionCallable,
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

@Override
public <T> T runInTransaction(
final TransactionCallable<T> callable, TransactionOptions transactionOptions) {
try {
TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN);

Callable<T> transactionCallable =
(getOptions().getOpenTelemetryOptions().isEnabled()
? new TracedReadWriteTransactionCallable<T>(this, callable, transactionOptions, span)
: new ReadWriteTransactionCallable<T>(this, callable, transactionOptions));

try (Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
new ReadWriteTransactionCallable<T>(
this, callable, transactionOptions, otelTraceUtil.getCurrentSpanContext()),
transactionCallable,
retrySettings,
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
span.end();
}
}

Expand Down Expand Up @@ -747,8 +778,7 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction(
final com.google.datastore.v1.BeginTransactionRequest requestPb) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span =
otelTraceUtil.startSpan(
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION,
otelTraceUtil.getCurrentSpanContext());
com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) {
return RetryHelper.runWithRetries(
() -> datastoreRpc.beginTransaction(requestPb),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
Expand All @@ -35,14 +35,6 @@
*/
@InternalApi
public class DisabledTraceUtil implements TraceUtil {

static class SpanContext implements TraceUtil.SpanContext {
@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return null;
}
}

static class Span implements TraceUtil.Span {
@Override
public void end() {}
Expand Down Expand Up @@ -112,7 +104,7 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
public TraceUtil.Span startSpan(String spanName, TraceUtil.Span parentSpan) {
return new Span();
}

Expand All @@ -132,12 +124,6 @@ public TraceUtil.Context getCurrentContext() {
return new Context();
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext();
}

@Override
public Tracer getTracer() {
return TracerProvider.noop().get(LIBRARY_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.cloud.datastore.DatastoreOptions;
import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext;
import com.google.cloud.datastore.telemetry.TraceUtil.Context;
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
import com.google.cloud.datastore.telemetry.TraceUtil.Span;
import com.google.common.base.Throwables;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -73,19 +73,6 @@ public ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> getChannelConfi
return null;
}

static class SpanContext implements TraceUtil.SpanContext {
private final io.opentelemetry.api.trace.SpanContext spanContext;

public SpanContext(io.opentelemetry.api.trace.SpanContext spanContext) {
this.spanContext = spanContext;
}

@Override
public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return this.spanContext;
}
}

static class Span implements TraceUtil.Span {
private final io.opentelemetry.api.trace.Span span;
private final String spanName;
Expand All @@ -95,6 +82,11 @@ public Span(io.opentelemetry.api.trace.Span span, String spanName) {
this.spanName = spanName;
}

@Override
public io.opentelemetry.api.trace.Span getSpan() {
return this.span;
}

/** Ends this span. */
@Override
public void end() {
Expand Down Expand Up @@ -197,10 +189,6 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
return this;
}

public io.opentelemetry.api.trace.Span getSpan() {
return this.span;
}

@Override
public Scope makeCurrent() {
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
Expand Down Expand Up @@ -307,18 +295,13 @@ public Span startSpan(String spanName) {
}

@Override
public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) {
public TraceUtil.Span startSpan(String spanName, TraceUtil.Span parentSpan) {
SpanBuilder spanBuilder =
tracer
.spanBuilder(spanName)
.setSpanKind(SpanKind.PRODUCER)
.setParent(
io.opentelemetry.context.Context.current()
.with(
io.opentelemetry.api.trace.Span.wrap(parentSpanContext.getSpanContext())));
io.opentelemetry.api.trace.Span span =
addSettingsAttributesToCurrentSpan(spanBuilder).startSpan();
return new Span(span, spanName);
.setParent(io.opentelemetry.context.Context.current().with(parentSpan.getSpan()));
return new Span(addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(), spanName);
}

@Nonnull
Expand All @@ -333,12 +316,6 @@ public TraceUtil.Context getCurrentContext() {
return new Context(io.opentelemetry.context.Context.current());
}

@Nonnull
@Override
public TraceUtil.SpanContext getCurrentSpanContext() {
return new SpanContext(io.opentelemetry.api.trace.Span.current().getSpanContext());
}

@Override
public Tracer getTracer() {
return this.tracer;
Expand Down
Loading
Loading