Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trace instrumentation for queries and transactions. #1592

Merged
merged 12 commits into from
Mar 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.firestore.v1.BatchGetDocumentsRequest;
Expand All @@ -38,6 +39,7 @@
import com.google.protobuf.ByteString;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -84,6 +86,12 @@ class FirestoreImpl implements Firestore, FirestoreRpcContext<FirestoreImpl> {
ResourcePath.create(DatabaseRootName.of(options.getProjectId(), options.getDatabaseId()));
}

/** Gets the TraceUtil object associated with this Firestore instance. */
@Nonnull
private TraceUtil getTraceUtil() {
return getOptions().getTraceUtil();
}

/** Lazy-load the Firestore's default BulkWriter. */
private BulkWriter getBulkWriter() {
if (bulkWriterInstance == null) {
Expand Down Expand Up @@ -212,21 +220,43 @@ void getAll(
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId,
final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {
// To reduce the size of traces, we only register one event for every 100 responses
// that we receive from the server.
final int NUM_RESPONSES_PER_TRACE_EVENT = 100;

ResponseObserver<BatchGetDocumentsResponse> responseObserver =
new ResponseObserver<BatchGetDocumentsResponse>() {
int numResponses = 0;
boolean hasCompleted = false;

@Override
public void onStart(StreamController streamController) {}
public void onStart(StreamController streamController) {
getTraceUtil()
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + ": Start",
Collections.singletonMap("numDocuments", documentReferences.length));
}

@Override
public void onResponse(BatchGetDocumentsResponse response) {
DocumentReference documentReference;
DocumentSnapshot documentSnapshot;

numResponses++;
if (numResponses == 1) {
getTraceUtil()
.currentSpan()
.addEvent(TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS + ": First response received");
} else if (numResponses % NUM_RESPONSES_PER_TRACE_EVENT == 0) {
getTraceUtil()
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS
+ ": Received "
+ numResponses
+ " responses");
}

switch (response.getResultCase()) {
case FOUND:
Expand Down Expand Up @@ -261,13 +291,22 @@ public void onResponse(BatchGetDocumentsResponse response) {

@Override
public void onError(Throwable throwable) {
getTraceUtil().currentSpan().end(throwable);
apiStreamObserver.onError(throwable);
}

@Override
public void onComplete() {
if (hasCompleted) return;
hasCompleted = true;
getTraceUtil()
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_BATCH_GET_DOCUMENTS
+ ": Completed with "
+ numResponses
+ " responses.",
Collections.singletonMap("numResponses", numResponses));
apiStreamObserver.onCompleted();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.Query.QueryOptions.Builder;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.bundle.BundledQuery;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.Document;
Expand Down Expand Up @@ -1491,7 +1494,8 @@ public void onCompleted() {
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
/* transactionId= */ null,
/* readTime= */ null);
/* readTime= */ null,
/* isRetryRequestWithCursor= */ false);
}

/**
Expand Down Expand Up @@ -1627,7 +1631,13 @@ private void internalStream(
final QuerySnapshotObserver documentObserver,
final long startTimeNanos,
@Nullable final ByteString transactionId,
@Nullable final Timestamp readTime) {
@Nullable final Timestamp readTime,
final boolean isRetryRequestWithCursor) {
TraceUtil traceUtil = getFirestore().getOptions().getTraceUtil();
// To reduce the size of traces, we only register one event for every 100 responses
// that we receive from the server.
final int NUM_RESPONSES_PER_TRACE_EVENT = 100;

RunQueryRequest.Builder request = RunQueryRequest.newBuilder();
request.setStructuredQuery(buildQuery()).setParent(options.getParentPath().toString());

Expand All @@ -1638,11 +1648,22 @@ private void internalStream(
request.setReadTime(readTime.toProto());
}

traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY,
new ImmutableMap.Builder<String, Object>()
.put("isTransactional", transactionId != null)
.put("isRetryRequestWithCursor", isRetryRequestWithCursor)
.build());

final AtomicReference<QueryDocumentSnapshot> lastReceivedDocument = new AtomicReference<>();

ResponseObserver<RunQueryResponse> observer =
new ResponseObserver<RunQueryResponse>() {
Timestamp readTime;
boolean firstResponse = false;
int numDocuments = 0;

// The stream's `onComplete()` could be called more than once,
// this flag makes sure only the first one is actually processed.
Expand All @@ -1653,7 +1674,21 @@ public void onStart(StreamController streamController) {}

@Override
public void onResponse(RunQueryResponse response) {
if (!firstResponse) {
firstResponse = true;
traceUtil.currentSpan().addEvent(TraceUtil.SPAN_NAME_RUN_QUERY + ": First Response");
}
if (response.hasDocument()) {
numDocuments++;
if (numDocuments % NUM_RESPONSES_PER_TRACE_EVENT == 0) {
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY
+ ": Received "
+ numDocuments
+ " documents");
}
Document document = response.getDocument();
QueryDocumentSnapshot documentSnapshot =
QueryDocumentSnapshot.fromDocument(
Expand All @@ -1667,6 +1702,9 @@ public void onResponse(RunQueryResponse response) {
}

if (response.getDone()) {
traceUtil
.currentSpan()
.addEvent(TraceUtil.SPAN_NAME_RUN_QUERY + ": Received RunQueryResponse.Done");
onComplete();
}
}
Expand All @@ -1675,15 +1713,27 @@ public void onResponse(RunQueryResponse response) {
public void onError(Throwable throwable) {
QueryDocumentSnapshot cursor = lastReceivedDocument.get();
if (shouldRetry(cursor, throwable)) {
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY + ": Retryable Error",
Collections.singletonMap("error.message", throwable.getMessage()));

Query.this
.startAfter(cursor)
.internalStream(
documentObserver,
startTimeNanos,
/* transactionId= */ null,
options.getRequireConsistency() ? cursor.getReadTime() : null);
options.getRequireConsistency() ? cursor.getReadTime() : null,
/* isRetryRequestWithCursor= */ true);

} else {
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY + ": Error",
Collections.singletonMap("error.message", throwable.getMessage()));
documentObserver.onError(throwable);
}
}
Expand All @@ -1692,6 +1742,11 @@ public void onError(Throwable throwable) {
public void onComplete() {
if (hasCompleted) return;
hasCompleted = true;
traceUtil
.currentSpan()
.addEvent(
TraceUtil.SPAN_NAME_RUN_QUERY + ": Completed",
Collections.singletonMap("numDocuments", numDocuments));
documentObserver.onCompleted(readTime);
}

Expand Down Expand Up @@ -1746,40 +1801,54 @@ public ListenerRegistration addSnapshotListener(
}

ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId) {
final SettableApiFuture<QuerySnapshot> result = SettableApiFuture.create();

internalStream(
new QuerySnapshotObserver() {
final List<QueryDocumentSnapshot> documentSnapshots = new ArrayList<>();

@Override
public void onNext(QueryDocumentSnapshot documentSnapshot) {
documentSnapshots.add(documentSnapshot);
}

@Override
public void onError(Throwable throwable) {
result.setException(throwable);
}
TraceUtil.Span span =
getFirestore()
.getOptions()
.getTraceUtil()
.startSpan(
transactionId == null
? TraceUtil.SPAN_NAME_QUERY_GET
: TraceUtil.SPAN_NAME_TRANSACTION_GET_QUERY);
try (Scope ignored = span.makeCurrent()) {
final SettableApiFuture<QuerySnapshot> result = SettableApiFuture.create();
internalStream(
new QuerySnapshotObserver() {
final List<QueryDocumentSnapshot> documentSnapshots = new ArrayList<>();

@Override
public void onNext(QueryDocumentSnapshot documentSnapshot) {
documentSnapshots.add(documentSnapshot);
}

@Override
public void onCompleted() {
// The results for limitToLast queries need to be flipped since we reversed the
// ordering constraints before sending the query to the backend.
List<QueryDocumentSnapshot> resultView =
LimitType.Last.equals(Query.this.options.getLimitType())
? reverse(documentSnapshots)
: documentSnapshots;
QuerySnapshot querySnapshot =
QuerySnapshot.withDocuments(Query.this, this.getReadTime(), resultView);
result.set(querySnapshot);
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null);
@Override
public void onError(Throwable throwable) {
result.setException(throwable);
}

return result;
@Override
public void onCompleted() {
// The results for limitToLast queries need to be flipped since we reversed the
// ordering constraints before sending the query to the backend.
List<QueryDocumentSnapshot> resultView =
LimitType.Last.equals(Query.this.options.getLimitType())
? reverse(documentSnapshots)
: documentSnapshots;
QuerySnapshot querySnapshot =
QuerySnapshot.withDocuments(Query.this, this.getReadTime(), resultView);
result.set(querySnapshot);
}
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null,
/* isRetryRequestWithCursor= */ false);

span.endAtFuture(result);
return result;
} catch (Exception error) {
span.end(error);
throw error;
}
}

Comparator<QueryDocumentSnapshot> comparator() {
Expand Down
Loading
Loading