Skip to content

Commit

Permalink
fix: reset a measure map everytime the stats are recorded (googleapis…
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf authored Aug 26, 2022
1 parent faa5b36 commit 1683365
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 24 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigtable-stats/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,10 @@
<method>*StatsRecorderWrapper*</method>
<to>*StatsRecorder*</to>
</difference>
<!-- Internal API is updated -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
<method>void record(java.lang.String, java.lang.String, java.lang.String, java.lang.String)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class StatsRecorderWrapper {
private final SpanName spanName;
private final Map<String, String> statsAttributes;

private MeasureMap measureMap;
private MeasureMap attemptMeasureMap;
private MeasureMap operationMeasureMap;

public StatsRecorderWrapper(
OperationType operationType,
Expand All @@ -54,10 +55,11 @@ public StatsRecorderWrapper(
this.parentContext = tagger.getCurrentTagContext();
this.statsAttributes = statsAttributes;

this.measureMap = statsRecorder.newMeasureMap();
this.attemptMeasureMap = statsRecorder.newMeasureMap();
this.operationMeasureMap = statsRecorder.newMeasureMap();
}

public void record(String status, String tableId, String zone, String cluster) {
public void recordOperation(String status, String tableId, String zone, String cluster) {
TagContextBuilder tagCtx =
newTagContextBuilder(tableId, zone, cluster)
.putLocal(BuiltinMeasureConstants.STATUS, TagValue.create(status));
Expand All @@ -66,39 +68,55 @@ public void record(String status, String tableId, String zone, String cluster) {
tagCtx.putLocal(
BuiltinMeasureConstants.STREAMING, TagValue.create(Boolean.toString(isStreaming)));

measureMap.record(tagCtx.build());
operationMeasureMap.record(tagCtx.build());
// Reinitialize a new map
operationMeasureMap = statsRecorder.newMeasureMap();
}

public void recordAttempt(String status, String tableId, String zone, String cluster) {
TagContextBuilder tagCtx =
newTagContextBuilder(tableId, zone, cluster)
.putLocal(BuiltinMeasureConstants.STATUS, TagValue.create(status));

boolean isStreaming = operationType == OperationType.ServerStreaming;
tagCtx.putLocal(
BuiltinMeasureConstants.STREAMING, TagValue.create(Boolean.toString(isStreaming)));

attemptMeasureMap.record(tagCtx.build());
// Reinitialize a new map
attemptMeasureMap = statsRecorder.newMeasureMap();
}

public void putOperationLatencies(long operationLatency) {
measureMap.put(BuiltinMeasureConstants.OPERATION_LATENCIES, operationLatency);
operationMeasureMap.put(BuiltinMeasureConstants.OPERATION_LATENCIES, operationLatency);
}

public void putAttemptLatencies(long attemptLatency) {
measureMap.put(BuiltinMeasureConstants.ATTEMPT_LATENCIES, attemptLatency);
attemptMeasureMap.put(BuiltinMeasureConstants.ATTEMPT_LATENCIES, attemptLatency);
}

public void putRetryCount(int attemptCount) {
measureMap.put(BuiltinMeasureConstants.RETRY_COUNT, attemptCount);
operationMeasureMap.put(BuiltinMeasureConstants.RETRY_COUNT, attemptCount);
}

public void putApplicationLatencies(long applicationLatency) {
measureMap.put(BuiltinMeasureConstants.APPLICATION_LATENCIES, applicationLatency);
operationMeasureMap.put(BuiltinMeasureConstants.APPLICATION_LATENCIES, applicationLatency);
}

public void putFirstResponseLatencies(long firstResponseLatency) {
measureMap.put(BuiltinMeasureConstants.FIRST_RESPONSE_LATENCIES, firstResponseLatency);
operationMeasureMap.put(BuiltinMeasureConstants.FIRST_RESPONSE_LATENCIES, firstResponseLatency);
}

public void putGfeLatencies(long serverLatency) {
measureMap.put(BuiltinMeasureConstants.SERVER_LATENCIES, serverLatency);
attemptMeasureMap.put(BuiltinMeasureConstants.SERVER_LATENCIES, serverLatency);
}

public void putGfeMissingHeaders(long connectivityErrors) {
measureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
}

public void putBatchRequestThrottled(long throttledTimeMs) {
measureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
}

private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public void testStreamingOperation() throws InterruptedException {
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);

recorderWrapper.record("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER);

Thread.sleep(100);

Expand Down Expand Up @@ -291,7 +292,8 @@ public void testUnaryOperations() throws InterruptedException {
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);

recorderWrapper.record("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);

Thread.sleep(100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void recordOperationCompletion(@Nullable Throwable status) {
recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
}

recorder.record(Util.extractStatus(status), tableId, zone, cluster);
recorder.recordOperation(Util.extractStatus(status), tableId, zone, cluster);
}

private void recordAttemptCompletion(@Nullable Throwable status) {
Expand All @@ -257,6 +257,6 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}
}
recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
recorder.record(Util.extractStatus(status), tableId, zone, cluster);
recorder.recordAttempt(Util.extractStatus(status), tableId, zone, cluster);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,16 @@ public void testMutateRowAttempts() {
stub.mutateRowCallable()
.call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));

// record() will get called 4 times, 3 times for attempts and 1 for recording operation level
// metrics. Also set a timeout to reduce flakiness of this test. BasicRetryingFuture will set
// Set a timeout to reduce flakiness of this test. BasicRetryingFuture will set
// attempt succeeded and set the response which will call complete() in AbstractFuture which
// calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be
// called after the mutateRow call is returned. So there's a race between when the call returns
// and when the record() is called in onOperationCompletion().
verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get() + 1))
.record(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
assertThat(zone.getAllValues()).containsExactly("global", "global", ZONE, ZONE);
assertThat(cluster.getAllValues())
.containsExactly("unspecified", "unspecified", CLUSTER, CLUSTER);
assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "UNAVAILABLE", "OK", "OK");
verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get()))
.recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
assertThat(zone.getAllValues()).containsExactly("global", "global", ZONE);
assertThat(cluster.getAllValues()).containsExactly("unspecified", "unspecified", CLUSTER);
assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "UNAVAILABLE", "OK");
}

private static class FakeService extends BigtableGrpc.BigtableImplBase {
Expand Down

0 comments on commit 1683365

Please sign in to comment.