From 5440c22364074c108450c3a748a6a17d5f1dddda Mon Sep 17 00:00:00 2001 From: Jimit Shah <57637300+jimit-j-shah@users.noreply.github.com> Date: Thu, 19 Sep 2024 14:56:26 -0700 Subject: [PATCH] feat: Introducing Tracing with OpenTelemetry API #1537 (#1576) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Adding TraceUtil interface and its implementation to enable Tracing controls via DatastoreOptions (#1431) * Adding EnabledTraceUtil, DisabledTraceUtil and TraceUtilTest * Annotating DatastoreOpenTelemetryOptions to be transient as they're not serializable * Adding google-auth-library-credentials dependency due to https://github.com/googleapis/java-datastore/actions/runs/8944472794/job/24571458116?pr=1431 * feat: Adding Lookup RPC OpenTelemetry Tracing (#1437) * feat: Adding Lookup RPC OpenTelemetry Tracing - Removed OpenCensus Tracing - Added E2E tests with Global and Local OTel SDK - Moved OTel SDK setup to RemoteDatastoreHelper - Fixed pom to depend on BOM for all shared dependencies * feat: Adding Commit RPC Trace Instrumentation (#1440) - Added end-to-end test for Datastore operationsput, add, update and delete. - Updated E2E Test to use the namespace correctly for efficient clean-up of test data * feat: RunQuery trace instrumentation (#1441) * feat: RunQuery trace instrumentation * feat: RunAggregationQuery instrumentation (#1447) * feat: RunQuery trace instrumentation * Formatting * Formatting * Refactor: s/RUNQUERY/RUN_QUERY * feat: RunAggregationQuery Trace Instrumentation * Build: retiring test assertions for OpenCensus spans - will be replacing this in hermetic integration tests for OpenTelemetry using in-memory span exports (in addition to ITE2ETraceTest.java). * Formatting * Fixing @Test annotation missed after merge * Formatting * feat: RunQuery trace instrumentation * Formatting * Formatting * Refactor: s/RUNQUERY/RUN_QUERY * feat: RunAggregationQuery Trace Instrumentation * Build: retiring test assertions for OpenCensus spans - will be replacing this in hermetic integration tests for OpenTelemetry using in-memory span exports (in addition to ITE2ETraceTest.java). * Formatting * Fixing @Test annotation missed after merge * Formatting * feat: Add Transaction tracing test: transactionalLookupTest * test: Transaction test for RunInTransaction - need to fix trace instrumentation for RunIn.. * Adding transaction span names * TransactionLookupTest * feat: support for transactional operations - tested using newTransaction() and runInTransaction() * Revert "feat: support for transactional operations" This reverts commit 10341c0b97cbc2025f9f928ce8cb09d5c036a5b3. * feat: support for transactional operations (#1468) * feat: support for transactional operations - tested using newTransaction() and runInTransaction() * feat: Allocateid tracing (#1488) * feat: Adding tracing for AllocateIds RPC * formatting * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot * feat: Add tracing for ReserveIds operation (#1490) - added end-to-end test * fix: Fixed Span nesting for `ReadWriteTransactionCallable` by using parent SpanContext instead of just parent Context (#1495) * fix: Fixed the TraceUtil.startSpan method to use `SpanContext` for linking with the parent instead of `Context`. - This fixes the hierarchy of Spans appearing in a transaction under a Run method. - Tested using existing transaction test * Fixed commit reordering and typos * fix: lint errors * fix: Refactored the ReadWriteTransactioncallable.call method to use startSpan idiomatically - TraceUtil.startSpan needs more debugging - return DefaultTracerProvider instance (no-op) when initializing DisabledTraceUtil - this fixes the unit tests in DatastoreTest.testRunInTransactionWithReadWriteOption * feat: Added tracing for Transaction.RunQuery (#1499) * feat: Added span for Transactional RunQuery - tested * fix: lint * fix: patch apply issues * fix: refactor using boolean flag * fix: s/startSpan/startSpanWithParentContext * test: Additional Transaction Testing and cleanup OpenCensus usage (#1505) * test: newTransactionReadWriteTraceTest * fix: test literal * feat: Added tests for transaction cases * fix: Cleanup OpenCensus dead code * fix: updating version from 2.20.1 -> 2.21.0 * fix: reverting version from 2.21.0 -> 2.20.1 * fix: Adding an exception to the clirr-maven-plugin for an internal API parameter change from com.google.cloud.datastore.TraceUtil -> com.google.cloud.datastore.telemetry.TraceUtil * fix: Fixing the differenceType in clirr exception * fix: add an exception for removal of an internal class (com.google.cloud.datastore.TraceUtil) * fix: fixing incomplete difference details for type 7005 * fix: Fixing `to` of the difference to be the entire signature * fix: typo * test: Adding ITTracingTest to verify events and span attributes (whic… (#1514) * test: Adding ITTracingTest to verify events and span attributes (which are not verified in ITE2ETracingTest) due to TraceClient API limitations. - This test uses InMemorySpanExporter to read the generated Otel span data by the test process to verify generated span data as it were before exporting to a backend. None of the span data is exported to a durable backend. - This test is still an E2E test as it requires a project to send RPCs to. * fix: fixing compilation error due to missing pom dependency. * test: Test for AllocateId and ReserveId rpcs * test: Commit/Put/Update/Delete tests * test: Added fixes and test for RunQuery event * test: Additional Transaction tests and AggregationQuery test (#1518) * test: ReadWrite Transaction test * test: Added test for Transactional RunQuery and Transaction Rollback * test: runInTransaction API tracing test - Fixed setting of common span attributes to spans in runInTransaction - Removed some gRPC related channel attributes that are not present in this Datastore version, yet. * fix: Undelete gRPC upgrade docs * fix: Undo merge mistakes * fix: Updating span event strings (#1539) * fix: Fixing user-facing span names in line with go/firestore-client-trace-catalog * fix: updating bom dependency version to fix https://github.com/googleapis/java-datastore/actions/runs/10256441634/job/28375496112?pr=1539 * Fix: typo in test causing integration test failure (#1556) https://btx.cloud.google.com/invocations/c11a2e8b-4494-4ddc-a77e-cf2bcbcf5254/targets/cloud-devrel%2Fclient-libraries%2Fjava%2Fjava-datastore%2Fpresubmit%2Fintegration;config=default/log * fix: opentelemetry-sdk should only be used as a Test Dependency * fix: Update opentelemetry.version - this also fixes the tests failing in https://github.com/googleapis/java-datastore/actions/runs/10891578591/job/30222786908 * fix: Replacing attribute key values w/ constants * fix: opentelemetry.version to fix RequireUpperBoundDeps check https://github.com/googleapis/java-datastore/actions/runs/10892403348/job/30225154043?pr=1576 * fix: Create Span hierarchy using parent Span (#1580) * fix: Replace use of TraceUtil.SpanContext w/ TraceUtil.Context * fix: Fixing how span hierarchy is created across threads - using Span instead of Context * fix: cleaning up startSpan(spanName, parentContext) variant * fix: add TracedReadWriteTransactionCallable to bifurcate tracing enabled/disabled paths for the Transaction callback. - This change implements the idiomatic way to express nested spans as described in https://opentelemetry.io/docs/languages/java/instrumentation/#create-nested-spans * fix: cleanup * fix: cleanup * fix: cleanup * fix: formatting and import refactoring * chore: generate libraries at Thu Sep 19 18:35:54 UTC 2024 --------- Co-authored-by: Owl Bot Co-authored-by: cloud-java-bot --- README.md | 2 +- .../clirr-ignored-differences.xml | 12 + google-cloud-datastore/pom.xml | 86 +- .../google/cloud/datastore/DatastoreImpl.java | 289 +++-- .../DatastoreOpenTelemetryOptions.java | 97 ++ .../cloud/datastore/DatastoreOptions.java | 40 + .../RetryAndTraceDatastoreRpcDecorator.java | 30 +- .../com/google/cloud/datastore/TraceUtil.java | 76 -- .../cloud/datastore/TransactionImpl.java | 5 + .../datastore/spi/v1/HttpDatastoreRpc.java | 5 +- .../telemetry/DisabledTraceUtil.java | 131 +++ .../datastore/telemetry/EnabledTraceUtil.java | 323 ++++++ .../cloud/datastore/telemetry/TraceUtil.java | 167 +++ .../testing/RemoteDatastoreHelper.java | 29 +- .../cloud/datastore/DatastoreOptionsTest.java | 14 + .../google/cloud/datastore/DatastoreTest.java | 2 + ...etryAndTraceDatastoreRpcDecoratorTest.java | 84 -- .../cloud/datastore/it/ITE2ETracingTest.java | 1009 +++++++++++++++++ .../cloud/datastore/it/ITTracingTest.java | 849 ++++++++++++++ .../telemetry/DisabledTraceUtilTest.java | 54 + .../telemetry/EnabledTraceUtilTest.java | 106 ++ .../datastore/telemetry/TraceUtilTest.java | 62 + 22 files changed, 3216 insertions(+), 256 deletions(-) create mode 100644 google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java delete mode 100644 google-cloud-datastore/src/main/java/com/google/cloud/datastore/TraceUtil.java create mode 100644 google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java create mode 100644 google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java create mode 100644 google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java delete mode 100644 google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java create mode 100644 google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java create mode 100644 google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITTracingTest.java create mode 100644 google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java create mode 100644 google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java create mode 100644 google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/TraceUtilTest.java diff --git a/README.md b/README.md index e5e9c1afe..d8b43bd0a 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ If you are using Maven without the BOM, add this to your dependencies: com.google.cloud google-cloud-datastore - 2.21.2 + 2.21.3 ``` diff --git a/google-cloud-datastore/clirr-ignored-differences.xml b/google-cloud-datastore/clirr-ignored-differences.xml index 1620fd752..847ba4f4c 100644 --- a/google-cloud-datastore/clirr-ignored-differences.xml +++ b/google-cloud-datastore/clirr-ignored-differences.xml @@ -55,4 +55,16 @@ java.lang.Object execute(com.google.cloud.datastore.Query, com.google.cloud.datastore.ReadOption[]) 7004 + + com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator + RetryAndTraceDatastoreRpcDecorator(com.google.cloud.datastore.spi.v1.DatastoreRpc, com.google.cloud.datastore.TraceUtil, com.google.api.gax.retrying.RetrySettings, com.google.cloud.datastore.DatastoreOptions) + RetryAndTraceDatastoreRpcDecorator(com.google.cloud.datastore.spi.v1.DatastoreRpc, com.google.cloud.datastore.telemetry.TraceUtil, com.google.api.gax.retrying.RetrySettings, com.google.cloud.datastore.DatastoreOptions) + 7005 + + + + + com/google/cloud/datastore/TraceUtil + 8001 + diff --git a/google-cloud-datastore/pom.xml b/google-cloud-datastore/pom.xml index 595f04ba4..5137728ea 100644 --- a/google-cloud-datastore/pom.xml +++ b/google-cloud-datastore/pom.xml @@ -16,7 +16,20 @@ google-cloud-datastore + 1.42.0 + + + + + com.google.cloud + gapic-libraries-bom + 1.37.0 + pom + import + + + com.google.api.grpc @@ -38,6 +51,10 @@ com.google.cloud.datastore datastore-v1-proto-client + + com.google.auth + google-auth-library-credentials + io.grpc grpc-api @@ -111,6 +128,19 @@ jsr305 + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-context + ${opentelemetry.version} + + + ${project.groupId} @@ -118,7 +148,6 @@ test-jar test - com.google.guava guava-testlib @@ -160,6 +189,61 @@ 1.4.4 test + + com.google.testparameterinjector + test-parameter-injector + 1.16 + test + + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-common + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk-testing + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-sdk-trace + ${opentelemetry.version} + test + + + io.opentelemetry + opentelemetry-semconv + 1.1.0-alpha + test + + + + + com.google.cloud.opentelemetry + exporter-trace + 0.15.0 + test + + + com.google.cloud + google-cloud-trace + test + + + com.google.api.grpc + proto-google-cloud-trace-v1 + test + + diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index a3bfb3796..2bf8389fd 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -16,6 +16,26 @@ package com.google.cloud.datastore; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_DEFERRED; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_DOCUMENT_COUNT; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_MISSING; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_MORE_RESULTS; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_READ_CONSISTENCY; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_RECEIVED; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_TRANSACTIONAL; +import static com.google.cloud.datastore.telemetry.TraceUtil.ATTRIBUTES_KEY_TRANSACTION_ID; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_ALLOCATE_IDS; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RESERVE_IDS; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_ROLLBACK; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY; + import com.google.api.core.BetaApi; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.BaseService; @@ -25,20 +45,23 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.datastore.execution.AggregationQueryExecutor; import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.TraceUtil; +import com.google.cloud.datastore.telemetry.TraceUtil.Scope; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.datastore.v1.CommitResponse; import com.google.datastore.v1.ExplainOptions; import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.ReserveIdsRequest; +import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.TransactionOptions; import com.google.protobuf.ByteString; -import io.opencensus.common.Scope; -import io.opencensus.trace.Span; -import io.opencensus.trace.Status; +import io.opentelemetry.context.Context; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -50,6 +73,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import javax.annotation.Nullable; final class DatastoreImpl extends BaseService implements Datastore { @@ -59,7 +83,9 @@ final class DatastoreImpl extends BaseService implements Datas TransactionExceptionHandler.build(); private static final ExceptionHandler TRANSACTION_OPERATION_EXCEPTION_HANDLER = TransactionOperationExceptionHandler.build(); - private final TraceUtil traceUtil = TraceUtil.getInstance(); + + private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil = + getOptions().getTraceUtil(); private final ReadOptionProtoPreparer readOptionProtoPreparer; private final AggregationQueryExecutor aggregationQueryExecutor; @@ -73,7 +99,8 @@ final class DatastoreImpl extends BaseService implements Datas readOptionProtoPreparer = new ReadOptionProtoPreparer(); aggregationQueryExecutor = new AggregationQueryExecutor( - new RetryAndTraceDatastoreRpcDecorator(datastoreRpc, traceUtil, retrySettings, options), + new RetryAndTraceDatastoreRpcDecorator( + datastoreRpc, otelTraceUtil, retrySettings, options), options); } @@ -92,8 +119,68 @@ public Transaction newTransaction() { return new TransactionImpl(this); } - static class ReadWriteTransactionCallable implements Callable { + static class TracedReadWriteTransactionCallable implements Callable { + private final Datastore datastore; + private final TransactionCallable callable; + private volatile TransactionOptions options; + private volatile Transaction transaction; + + private final TraceUtil.Span parentSpan; + + TracedReadWriteTransactionCallable( + Datastore datastore, + TransactionCallable callable, + TransactionOptions options, + @Nullable com.google.cloud.datastore.telemetry.TraceUtil.Span parentSpan) { + this.datastore = datastore; + this.callable = callable; + this.options = options; + this.transaction = null; + this.parentSpan = parentSpan; + } + + Datastore getDatastore() { + return datastore; + } + + TransactionOptions getOptions() { + return options; + } + + Transaction getTransaction() { + return transaction; + } + + void setPrevTransactionId(ByteString transactionId) { + TransactionOptions.ReadWrite readWrite = + TransactionOptions.ReadWrite.newBuilder().setPreviousTransaction(transactionId).build(); + options = options.toBuilder().setReadWrite(readWrite).build(); + } + + @Override + public T call() throws DatastoreException { + try (io.opentelemetry.context.Scope ignored = + Context.current().with(parentSpan.getSpan()).makeCurrent()) { + transaction = datastore.newTransaction(options); + T value = callable.run(transaction); + transaction.commit(); + return value; + } catch (Exception ex) { + transaction.rollback(); + throw DatastoreException.propagateUserException(ex); + } finally { + if (transaction.isActive()) { + transaction.rollback(); + } + if (options != null + && options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) { + setPrevTransactionId(transaction.getTransactionId()); + } + } + } + } + static class ReadWriteTransactionCallable implements Callable { private final Datastore datastore; private final TransactionCallable callable; private volatile TransactionOptions options; @@ -127,8 +214,8 @@ void setPrevTransactionId(ByteString transactionId) { @Override public T call() throws DatastoreException { - transaction = datastore.newTransaction(options); try { + transaction = datastore.newTransaction(options); T value = callable.run(transaction); transaction.commit(); return value; @@ -149,36 +236,47 @@ public T call() throws DatastoreException { @Override public T runInTransaction(final TransactionCallable callable) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN); + Callable transactionCallable = + (getOptions().getOpenTelemetryOptions().isEnabled() + ? new TracedReadWriteTransactionCallable( + this, callable, /*transactionOptions=*/ null, span) + : new ReadWriteTransactionCallable(this, callable, /*transactionOptions=*/ null)); + try (Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new ReadWriteTransactionCallable(this, callable, null), + transactionCallable, retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @Override public T runInTransaction( final TransactionCallable callable, TransactionOptions transactionOptions) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_TRANSACTION); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN); + + Callable transactionCallable = + (getOptions().getOpenTelemetryOptions().isEnabled() + ? new TracedReadWriteTransactionCallable(this, callable, transactionOptions, span) + : new ReadWriteTransactionCallable(this, callable, transactionOptions)); + + try (Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new ReadWriteTransactionCallable(this, callable, transactionOptions), + transactionCallable, retrySettings, TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -236,20 +334,39 @@ public AggregationResults runAggregation( com.google.datastore.v1.RunQueryResponse runQuery( final com.google.datastore.v1.RunQueryRequest requestPb) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_RUNQUERY); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { - return RetryHelper.runWithRetries( - () -> datastoreRpc.runQuery(requestPb), - retrySettings, - requestPb.getReadOptions().getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); + ReadOptions readOptions = requestPb.getReadOptions(); + boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); + String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_RUN_QUERY : SPAN_NAME_RUN_QUERY); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + RunQueryResponse response = + RetryHelper.runWithRetries( + () -> datastoreRpc.runQuery(requestPb), + retrySettings, + requestPb.getReadOptions().getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER, + getOptions().getClock()); + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getBatch().getEntityResultsCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put(ATTRIBUTES_KEY_READ_CONSISTENCY, readOptions.getReadConsistency().toString()) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + (isTransactional + ? requestPb.getReadOptions().getTransaction().toStringUtf8() + : "")) + .put(ATTRIBUTES_KEY_MORE_RESULTS, response.getBatch().getMoreResults().toString()) + .build()); + return response; } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -291,8 +408,9 @@ public List allocateId(IncompleteKey... keys) { private com.google.datastore.v1.AllocateIdsResponse allocateIds( final com.google.datastore.v1.AllocateIdsRequest requestPb) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_ALLOCATEIDS); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan(SPAN_NAME_ALLOCATE_IDS); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( new Callable() { @Override @@ -304,10 +422,10 @@ public com.google.datastore.v1.AllocateIdsResponse call() throws DatastoreExcept EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -450,14 +568,27 @@ protected Entity computeNext() { com.google.datastore.v1.LookupResponse lookup( final com.google.datastore.v1.LookupRequest requestPb) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_LOOKUP); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + ReadOptions readOptions = requestPb.getReadOptions(); + boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); + String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_LOOKUP : SPAN_NAME_LOOKUP); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new Callable() { - @Override - public com.google.datastore.v1.LookupResponse call() throws DatastoreException { - return datastoreRpc.lookup(requestPb); - } + () -> { + com.google.datastore.v1.LookupResponse response = datastoreRpc.lookup(requestPb); + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_RECEIVED, response.getFoundCount()) + .put(ATTRIBUTES_KEY_MISSING, response.getMissingCount()) + .put(ATTRIBUTES_KEY_DEFERRED, response.getDeferredCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + isTransactional ? readOptions.getTransaction().toStringUtf8() : "") + .build()); + return response; }, retrySettings, requestPb.getReadOptions().getTransaction().isEmpty() @@ -465,10 +596,10 @@ public com.google.datastore.v1.LookupResponse call() throws DatastoreException { : TRANSACTION_OPERATION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -492,8 +623,9 @@ public List reserveIds(Key... keys) { com.google.datastore.v1.ReserveIdsResponse reserveIds( final com.google.datastore.v1.ReserveIdsRequest requestPb) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_RESERVEIDS); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan(SPAN_NAME_RESERVE_IDS); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( new Callable() { @Override @@ -505,10 +637,10 @@ public com.google.datastore.v1.ReserveIdsResponse call() throws DatastoreExcepti EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -602,20 +734,34 @@ private com.google.datastore.v1.CommitResponse commitMutation( com.google.datastore.v1.CommitResponse commit( final com.google.datastore.v1.CommitRequest requestPb) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_COMMIT); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { - return RetryHelper.runWithRetries( - () -> datastoreRpc.commit(requestPb), - retrySettings, - requestPb.getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); + final boolean isTransactional = + requestPb.hasTransaction() || requestPb.hasSingleUseTransaction(); + final String spanName = isTransactional ? SPAN_NAME_TRANSACTION_COMMIT : SPAN_NAME_COMMIT; + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + CommitResponse response = + RetryHelper.runWithRetries( + () -> datastoreRpc.commit(requestPb), + retrySettings, + requestPb.getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER, + getOptions().getClock()); + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getMutationResultsCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + isTransactional ? requestPb.getTransaction().toStringUtf8() : "") + .build()); + return response; } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -626,24 +772,19 @@ ByteString requestTransactionId( com.google.datastore.v1.BeginTransactionResponse beginTransaction( final com.google.datastore.v1.BeginTransactionRequest requestPb) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_BEGINTRANSACTION); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan(SPAN_NAME_BEGIN_TRANSACTION); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) { return RetryHelper.runWithRetries( - new Callable() { - @Override - public com.google.datastore.v1.BeginTransactionResponse call() - throws DatastoreException { - return datastoreRpc.beginTransaction(requestPb); - } - }, + () -> datastoreRpc.beginTransaction(requestPb), retrySettings, EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } @@ -657,8 +798,9 @@ void rollbackTransaction(ByteString transaction) { } void rollback(final com.google.datastore.v1.RollbackRequest requestPb) { - Span span = traceUtil.startSpan(TraceUtil.SPAN_NAME_ROLLBACK); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = + otelTraceUtil.startSpan(SPAN_NAME_ROLLBACK); + try (Scope scope = span.makeCurrent()) { RetryHelper.runWithRetries( new Callable() { @Override @@ -670,11 +812,16 @@ public Void call() throws DatastoreException { retrySettings, EXCEPTION_HANDLER, getOptions().getClock()); + span.addEvent( + SPAN_NAME_ROLLBACK, + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_TRANSACTION_ID, requestPb.getTransaction().toStringUtf8()) + .build()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java new file mode 100644 index 000000000..ac266562e --- /dev/null +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOpenTelemetryOptions.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore; + +import io.opentelemetry.api.OpenTelemetry; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +public class DatastoreOpenTelemetryOptions { + private final boolean enabled; + private final @Nullable OpenTelemetry openTelemetry; + + DatastoreOpenTelemetryOptions(Builder builder) { + this.enabled = builder.enabled; + this.openTelemetry = builder.openTelemetry; + } + + public boolean isEnabled() { + return enabled; + } + + @Nullable + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + @Nonnull + public DatastoreOpenTelemetryOptions.Builder toBuilder() { + return new DatastoreOpenTelemetryOptions.Builder(this); + } + + @Nonnull + public static DatastoreOpenTelemetryOptions.Builder newBuilder() { + return new DatastoreOpenTelemetryOptions.Builder(); + } + + public static class Builder { + + private boolean enabled; + + @Nullable private OpenTelemetry openTelemetry; + + private Builder() { + enabled = false; + openTelemetry = null; + } + + private Builder(DatastoreOpenTelemetryOptions options) { + this.enabled = options.enabled; + this.openTelemetry = options.openTelemetry; + } + + @Nonnull + public DatastoreOpenTelemetryOptions build() { + return new DatastoreOpenTelemetryOptions(this); + } + + /** + * Sets whether tracing should be enabled. + * + * @param enabled Whether tracing should be enabled. + */ + @Nonnull + public DatastoreOpenTelemetryOptions.Builder setTracingEnabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + /** + * Sets the {@link OpenTelemetry} to use with this Datastore instance. If telemetry collection + * is enabled, but an `OpenTelemetry` is not provided, the Datastore SDK will attempt to use the + * `GlobalOpenTelemetry`. + * + * @param openTelemetry The OpenTelemetry that should be used by this Datastore instance. + */ + @Nonnull + public DatastoreOpenTelemetryOptions.Builder setOpenTelemetry( + @Nonnull OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + return this; + } + } +} diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java index 8437c3e22..cef40eedd 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java @@ -18,6 +18,7 @@ import static com.google.cloud.datastore.Validator.validateNamespace; +import com.google.api.core.BetaApi; import com.google.cloud.ServiceDefaults; import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceRpc; @@ -31,6 +32,8 @@ import java.lang.reflect.Method; import java.util.Objects; import java.util.Set; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; public class DatastoreOptions extends ServiceOptions { @@ -43,6 +46,9 @@ public class DatastoreOptions extends ServiceOptions { private String namespace; private String databaseId; + @Nullable private DatastoreOpenTelemetryOptions openTelemetryOptions = null; + private Builder() {} private Builder(DatastoreOptions options) { super(options); namespace = options.namespace; databaseId = options.databaseId; + this.openTelemetryOptions = options.openTelemetryOptions; } @Override @@ -100,10 +120,30 @@ public Builder setDatabaseId(String databaseId) { this.databaseId = databaseId; return this; } + + /** + * Sets the {@link DatastoreOpenTelemetryOptions} to be used for this Firestore instance. + * + * @param openTelemetryOptions The `DatastoreOpenTelemetryOptions` to use. + */ + @BetaApi + @Nonnull + public Builder setOpenTelemetryOptions( + @Nonnull DatastoreOpenTelemetryOptions openTelemetryOptions) { + this.openTelemetryOptions = openTelemetryOptions; + return this; + } } private DatastoreOptions(Builder builder) { super(DatastoreFactory.class, DatastoreRpcFactory.class, builder, new DatastoreDefaults()); + + this.openTelemetryOptions = + builder.openTelemetryOptions != null + ? builder.openTelemetryOptions + : DatastoreOpenTelemetryOptions.newBuilder().build(); + this.traceUtil = com.google.cloud.datastore.telemetry.TraceUtil.getInstance(this); + namespace = MoreObjects.firstNonNull(builder.namespace, defaultNamespace()); databaseId = MoreObjects.firstNonNull(builder.databaseId, DEFAULT_DATABASE_ID); } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index c4a85caab..e1ea6e8dd 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -16,13 +16,13 @@ package com.google.cloud.datastore; import static com.google.cloud.BaseService.EXCEPTION_HANDLER; -import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; import com.google.api.core.InternalApi; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.datastore.v1.AllocateIdsRequest; import com.google.datastore.v1.AllocateIdsResponse; import com.google.datastore.v1.BeginTransactionRequest; @@ -31,6 +31,7 @@ import com.google.datastore.v1.CommitResponse; import com.google.datastore.v1.LookupRequest; import com.google.datastore.v1.LookupResponse; +import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.ReserveIdsRequest; import com.google.datastore.v1.ReserveIdsResponse; import com.google.datastore.v1.RollbackRequest; @@ -39,9 +40,6 @@ import com.google.datastore.v1.RunAggregationQueryResponse; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; -import io.opencensus.common.Scope; -import io.opencensus.trace.Span; -import io.opencensus.trace.Status; import java.util.concurrent.Callable; /** @@ -52,19 +50,19 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc { private final DatastoreRpc datastoreRpc; - private final TraceUtil traceUtil; + private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil; private final RetrySettings retrySettings; private final DatastoreOptions datastoreOptions; public RetryAndTraceDatastoreRpcDecorator( DatastoreRpc datastoreRpc, - TraceUtil traceUtil, + TraceUtil otelTraceUtil, RetrySettings retrySettings, DatastoreOptions datastoreOptions) { this.datastoreRpc = datastoreRpc; - this.traceUtil = traceUtil; this.retrySettings = retrySettings; this.datastoreOptions = datastoreOptions; + this.otelTraceUtil = otelTraceUtil; } @Override @@ -105,20 +103,26 @@ public RunQueryResponse runQuery(RunQueryRequest request) { @Override public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryRequest request) { - return invokeRpc( - () -> datastoreRpc.runAggregationQuery(request), SPAN_NAME_RUN_AGGREGATION_QUERY); + ReadOptions readOptions = request.getReadOptions(); + boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); + String spanName = + (isTransactional + ? com.google.cloud.datastore.telemetry.TraceUtil + .SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY + : com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY); + return invokeRpc(() -> datastoreRpc.runAggregationQuery(request), spanName); } public O invokeRpc(Callable block, String startSpan) { - Span span = traceUtil.startSpan(startSpan); - try (Scope scope = traceUtil.getTracer().withSpan(span)) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); + try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); } catch (RetryHelperException e) { - span.setStatus(Status.UNKNOWN.withDescription(e.getMessage())); + span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); + span.end(); } } } diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TraceUtil.java deleted file mode 100644 index 57525d15d..000000000 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TraceUtil.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.datastore; - -import com.google.cloud.datastore.spi.v1.HttpDatastoreRpc; -import io.opencensus.trace.EndSpanOptions; -import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; -import io.opencensus.trace.Tracing; - -/** - * Helper class for tracing utility. It is used for instrumenting {@link HttpDatastoreRpc} with - * OpenCensus APIs. - * - *

