Skip to content

Commit

Permalink
feat: trace instrumentation for queries and transactions. (#1592)
Browse files Browse the repository at this point in the history
* feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.

* feat: Add com.google.cloud.firestore.telemetry package.

* fix: Remove OpenCensus tracing code.

* feat: tracing for aggregate queries, bulkwriter, partition queries, and listDocuments.

* feat: trace instrumentation for DocumentReference methods.

* feat: trace instrumentation for queries and transactions.

* test: Adding first e2e client-tracing test w/ Custom Root Span (#1621)

* test: Adding first e2e client-tracing test w/ Custom Root Span

* Roll back E2E tests commit.

* Address feedback.

* Address feedback (better event log message).

* Address feedback.

---------

Co-authored-by: Jimit J Shah <57637300+jimit-j-shah@users.noreply.github.com>
  • Loading branch information
ehsannas and jimit-j-shah authored Mar 27, 2024
1 parent cf03252 commit 7b8c405
Show file tree
Hide file tree
Showing 5 changed files with 518 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.firestore.v1.BatchGetDocumentsRequest;
Expand All @@ -38,6 +39,7 @@
import com.google.protobuf.ByteString;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -84,6 +86,12 @@ class FirestoreImpl implements Firestore, FirestoreRpcContext<FirestoreImpl> {
ResourcePath.create(DatabaseRootName.of(options.getProjectId(), options.getDatabaseId()));
}

/** Gets the TraceUtil object associated with this Firestore instance. */
@Nonnull
private TraceUtil getTraceUtil() {
return getOptions().getTraceUtil();
}

/** Lazy-load the Firestore's default BulkWriter. */
private BulkWriter getBulkWriter() {
if (bulkWriterInstance == null) {
Expand Down Expand Up @@ -212,21 +220,43 @@ void getAll(
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId,
final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {
// To reduce the size of traces, we only register one event for every 100 responses
// that we receive from the server.
final int NUM_RESPONSES_PER_TRACE_EVENT = 100;

ResponseObserver<BatchGetDocumentsResponse> responseObserver =
new ResponseObserver<BatchGetDocumentsResponse>() {
int numResponses = 0;
boolean hasCompleted = false;

@Override
public void onStart(StreamController streamController) {}
public void onStart(StreamController streamController) {
getTraceUtil()
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + ": Start",
Collections.singletonMap("numDocuments", documentReferences.length));
}

@Override
public void onResponse(BatchGetDocumentsResponse response) {
DocumentReference documentReference;
DocumentSnapshot documentSnapshot;

numResponses++;
if (numResponses == 1) {
getTraceUtil()
.currentSpan()
.addEvent(TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + ": First response received");
} else if (numResponses % NUM_RESPONSES_PER_TRACE_EVENT == 0) {
getTraceUtil()
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS
+ ": Received "
+ numResponses
+ " responses");
}

switch (response.getResultCase()) {
case FOUND:
Expand Down Expand Up @@ -261,13 +291,22 @@ public void onResponse(BatchGetDocumentsResponse response) {

@Override
public void onError(Throwable throwable) {
getTraceUtil().currentSpan().end(throwable);
apiStreamObserver.onError(throwable);
}

@Override
public void onComplete() {
if (hasCompleted) return;
hasCompleted = true;
getTraceUtil()
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS
+ ": Completed with "
+ numResponses
+ " responses.",
Collections.singletonMap("numResponses", numResponses));
apiStreamObserver.onCompleted();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.Query.QueryOptions.Builder;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.bundle.BundledQuery;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.Document;
Expand Down Expand Up @@ -1491,7 +1494,8 @@ public void onCompleted() {
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
/* transactionId= */ null,
/* readTime= */ null);
/* readTime= */ null,
/* isRetryRequestWithCursor= */ false);
}

/**
Expand Down Expand Up @@ -1627,7 +1631,13 @@ private void internalStream(
final QuerySnapshotObserver documentObserver,
final long startTimeNanos,
@Nullable final ByteString transactionId,
@Nullable final Timestamp readTime) {
@Nullable final Timestamp readTime,
final boolean isRetryRequestWithCursor) {
TraceUtil traceUtil = getFirestore().getOptions().getTraceUtil();
// To reduce the size of traces, we only register one event for every 100 responses
// that we receive from the server.
final int NUM_RESPONSES_PER_TRACE_EVENT = 100;

RunQueryRequest.Builder request = RunQueryRequest.newBuilder();
request.setStructuredQuery(buildQuery()).setParent(options.getParentPath().toString());

Expand All @@ -1638,11 +1648,22 @@ private void internalStream(
request.setReadTime(readTime.toProto());
}

traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY,
new ImmutableMap.Builder<String, Object>()
.put("isTransactional", transactionId != null)
.put("isRetryRequestWithCursor", isRetryRequestWithCursor)
.build());

final AtomicReference<QueryDocumentSnapshot> lastReceivedDocument = new AtomicReference<>();

ResponseObserver<RunQueryResponse> observer =
new ResponseObserver<RunQueryResponse>() {
Timestamp readTime;
boolean firstResponse = false;
int numDocuments = 0;

// The stream's `onComplete()` could be called more than once,
// this flag makes sure only the first one is actually processed.
Expand All @@ -1653,7 +1674,21 @@ public void onStart(StreamController streamController) {}

@Override
public void onResponse(RunQueryResponse response) {
if (!firstResponse) {
firstResponse = true;
traceUtil.currentSpan().addEvent(TraceUtil.SPAN_NAME_RUN_QUERY + ": First Response");
}
if (response.hasDocument()) {
numDocuments++;
if (numDocuments % NUM_RESPONSES_PER_TRACE_EVENT == 0) {
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY
+ ": Received "
+ numDocuments
+ " documents");
}
Document document = response.getDocument();
QueryDocumentSnapshot documentSnapshot =
QueryDocumentSnapshot.fromDocument(
Expand All @@ -1667,6 +1702,9 @@ public void onResponse(RunQueryResponse response) {
}

if (response.getDone()) {
traceUtil
.currentSpan()
.addEvent(TraceUtil.SPAN_NAME_RUN_QUERY + ": Received RunQueryResponse.Done");
onComplete();
}
}
Expand All @@ -1675,15 +1713,27 @@ public void onResponse(RunQueryResponse response) {
public void onError(Throwable throwable) {
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
if (shouldRetry(cursor, throwable)) {
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY + ": Retryable Error",
Collections.singletonMap("error.message", throwable.getMessage()));

Query.this
.startAfter(cursor)
.internalStream(
documentObserver,
startTimeNanos,
/* transactionId= */ null,
options.getRequireConsistency() ? cursor.getReadTime() : null);
options.getRequireConsistency() ? cursor.getReadTime() : null,
/* isRetryRequestWithCursor= */ true);

} else {
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY + ": Error",
Collections.singletonMap("error.message", throwable.getMessage()));
documentObserver.onError(throwable);
}
}
Expand All @@ -1692,6 +1742,11 @@ public void onError(Throwable throwable) {
public void onComplete() {
if (hasCompleted) return;
hasCompleted = true;
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY + ": Completed",
Collections.singletonMap("numDocuments", numDocuments));
documentObserver.onCompleted(readTime);
}

Expand Down Expand Up @@ -1746,40 +1801,54 @@ public ListenerRegistration addSnapshotListener(
}

ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId) {
final SettableApiFuture<QuerySnapshot> result = SettableApiFuture.create();

internalStream(
new QuerySnapshotObserver() {
final List<QueryDocumentSnapshot> documentSnapshots = new ArrayList<>();

@Override
public void onNext(QueryDocumentSnapshot documentSnapshot) {
documentSnapshots.add(documentSnapshot);
}

@Override
public void onError(Throwable throwable) {
result.setException(throwable);
}
TraceUtil.Span span =
getFirestore()
.getOptions()
.getTraceUtil()
.startSpan(
transactionId == null
? TraceUtil.SPAN_NAME_QUERY_GET
: TraceUtil.SPAN_NAME_TRANSACTION_GET_QUERY);
try (Scope ignored = span.makeCurrent()) {
final SettableApiFuture<QuerySnapshot> result = SettableApiFuture.create();
internalStream(
new QuerySnapshotObserver() {
final List<QueryDocumentSnapshot> documentSnapshots = new ArrayList<>();

@Override
public void onNext(QueryDocumentSnapshot documentSnapshot) {
documentSnapshots.add(documentSnapshot);
}

@Override
public void onCompleted() {
// The results for limitToLast queries need to be flipped since we reversed the
// ordering constraints before sending the query to the backend.
List<QueryDocumentSnapshot> resultView =
LimitType.Last.equals(Query.this.options.getLimitType())
? reverse(documentSnapshots)
: documentSnapshots;
QuerySnapshot querySnapshot =
QuerySnapshot.withDocuments(Query.this, this.getReadTime(), resultView);
result.set(querySnapshot);
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null);
@Override
public void onError(Throwable throwable) {
result.setException(throwable);
}

return result;
@Override
public void onCompleted() {
// The results for limitToLast queries need to be flipped since we reversed the
// ordering constraints before sending the query to the backend.
List<QueryDocumentSnapshot> resultView =
LimitType.Last.equals(Query.this.options.getLimitType())
? reverse(documentSnapshots)
: documentSnapshots;
QuerySnapshot querySnapshot =
QuerySnapshot.withDocuments(Query.this, this.getReadTime(), resultView);
result.set(querySnapshot);
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null,
/* isRetryRequestWithCursor= */ false);

span.endAtFuture(result);
return result;
} catch (Exception error) {
span.end(error);
throw error;
}
}

Comparator<QueryDocumentSnapshot> comparator() {
Expand Down
Loading

0 comments on commit 7b8c405

Please sign in to comment.