diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index 52acb77d8..0fefc3168 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -25,9 +25,9 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.datastore.execution.AggregationQueryExecutor; import com.google.cloud.datastore.spi.v1.DatastoreRpc; -import com.google.cloud.datastore.telemetry.TraceUtil.Context; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -42,6 +42,11 @@ import io.opencensus.common.Scope; import io.opencensus.trace.Span; import io.opencensus.trace.Status; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Context; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,7 +58,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; final class DatastoreImpl extends BaseService implements Datastore { @@ -106,15 +111,18 @@ static class ReadWriteTransactionCallable implements Callable { private volatile TransactionOptions options; private volatile Transaction transaction; + private final com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext; + ReadWriteTransactionCallable( Datastore datastore, TransactionCallable callable, TransactionOptions options, - @Nonnull Context parentTraceContext) { + @Nullable com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) { this.datastore = datastore; this.callable = callable; this.options = options; this.transaction = null; + this.parentSpanContext = parentSpanContext; } Datastore getDatastore() { @@ -135,26 +143,57 @@ void setPrevTransactionId(ByteString transactionId) { options = options.toBuilder().setReadWrite(readWrite).build(); } + private io.opentelemetry.api.trace.Span startSpanWithParentContext( + String spanName, + com.google.cloud.datastore.telemetry.TraceUtil.SpanContext parentSpanContext) { + com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil = + datastore.getOptions().getTraceUtil(); + SpanBuilder spanBuilder = + otelTraceUtil + .getTracer() + .spanBuilder(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN) + .setSpanKind(SpanKind.PRODUCER) + .setParent( + Context.current() + .with( + io.opentelemetry.api.trace.Span.wrap( + parentSpanContext.getSpanContext()))); + return spanBuilder.startSpan(); + } + @Override public T call() throws DatastoreException { - com.google.cloud.datastore.telemetry.TraceUtil traceUtil = - datastore.getOptions().getTraceUtil(); - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - traceUtil.startSpan( + // TODO Instead of using OTel Spans directly, TraceUtil.Span should be used here. However, + // the same code in startSpanInternal doesn't work when EnabledTraceUtil.StartSpan is called + // probably because of some thread-local caching that is getting lost. This needs more + // debugging. The code below works and is idiomatic but could be prettier and more consistent + // with the use of TraceUtil-provided framework. + io.opentelemetry.api.trace.Span span = + startSpanWithParentContext( com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN, - datastore.getOptions().getTraceUtil().getCurrentContext()); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + parentSpanContext); + try (io.opentelemetry.context.Scope ignored = span.makeCurrent()) { transaction = datastore.newTransaction(options); T value = callable.run(transaction); transaction.commit(); return value; } catch (Exception ex) { transaction.rollback(); + span.setStatus(StatusCode.ERROR, ex.getMessage()); + span.recordException( + ex, + Attributes.builder() + .put("exception.message", ex.getMessage()) + .put("exception.type", ex.getClass().getName()) + .put("exception.stacktrace", Throwables.getStackTraceAsString(ex)) + .build()); + span.end(); throw DatastoreException.propagateUserException(ex); } finally { if (transaction.isActive()) { transaction.rollback(); } + span.end(); if (options != null && options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) { setPrevTransactionId(transaction.getTransactionId()); @@ -165,42 +204,30 @@ public T call() throws DatastoreException { @Override public T runInTransaction(final TransactionCallable callable) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan( - com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + try { return RetryHelper.runWithRetries( new ReadWriteTransactionCallable( - this, callable, null, otelTraceUtil.getCurrentContext()), + this, callable, null, otelTraceUtil.getCurrentSpanContext()), retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.end(e); throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); } } @Override public T runInTransaction( final TransactionCallable callable, TransactionOptions transactionOptions) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan( - com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + try { return RetryHelper.runWithRetries( new ReadWriteTransactionCallable( - this, callable, transactionOptions, otelTraceUtil.getCurrentContext()), + this, callable, transactionOptions, otelTraceUtil.getCurrentSpanContext()), retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.end(e); throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); } } @@ -258,11 +285,14 @@ public AggregationResults runAggregation( com.google.datastore.v1.RunQueryResponse runQuery( final com.google.datastore.v1.RunQueryRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY); ReadOptions readOptions = requestPb.getReadOptions(); - span.setAttribute( - "isTransactional", readOptions.hasTransaction() || readOptions.hasNewTransaction()); + boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); + String spanName = + (isTransactional + ? com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY + : com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + span.setAttribute("isTransactional", isTransactional); span.setAttribute("readConsistency", readOptions.getReadConsistency().toString()); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { @@ -275,7 +305,7 @@ com.google.datastore.v1.RunQueryResponse runQuery( : TRANSACTION_OPERATION_EXCEPTION_HANDLER, getOptions().getClock()); span.addEvent( - com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY + ": Completed", + spanName + ": Completed", new ImmutableMap.Builder() .put("Received", response.getBatch().getEntityResultsCount()) .put("More results", response.getBatch().getMoreResults().toString()) @@ -689,7 +719,7 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction( com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan( com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION, - otelTraceUtil.getCurrentContext()); + otelTraceUtil.getCurrentSpanContext()); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) { return RetryHelper.runWithRetries( () -> datastoreRpc.beginTransaction(requestPb), diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java index 2a42081a1..6ba0fd81c 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java @@ -19,7 +19,11 @@ import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.InternalApi; +import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext; import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -31,6 +35,13 @@ @InternalApi public class DisabledTraceUtil implements TraceUtil { + static class SpanContext implements TraceUtil.SpanContext { + @Override + public io.opentelemetry.api.trace.SpanContext getSpanContext() { + return null; + } + } + static class Span implements TraceUtil.Span { @Override public void end() {} @@ -66,6 +77,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) { return this; } + public io.opentelemetry.api.trace.Span getSpan() { + return null; + } + @Override public Scope makeCurrent() { return new Scope(); @@ -96,7 +111,7 @@ public Span startSpan(String spanName) { } @Override - public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) { + public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) { return new Span(); } @@ -111,4 +126,15 @@ public TraceUtil.Span getCurrentSpan() { public TraceUtil.Context getCurrentContext() { return new Context(); } + + @Nonnull + @Override + public TraceUtil.SpanContext getCurrentSpanContext() { + return new SpanContext(); + } + + @Override + public Tracer getTracer() { + return TracerProvider.noop().get(LIBRARY_NAME); + } } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java index 50f89369a..438395cb1 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java @@ -22,16 +22,19 @@ import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.telemetry.TraceUtil.SpanContext; import com.google.common.base.Throwables; import io.grpc.ManagedChannelBuilder; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -70,6 +73,19 @@ public ApiFunction getChannelConfi return null; } + static class SpanContext implements TraceUtil.SpanContext { + private final io.opentelemetry.api.trace.SpanContext spanContext; + + public SpanContext(io.opentelemetry.api.trace.SpanContext spanContext) { + this.spanContext = spanContext; + } + + @Override + public io.opentelemetry.api.trace.SpanContext getSpanContext() { + return this.spanContext; + } + } + static class Span implements TraceUtil.Span { private final io.opentelemetry.api.trace.Span span; private final String spanName; @@ -181,6 +197,10 @@ public TraceUtil.Span setAttribute(String key, boolean value) { return this; } + public io.opentelemetry.api.trace.Span getSpan() { + return this.span; + } + @Override public Scope makeCurrent() { try (io.opentelemetry.context.Scope scope = span.makeCurrent()) { @@ -286,13 +306,15 @@ public Span startSpan(String spanName) { } @Override - public TraceUtil.Span startSpan(String spanName, TraceUtil.Context parent) { - assert (parent instanceof Context); + public TraceUtil.Span startSpan(String spanName, TraceUtil.SpanContext parentSpanContext) { SpanBuilder spanBuilder = tracer .spanBuilder(spanName) .setSpanKind(SpanKind.PRODUCER) - .setParent(((Context) parent).context); + .setParent( + io.opentelemetry.context.Context.current() + .with( + io.opentelemetry.api.trace.Span.wrap(parentSpanContext.getSpanContext()))); io.opentelemetry.api.trace.Span span = addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); return new Span(span, spanName); @@ -309,4 +331,15 @@ public TraceUtil.Span getCurrentSpan() { public TraceUtil.Context getCurrentContext() { return new Context(io.opentelemetry.context.Context.current()); } + + @Nonnull + @Override + public TraceUtil.SpanContext getCurrentSpanContext() { + return new SpanContext(io.opentelemetry.api.trace.Span.current().getSpanContext()); + } + + @Override + public Tracer getTracer() { + return this.tracer; + } } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java index 1e5226126..dd1dcf29e 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java @@ -21,6 +21,8 @@ import com.google.api.core.InternalExtensionOnly; import com.google.cloud.datastore.DatastoreOptions; import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -37,10 +39,11 @@ public interface TraceUtil { static final String SPAN_NAME_COMMIT = "Commit"; static final String SPAN_NAME_RUN_QUERY = "RunQuery"; static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery"; - static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.run"; + static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.Run"; static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin"; static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup"; static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit"; + static final String SPAN_NAME_TRANSACTION_RUN_QUERY = "Transaction.RunQuery"; static final String SPAN_NAME_ROLLBACK = "Transaction.Rollback"; static final String SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY = "Transaction.RunAggregationQuery"; @@ -78,6 +81,11 @@ static TraceUtil getInstance(@Nonnull DatastoreOptions datastoreOptions) { @Nullable ApiFunction getChannelConfigurator(); + /** Represents a trace span's context */ + interface SpanContext { + io.opentelemetry.api.trace.SpanContext getSpanContext(); + } + /** Represents a trace span. */ interface Span { /** Adds the given event to this span. */ @@ -95,6 +103,8 @@ interface Span { /** Adds the given attribute to this span. */ Span setAttribute(String key, boolean value); + io.opentelemetry.api.trace.Span getSpan(); + /** Marks this span as the current span. */ Scope makeCurrent(); @@ -129,10 +139,10 @@ interface Scope extends AutoCloseable { Span startSpan(String spanName); /** - * Starts a new span with the given name and the given context as its parent, sets it as the - * current span, and returns it. + * Starts a new span with the given name and the span represented by the parentSpanContext as its + * parents, sets it as the current span and returns it. */ - Span startSpan(String spanName, Context parent); + Span startSpan(String spanName, SpanContext parentSpanContext); /** Returns the current span. */ @Nonnull @@ -141,4 +151,11 @@ interface Scope extends AutoCloseable { /** Returns the current Context. */ @Nonnull Context getCurrentContext(); + + /** Returns the current SpanContext */ + @Nonnull + SpanContext getCurrentSpanContext(); + + /** Returns the current OpenTelemetry Tracer when OpenTelemetry SDK is provided. */ + Tracer getTracer(); } diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java index bf7635266..34c7ea858 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java @@ -27,6 +27,7 @@ import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP; import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY; import static com.google.common.truth.Truth.assertThat; import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; import static org.junit.Assert.assertEquals; @@ -815,8 +816,49 @@ public void transactionalLookupTest() throws Exception { Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); } + @Test + public void transactionQueryTest() throws Exception { + // Set up + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + assertNotNull(customSpanContext); + + // Test + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Transaction transaction = datastore.newTransaction(); + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + QueryResults queryResults = transaction.run(query); + transaction.commit(); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace( + customSpanContext.getTraceId(), + /*numExpectedSpans=*/ 3, + Arrays.asList( + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_TRANSACTION_RUN_QUERY), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); + } + @Test public void runInTransactionQueryTest() throws Exception { + // Set up Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); List entityList = new ArrayList<>(); @@ -851,14 +893,13 @@ public void runInTransactionQueryTest() throws Exception { customSpanContext.getTraceId(), /*numExpectedSpans=*/ 4, Arrays.asList( - Collections.singletonList(SPAN_NAME_TRANSACTION_RUN), - Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), - Collections.singletonList(SPAN_NAME_RUN_QUERY), - Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); + Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_BEGIN_TRANSACTION), + Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_RUN_QUERY), + Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_TRANSACTION_COMMIT))); } @Test - public void runInTransactionAggregationQueryTest() throws Exception {} + public void transactionRunQueryTest() throws Exception {} @Test public void readWriteTransactionTraceTest() throws Exception {} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java index a24f55597..89c91b3a7 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java @@ -38,7 +38,7 @@ public void usesDisabledSpan() { assertThat(traceUtil.getCurrentSpan() instanceof DisabledTraceUtil.Span).isTrue(); assertThat(traceUtil.startSpan("foo") instanceof DisabledTraceUtil.Span).isTrue(); assertThat( - traceUtil.startSpan("foo", traceUtil.getCurrentContext()) + traceUtil.startSpan("foo", traceUtil.getCurrentSpanContext()) instanceof DisabledTraceUtil.Span) .isTrue(); } diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java index 2497672d9..a3620bbc2 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java @@ -92,7 +92,7 @@ public void usesEnabledSpan() { assertThat(traceUtil.getCurrentSpan() instanceof EnabledTraceUtil.Span).isTrue(); assertThat(traceUtil.startSpan("foo") != null).isTrue(); assertThat( - traceUtil.startSpan("foo", traceUtil.getCurrentContext()) + traceUtil.startSpan("foo", traceUtil.getCurrentSpanContext()) instanceof EnabledTraceUtil.Span) .isTrue(); }