TraceUtil instances are created by the {@link TraceUtil#getInstance()} method. - */ -public class TraceUtil { - private final Tracer tracer = Tracing.getTracer(); - private static final TraceUtil traceUtil = new TraceUtil(); - static final String SPAN_NAME_ALLOCATEIDS = "CloudDatastoreOperation.allocateIds"; - static final String SPAN_NAME_TRANSACTION = "CloudDatastoreOperation.readWriteTransaction"; - static final String SPAN_NAME_BEGINTRANSACTION = "CloudDatastoreOperation.beginTransaction"; - static final String SPAN_NAME_COMMIT = "CloudDatastoreOperation.commit"; - static final String SPAN_NAME_LOOKUP = "CloudDatastoreOperation.lookup"; - static final String SPAN_NAME_RESERVEIDS = "CloudDatastoreOperation.reserveIds"; - static final String SPAN_NAME_ROLLBACK = "CloudDatastoreOperation.rollback"; - static final String SPAN_NAME_RUNQUERY = "CloudDatastoreOperation.runQuery"; - static final String SPAN_NAME_RUN_AGGREGATION_QUERY = - "CloudDatastoreOperation.runAggregationQuery"; - static final EndSpanOptions END_SPAN_OPTIONS = - EndSpanOptions.builder().setSampleToLocalSpanStore(true).build(); - - /** - * Starts a new span. - * - * @param spanName The name of the returned Span. - * @return The newly created {@link Span}. - */ - protected Span startSpan(String spanName) { - return tracer.spanBuilder(spanName).startSpan(); - } - - /** - * Return the global {@link Tracer}. - * - * @return The global {@link Tracer}. - */ - public Tracer getTracer() { - return tracer; - } - - /** - * Return TraceUtil Object. - * - * @return An instance of {@link TraceUtil} - */ - public static TraceUtil getInstance() { - return traceUtil; - } - - private TraceUtil() {} -} diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java index f08a908ec..e730db81f 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/TransactionImpl.java @@ -20,6 +20,7 @@ import com.google.api.core.BetaApi; import com.google.cloud.datastore.models.ExplainOptions; +import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.common.collect.ImmutableList; import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.TransactionOptions; @@ -28,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import javax.annotation.Nonnull; final class TransactionImpl extends BaseDatastoreBatchWriter implements Transaction { @@ -37,6 +39,8 @@ final class TransactionImpl extends BaseDatastoreBatchWriter implements Transact private final ReadOptionProtoPreparer readOptionProtoPreparer; + @Nonnull private final TraceUtil traceUtil; + static class ResponseImpl implements Transaction.Response { private final com.google.datastore.v1.CommitResponse response; @@ -78,6 +82,7 @@ public List getGeneratedKeys() { transactionId = datastore.requestTransactionId(requestPb); this.readOptionProtoPreparer = new ReadOptionProtoPreparer(); + this.traceUtil = datastore.getOptions().getTraceUtil(); } @Override diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java index fd3cdc658..d927a2d7f 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/spi/v1/HttpDatastoreRpc.java @@ -21,7 +21,6 @@ import com.google.api.client.http.HttpTransport; import com.google.cloud.datastore.DatastoreException; import com.google.cloud.datastore.DatastoreOptions; -import com.google.cloud.datastore.TraceUtil; import com.google.cloud.http.CensusHttpModule; import com.google.cloud.http.HttpTransportOptions; import com.google.datastore.v1.AllocateIdsRequest; @@ -40,6 +39,7 @@ import com.google.datastore.v1.RunAggregationQueryResponse; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; +import io.opencensus.trace.Tracing; import java.io.IOException; import java.net.InetAddress; import java.net.URL; @@ -80,8 +80,7 @@ public HttpDatastoreRpc(DatastoreOptions options) { private HttpRequestInitializer getHttpRequestInitializer( final DatastoreOptions options, HttpTransportOptions httpTransportOptions) { // Open Census initialization - CensusHttpModule censusHttpModule = - new CensusHttpModule(TraceUtil.getInstance().getTracer(), true); + CensusHttpModule censusHttpModule = new CensusHttpModule(Tracing.getTracer(), true); final HttpRequestInitializer censusHttpModuleHttpRequestInitializer = censusHttpModule.getHttpRequestInitializer( httpTransportOptions.getHttpRequestInitializer(options)); diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java new file mode 100644 index 000000000..ebb630515 --- /dev/null +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/DisabledTraceUtil.java @@ -0,0 +1,131 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalApi; +import com.google.cloud.datastore.telemetry.TraceUtil.Context; +import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Tracing utility implementation, used to stub out tracing instrumentation when tracing is + * disabled. + */ +@InternalApi +public class DisabledTraceUtil implements TraceUtil { + static class Span implements TraceUtil.Span { + @Override + public void end() {} + + @Override + public void end(Throwable error) {} + + @Override + public void endAtFuture(ApiFuture futureValue) {} + + @Override + public TraceUtil.Span addEvent(String name) { + return this; + } + + @Override + public TraceUtil.Span addEvent(String name, Map attributes) { + return this; + } + + @Override + public TraceUtil.Span setAttribute(String key, int value) { + return this; + } + + @Override + public TraceUtil.Span setAttribute(String key, String value) { + return this; + } + + @Override + public TraceUtil.Span setAttribute(String key, boolean value) { + return this; + } + + public io.opentelemetry.api.trace.Span getSpan() { + return null; + } + + @Override + public Scope makeCurrent() { + return new Scope(); + } + } + + static class Context implements TraceUtil.Context { + @Override + public Scope makeCurrent() { + return new Scope(); + } + } + + static class Scope implements TraceUtil.Scope { + @Override + public void close() {} + } + + @Nullable + @Override + public ApiFunction getChannelConfigurator() { + return null; + } + + @Override + public Span startSpan(String spanName) { + return new Span(); + } + + @Override + public TraceUtil.Span startSpan(String spanName, TraceUtil.Span parentSpan) { + return new Span(); + } + + public SpanBuilder addSettingsAttributesToCurrentSpan(SpanBuilder spanBuilder) { + return getTracer().spanBuilder("TRACING_DISABLED_NO_OP"); + } + + @Nonnull + @Override + public TraceUtil.Span getCurrentSpan() { + return new Span(); + } + + @Nonnull + @Override + public TraceUtil.Context getCurrentContext() { + return new Context(); + } + + @Override + public Tracer getTracer() { + return TracerProvider.noop().get(LIBRARY_NAME); + } +} diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java new file mode 100644 index 000000000..40fc7308e --- /dev/null +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java @@ -0,0 +1,323 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.telemetry.TraceUtil.Context; +import com.google.cloud.datastore.telemetry.TraceUtil.Scope; +import com.google.cloud.datastore.telemetry.TraceUtil.Span; +import com.google.common.base.Throwables; +import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Tracing utility implementation, used to stub out tracing instrumentation when tracing is enabled. + */ +@InternalApi +public class EnabledTraceUtil implements TraceUtil { + private final Tracer tracer; + private final OpenTelemetry openTelemetry; + private final DatastoreOptions datastoreOptions; + + EnabledTraceUtil(DatastoreOptions datastoreOptions) { + OpenTelemetry openTelemetry = datastoreOptions.getOpenTelemetryOptions().getOpenTelemetry(); + + // If tracing is enabled, but an OpenTelemetry instance is not provided, fall back + // to using GlobalOpenTelemetry. + if (openTelemetry == null) { + openTelemetry = GlobalOpenTelemetry.get(); + } + + this.datastoreOptions = datastoreOptions; + this.openTelemetry = openTelemetry; + this.tracer = openTelemetry.getTracer(LIBRARY_NAME); + } + + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + @Override + @Nullable + public ApiFunction getChannelConfigurator() { + // TODO(jimit) Update this to return a gRPC Channel Configurator after gRPC upgrade. + return null; + } + + static class Span implements TraceUtil.Span { + private final io.opentelemetry.api.trace.Span span; + private final String spanName; + + public Span(io.opentelemetry.api.trace.Span span, String spanName) { + this.span = span; + this.spanName = spanName; + } + + @Override + public io.opentelemetry.api.trace.Span getSpan() { + return this.span; + } + + /** Ends this span. */ + @Override + public void end() { + span.end(); + } + + /** Ends this span in an error. */ + @Override + public void end(Throwable error) { + span.setStatus(StatusCode.ERROR, error.getMessage()); + span.recordException( + error, + Attributes.builder() + .put("exception.message", error.getMessage()) + .put("exception.type", error.getClass().getName()) + .put("exception.stacktrace", Throwables.getStackTraceAsString(error)) + .build()); + span.end(); + } + + /** + * If an operation ends in the future, its relevant span should end _after_ the future has been + * completed. This method "appends" the span completion code at the completion of the given + * future. In order for telemetry info to be recorded, the future returned by this method should + * be completed. + */ + @Override + public void endAtFuture(ApiFuture futureValue) { + io.opentelemetry.context.Context asyncContext = io.opentelemetry.context.Context.current(); + ApiFutures.addCallback( + futureValue, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable t) { + try (io.opentelemetry.context.Scope scope = asyncContext.makeCurrent()) { + span.addEvent(spanName + " failed."); + end(t); + } + } + + @Override + public void onSuccess(T result) { + try (io.opentelemetry.context.Scope scope = asyncContext.makeCurrent()) { + span.addEvent(spanName + " succeeded."); + end(); + } + } + }); + } + + /** Adds the given event to this span. */ + @Override + public TraceUtil.Span addEvent(String name) { + span.addEvent(name); + return this; + } + + @Override + public TraceUtil.Span addEvent(String name, Map attributes) { + AttributesBuilder attributesBuilder = Attributes.builder(); + attributes.forEach( + (key, value) -> { + if (value instanceof Integer) { + attributesBuilder.put(key, (int) value); + } else if (value instanceof Long) { + attributesBuilder.put(key, (long) value); + } else if (value instanceof Double) { + attributesBuilder.put(key, (double) value); + } else if (value instanceof Float) { + attributesBuilder.put(key, (float) value); + } else if (value instanceof Boolean) { + attributesBuilder.put(key, (boolean) value); + } else if (value instanceof String) { + attributesBuilder.put(key, (String) value); + } else { + // OpenTelemetry APIs do not support any other type. + throw new IllegalArgumentException( + "Unknown attribute type:" + value.getClass().getSimpleName()); + } + }); + span.addEvent(name, attributesBuilder.build()); + return this; + } + + @Override + public TraceUtil.Span setAttribute(String key, int value) { + span.setAttribute(ATTRIBUTE_SERVICE_PREFIX + key, value); + return this; + } + + @Override + public TraceUtil.Span setAttribute(String key, String value) { + span.setAttribute(ATTRIBUTE_SERVICE_PREFIX + key, value); + return this; + } + + @Override + public TraceUtil.Span setAttribute(String key, boolean value) { + span.setAttribute(ATTRIBUTE_SERVICE_PREFIX + key, value); + return this; + } + + @Override + public Scope makeCurrent() { + try (io.opentelemetry.context.Scope scope = span.makeCurrent()) { + return new Scope(scope); + } + } + } + + static class Scope implements TraceUtil.Scope { + private final io.opentelemetry.context.Scope scope; + + Scope(io.opentelemetry.context.Scope scope) { + this.scope = scope; + } + + @Override + public void close() { + scope.close(); + } + } + + static class Context implements TraceUtil.Context { + private final io.opentelemetry.context.Context context; + + Context(io.opentelemetry.context.Context context) { + this.context = context; + } + + @Override + public Scope makeCurrent() { + try (io.opentelemetry.context.Scope scope = context.makeCurrent()) { + return new Scope(scope); + } + } + } + + /** Applies the current Datastore instance settings as attributes to the current Span */ + @Override + public SpanBuilder addSettingsAttributesToCurrentSpan(SpanBuilder spanBuilder) { + spanBuilder = + spanBuilder.setAllAttributes( + Attributes.builder() + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.databaseId", + datastoreOptions.getDatabaseId()) + .put(ATTRIBUTE_SERVICE_PREFIX + "settings.host", datastoreOptions.getHost()) + .build()); + + if (datastoreOptions.getCredentials() != null) { + spanBuilder = + spanBuilder.setAttribute( + ATTRIBUTE_SERVICE_PREFIX + "settings.credentials.authenticationType", + datastoreOptions.getCredentials().getAuthenticationType()); + } + + if (datastoreOptions.getRetrySettings() != null) { + spanBuilder = + spanBuilder.setAllAttributes( + Attributes.builder() + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.initialRetryDelay", + datastoreOptions.getRetrySettings().getInitialRetryDelay().toString()) + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.maxRetryDelay", + datastoreOptions.getRetrySettings().getMaxRetryDelay().toString()) + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.retryDelayMultiplier", + String.valueOf(datastoreOptions.getRetrySettings().getRetryDelayMultiplier())) + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.maxAttempts", + String.valueOf(datastoreOptions.getRetrySettings().getMaxAttempts())) + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.initialRpcTimeout", + datastoreOptions.getRetrySettings().getInitialRpcTimeout().toString()) + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.maxRpcTimeout", + datastoreOptions.getRetrySettings().getMaxRpcTimeout().toString()) + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.rpcTimeoutMultiplier", + String.valueOf(datastoreOptions.getRetrySettings().getRpcTimeoutMultiplier())) + .put( + ATTRIBUTE_SERVICE_PREFIX + "settings.retrySettings.totalTimeout", + datastoreOptions.getRetrySettings().getTotalTimeout().toString()) + .build()); + } + + // Add the memory utilization of the client at the time this trace was collected. + long totalMemory = Runtime.getRuntime().totalMemory(); + long freeMemory = Runtime.getRuntime().freeMemory(); + double memoryUtilization = ((double) (totalMemory - freeMemory)) / totalMemory; + spanBuilder.setAttribute( + ATTRIBUTE_SERVICE_PREFIX + "memoryUtilization", + String.format("%.2f", memoryUtilization * 100) + "%"); + + return spanBuilder; + } + + @Override + public Span startSpan(String spanName) { + SpanBuilder spanBuilder = tracer.spanBuilder(spanName).setSpanKind(SpanKind.PRODUCER); + io.opentelemetry.api.trace.Span span = + addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(); + return new Span(span, spanName); + } + + @Override + public TraceUtil.Span startSpan(String spanName, TraceUtil.Span parentSpan) { + SpanBuilder spanBuilder = + tracer + .spanBuilder(spanName) + .setSpanKind(SpanKind.PRODUCER) + .setParent(io.opentelemetry.context.Context.current().with(parentSpan.getSpan())); + return new Span(addSettingsAttributesToCurrentSpan(spanBuilder).startSpan(), spanName); + } + + @Nonnull + @Override + public TraceUtil.Span getCurrentSpan() { + return new Span(io.opentelemetry.api.trace.Span.current(), ""); + } + + @Nonnull + @Override + public TraceUtil.Context getCurrentContext() { + return new Context(io.opentelemetry.context.Context.current()); + } + + @Override + public Tracer getTracer() { + return this.tracer; + } +} diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java new file mode 100644 index 000000000..fd616a733 --- /dev/null +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TraceUtil.java @@ -0,0 +1,167 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.InternalExtensionOnly; +import com.google.cloud.datastore.DatastoreOptions; +import io.grpc.ManagedChannelBuilder; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.Tracer; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** Utility interface to manage OpenTelemetry tracing instrumentation based on the configuration. */ +@InternalExtensionOnly +public interface TraceUtil { + static final String ATTRIBUTE_SERVICE_PREFIX = "gcp.datastore."; + static final String ENABLE_TRACING_ENV_VAR = "DATASTORE_ENABLE_TRACING"; + static final String LIBRARY_NAME = "com.google.cloud.datastore"; + static final String SPAN_NAME_LOOKUP = "Lookup"; + static final String SPAN_NAME_ALLOCATE_IDS = "AllocateIds"; + static final String SPAN_NAME_RESERVE_IDS = "ReserveIds"; + static final String SPAN_NAME_COMMIT = "Commit"; + static final String SPAN_NAME_RUN_QUERY = "RunQuery"; + static final String SPAN_NAME_RUN_AGGREGATION_QUERY = "RunAggregationQuery"; + static final String SPAN_NAME_TRANSACTION_RUN = "Transaction.Run"; + static final String SPAN_NAME_BEGIN_TRANSACTION = "Transaction.Begin"; + static final String SPAN_NAME_TRANSACTION_LOOKUP = "Transaction.Lookup"; + static final String SPAN_NAME_TRANSACTION_COMMIT = "Transaction.Commit"; + static final String SPAN_NAME_TRANSACTION_RUN_QUERY = "Transaction.RunQuery"; + static final String SPAN_NAME_ROLLBACK = "Transaction.Rollback"; + static final String SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY = + "Transaction.RunAggregationQuery"; + static final String ATTRIBUTES_KEY_DOCUMENT_COUNT = "doc_count"; + static final String ATTRIBUTES_KEY_TRANSACTIONAL = "transactional"; + static final String ATTRIBUTES_KEY_TRANSACTION_ID = "transaction_id"; + static final String ATTRIBUTES_KEY_READ_CONSISTENCY = "read_consistency"; + static final String ATTRIBUTES_KEY_RECEIVED = "Received"; + static final String ATTRIBUTES_KEY_MISSING = "Missing"; + static final String ATTRIBUTES_KEY_DEFERRED = "Deferred"; + static final String ATTRIBUTES_KEY_MORE_RESULTS = "mor_results"; + + /** + * Creates and returns an instance of the TraceUtil class. + * + * @param datastoreOptions The DatastoreOptions object that is requesting an instance of + * TraceUtil. + * @return An instance of the TraceUtil class. + */ + static TraceUtil getInstance(@Nonnull DatastoreOptions datastoreOptions) { + boolean createEnabledInstance = datastoreOptions.getOpenTelemetryOptions().isEnabled(); + + // The environment variable can override options to enable/disable telemetry collection. + String enableTracingEnvVar = System.getenv(ENABLE_TRACING_ENV_VAR); + if (enableTracingEnvVar != null) { + if (enableTracingEnvVar.equalsIgnoreCase("true") + || enableTracingEnvVar.equalsIgnoreCase("on")) { + createEnabledInstance = true; + } + if (enableTracingEnvVar.equalsIgnoreCase("false") + || enableTracingEnvVar.equalsIgnoreCase("off")) { + createEnabledInstance = false; + } + } + + if (createEnabledInstance) { + return new EnabledTraceUtil(datastoreOptions); + } else { + return new DisabledTraceUtil(); + } + } + + /** Returns a channel configurator for gRPC, or {@code null} if tracing is disabled. */ + @Nullable + ApiFunction getChannelConfigurator(); + + /** Represents a trace span. */ + interface Span { + /** Adds the given event to this span. */ + Span addEvent(String name); + + /** Adds the given event with the given attributes to this span. */ + Span addEvent(String name, Map attributes); + + /** Adds the given attribute to this span. */ + Span setAttribute(String key, int value); + + /** Adds the given attribute to this span. */ + Span setAttribute(String key, String value); + + /** Adds the given attribute to this span. */ + Span setAttribute(String key, boolean value); + + io.opentelemetry.api.trace.Span getSpan(); + + /** Marks this span as the current span. */ + Scope makeCurrent(); + + /** Ends this span. */ + void end(); + + /** Ends this span in an error. */ + void end(Throwable error); + + /** + * If an operation ends in the future, its relevant span should end _after_ the future has been + * completed. This method "appends" the span completion code at the completion of the given + * future. In order for telemetry info to be recorded, the future returned by this method should + * be completed. + */ + void endAtFuture(ApiFuture futureValue); + } + + /** Represents a trace context. */ + interface Context { + /** Makes this context the current context. */ + Scope makeCurrent(); + } + + /** Represents a trace scope. */ + interface Scope extends AutoCloseable { + /** Closes the current scope. */ + void close(); + } + + /** Starts a new span with the given name, sets it as the current span, and returns it. */ + Span startSpan(String spanName); + + /** + * Starts a new span with the given name and the span represented by the parentSpan as its parent, + * sets it as the current span and returns it. + */ + Span startSpan(String spanName, Span parentSpan); + + /** + * Adds common SpanAttributes to the current span, useful when hand-creating a new Span without + * using the TraceUtil.Span interface. + */ + SpanBuilder addSettingsAttributesToCurrentSpan(SpanBuilder spanBuilder); + + /** Returns the current span. */ + @Nonnull + Span getCurrentSpan(); + + /** Returns the current Context. */ + @Nonnull + Context getCurrentContext(); + + /** Returns the current OpenTelemetry Tracer when OpenTelemetry SDK is provided. */ + Tracer getTracer(); +} diff --git a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/testing/RemoteDatastoreHelper.java b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/testing/RemoteDatastoreHelper.java index 596ce96d8..6167cedca 100644 --- a/google-cloud-datastore/src/main/java/com/google/cloud/datastore/testing/RemoteDatastoreHelper.java +++ b/google-cloud-datastore/src/main/java/com/google/cloud/datastore/testing/RemoteDatastoreHelper.java @@ -19,13 +19,16 @@ import com.google.api.core.InternalApi; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import com.google.cloud.datastore.DatastoreOptions; import com.google.cloud.datastore.Key; import com.google.cloud.datastore.Query; import com.google.cloud.datastore.QueryResults; import com.google.cloud.datastore.StructuredQuery; import com.google.cloud.http.HttpTransportOptions; +import io.opentelemetry.sdk.OpenTelemetrySdk; import java.util.UUID; +import javax.annotation.Nullable; import org.threeten.bp.Duration; /** @@ -38,13 +41,13 @@ * RetrySettings#getTotalTimeout()} is {@code 120000} and {@link * RetrySettings#getInitialRetryDelay()} is {@code 250}. {@link * HttpTransportOptions#getConnectTimeout()} and {@link HttpTransportOptions#getReadTimeout()} are - * both both set to {@code 60000}. + * both both set to {@code 60000}. If an OpenTelemetrySdk object is passed in, OpenTelemetry Trace + * collection will be enabled for the Client application. * *

Internal testing use only */ @InternalApi public class RemoteDatastoreHelper { - private final DatastoreOptions options; private final Datastore datastore; private final String namespace; @@ -78,18 +81,30 @@ public static RemoteDatastoreHelper create() { } /** Creates a {@code RemoteStorageHelper} object. */ - public static RemoteDatastoreHelper create(String databaseId) { + public static RemoteDatastoreHelper create( + String databaseId, @Nullable OpenTelemetrySdk openTelemetrySdk) { HttpTransportOptions transportOptions = DatastoreOptions.getDefaultHttpTransportOptions(); transportOptions = transportOptions.toBuilder().setConnectTimeout(60000).setReadTimeout(60000).build(); - DatastoreOptions datastoreOption = + DatastoreOptions.Builder datastoreOptionBuilder = DatastoreOptions.newBuilder() .setDatabaseId(databaseId) .setNamespace(UUID.randomUUID().toString()) .setRetrySettings(retrySettings()) - .setTransportOptions(transportOptions) - .build(); - return new RemoteDatastoreHelper(datastoreOption); + .setTransportOptions(transportOptions); + + if (openTelemetrySdk != null) { + datastoreOptionBuilder.setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder() + .setOpenTelemetry(openTelemetrySdk) + .setTracingEnabled(true) + .build()); + } + return new RemoteDatastoreHelper(datastoreOptionBuilder.build()); + } + + public static RemoteDatastoreHelper create(String databaseId) { + return create(databaseId, /*openTelemetrySdk=*/ null); } private static RetrySettings retrySettings() { diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java index a545580e2..85703f739 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreOptionsTest.java @@ -70,6 +70,20 @@ public void testHost() { assertEquals("http://localhost:" + PORT, options.build().getHost()); } + @Test + public void testOpenTelemetryOptionsEnabled() { + options.setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()); + assertTrue(options.build().getOpenTelemetryOptions().isEnabled()); + } + + @Test + public void testOpenTelemetryOptionsDisabled() { + options.setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(false).build()); + assertTrue(!options.build().getOpenTelemetryOptions().isEnabled()); + } + @Test public void testNamespace() { assertTrue(options.build().getNamespace().isEmpty()); diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java index cd768f986..d88e1d1ec 100644 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreTest.java @@ -176,6 +176,8 @@ public static void beforeClass() throws IOException, InterruptedException { public void setUp() { rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class); rpcMock = EasyMock.createStrictMock(DatastoreRpc.class); + DatastoreOpenTelemetryOptions.Builder otelOptionsBuilder = + DatastoreOpenTelemetryOptions.newBuilder(); rpcMockOptions = options .toBuilder() diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java deleted file mode 100644 index b86355afa..000000000 --- a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecoratorTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.datastore; - -import static com.google.cloud.datastore.TraceUtil.END_SPAN_OPTIONS; -import static com.google.cloud.datastore.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; -import static com.google.common.truth.Truth.assertThat; -import static com.google.rpc.Code.UNAVAILABLE; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.createStrictMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; - -import com.google.api.gax.retrying.RetrySettings; -import com.google.cloud.datastore.spi.v1.DatastoreRpc; -import com.google.datastore.v1.RunAggregationQueryRequest; -import com.google.datastore.v1.RunAggregationQueryResponse; -import io.opencensus.trace.Span; -import io.opencensus.trace.Tracer; -import org.junit.Before; -import org.junit.Test; - -public class RetryAndTraceDatastoreRpcDecoratorTest { - - public static final int MAX_ATTEMPTS = 3; - private DatastoreRpc mockDatastoreRpc; - private TraceUtil mockTraceUtil; - private DatastoreOptions datastoreOptions = - DatastoreOptions.newBuilder().setProjectId("project-id").build(); - private RetrySettings retrySettings = - RetrySettings.newBuilder().setMaxAttempts(MAX_ATTEMPTS).build(); - - private RetryAndTraceDatastoreRpcDecorator datastoreRpcDecorator; - - @Before - public void setUp() throws Exception { - mockDatastoreRpc = createStrictMock(DatastoreRpc.class); - mockTraceUtil = createStrictMock(TraceUtil.class); - datastoreRpcDecorator = - new RetryAndTraceDatastoreRpcDecorator( - mockDatastoreRpc, mockTraceUtil, retrySettings, datastoreOptions); - } - - @Test - public void testRunAggregationQuery() { - Span mockSpan = createStrictMock(Span.class); - RunAggregationQueryRequest aggregationQueryRequest = - RunAggregationQueryRequest.getDefaultInstance(); - RunAggregationQueryResponse aggregationQueryResponse = - RunAggregationQueryResponse.getDefaultInstance(); - - expect(mockDatastoreRpc.runAggregationQuery(aggregationQueryRequest)) - .andThrow( - new DatastoreException( - UNAVAILABLE.getNumber(), "API not accessible currently", UNAVAILABLE.name())) - .times(2) - .andReturn(aggregationQueryResponse); - expect(mockTraceUtil.startSpan(SPAN_NAME_RUN_AGGREGATION_QUERY)).andReturn(mockSpan); - expect(mockTraceUtil.getTracer()).andReturn(createNiceMock(Tracer.class)); - mockSpan.end(END_SPAN_OPTIONS); - - replay(mockDatastoreRpc, mockTraceUtil, mockSpan); - - RunAggregationQueryResponse actualAggregationQueryResponse = - datastoreRpcDecorator.runAggregationQuery(aggregationQueryRequest); - - assertThat(actualAggregationQueryResponse).isSameInstanceAs(aggregationQueryResponse); - verify(mockDatastoreRpc, mockTraceUtil, mockSpan); - } -} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java new file mode 100644 index 000000000..bee54a1f0 --- /dev/null +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITE2ETracingTest.java @@ -0,0 +1,1009 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.it; + +import static com.google.cloud.datastore.aggregation.Aggregation.count; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_ALLOCATE_IDS; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_BEGIN_TRANSACTION; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_COMMIT; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_LOOKUP; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RESERVE_IDS; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_ROLLBACK; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_QUERY; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_COMMIT; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_LOOKUP; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN; +import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY; +import static com.google.common.truth.Truth.assertThat; +import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.datastore.AggregationQuery; +import com.google.cloud.datastore.AggregationResult; +import com.google.cloud.datastore.AggregationResults; +import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.IncompleteKey; +import com.google.cloud.datastore.Key; +import com.google.cloud.datastore.KeyFactory; +import com.google.cloud.datastore.Query; +import com.google.cloud.datastore.QueryResults; +import com.google.cloud.datastore.ReadOption; +import com.google.cloud.datastore.StructuredQuery; +import com.google.cloud.datastore.StructuredQuery.PropertyFilter; +import com.google.cloud.datastore.Transaction; +import com.google.cloud.datastore.testing.RemoteDatastoreHelper; +import com.google.cloud.opentelemetry.trace.TraceConfiguration; +import com.google.cloud.opentelemetry.trace.TraceExporter; +import com.google.cloud.trace.v1.TraceServiceClient; +import com.google.common.base.Preconditions; +import com.google.devtools.cloudtrace.v1.Trace; +import com.google.devtools.cloudtrace.v1.TraceSpan; +import com.google.testing.junit.testparameterinjector.TestParameter; +import com.google.testing.junit.testparameterinjector.TestParameterInjector; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +// This End-to-End test verifies Client-side Tracing Functionality instrumented using the +// OpenTelemetry API. +// The test depends on the following external APIs/Services: +// 1. Java OpenTelemetry SDK +// 2. Cloud Trace Exporter +// 3. TraceServiceClient from Cloud Trace API v1. +// +// Permissions required to run this test (https://cloud.google.com/trace/docs/iam#trace-roles): +// 1. gcloud auth application-default login must be run with the test user. +// 2. To write traces, test user must have one of roles/cloudtrace.[admin|agent|user] roles. +// 3. To read traces, test user must have one of roles/cloudtrace.[admin|user] roles. +// +// Each test-case has the following workflow: +// 1. OpenTelemetry SDK is initialized with Cloud Trace Exporter and 100% Trace Sampling +// 2. On initialization, Datastore client is provided the OpenTelemetry SDK object from (1) +// 3. A custom TraceID is generated and injected using a custom SpanContext +// 4. Datastore operations are run inside a root TraceSpan created using the custom SpanContext from +// (3). +// 5. Traces are read-back using TraceServiceClient and verified against expected Call Stacks. +@RunWith(TestParameterInjector.class) +public class ITE2ETracingTest { + + protected boolean isUsingGlobalOpenTelemetrySDK() { + return useGlobalOpenTelemetrySDK; + } + + protected String datastoreNamedDatabase() { + return datastoreNamedDatabase; + } + + // Helper class to track call-stacks in a trace + protected static class TraceContainer { + + // Maps Span ID to TraceSpan + private final Map idSpanMap; + + // Maps Parent Span ID to a list of Child SpanIDs, useful for top-down traversal + private final Map> parentChildIdMap; + + // Tracks the Root Span ID + private long rootId; + + public TraceContainer(String rootSpanName, Trace trace) { + idSpanMap = new TreeMap<>(); + parentChildIdMap = new TreeMap<>(); + for (TraceSpan span : trace.getSpansList()) { + long spanId = span.getSpanId(); + idSpanMap.put(spanId, span); + if (rootSpanName.equals(span.getName())) { + rootId = span.getSpanId(); + } + + // Add self as a child of the parent span + if (!parentChildIdMap.containsKey(span.getParentSpanId())) { + parentChildIdMap.put(span.getParentSpanId(), new ArrayList<>()); + } + parentChildIdMap.get(span.getParentSpanId()).add(spanId); + } + } + + String spanName(long spanId) { + return idSpanMap.get(spanId).getName(); + } + + List childSpans(long spanId) { + return parentChildIdMap.get(spanId); + } + + // This method only works for matching call stacks with traces which have children of distinct + // type at all levels. This is good enough as the intention is to validate if the e2e path is + // WAI - the intention is not to validate Cloud Trace's correctness w.r.t. durability of all + // kinds of traces. + boolean containsCallStack(String... callStack) throws RuntimeException { + List expectedCallStack = Arrays.asList(callStack); + if (expectedCallStack.isEmpty()) { + throw new RuntimeException("Input callStack is empty"); + } + return dfsContainsCallStack(rootId, expectedCallStack); + } + + // Depth-first check for call stack in the trace + private boolean dfsContainsCallStack(long spanId, List expectedCallStack) { + logger.info( + "span=" + + spanName(spanId) + + ", expectedCallStack[0]=" + + (expectedCallStack.isEmpty() ? "null" : expectedCallStack.get(0))); + if (expectedCallStack.isEmpty()) { + return false; + } + if (spanName(spanId).equals(expectedCallStack.get(0))) { + // Recursion termination + if (childSpans(spanId) == null) { + logger.info("No more children for " + spanName(spanId)); + return expectedCallStack.size() <= 1; + } else { + // Examine the child spans + for (Long childSpan : childSpans(spanId)) { + int callStackListSize = expectedCallStack.size(); + logger.info( + "childSpan=" + + spanName(childSpan) + + ", expectedCallStackSize=" + + callStackListSize); + if (dfsContainsCallStack( + childSpan, + expectedCallStack.subList( + /*fromIndexInclusive=*/ 1, /*toIndexExclusive*/ callStackListSize))) { + return true; + } + } + } + } else { + logger.info(spanName(spanId) + " didn't match " + expectedCallStack.get(0)); + } + return false; + } + } + + private static final Logger logger = Logger.getLogger(ITE2ETracingTest.class.getName()); + + private static final String RUN_AGGREGATION_QUERY_RPC_NAME = "RunAggregationQuery"; + + private static final String RUN_QUERY_RPC_NAME = "RunQuery"; + + private static final int NUM_TRACE_ID_BYTES = 32; + + private static final int NUM_SPAN_ID_BYTES = 16; + + private static final int GET_TRACE_RETRY_COUNT = 60; + + private static final int GET_TRACE_RETRY_BACKOFF_MILLIS = 1000; + + private static final int TRACE_FORCE_FLUSH_MILLIS = 5000; + + private static final int TRACE_PROVIDER_SHUTDOWN_MILLIS = 1000; + + private static Key KEY1; + + private static Key KEY2; + + private static Key KEY3; + + private static Key KEY4; + + // Random int generator for trace ID and span ID + private static Random random; + + private static TraceExporter traceExporter; + + // Required for reading back traces from Cloud Trace for validation + private static TraceServiceClient traceClient_v1; + + // Custom SpanContext for each test, required for TraceID injection + private static SpanContext customSpanContext; + + // Trace read back from Cloud Trace using traceClient_v1 for verification + private static Trace retrievedTrace; + + private static String rootSpanName; + private static Tracer tracer; + + // Required to set custom-root span + private static OpenTelemetrySdk openTelemetrySdk; + + private static String projectId; + + private static DatastoreOptions options; + + private static Datastore datastore; + + private static RemoteDatastoreHelper remoteDatastoreHelper; + + @TestParameter boolean useGlobalOpenTelemetrySDK; + + @TestParameter({ + /*(default)*/ + "", + "test-db" + }) + String datastoreNamedDatabase; + + @BeforeClass + public static void setup() throws IOException { + projectId = DatastoreOptions.getDefaultProjectId(); + traceExporter = + TraceExporter.createWithConfiguration( + TraceConfiguration.builder().setProjectId(projectId).build()); + traceClient_v1 = TraceServiceClient.create(); + random = new Random(); + } + + @Before + public void before() throws Exception { + // Set up OTel SDK + Resource resource = + Resource.getDefault().merge(Resource.builder().put(SERVICE_NAME, "Sparky").build()); + + if (isUsingGlobalOpenTelemetrySDK()) { + openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(BatchSpanProcessor.builder(traceExporter).build()) + .setSampler(Sampler.alwaysOn()) + .build()) + .buildAndRegisterGlobal(); + } else { + openTelemetrySdk = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(BatchSpanProcessor.builder(traceExporter).build()) + .setSampler(Sampler.alwaysOn()) + .build()) + .build(); + } + + // Initialize the Datastore DB w/ the OTel SDK. Ideally we'd do this is the @BeforeAll method + // but because gRPC traces need to be deterministically force-flushed for every test + String namedDb = datastoreNamedDatabase(); + logger.log(Level.INFO, "Integration test using named database " + namedDb); + remoteDatastoreHelper = RemoteDatastoreHelper.create(namedDb, openTelemetrySdk); + options = remoteDatastoreHelper.getOptions(); + datastore = options.getService(); + + Preconditions.checkNotNull( + datastore, + "Error instantiating Datastore. Check that the service account credentials " + + "were properly set."); + + String projectId = options.getProjectId(); + String kind1 = "kind1"; + KEY1 = + Key.newBuilder(projectId, kind1, "key1", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY2 = + Key.newBuilder(projectId, kind1, "key2", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY3 = + Key.newBuilder(projectId, kind1, "key3", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY4 = + Key.newBuilder(projectId, kind1, "key4", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + // Set up the tracer for custom TraceID injection + rootSpanName = + String.format("%s%d", this.getClass().getSimpleName(), System.currentTimeMillis()); + if (isUsingGlobalOpenTelemetrySDK()) { + tracer = GlobalOpenTelemetry.getTracer(rootSpanName); + } else { + tracer = + datastore + .getOptions() + .getOpenTelemetryOptions() + .getOpenTelemetry() + .getTracer(rootSpanName); + } + + // Get up a new SpanContext (ergo TraceId) for each test + customSpanContext = getNewSpanContext(); + assertNotNull(customSpanContext); + assertNull(retrievedTrace); + } + + @After + public void after() throws Exception { + if (isUsingGlobalOpenTelemetrySDK()) { + GlobalOpenTelemetry.resetForTest(); + } + remoteDatastoreHelper.deleteNamespace(); + rootSpanName = null; + tracer = null; + retrievedTrace = null; + customSpanContext = null; + openTelemetrySdk = null; + } + + @AfterClass + public static void teardown() throws Exception { + traceClient_v1.close(); + } + + // Generates a random hex string of length `numBytes` + private String generateRandomHexString(int numBytes) { + StringBuilder newTraceId = new StringBuilder(); + while (newTraceId.length() < numBytes) { + newTraceId.append(Integer.toHexString(random.nextInt())); + } + return newTraceId.substring(0, numBytes); + } + + protected String generateNewTraceId() { + return generateRandomHexString(NUM_TRACE_ID_BYTES); + } + + // Generates a random 16-byte hex string + protected String generateNewSpanId() { + return generateRandomHexString(NUM_SPAN_ID_BYTES); + } + + // Generates a new SpanContext w/ random traceId,spanId + protected SpanContext getNewSpanContext() { + String traceId = generateNewTraceId(); + String spanId = generateNewSpanId(); + logger.info("traceId=" + traceId + ", spanId=" + spanId); + + return SpanContext.create(traceId, spanId, TraceFlags.getSampled(), TraceState.getDefault()); + } + + protected Span getNewRootSpanWithContext() { + // Execute the DB operation in the context of the custom root span. + return tracer + .spanBuilder(rootSpanName) + .setParent(Context.root().with(Span.wrap(customSpanContext))) + .startSpan(); + } + + protected void waitForTracesToComplete() throws Exception { + logger.info("Flushing traces..."); + CompletableResultCode completableResultCode = + openTelemetrySdk.getSdkTracerProvider().forceFlush(); + completableResultCode.join(TRACE_FORCE_FLUSH_MILLIS, TimeUnit.MILLISECONDS); + } + + // Validates `retrievedTrace`. Cloud Trace indexes traces w/ eventual consistency, even when + // indexing traceId, therefore the test must retry a few times before the complete trace is + // available. + // For Transaction traces, there may be more spans than in the trace than specified in + // `callStack`. So `numExpectedSpans` is the expected total number of spans (and not just the + // spans in `callStack`) + protected void fetchAndValidateTrace( + String traceId, int numExpectedSpans, List> callStackList) + throws InterruptedException { + // Large enough count to accommodate eventually consistent Cloud Trace backend + int numRetries = GET_TRACE_RETRY_COUNT; + // Account for rootSpanName + numExpectedSpans++; + + // Fetch traces + do { + try { + retrievedTrace = traceClient_v1.getTrace(projectId, traceId); + assertEquals(traceId, retrievedTrace.getTraceId()); + + logger.info( + "expectedSpanCount=" + + numExpectedSpans + + ", retrievedSpanCount=" + + retrievedTrace.getSpansCount()); + } catch (NotFoundException notFound) { + logger.info("Trace not found, retrying in " + GET_TRACE_RETRY_BACKOFF_MILLIS + " ms"); + } catch (IndexOutOfBoundsException outOfBoundsException) { + logger.info("Call stack not found in trace. Retrying."); + } + if (retrievedTrace == null || numExpectedSpans != retrievedTrace.getSpansCount()) { + Thread.sleep(GET_TRACE_RETRY_BACKOFF_MILLIS); + } + } while (numRetries-- > 0 + && (retrievedTrace == null || numExpectedSpans != retrievedTrace.getSpansCount())); + + if (retrievedTrace == null || numExpectedSpans != retrievedTrace.getSpansCount()) { + throw new RuntimeException( + "Expected number of spans: " + + numExpectedSpans + + ", Actual number of spans: " + + (retrievedTrace != null + ? retrievedTrace.getSpansList().toString() + : "Trace NOT_FOUND")); + } + + TraceContainer traceContainer = new TraceContainer(rootSpanName, retrievedTrace); + + for (List callStack : callStackList) { + // Update all call stacks to be rooted at rootSpanName + ArrayList expectedCallStack = new ArrayList<>(callStack); + + // numExpectedSpans should account for rootSpanName (not passed in callStackList) + expectedCallStack.add(0, rootSpanName); + + // *May be* the full trace was returned + logger.info("Checking if TraceContainer contains the callStack"); + String[] expectedCallList = new String[expectedCallStack.size()]; + if (!traceContainer.containsCallStack(expectedCallStack.toArray(expectedCallList))) { + throw new RuntimeException( + "Expected spans: " + + Arrays.toString(expectedCallList) + + ", Actual spans: " + + (retrievedTrace != null + ? retrievedTrace.getSpansList().toString() + : "Trace NOT_FOUND")); + } + logger.severe("CallStack not found in TraceContainer."); + } + } + + // Validates `retrievedTrace`. Cloud Trace indexes traces w/ eventual consistency, even when + // indexing traceId, therefore the test must retry a few times before the complete trace is + // available. + // For Non-Transaction traces, there is a 1:1 ratio of spans in `spanNames` and in the trace. + protected void fetchAndValidateTrace(String traceId, String... spanNames) + throws InterruptedException { + fetchAndValidateTrace(traceId, spanNames.length, Arrays.asList(Arrays.asList(spanNames))); + } + + @Test + public void traceContainerTest() throws Exception { + // Make sure the test has a new SpanContext (and TraceId for injection) + assertNotNull(customSpanContext); + + // Inject new trace ID + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Entity entity = datastore.get(KEY1); + assertNull(entity); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + Trace traceResp = null; + int expectedSpanCount = 2; + + int numRetries = GET_TRACE_RETRY_COUNT; + do { + try { + traceResp = traceClient_v1.getTrace(projectId, customSpanContext.getTraceId()); + if (traceResp.getSpansCount() == expectedSpanCount) { + logger.info("Success: Got " + expectedSpanCount + " spans."); + break; + } + } catch (NotFoundException notFoundException) { + Thread.sleep(GET_TRACE_RETRY_BACKOFF_MILLIS); + logger.info("Trace not found, retrying in " + GET_TRACE_RETRY_BACKOFF_MILLIS + " ms"); + } + logger.info( + "Trace Found. The trace did not contain " + + expectedSpanCount + + " spans. Going to retry."); + numRetries--; + } while (numRetries > 0); + + // Make sure we got as many spans as we expected. + assertNotNull(traceResp); + assertEquals(expectedSpanCount, traceResp.getSpansCount()); + + TraceContainer traceCont = new TraceContainer(rootSpanName, traceResp); + + // Contains exact path + assertTrue(traceCont.containsCallStack(rootSpanName, SPAN_NAME_LOOKUP)); + + // Top-level mismatch + assertFalse(traceCont.containsCallStack(SPAN_NAME_LOOKUP, RUN_QUERY_RPC_NAME)); + + // Leaf-level mismatch/missing + assertFalse( + traceCont.containsCallStack( + rootSpanName, SPAN_NAME_LOOKUP, RUN_AGGREGATION_QUERY_RPC_NAME)); + } + + @Test + public void lookupTraceTest() throws Exception { + // Make sure the test has a new SpanContext (and TraceId for injection) + assertNotNull(customSpanContext); + + // Inject new trace ID + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Entity entity = datastore.get(KEY1); + assertNull(entity); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_LOOKUP); + } + + @Test + public void allocateIdsTraceTest() throws Exception { + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + String kind1 = "kind1"; + KeyFactory keyFactory = datastore.newKeyFactory().setKind(kind1); + IncompleteKey pk1 = keyFactory.newKey(); + Key key1 = datastore.allocateId(pk1); + assertEquals(key1.getProjectId(), pk1.getProjectId()); + assertEquals(key1.getNamespace(), pk1.getNamespace()); + assertEquals(key1.getAncestors(), pk1.getAncestors()); + assertEquals(key1.getKind(), pk1.getKind()); + assertTrue(key1.hasId()); + assertFalse(key1.hasName()); + assertEquals(Key.newBuilder(pk1, key1.getId()).build(), key1); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_ALLOCATE_IDS); + } + + @Test + public void reserveIdsTraceTest() throws Exception { + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + KeyFactory keyFactory = datastore.newKeyFactory().setKind("MyKind"); + Key key1 = keyFactory.newKey(10); + Key key2 = keyFactory.newKey("name"); + List keyList = datastore.reserveIds(key1, key2); + assertEquals(2, keyList.size()); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RESERVE_IDS); + } + + @Test + public void commitTraceTest() throws Exception { + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + + Entity entity1 = Entity.newBuilder(KEY1).set("test_key", "test_value").build(); + try (Scope ignored = rootSpan.makeCurrent()) { + Entity response = datastore.add(entity1); + assertEquals(entity1, response); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_COMMIT); + } + + @Test + public void putTraceTest() throws Exception { + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + + Entity entity1 = Entity.newBuilder(KEY1).set("test_key", "test_value").build(); + try (Scope ignored = rootSpan.makeCurrent()) { + Entity response = datastore.put(entity1); + assertEquals(entity1, response); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_COMMIT); + } + + @Test + public void updateTraceTest() throws Exception { + assertNotNull(customSpanContext); + + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Entity entity1_update = + Entity.newBuilder(entity1).set("test_field", "new_test_value1").build(); + Entity entity2_update = + Entity.newBuilder(entity2).set("test_field", "new_test_value1").build(); + datastore.update(entity1_update, entity2_update); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_COMMIT); + } + + @Test + public void deleteTraceTest() throws Exception { + assertNotNull(customSpanContext); + + Entity entity1 = Entity.newBuilder(KEY1).set("test_key", "test_value").build(); + Entity response = datastore.put(entity1); + assertEquals(entity1, response); + + Span rootSpan = getNewRootSpanWithContext(); + + try (Scope ignored = rootSpan.makeCurrent()) { + datastore.delete(entity1.getKey()); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_COMMIT); + } + + @Test + public void runQueryTraceTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + QueryResults queryResults = datastore.run(query); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_QUERY); + } + + @Test + public void runAggregationQueryTraceTest() throws Exception { + Entity entity1 = + Entity.newBuilder(KEY1) + .set("pepper_name", "jalapeno") + .set("max_scoville_level", 10000) + .build(); + Entity entity2 = + Entity.newBuilder(KEY2) + .set("pepper_name", "serrano") + .set("max_scoville_level", 25000) + .build(); + Entity entity3 = + Entity.newBuilder(KEY3) + .set("pepper_name", "habanero") + .set("max_scoville_level", 350000) + .build(); + Entity entity4 = + Entity.newBuilder(KEY4) + .set("pepper_name", "ghost") + .set("max_scoville_level", 1500000) + .build(); + + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + entityList.add(entity3); + entityList.add(entity4); + + List response = datastore.add(entity1, entity2, entity3, entity4); + assertEquals(entityList, response); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + PropertyFilter mediumSpicyFilters = PropertyFilter.lt("max_scoville_level", 100000); + StructuredQuery mediumSpicyQuery = + Query.newEntityQueryBuilder() + .setKind(KEY1.getKind()) + .setFilter(mediumSpicyFilters) + .build(); + AggregationQuery countSpicyPeppers = + Query.newAggregationQueryBuilder() + .addAggregation(count().as("count")) + .over(mediumSpicyQuery) + .build(); + AggregationResults results = datastore.runAggregation(countSpicyPeppers); + assertThat(results.size()).isEqualTo(1); + AggregationResult result = results.get(0); + assertThat(result.getLong("count")).isEqualTo(2L); + } finally { + rootSpan.end(); + } + + waitForTracesToComplete(); + + fetchAndValidateTrace(customSpanContext.getTraceId(), SPAN_NAME_RUN_AGGREGATION_QUERY); + } + + @Test + public void newTransactionReadTest() throws Exception { + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Transaction transaction = datastore.newTransaction(); + Entity entity = datastore.get(KEY1, ReadOption.transactionId(transaction.getTransactionId())); + transaction.commit(); + assertNull(entity); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace( + customSpanContext.getTraceId(), + /*numExpectedSpans=*/ 3, + Arrays.asList( + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); + } + + @Test + public void newTransactionQueryTest() throws Exception { + // Set up + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + assertNotNull(customSpanContext); + + // Test + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Transaction transaction = datastore.newTransaction(); + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + QueryResults queryResults = transaction.run(query); + transaction.commit(); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace( + customSpanContext.getTraceId(), + /*numExpectedSpans=*/ 3, + Arrays.asList( + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_TRANSACTION_RUN_QUERY), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); + } + + @Test + public void newTransactionReadWriteTraceTest() throws Exception { + // Set up + Entity entity1 = Entity.newBuilder(KEY1).set("pepper_type", "jalapeno").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("pepper_type", "habanero").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + String simplified_spice_level = "not_spicy"; + Entity entity1update = + Entity.newBuilder(entity1).set("spice_level", simplified_spice_level).build(); + + assertNotNull(customSpanContext); + + // Test + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Transaction transaction = datastore.newTransaction(); + entity1 = transaction.get(KEY1); + switch (entity1.getString("pepper_type")) { + case "jalapeno": + simplified_spice_level = "mild"; + break; + + case "habanero": + simplified_spice_level = "hot"; + break; + } + transaction.update(entity1update); + transaction.delete(KEY2); + transaction.commit(); + assertFalse(transaction.isActive()); + } finally { + rootSpan.end(); + } + + waitForTracesToComplete(); + + List list = datastore.fetch(KEY1, KEY2); + assertEquals(list.get(0), entity1update); + assertNull(list.get(1)); + + fetchAndValidateTrace( + customSpanContext.getTraceId(), + /*numExpectedSpans=*/ 3, + Arrays.asList( + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP), + Collections.singletonList(SPAN_NAME_TRANSACTION_COMMIT))); + } + + @Test + public void newTransactionRollbackTest() throws Exception { + // Set up + Entity entity1 = Entity.newBuilder(KEY1).set("pepper_type", "jalapeno").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("pepper_type", "habanero").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + String simplified_spice_level = "not_spicy"; + Entity entity1update = + Entity.newBuilder(entity1).set("spice_level", simplified_spice_level).build(); + + assertNotNull(customSpanContext); + + // Test + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + Transaction transaction = datastore.newTransaction(); + entity1 = transaction.get(KEY1); + switch (entity1.getString("pepper_type")) { + case "jalapeno": + simplified_spice_level = "mild"; + break; + + case "habanero": + simplified_spice_level = "hot"; + break; + } + transaction.update(entity1update); + transaction.delete(KEY2); + transaction.rollback(); + assertFalse(transaction.isActive()); + } finally { + rootSpan.end(); + } + + waitForTracesToComplete(); + + List list = datastore.fetch(KEY1, KEY2); + assertEquals(list.get(0), entity1); + assertEquals(list.get(1), entity2); + + fetchAndValidateTrace( + customSpanContext.getTraceId(), + /*numExpectedSpans=*/ 3, + Arrays.asList( + Collections.singletonList(SPAN_NAME_BEGIN_TRANSACTION), + Collections.singletonList(SPAN_NAME_TRANSACTION_LOOKUP), + Collections.singletonList(SPAN_NAME_ROLLBACK))); + } + + @Test + public void runInTransactionQueryTest() throws Exception { + // Set up + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + assertNotNull(customSpanContext); + + Span rootSpan = getNewRootSpanWithContext(); + try (Scope ignored = rootSpan.makeCurrent()) { + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + Datastore.TransactionCallable callable = + transaction -> { + QueryResults queryResults = datastore.run(query); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + return true; + }; + datastore.runInTransaction(callable); + } finally { + rootSpan.end(); + } + waitForTracesToComplete(); + + fetchAndValidateTrace( + customSpanContext.getTraceId(), + /*numExpectedSpans=*/ 4, + Arrays.asList( + Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_BEGIN_TRANSACTION), + Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_RUN_QUERY), + Arrays.asList(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_TRANSACTION_COMMIT))); + } +} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITTracingTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITTracingTest.java new file mode 100644 index 000000000..aefb51352 --- /dev/null +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITTracingTest.java @@ -0,0 +1,849 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.it; + +import static com.google.cloud.datastore.aggregation.Aggregation.count; +import static com.google.cloud.datastore.telemetry.TraceUtil.*; +import static com.google.common.truth.Truth.assertThat; +import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.datastore.AggregationQuery; +import com.google.cloud.datastore.AggregationResult; +import com.google.cloud.datastore.AggregationResults; +import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.IncompleteKey; +import com.google.cloud.datastore.Key; +import com.google.cloud.datastore.KeyFactory; +import com.google.cloud.datastore.Query; +import com.google.cloud.datastore.QueryResults; +import com.google.cloud.datastore.ReadOption; +import com.google.cloud.datastore.StructuredQuery; +import com.google.cloud.datastore.StructuredQuery.PropertyFilter; +import com.google.cloud.datastore.Transaction; +import com.google.cloud.datastore.testing.RemoteDatastoreHelper; +import com.google.common.base.Preconditions; +import com.google.testing.junit.testparameterinjector.TestParameter; +import com.google.testing.junit.testparameterinjector.TestParameterInjector; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.OpenTelemetrySdkBuilder; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.EventData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; + +@RunWith(TestParameterInjector.class) +public class ITTracingTest { + protected boolean isUsingGlobalOpenTelemetrySDK() { + return useGlobalOpenTelemetrySDK; + } + + protected String datastoreNamedDatabase() { + return datastoreNamedDatabase; + } + + private static final Logger logger = + Logger.getLogger(com.google.cloud.datastore.it.ITTracingTest.class.getName()); + + private static final int TRACE_FORCE_FLUSH_MILLIS = 1000; + private static final int TRACE_PROVIDER_SHUTDOWN_MILLIS = 1000; + private static final int IN_MEMORY_SPAN_EXPORTER_DELAY_MILLIS = 50; + private static final String SERVICE = "google.datastore.v1.Datastore/"; + + private static Key KEY1; + + private static Key KEY2; + + private static Key KEY3; + + private static Key KEY4; + + private static OpenTelemetrySdk openTelemetrySdk; + + // We use an InMemorySpanExporter for testing which keeps all generated trace spans + // in memory so that we can check their correctness. + protected InMemorySpanExporter inMemorySpanExporter; + private static DatastoreOptions options; + + protected Datastore datastore; + private static RemoteDatastoreHelper remoteDatastoreHelper; + + @TestParameter boolean useGlobalOpenTelemetrySDK; + + @TestParameter({ + /*(default)*/ + "", + "test-db" + }) + String datastoreNamedDatabase; + + Map spanNameToSpanId = new HashMap<>(); + Map spanIdToParentSpanId = new HashMap<>(); + Map spanNameToSpanData = new HashMap<>(); + + @Rule public TestName testName = new TestName(); + + @Before + public void before() { + inMemorySpanExporter = InMemorySpanExporter.create(); + + Resource resource = + Resource.getDefault().merge(Resource.builder().put(SERVICE_NAME, "Sparky").build()); + SpanProcessor inMemorySpanProcessor = SimpleSpanProcessor.create(inMemorySpanExporter); + DatastoreOptions.Builder optionsBuilder = DatastoreOptions.newBuilder(); + DatastoreOpenTelemetryOptions.Builder otelOptionsBuilder = + DatastoreOpenTelemetryOptions.newBuilder(); + OpenTelemetrySdkBuilder openTelemetrySdkBuilder = + OpenTelemetrySdk.builder() + .setTracerProvider( + SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(inMemorySpanProcessor) + .setSampler(Sampler.alwaysOn()) + .build()); + + if (isUsingGlobalOpenTelemetrySDK()) { + GlobalOpenTelemetry.resetForTest(); + openTelemetrySdk = openTelemetrySdkBuilder.buildAndRegisterGlobal(); + optionsBuilder.setOpenTelemetryOptions(otelOptionsBuilder.setTracingEnabled(true).build()); + } else { + openTelemetrySdk = openTelemetrySdkBuilder.build(); + optionsBuilder.setOpenTelemetryOptions( + otelOptionsBuilder.setTracingEnabled(true).setOpenTelemetry(openTelemetrySdk).build()); + } + + String namedDb = datastoreNamedDatabase(); + logger.log(Level.INFO, "Integration test using named database " + namedDb); + remoteDatastoreHelper = RemoteDatastoreHelper.create(namedDb, openTelemetrySdk); + options = remoteDatastoreHelper.getOptions(); + datastore = options.getService(); + + Preconditions.checkNotNull( + datastore, + "Error instantiating Datastore. Check that the service account credentials " + + "were properly set."); + + String projectId = options.getProjectId(); + String kind1 = "kind1"; + KEY1 = + Key.newBuilder(projectId, kind1, "key1", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY2 = + Key.newBuilder(projectId, kind1, "key2", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY3 = + Key.newBuilder(projectId, kind1, "key3", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + KEY4 = + Key.newBuilder(projectId, kind1, "key4", options.getDatabaseId()) + .setNamespace(options.getNamespace()) + .build(); + cleanupTestSpanContext(); + } + + @After + public void after() throws Exception { + if (isUsingGlobalOpenTelemetrySDK()) { + GlobalOpenTelemetry.resetForTest(); + } + remoteDatastoreHelper.deleteNamespace(); + inMemorySpanExporter.reset(); + CompletableResultCode completableResultCode = + openTelemetrySdk.getSdkTracerProvider().shutdown(); + completableResultCode.join(TRACE_PROVIDER_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS); + openTelemetrySdk = null; + } + + @AfterClass + public static void teardown() {} + + void waitForTracesToComplete() throws Exception { + // The same way that querying the Cloud Trace backend may not give us the + // full trace on the first try, querying the in-memory traces may not result + // in the full trace immediately. Note that performing the `flush` is not + // enough. This doesn't pose an issue in practice, but can make tests flaky. + // Therefore, we're adding a delay to make sure we avoid any flakiness. + inMemorySpanExporter.flush().join(IN_MEMORY_SPAN_EXPORTER_DELAY_MILLIS, TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS.sleep(IN_MEMORY_SPAN_EXPORTER_DELAY_MILLIS); + + CompletableResultCode completableResultCode = + openTelemetrySdk.getSdkTracerProvider().forceFlush(); + completableResultCode.join(TRACE_FORCE_FLUSH_MILLIS, TimeUnit.MILLISECONDS); + } + + // Prepares all the spans in memory for inspection. + List prepareSpans() throws Exception { + waitForTracesToComplete(); + List spans = inMemorySpanExporter.getFinishedSpanItems(); + buildSpanMaps(spans); + printSpans(); + return spans; + } + + void buildSpanMaps(List spans) { + for (SpanData spanData : spans) { + spanNameToSpanData.put(spanData.getName(), spanData); + spanNameToSpanId.put(spanData.getName(), spanData.getSpanId()); + spanIdToParentSpanId.put(spanData.getSpanId(), spanData.getParentSpanId()); + } + } + + // Returns the SpanData object for the span with the given name. + // Returns null if no span with the given name exists. + @Nullable + SpanData getSpanByName(String spanName) { + return spanNameToSpanData.get(spanName); + } + + // Returns the SpanData object for the gRPC span with the given RPC name. + // Returns null if no such span exists. + @Nullable + SpanData getGrpcSpanByName(String rpcName) { + return getSpanByName(SERVICE + rpcName); + } + + String grpcSpanName(String rpcName) { + return SERVICE + rpcName; + } + + void assertSameTrace(SpanData... spans) { + if (spans.length > 1) { + String traceId = spans[0].getTraceId(); + for (SpanData spanData : spans) { + assertEquals(traceId, spanData.getTraceId()); + } + } + } + + // Helper to see the spans in standard output while developing tests + void printSpans() { + for (SpanData spanData : spanNameToSpanData.values()) { + logger.log( + Level.FINE, + String.format( + "SPAN ID:%s, ParentID:%s, KIND:%s, TRACE ID:%s, NAME:%s, ATTRIBUTES:%s, EVENTS:%s\n", + spanData.getSpanId(), + spanData.getParentSpanId(), + spanData.getKind(), + spanData.getTraceId(), + spanData.getName(), + spanData.getAttributes().toString(), + spanData.getEvents().toString())); + } + } + + // Asserts that the span hierarchy exists for the given span names. The hierarchy starts with the + // root span, followed + // by the child span, grandchild span, and so on. It also asserts that all the given spans belong + // to the same trace, + // and that datastore-generated spans contain the expected datastore attributes. + void assertSpanHierarchy(String... spanNamesHierarchy) { + List spanNames = Arrays.asList(spanNamesHierarchy); + + for (int i = 0; i + 1 < spanNames.size(); ++i) { + String parentSpanName = spanNames.get(i); + String childSpanName = spanNames.get(i + 1); + SpanData parentSpan = getSpanByName(parentSpanName); + SpanData childSpan = getSpanByName(childSpanName); + assertNotNull(parentSpan); + assertNotNull(childSpan); + assertEquals(childSpan.getParentSpanId(), parentSpan.getSpanId()); + assertSameTrace(childSpan, parentSpan); + // gRPC spans do not have datastore attributes. + if (!parentSpanName.startsWith(SERVICE)) { + assertHasExpectedAttributes(parentSpan); + } + if (!childSpanName.startsWith(SERVICE)) { + assertHasExpectedAttributes(childSpan); + } + } + } + + void assertHasExpectedAttributes(SpanData spanData, String... additionalExpectedAttributes) { + // All datastore-generated spans have the settings attributes. + List expectedAttributes = + Arrays.asList( + "gcp.datastore.memoryUtilization", + "gcp.datastore.settings.host", + "gcp.datastore.settings.databaseId", + "gcp.datastore.settings.retrySettings.maxRpcTimeout", + "gcp.datastore.settings.retrySettings.retryDelayMultiplier", + "gcp.datastore.settings.retrySettings.initialRetryDelay", + "gcp.datastore.settings.credentials.authenticationType", + "gcp.datastore.settings.retrySettings.maxAttempts", + "gcp.datastore.settings.retrySettings.maxRetryDelay", + "gcp.datastore.settings.retrySettings.rpcTimeoutMultiplier", + "gcp.datastore.settings.retrySettings.totalTimeout", + "gcp.datastore.settings.retrySettings.initialRpcTimeout"); + + expectedAttributes.addAll(Arrays.asList(additionalExpectedAttributes)); + + Attributes spanAttributes = spanData.getAttributes(); + for (String expectedAttribute : expectedAttributes) { + assertNotNull(spanAttributes.get(AttributeKey.stringKey(expectedAttribute))); + } + } + + // Returns true if and only if the given span data contains an event with the given name and the + // given expected + // attributes. + boolean hasEvent(SpanData spanData, String eventName, @Nullable Attributes expectedAttributes) { + if (spanData == null) { + return false; + } + + logger.log( + Level.INFO, + String.format( + "Checking if span named '%s' (ID='%s') contains an event named '%s'", + spanData.getName(), spanData.getSpanId(), eventName)); + + List events = spanData.getEvents(); + for (EventData event : events) { + if (event.getName().equals(eventName)) { + if (expectedAttributes == null) { + return true; + } + + // Make sure attributes also match. + Attributes eventAttributes = event.getAttributes(); + return expectedAttributes.equals(eventAttributes); + } + } + return false; + } + + void cleanupTestSpanContext() { + inMemorySpanExporter.reset(); + spanNameToSpanId.clear(); + spanIdToParentSpanId.clear(); + spanNameToSpanData.clear(); + } + + // This is a POJO used for testing APIs that take a POJO. + public static class Pojo { + public int bar; + + public Pojo() { + bar = 0; + } + + public Pojo(int bar) { + this.bar = bar; + } + + public int getBar() { + return bar; + } + + public void setBar(int bar) { + this.bar = bar; + } + } + + @Test + public void lookupTraceTest() throws Exception { + Entity entity = datastore.get(KEY1); + assertNull(entity); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_LOOKUP); + SpanData span = getSpanByName(SPAN_NAME_LOOKUP); + assertTrue( + hasEvent( + span, + SPAN_NAME_LOOKUP + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_RECEIVED, 0) + .put(ATTRIBUTES_KEY_MISSING, 1) + .put(ATTRIBUTES_KEY_DEFERRED, 0) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, false) + .put(ATTRIBUTES_KEY_TRANSACTION_ID, "") + .build())); + } + + @Test + public void allocateIdsTraceTest() throws Exception { + String kind1 = "kind1"; + KeyFactory keyFactory = datastore.newKeyFactory().setKind(kind1); + IncompleteKey pk1 = keyFactory.newKey(); + Key key1 = datastore.allocateId(pk1); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_ALLOCATE_IDS); + } + + @Test + public void reserveIdsTraceTest() throws Exception { + KeyFactory keyFactory = datastore.newKeyFactory().setKind("MyKind"); + Key key1 = keyFactory.newKey(10); + Key key2 = keyFactory.newKey("name"); + List keyList = datastore.reserveIds(key1, key2); + assertEquals(2, keyList.size()); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_RESERVE_IDS); + } + + @Test + public void commitTraceTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_key", "test_value").build(); + Entity response = datastore.add(entity1); + assertEquals(entity1, response); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_COMMIT); + } + + @Test + public void putTraceTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_key", "test_value").build(); + Entity response = datastore.put(entity1); + assertEquals(entity1, response); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_COMMIT); + } + + @Test + public void updateTraceTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_COMMIT); + + SpanData spanData = getSpanByName(SPAN_NAME_COMMIT); + assertTrue( + hasEvent( + spanData, + SPAN_NAME_COMMIT + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.size()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, false) + .put(ATTRIBUTES_KEY_TRANSACTION_ID, "") + .build())); + + // Clean Up test span context to verify update spans + cleanupTestSpanContext(); + + Entity entity1_update = Entity.newBuilder(entity1).set("test_field", "new_test_value1").build(); + Entity entity2_update = Entity.newBuilder(entity2).set("test_field", "new_test_value1").build(); + datastore.update(entity1_update, entity2_update); + + waitForTracesToComplete(); + + spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_COMMIT); + } + + @Test + public void deleteTraceTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_key", "test_value").build(); + Entity response = datastore.put(entity1); + assertEquals(entity1, response); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_COMMIT); + + SpanData spanData = getSpanByName(SPAN_NAME_COMMIT); + assertTrue( + hasEvent( + spanData, + SPAN_NAME_COMMIT + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, 1) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, false) + .put(ATTRIBUTES_KEY_TRANSACTION_ID, "") + .build())); + + // Clean Up test span context to verify update spans + cleanupTestSpanContext(); + + datastore.delete(entity1.getKey()); + + waitForTracesToComplete(); + + spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_COMMIT); + + spanData = getSpanByName(SPAN_NAME_COMMIT); + assertTrue( + hasEvent( + spanData, + SPAN_NAME_COMMIT + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, 1) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, false) + .put(ATTRIBUTES_KEY_TRANSACTION_ID, "") + .build())); + } + + @Test + public void runQueryTraceTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + // Clean Up test span context to verify RunQuery spans + cleanupTestSpanContext(); + + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + QueryResults queryResults = datastore.run(query); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_RUN_QUERY); + + SpanData span = getSpanByName(SPAN_NAME_RUN_QUERY); + assertTrue( + hasEvent( + span, + SPAN_NAME_RUN_QUERY + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, 1) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, false) + .put(ATTRIBUTES_KEY_READ_CONSISTENCY, "READ_CONSISTENCY_UNSPECIFIED") + .put(ATTRIBUTES_KEY_MORE_RESULTS, "NO_MORE_RESULTS") + .put(ATTRIBUTES_KEY_TRANSACTION_ID, "") + .build())); + } + + @Test + public void runAggregationQueryTraceTest() throws Exception { + Entity entity1 = + Entity.newBuilder(KEY1) + .set("pepper_name", "jalapeno") + .set("max_scoville_level", 10000) + .build(); + Entity entity2 = + Entity.newBuilder(KEY2) + .set("pepper_name", "serrano") + .set("max_scoville_level", 25000) + .build(); + Entity entity3 = + Entity.newBuilder(KEY3) + .set("pepper_name", "habanero") + .set("max_scoville_level", 350000) + .build(); + Entity entity4 = + Entity.newBuilder(KEY4) + .set("pepper_name", "ghost") + .set("max_scoville_level", 1500000) + .build(); + + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + entityList.add(entity3); + entityList.add(entity4); + + List response = datastore.add(entity1, entity2, entity3, entity4); + assertEquals(entityList, response); + + // Clean Up test span context to verify RunAggregationQuery spans + cleanupTestSpanContext(); + + PropertyFilter mediumSpicyFilters = PropertyFilter.lt("max_scoville_level", 100000); + StructuredQuery mediumSpicyQuery = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(mediumSpicyFilters).build(); + AggregationQuery countSpicyPeppers = + Query.newAggregationQueryBuilder() + .addAggregation(count().as("count")) + .over(mediumSpicyQuery) + .build(); + AggregationResults results = datastore.runAggregation(countSpicyPeppers); + assertThat(results.size()).isEqualTo(1); + AggregationResult result = results.get(0); + assertThat(result.getLong("count")).isEqualTo(2L); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(1, spans.size()); + assertSpanHierarchy(SPAN_NAME_RUN_AGGREGATION_QUERY); + } + + @Test + public void newTransactionReadWriteTraceTest() throws Exception { + // Transaction.Begin + Transaction transaction = datastore.newTransaction(); + + // Transaction.Lookup + Entity entity = datastore.get(KEY1, ReadOption.transactionId(transaction.getTransactionId())); + assertNull(entity); + + Entity updatedEntity = Entity.newBuilder(KEY1).set("test_field", "new_test_value1").build(); + transaction.put(updatedEntity); + + // Transaction.Commit + transaction.commit(); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(3, spans.size()); + + assertSpanHierarchy(SPAN_NAME_BEGIN_TRANSACTION); + assertSpanHierarchy(SPAN_NAME_TRANSACTION_LOOKUP); + SpanData span = getSpanByName(SPAN_NAME_TRANSACTION_LOOKUP); + assertTrue( + hasEvent( + span, + SPAN_NAME_TRANSACTION_LOOKUP + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DEFERRED, 0) + .put(ATTRIBUTES_KEY_MISSING, 1) + .put(ATTRIBUTES_KEY_RECEIVED, 0) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, true) + .put(ATTRIBUTES_KEY_TRANSACTION_ID, transaction.getTransactionId().toStringUtf8()) + .build())); + + assertSpanHierarchy(SPAN_NAME_TRANSACTION_COMMIT); + span = getSpanByName(SPAN_NAME_TRANSACTION_COMMIT); + assertTrue( + hasEvent( + span, + SPAN_NAME_TRANSACTION_COMMIT + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, 1) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, true) + .put(ATTRIBUTES_KEY_TRANSACTION_ID, transaction.getTransactionId().toStringUtf8()) + .build())); + } + + @Test + public void newTransactionQueryTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + // Clean Up test span context to verify Transaction RunQuery spans + cleanupTestSpanContext(); + + Transaction transaction = datastore.newTransaction(); + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + QueryResults queryResults = transaction.run(query); + transaction.commit(); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(3, spans.size()); + + assertSpanHierarchy(SPAN_NAME_BEGIN_TRANSACTION); + assertSpanHierarchy(SPAN_NAME_TRANSACTION_RUN_QUERY); + assertSpanHierarchy(SPAN_NAME_TRANSACTION_COMMIT); + SpanData span = getSpanByName(SPAN_NAME_TRANSACTION_RUN_QUERY); + assertTrue( + hasEvent( + span, + SPAN_NAME_TRANSACTION_RUN_QUERY + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, 1) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, true) + .put(ATTRIBUTES_KEY_READ_CONSISTENCY, "READ_CONSISTENCY_UNSPECIFIED") + .put(ATTRIBUTES_KEY_MORE_RESULTS, "NO_MORE_RESULTS") + .put(ATTRIBUTES_KEY_TRANSACTION_ID, transaction.getTransactionId().toStringUtf8()) + .build())); + } + + @Test + public void newTransactionRollbackTest() throws Exception { + Entity entity1 = Entity.newBuilder(KEY1).set("pepper_type", "jalapeno").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("pepper_type", "habanero").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + // Clean Up test span context to verify Transaction Rollback spans + cleanupTestSpanContext(); + + String simplified_spice_level = "not_spicy"; + Entity entity1update = + Entity.newBuilder(entity1).set("spice_level", simplified_spice_level).build(); + Transaction transaction = datastore.newTransaction(); + entity1 = transaction.get(KEY1); + switch (entity1.getString("pepper_type")) { + case "jalapeno": + simplified_spice_level = "mild"; + break; + + case "habanero": + simplified_spice_level = "hot"; + break; + } + transaction.update(entity1update); + transaction.delete(KEY2); + transaction.rollback(); + assertFalse(transaction.isActive()); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(3, spans.size()); + + assertSpanHierarchy(SPAN_NAME_BEGIN_TRANSACTION); + assertSpanHierarchy(SPAN_NAME_TRANSACTION_LOOKUP); + SpanData span = getSpanByName(SPAN_NAME_TRANSACTION_LOOKUP); + assertTrue( + hasEvent( + span, + SPAN_NAME_TRANSACTION_LOOKUP + " complete.", + Attributes.builder() + .put(ATTRIBUTES_KEY_DEFERRED, 0) + .put(ATTRIBUTES_KEY_MISSING, 0) + .put(ATTRIBUTES_KEY_RECEIVED, 1) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, true) + .put(ATTRIBUTES_KEY_TRANSACTION_ID, transaction.getTransactionId().toStringUtf8()) + .build())); + + assertSpanHierarchy(SPAN_NAME_ROLLBACK); + span = getSpanByName(SPAN_NAME_ROLLBACK); + assertTrue( + hasEvent( + span, + SPAN_NAME_ROLLBACK, + Attributes.builder() + .put(ATTRIBUTES_KEY_TRANSACTION_ID, transaction.getTransactionId().toStringUtf8()) + .build())); + } + + @Test + public void runInTransactionQueryTest() throws Exception { + // Set up + Entity entity1 = Entity.newBuilder(KEY1).set("test_field", "test_value1").build(); + Entity entity2 = Entity.newBuilder(KEY2).set("test_field", "test_value2").build(); + List entityList = new ArrayList<>(); + entityList.add(entity1); + entityList.add(entity2); + + List response = datastore.add(entity1, entity2); + assertEquals(entityList, response); + + // Clean Up test span context to verify Transaction Rollback spans + cleanupTestSpanContext(); + + PropertyFilter filter = PropertyFilter.eq("test_field", entity1.getValue("test_field")); + Query query = + Query.newEntityQueryBuilder().setKind(KEY1.getKind()).setFilter(filter).build(); + Datastore.TransactionCallable callable = + transaction -> { + QueryResults queryResults = datastore.run(query); + assertTrue(queryResults.hasNext()); + assertEquals(entity1, queryResults.next()); + assertFalse(queryResults.hasNext()); + return true; + }; + datastore.runInTransaction(callable); + + waitForTracesToComplete(); + + List spans = prepareSpans(); + assertEquals(4, spans.size()); + + // Since the runInTransaction method runs the TransactionCallable opaquely in a transaction + // there is no way for the API user to know the transaction ID, so we will not validate it here. + assertSpanHierarchy(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_BEGIN_TRANSACTION); + assertSpanHierarchy(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_RUN_QUERY); + assertSpanHierarchy(SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_TRANSACTION_COMMIT); + } +} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java new file mode 100644 index 000000000..c80ef9353 --- /dev/null +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/DisabledTraceUtilTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.datastore.telemetry; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.Test; + +public class DisabledTraceUtilTest { + @Test + public void disabledTraceUtilDoesNotProvideChannelConfigurator() { + DisabledTraceUtil traceUtil = new DisabledTraceUtil(); + assertThat(traceUtil.getChannelConfigurator()).isNull(); + } + + @Test + public void usesDisabledContext() { + DisabledTraceUtil traceUtil = new DisabledTraceUtil(); + assertThat(traceUtil.getCurrentContext() instanceof DisabledTraceUtil.Context).isTrue(); + } + + @Test + public void usesDisabledSpan() { + DisabledTraceUtil traceUtil = new DisabledTraceUtil(); + assertThat(traceUtil.getCurrentSpan() instanceof DisabledTraceUtil.Span).isTrue(); + assertThat(traceUtil.startSpan("foo") instanceof DisabledTraceUtil.Span).isTrue(); + assertThat( + traceUtil.startSpan("foo", traceUtil.getCurrentSpan()) + instanceof DisabledTraceUtil.Span) + .isTrue(); + } + + @Test + public void usesDisabledScope() { + DisabledTraceUtil traceUtil = new DisabledTraceUtil(); + assertThat(traceUtil.getCurrentContext().makeCurrent() instanceof DisabledTraceUtil.Scope) + .isTrue(); + assertThat(traceUtil.getCurrentSpan().makeCurrent() instanceof DisabledTraceUtil.Scope) + .isTrue(); + } +} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java new file mode 100644 index 000000000..50d7b6820 --- /dev/null +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/EnabledTraceUtilTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.datastore.telemetry; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.NoCredentials; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; +import com.google.cloud.datastore.DatastoreOptions; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import org.junit.Before; +import org.junit.Test; + +public class EnabledTraceUtilTest { + @Before + public void setUp() { + GlobalOpenTelemetry.resetForTest(); + } + + DatastoreOptions.Builder getBaseOptions() { + return DatastoreOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()); + } + + DatastoreOptions getTracingEnabledOptions() { + return getBaseOptions() + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()) + .build(); + } + + EnabledTraceUtil newEnabledTraceUtil() { + return new EnabledTraceUtil(getTracingEnabledOptions()); + } + + @Test + public void usesOpenTelemetryFromOptions() { + OpenTelemetrySdk myOpenTelemetrySdk = OpenTelemetrySdk.builder().build(); + DatastoreOptions firestoreOptions = + getBaseOptions() + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder() + .setTracingEnabled(true) + .setOpenTelemetry(myOpenTelemetrySdk) + .build()) + .build(); + EnabledTraceUtil traceUtil = new EnabledTraceUtil(firestoreOptions); + assertThat(traceUtil.getOpenTelemetry()).isEqualTo(myOpenTelemetrySdk); + } + + @Test + public void usesGlobalOpenTelemetryIfOpenTelemetryInstanceNotProvided() { + OpenTelemetrySdk ignored = OpenTelemetrySdk.builder().buildAndRegisterGlobal(); + DatastoreOptions firestoreOptions = + getBaseOptions() + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()) + .build(); + EnabledTraceUtil traceUtil = new EnabledTraceUtil(firestoreOptions); + assertThat(traceUtil.getOpenTelemetry()).isEqualTo(GlobalOpenTelemetry.get()); + } + + @Test + public void enabledTraceUtilProvidesChannelConfigurator() { + assertThat(newEnabledTraceUtil().getChannelConfigurator()).isNull(); + } + + @Test + public void usesEnabledContext() { + assertThat(newEnabledTraceUtil().getCurrentContext() instanceof EnabledTraceUtil.Context) + .isTrue(); + } + + @Test + public void usesEnabledSpan() { + EnabledTraceUtil traceUtil = newEnabledTraceUtil(); + assertThat(traceUtil.getCurrentSpan() instanceof EnabledTraceUtil.Span).isTrue(); + assertThat(traceUtil.startSpan("foo") != null).isTrue(); + assertThat( + traceUtil.startSpan("foo", traceUtil.getCurrentSpan()) instanceof EnabledTraceUtil.Span) + .isTrue(); + } + + @Test + public void usesEnabledScope() { + EnabledTraceUtil traceUtil = newEnabledTraceUtil(); + assertThat(traceUtil.getCurrentContext().makeCurrent() instanceof EnabledTraceUtil.Scope) + .isTrue(); + assertThat(traceUtil.getCurrentSpan().makeCurrent() instanceof EnabledTraceUtil.Scope).isTrue(); + } +} diff --git a/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/TraceUtilTest.java b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/TraceUtilTest.java new file mode 100644 index 000000000..f1cce8006 --- /dev/null +++ b/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/TraceUtilTest.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.datastore.telemetry; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.NoCredentials; +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; +import com.google.cloud.datastore.DatastoreOptions; +import org.junit.Test; + +public class TraceUtilTest { + @Test + public void defaultOptionsUseDisabledTraceUtil() { + TraceUtil traceUtil = + TraceUtil.getInstance( + DatastoreOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .build()); + assertThat(traceUtil instanceof DisabledTraceUtil).isTrue(); + } + + @Test + public void tracingDisabledOptionsUseDisabledTraceUtil() { + TraceUtil traceUtil = + TraceUtil.getInstance( + DatastoreOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(false).build()) + .build()); + assertThat(traceUtil instanceof DisabledTraceUtil).isTrue(); + } + + @Test + public void tracingEnabledOptionsUseEnabledTraceUtil() { + TraceUtil traceUtil = + TraceUtil.getInstance( + DatastoreOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build()) + .build()); + assertThat(traceUtil instanceof EnabledTraceUtil).isTrue(); + } +}