From 7b8c40583583295c2115e3d8515d7e82e9148cfa Mon Sep 17 00:00:00 2001 From: Ehsan Date: Wed, 27 Mar 2024 09:49:45 -0700 Subject: [PATCH] feat: trace instrumentation for queries and transactions. (#1592) * feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions. * feat: Add com.google.cloud.firestore.telemetry package. * fix: Remove OpenCensus tracing code. * feat: tracing for aggregate queries, bulkwriter, partition queries, and listDocuments. * feat: trace instrumentation for DocumentReference methods. * feat: trace instrumentation for queries and transactions. * test: Adding first e2e client-tracing test w/ Custom Root Span (#1621) * test: Adding first e2e client-tracing test w/ Custom Root Span * Roll back E2E tests commit. * Address feedback. * Address feedback (better event log message). * Address feedback. --------- Co-authored-by: Jimit J Shah <57637300+jimit-j-shah@users.noreply.github.com> --- .../google/cloud/firestore/FirestoreImpl.java | 41 +++- .../com/google/cloud/firestore/Query.java | 139 ++++++++--- .../google/cloud/firestore/Transaction.java | 152 +++++++++--- .../cloud/firestore/TransactionRunner.java | 48 +++- .../cloud/firestore/it/ITTracingTest.java | 222 ++++++++++++++++++ 5 files changed, 518 insertions(+), 84 deletions(-) diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index b8018cd42..b4165abac 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -30,6 +30,7 @@ import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.Timestamp; import com.google.cloud.firestore.spi.v1.FirestoreRpc; +import com.google.cloud.firestore.telemetry.TraceUtil; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.firestore.v1.BatchGetDocumentsRequest; @@ -38,6 +39,7 @@ import com.google.protobuf.ByteString; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -84,6 +86,12 @@ class FirestoreImpl implements Firestore, FirestoreRpcContext { ResourcePath.create(DatabaseRootName.of(options.getProjectId(), options.getDatabaseId())); } + /** Gets the TraceUtil object associated with this Firestore instance. */ + @Nonnull + private TraceUtil getTraceUtil() { + return getOptions().getTraceUtil(); + } + /** Lazy-load the Firestore's default BulkWriter. */ private BulkWriter getBulkWriter() { if (bulkWriterInstance == null) { @@ -212,6 +220,9 @@ void getAll( @Nullable FieldMask fieldMask, @Nullable ByteString transactionId, final ApiStreamObserver apiStreamObserver) { + // To reduce the size of traces, we only register one event for every 100 responses + // that we receive from the server. + final int NUM_RESPONSES_PER_TRACE_EVENT = 100; ResponseObserver responseObserver = new ResponseObserver() { @@ -219,7 +230,13 @@ void getAll( boolean hasCompleted = false; @Override - public void onStart(StreamController streamController) {} + public void onStart(StreamController streamController) { + getTraceUtil() + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + ": Start", + Collections.singletonMap("numDocuments", documentReferences.length)); + } @Override public void onResponse(BatchGetDocumentsResponse response) { @@ -227,6 +244,19 @@ public void onResponse(BatchGetDocumentsResponse response) { DocumentSnapshot documentSnapshot; numResponses++; + if (numResponses == 1) { + getTraceUtil() + .currentSpan() + .addEvent(TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + ": First response received"); + } else if (numResponses % NUM_RESPONSES_PER_TRACE_EVENT == 0) { + getTraceUtil() + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + + ": Received " + + numResponses + + " responses"); + } switch (response.getResultCase()) { case FOUND: @@ -261,6 +291,7 @@ public void onResponse(BatchGetDocumentsResponse response) { @Override public void onError(Throwable throwable) { + getTraceUtil().currentSpan().end(throwable); apiStreamObserver.onError(throwable); } @@ -268,6 +299,14 @@ public void onError(Throwable throwable) { public void onComplete() { if (hasCompleted) return; hasCompleted = true; + getTraceUtil() + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + + ": Completed with " + + numResponses + + " responses.", + Collections.singletonMap("numResponses", numResponses)); apiStreamObserver.onCompleted(); } }; diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java index e47f85f95..ceea15f61 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java @@ -38,9 +38,12 @@ import com.google.auto.value.AutoValue; import com.google.cloud.Timestamp; import com.google.cloud.firestore.Query.QueryOptions.Builder; +import com.google.cloud.firestore.telemetry.TraceUtil; +import com.google.cloud.firestore.telemetry.TraceUtil.Scope; import com.google.cloud.firestore.v1.FirestoreSettings; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.firestore.bundle.BundledQuery; import com.google.firestore.v1.Cursor; import com.google.firestore.v1.Document; @@ -1491,7 +1494,8 @@ public void onCompleted() { }, /* startTimeNanos= */ rpcContext.getClock().nanoTime(), /* transactionId= */ null, - /* readTime= */ null); + /* readTime= */ null, + /* isRetryRequestWithCursor= */ false); } /** @@ -1627,7 +1631,13 @@ private void internalStream( final QuerySnapshotObserver documentObserver, final long startTimeNanos, @Nullable final ByteString transactionId, - @Nullable final Timestamp readTime) { + @Nullable final Timestamp readTime, + final boolean isRetryRequestWithCursor) { + TraceUtil traceUtil = getFirestore().getOptions().getTraceUtil(); + // To reduce the size of traces, we only register one event for every 100 responses + // that we receive from the server. + final int NUM_RESPONSES_PER_TRACE_EVENT = 100; + RunQueryRequest.Builder request = RunQueryRequest.newBuilder(); request.setStructuredQuery(buildQuery()).setParent(options.getParentPath().toString()); @@ -1638,11 +1648,22 @@ private void internalStream( request.setReadTime(readTime.toProto()); } + traceUtil + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_RUN_QUERY, + new ImmutableMap.Builder() + .put("isTransactional", transactionId != null) + .put("isRetryRequestWithCursor", isRetryRequestWithCursor) + .build()); + final AtomicReference lastReceivedDocument = new AtomicReference<>(); ResponseObserver observer = new ResponseObserver() { Timestamp readTime; + boolean firstResponse = false; + int numDocuments = 0; // The stream's `onComplete()` could be called more than once, // this flag makes sure only the first one is actually processed. @@ -1653,7 +1674,21 @@ public void onStart(StreamController streamController) {} @Override public void onResponse(RunQueryResponse response) { + if (!firstResponse) { + firstResponse = true; + traceUtil.currentSpan().addEvent(TraceUtil.SPAN_NAME_RUN_QUERY + ": First Response"); + } if (response.hasDocument()) { + numDocuments++; + if (numDocuments % NUM_RESPONSES_PER_TRACE_EVENT == 0) { + traceUtil + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_RUN_QUERY + + ": Received " + + numDocuments + + " documents"); + } Document document = response.getDocument(); QueryDocumentSnapshot documentSnapshot = QueryDocumentSnapshot.fromDocument( @@ -1667,6 +1702,9 @@ public void onResponse(RunQueryResponse response) { } if (response.getDone()) { + traceUtil + .currentSpan() + .addEvent(TraceUtil.SPAN_NAME_RUN_QUERY + ": Received RunQueryResponse.Done"); onComplete(); } } @@ -1675,15 +1713,27 @@ public void onResponse(RunQueryResponse response) { public void onError(Throwable throwable) { QueryDocumentSnapshot cursor = lastReceivedDocument.get(); if (shouldRetry(cursor, throwable)) { + traceUtil + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_RUN_QUERY + ": Retryable Error", + Collections.singletonMap("error.message", throwable.getMessage())); + Query.this .startAfter(cursor) .internalStream( documentObserver, startTimeNanos, /* transactionId= */ null, - options.getRequireConsistency() ? cursor.getReadTime() : null); + options.getRequireConsistency() ? cursor.getReadTime() : null, + /* isRetryRequestWithCursor= */ true); } else { + traceUtil + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_RUN_QUERY + ": Error", + Collections.singletonMap("error.message", throwable.getMessage())); documentObserver.onError(throwable); } } @@ -1692,6 +1742,11 @@ public void onError(Throwable throwable) { public void onComplete() { if (hasCompleted) return; hasCompleted = true; + traceUtil + .currentSpan() + .addEvent( + TraceUtil.SPAN_NAME_RUN_QUERY + ": Completed", + Collections.singletonMap("numDocuments", numDocuments)); documentObserver.onCompleted(readTime); } @@ -1746,40 +1801,54 @@ public ListenerRegistration addSnapshotListener( } ApiFuture get(@Nullable ByteString transactionId) { - final SettableApiFuture result = SettableApiFuture.create(); - - internalStream( - new QuerySnapshotObserver() { - final List documentSnapshots = new ArrayList<>(); - - @Override - public void onNext(QueryDocumentSnapshot documentSnapshot) { - documentSnapshots.add(documentSnapshot); - } - - @Override - public void onError(Throwable throwable) { - result.setException(throwable); - } + TraceUtil.Span span = + getFirestore() + .getOptions() + .getTraceUtil() + .startSpan( + transactionId == null + ? TraceUtil.SPAN_NAME_QUERY_GET + : TraceUtil.SPAN_NAME_TRANSACTION_GET_QUERY); + try (Scope ignored = span.makeCurrent()) { + final SettableApiFuture result = SettableApiFuture.create(); + internalStream( + new QuerySnapshotObserver() { + final List documentSnapshots = new ArrayList<>(); + + @Override + public void onNext(QueryDocumentSnapshot documentSnapshot) { + documentSnapshots.add(documentSnapshot); + } - @Override - public void onCompleted() { - // The results for limitToLast queries need to be flipped since we reversed the - // ordering constraints before sending the query to the backend. - List resultView = - LimitType.Last.equals(Query.this.options.getLimitType()) - ? reverse(documentSnapshots) - : documentSnapshots; - QuerySnapshot querySnapshot = - QuerySnapshot.withDocuments(Query.this, this.getReadTime(), resultView); - result.set(querySnapshot); - } - }, - /* startTimeNanos= */ rpcContext.getClock().nanoTime(), - transactionId, - /* readTime= */ null); + @Override + public void onError(Throwable throwable) { + result.setException(throwable); + } - return result; + @Override + public void onCompleted() { + // The results for limitToLast queries need to be flipped since we reversed the + // ordering constraints before sending the query to the backend. + List resultView = + LimitType.Last.equals(Query.this.options.getLimitType()) + ? reverse(documentSnapshots) + : documentSnapshots; + QuerySnapshot querySnapshot = + QuerySnapshot.withDocuments(Query.this, this.getReadTime(), resultView); + result.set(querySnapshot); + } + }, + /* startTimeNanos= */ rpcContext.getClock().nanoTime(), + transactionId, + /* readTime= */ null, + /* isRetryRequestWithCursor= */ false); + + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } Comparator comparator() { diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java index 1a394e954..2190ac3f1 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java @@ -19,6 +19,9 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.firestore.TransactionOptions.TransactionOptionsType; +import com.google.cloud.firestore.telemetry.TraceUtil; +import com.google.cloud.firestore.telemetry.TraceUtil.Context; +import com.google.cloud.firestore.telemetry.TraceUtil.Scope; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import com.google.firestore.v1.BeginTransactionRequest; @@ -64,6 +67,7 @@ public interface AsyncFunction { private final TransactionOptions transactionOptions; private ByteString transactionId; + @Nonnull private final Context transactionTraceContext; Transaction( FirestoreImpl firestore, @@ -72,6 +76,12 @@ public interface AsyncFunction { super(firestore); this.transactionOptions = transactionOptions; this.transactionId = previousTransaction != null ? previousTransaction.transactionId : null; + this.transactionTraceContext = firestore.getOptions().getTraceUtil().currentContext(); + } + + @Nonnull + private TraceUtil getTraceUtil() { + return firestore.getOptions().getTraceUtil(); } public boolean hasTransactionId() { @@ -84,49 +94,74 @@ Transaction wrapResult(int writeIndex) { /** Starts a transaction and obtains the transaction id. */ ApiFuture begin() { - BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder(); - beginTransaction.setDatabase(firestore.getDatabaseName()); - - if (TransactionOptionsType.READ_WRITE.equals(transactionOptions.getType()) - && transactionId != null) { - beginTransaction.getOptionsBuilder().getReadWriteBuilder().setRetryTransaction(transactionId); - } else if (TransactionOptionsType.READ_ONLY.equals(transactionOptions.getType())) { - final ReadOnly.Builder readOnlyBuilder = ReadOnly.newBuilder(); - if (transactionOptions.getReadTime() != null) { - readOnlyBuilder.setReadTime(transactionOptions.getReadTime()); + TraceUtil.Span span = + getTraceUtil().startSpan(TraceUtil.SPAN_NAME_TRANSACTION_BEGIN, transactionTraceContext); + try (Scope ignored = span.makeCurrent()) { + BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder(); + beginTransaction.setDatabase(firestore.getDatabaseName()); + + if (TransactionOptionsType.READ_WRITE.equals(transactionOptions.getType()) + && transactionId != null) { + beginTransaction + .getOptionsBuilder() + .getReadWriteBuilder() + .setRetryTransaction(transactionId); + } else if (TransactionOptionsType.READ_ONLY.equals(transactionOptions.getType())) { + final ReadOnly.Builder readOnlyBuilder = ReadOnly.newBuilder(); + if (transactionOptions.getReadTime() != null) { + readOnlyBuilder.setReadTime(transactionOptions.getReadTime()); + } + beginTransaction.getOptionsBuilder().setReadOnly(readOnlyBuilder); } - beginTransaction.getOptionsBuilder().setReadOnly(readOnlyBuilder); - } - ApiFuture transactionBeginFuture = - firestore.sendRequest( - beginTransaction.build(), firestore.getClient().beginTransactionCallable()); - - return ApiFutures.transform( - transactionBeginFuture, - beginTransactionResponse -> { - transactionId = beginTransactionResponse.getTransaction(); - return null; - }, - MoreExecutors.directExecutor()); + ApiFuture transactionBeginFuture = + firestore.sendRequest( + beginTransaction.build(), firestore.getClient().beginTransactionCallable()); + + ApiFuture result = + ApiFutures.transform( + transactionBeginFuture, + beginTransactionResponse -> { + transactionId = beginTransactionResponse.getTransaction(); + return null; + }, + MoreExecutors.directExecutor()); + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } /** Commits a transaction. */ ApiFuture> commit() { - return super.commit(transactionId); + try (Scope ignored = transactionTraceContext.makeCurrent()) { + return super.commit(transactionId); + } } /** Rolls a transaction back and releases all read locks. */ ApiFuture rollback() { - RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder(); - reqBuilder.setTransaction(transactionId); - reqBuilder.setDatabase(firestore.getDatabaseName()); + TraceUtil.Span span = + getTraceUtil().startSpan(TraceUtil.SPAN_NAME_TRANSACTION_ROLLBACK, transactionTraceContext); + try (Scope ignored = span.makeCurrent()) { + RollbackRequest.Builder reqBuilder = RollbackRequest.newBuilder(); + reqBuilder.setTransaction(transactionId); + reqBuilder.setDatabase(firestore.getDatabaseName()); - ApiFuture rollbackFuture = - firestore.sendRequest(reqBuilder.build(), firestore.getClient().rollbackCallable()); + ApiFuture rollbackFuture = + firestore.sendRequest(reqBuilder.build(), firestore.getClient().rollbackCallable()); - return ApiFutures.transform( - rollbackFuture, beginTransactionResponse -> null, MoreExecutors.directExecutor()); + ApiFuture result = + ApiFutures.transform( + rollbackFuture, rollbackResponse -> null, MoreExecutors.directExecutor()); + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } /** @@ -138,10 +173,23 @@ ApiFuture rollback() { @Nonnull public ApiFuture get(@Nonnull DocumentReference documentRef) { Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - return ApiFutures.transform( - firestore.getAll(new DocumentReference[] {documentRef}, /*fieldMask=*/ null, transactionId), - snapshots -> snapshots.isEmpty() ? null : snapshots.get(0), - MoreExecutors.directExecutor()); + + TraceUtil.Span span = + getTraceUtil() + .startSpan(TraceUtil.SPAN_NAME_TRANSACTION_GET_DOCUMENT, transactionTraceContext); + try (Scope ignored = span.makeCurrent()) { + ApiFuture result = + ApiFutures.transform( + firestore.getAll( + new DocumentReference[] {documentRef}, /*fieldMask=*/ null, transactionId), + snapshots -> snapshots.isEmpty() ? null : snapshots.get(0), + MoreExecutors.directExecutor()); + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } /** @@ -155,7 +203,18 @@ public ApiFuture> getAll( @Nonnull DocumentReference... documentReferences) { Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - return firestore.getAll(documentReferences, /*fieldMask=*/ null, transactionId); + TraceUtil.Span span = + getTraceUtil() + .startSpan(TraceUtil.SPAN_NAME_TRANSACTION_GET_DOCUMENTS, transactionTraceContext); + try (Scope ignored = span.makeCurrent()) { + ApiFuture> result = + firestore.getAll(documentReferences, /*fieldMask=*/ null, transactionId); + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } /** @@ -171,7 +230,18 @@ public ApiFuture> getAll( @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask) { Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - return firestore.getAll(documentReferences, fieldMask, transactionId); + TraceUtil.Span span = + getTraceUtil() + .startSpan(TraceUtil.SPAN_NAME_TRANSACTION_GET_DOCUMENTS, transactionTraceContext); + try (Scope ignored = span.makeCurrent()) { + ApiFuture> result = + firestore.getAll(documentReferences, fieldMask, transactionId); + span.endAtFuture(result); + return result; + } catch (Exception error) { + span.end(error); + throw error; + } } /** @@ -184,7 +254,9 @@ public ApiFuture> getAll( public ApiFuture get(@Nonnull Query query) { Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - return query.get(transactionId); + try (Scope ignored = transactionTraceContext.makeCurrent()) { + return query.get(transactionId); + } } /** @@ -197,6 +269,8 @@ public ApiFuture get(@Nonnull Query query) { public ApiFuture get(@Nonnull AggregateQuery query) { Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - return query.get(transactionId); + try (Scope ignored = transactionTraceContext.makeCurrent()) { + return query.get(transactionId); + } } } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java index c4da87b62..a8bbbf05b 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java @@ -24,11 +24,15 @@ import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.TimedAttemptSettings; import com.google.api.gax.rpc.ApiException; +import com.google.cloud.firestore.telemetry.TraceUtil; +import com.google.cloud.firestore.telemetry.TraceUtil.Scope; +import com.google.cloud.firestore.telemetry.TraceUtil.Span; import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Context; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; /** * Implements backoff and retry semantics for Firestore transactions. @@ -51,6 +55,7 @@ class TransactionRunner { private TimedAttemptSettings nextBackoffAttempt; private Transaction transaction; private int attemptsRemaining; + private Span runTransactionSpan; /** * @param firestore The active Firestore instance @@ -79,17 +84,34 @@ class TransactionRunner { this.nextBackoffAttempt = backoffAlgorithm.createFirstAttempt(); } + @Nonnull + private TraceUtil getTraceUtil() { + return firestore.getOptions().getTraceUtil(); + } + ApiFuture run() { - this.transaction = new Transaction(firestore, transactionOptions, this.transaction); + runTransactionSpan = getTraceUtil().startSpan(TraceUtil.SPAN_NAME_TRANSACTION_RUN); + runTransactionSpan.setAttribute("transactionType", transactionOptions.getType().name()); + runTransactionSpan.setAttribute("numAttemptsAllowed", transactionOptions.getNumberOfAttempts()); + runTransactionSpan.setAttribute("attemptsRemaining", attemptsRemaining); + try (Scope ignored = runTransactionSpan.makeCurrent()) { + this.transaction = new Transaction(firestore, transactionOptions, this.transaction); - --attemptsRemaining; + --attemptsRemaining; - return ApiFutures.catchingAsync( - ApiFutures.transformAsync( - maybeRollback(), this::rollbackCallback, MoreExecutors.directExecutor()), - Throwable.class, - this::restartTransactionCallback, - MoreExecutors.directExecutor()); + ApiFuture result = + ApiFutures.catchingAsync( + ApiFutures.transformAsync( + maybeRollback(), this::rollbackCallback, MoreExecutors.directExecutor()), + Throwable.class, + this::restartTransactionCallback, + MoreExecutors.directExecutor()); + runTransactionSpan.endAtFuture(result); + return result; + } catch (Exception error) { + runTransactionSpan.end(error); + throw error; + } } private ApiFuture maybeRollback() { @@ -181,6 +203,9 @@ private ApiFuture restartTransactionCallback(Throwable throwable) { ApiException apiException = (ApiException) throwable; if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) { if (attemptsRemaining > 0) { + getTraceUtil() + .currentSpan() + .addEvent("Initiating transaction retry. Attempts remaining: " + attemptsRemaining); return run(); } else { final FirestoreException firestoreException = @@ -228,8 +253,13 @@ private ApiFuture rollbackAndReject(final Throwable throwable) { transaction .rollback() .addListener( - () -> failedTransaction.setException(throwable), MoreExecutors.directExecutor()); + () -> { + runTransactionSpan.end(throwable); + failedTransaction.setException(throwable); + }, + MoreExecutors.directExecutor()); } else { + runTransactionSpan.end(throwable); failedTransaction.setException(throwable); } diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITTracingTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITTracingTest.java index f2bff02b6..1107e97f1 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITTracingTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITTracingTest.java @@ -25,17 +25,21 @@ import com.google.cloud.firestore.BulkWriter; import com.google.cloud.firestore.BulkWriterOptions; import com.google.cloud.firestore.CollectionGroup; +import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.FieldMask; import com.google.cloud.firestore.FieldPath; import com.google.cloud.firestore.Firestore; import com.google.cloud.firestore.FirestoreOpenTelemetryOptions; import com.google.cloud.firestore.FirestoreOptions; import com.google.cloud.firestore.Precondition; +import com.google.cloud.firestore.Query; import com.google.cloud.firestore.SetOptions; +import com.google.cloud.firestore.WriteBatch; import com.google.common.base.Preconditions; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; @@ -76,6 +80,10 @@ public class ITTracingTest { private static final String LIST_DOCUMENTS_RPC_NAME = "ListDocuments"; private static final String LIST_COLLECTIONS_RPC_NAME = "ListCollectionIds"; private static final String BATCH_WRITE_RPC_NAME = "BatchWrite"; + private static final String RUN_QUERY_RPC_NAME = "RunQuery"; + private static final String RUN_AGGREGATION_QUERY_RPC_NAME = "RunAggregationQuery"; + private static final String BEGIN_TRANSACTION_RPC_NAME = "BeginTransaction"; + private static final String ROLLBACK_RPC_NAME = "Rollback"; // We use an InMemorySpanExporter for testing which keeps all generated trace spans // in memory so that we can check their correctness. @@ -261,6 +269,29 @@ void assertHasExpectedAttributes(SpanData spanData, String... additionalExpected } } + // Returns true if an only if the given span data contains an event with the given name and the + // given expected + // attributes. + boolean hasEvent(SpanData spanData, String eventName, @Nullable Attributes expectedAttributes) { + if (spanData == null) { + return false; + } + + List events = spanData.getEvents(); + for (EventData event : events) { + if (event.getName().equals(eventName)) { + if (expectedAttributes == null) { + return true; + } + + // Make sure attributes also match. + Attributes eventAttributes = event.getAttributes(); + return expectedAttributes.equals(eventAttributes); + } + } + return false; + } + // This is a POJO used for testing APIs that take a POJO. static class Pojo { public int bar; @@ -518,4 +549,195 @@ public void docListCollections() throws Exception { assertSpanHierarchy( SPAN_NAME_DOC_REF_LIST_COLLECTIONS, grpcSpanName(LIST_COLLECTIONS_RPC_NAME)); } + + @Test + public void getAll() throws Exception { + DocumentReference docRef0 = firestore.collection("col").document(); + DocumentReference docRef1 = firestore.collection("col").document(); + DocumentReference[] docs = {docRef0, docRef1}; + firestore.getAll(docs).get(); + List spans = prepareSpans(); + assertEquals(1, spans.size()); + SpanData span = getSpanByName(grpcSpanName(BATCH_GET_DOCUMENTS_RPC_NAME)); + assertTrue(hasEvent(span, "BatchGetDocuments: First response received", null)); + assertTrue( + hasEvent( + span, + "BatchGetDocuments: Completed with 2 responses.", + Attributes.builder().put("numResponses", 2).build())); + } + + @Test + public void queryGet() throws Exception { + firestore.collection("col").whereEqualTo("foo", "my_non_existent_value").get().get(); + List spans = prepareSpans(); + assertEquals(2, spans.size()); + assertSpanHierarchy(SPAN_NAME_QUERY_GET, grpcSpanName(RUN_QUERY_RPC_NAME)); + SpanData span = getSpanByName(SPAN_NAME_QUERY_GET); + assertTrue( + hasEvent( + span, + "RunQuery", + Attributes.builder() + .put("isRetryRequestWithCursor", false) + .put("transactional", false) + .build())); + assertTrue( + hasEvent(span, "RunQuery: Completed", Attributes.builder().put("numDocuments", 0).build())); + } + + @Test + public void transaction() throws Exception { + firestore + .runTransaction( + transaction -> { + Query q = firestore.collection("col").whereGreaterThan("bla", ""); + DocumentReference d = firestore.collection("col").document("foo"); + DocumentReference[] docList = {d, d}; + // Document Query. + transaction.get(q).get(); + + // Aggregation Query. + transaction.get(q.count()); + + // Get multiple documents. + transaction.getAll(d, d).get(); + + // Commit 2 documents. + transaction.set( + firestore.collection("foo").document("bar"), + Collections.singletonMap("foo", "bar")); + transaction.set( + firestore.collection("foo").document("bar2"), + Collections.singletonMap("foo2", "bar2")); + return 0; + }) + .get(); + + /* + Transaction.Run + |_ Transaction.Begin + |_ Transaction.Get.Query + |_ Transaction.Get.AggregateQuery + |_ Transaction.Get.Documents + |_ Transaction.Get.Documents + |_ Transaction.Get.Commit + */ + List spans = prepareSpans(); + assertEquals(11, spans.size()); + assertSpanHierarchy( + SPAN_NAME_TRANSACTION_RUN, + SPAN_NAME_TRANSACTION_BEGIN, + grpcSpanName(BEGIN_TRANSACTION_RPC_NAME)); + assertSpanHierarchy( + SPAN_NAME_TRANSACTION_RUN, + SPAN_NAME_TRANSACTION_GET_QUERY, + grpcSpanName(RUN_QUERY_RPC_NAME)); + assertSpanHierarchy( + SPAN_NAME_TRANSACTION_RUN, + SPAN_NAME_TRANSACTION_GET_AGGREGATION_QUERY, + grpcSpanName(RUN_AGGREGATION_QUERY_RPC_NAME)); + assertSpanHierarchy( + SPAN_NAME_TRANSACTION_RUN, + SPAN_NAME_TRANSACTION_GET_DOCUMENTS, + grpcSpanName(BATCH_GET_DOCUMENTS_RPC_NAME)); + assertSpanHierarchy( + SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_TRANSACTION_COMMIT, grpcSpanName(COMMIT_RPC_NAME)); + + Attributes commitAttributes = getSpanByName(SPAN_NAME_TRANSACTION_COMMIT).getAttributes(); + assertEquals( + 2L, commitAttributes.get(AttributeKey.longKey("gcp.firestore.numDocuments")).longValue()); + + Attributes runTransactionAttributes = getSpanByName(SPAN_NAME_TRANSACTION_RUN).getAttributes(); + assertEquals( + 5L, + runTransactionAttributes + .get(AttributeKey.longKey("gcp.firestore.numAttemptsAllowed")) + .longValue()); + assertEquals( + 5L, + runTransactionAttributes + .get(AttributeKey.longKey("gcp.firestore.attemptsRemaining")) + .longValue()); + assertEquals( + "READ_WRITE", + runTransactionAttributes.get(AttributeKey.stringKey("gcp.firestore.transactionType"))); + } + + @Test + public void transactionRollback() throws Exception { + String myErrorMessage = "My error message."; + try { + firestore + .runTransaction( + transaction -> { + if (true) { + throw (new Exception(myErrorMessage)); + } + return 0; + }) + .get(); + } catch (Exception e) { + // Catch and move on. + } + + /* + Transaction.Run + |_ Transaction.Begin + |_ Transaction.Rollback + */ + List spans = prepareSpans(); + assertEquals(5, spans.size()); + assertSpanHierarchy( + SPAN_NAME_TRANSACTION_RUN, + SPAN_NAME_TRANSACTION_BEGIN, + grpcSpanName(BEGIN_TRANSACTION_RPC_NAME)); + assertSpanHierarchy( + SPAN_NAME_TRANSACTION_RUN, SPAN_NAME_TRANSACTION_ROLLBACK, grpcSpanName(ROLLBACK_RPC_NAME)); + + SpanData runTransactionSpanData = getSpanByName(SPAN_NAME_TRANSACTION_RUN); + assertEquals(StatusCode.ERROR, runTransactionSpanData.getStatus().getStatusCode()); + assertEquals(1, runTransactionSpanData.getEvents().size()); + assertEquals( + myErrorMessage, + runTransactionSpanData + .getEvents() + .get(0) + .getAttributes() + .get(AttributeKey.stringKey("exception.message"))); + assertEquals( + "java.lang.Exception", + runTransactionSpanData + .getEvents() + .get(0) + .getAttributes() + .get(AttributeKey.stringKey("exception.type"))); + assertTrue( + runTransactionSpanData + .getEvents() + .get(0) + .getAttributes() + .get(AttributeKey.stringKey("exception.stacktrace")) + .startsWith("java.lang.Exception: My error message.")); + } + + @Test + public void writeBatch() throws Exception { + WriteBatch batch = firestore.batch(); + DocumentReference docRef = firestore.collection("foo").document(); + batch.create(docRef, Collections.singletonMap("foo", "bar")); + batch.update(docRef, Collections.singletonMap("foo", "bar")); + batch.delete(docRef); + batch.commit().get(); + + List spans = prepareSpans(); + assertEquals(2, spans.size()); + assertSpanHierarchy(SPAN_NAME_BATCH_COMMIT, grpcSpanName(COMMIT_RPC_NAME)); + assertEquals( + 3L, + getSpanByName(SPAN_NAME_BATCH_COMMIT) + .getAttributes() + .get(AttributeKey.longKey("gcp.firestore.numDocuments")) + .longValue()); + } }