Skip to content

Commit

Permalink
aggregate grpc queued and batch flow controlled latencies
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Apr 18, 2023
1 parent 5a081a5 commit 9313db4
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 20 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigtable-stats/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
<method>void record(java.lang.String, java.lang.String, java.lang.String, java.lang.String)</method>
</difference>
<!-- Internal API is updated -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
<method>void putBatchRequestThrottled(long)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,8 @@ public void putGfeMissingHeaders(long connectivityErrors) {
attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
}

public void putBatchRequestThrottled(long throttledTimeMs) {
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
}

public void putGrpcChannelQueuedLatencies(long blockedTime) {
attemptMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, blockedTime);
public void putClientBlockingLatencies(long clientBlockingLatency) {
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, clientBlockingLatency);
}

private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public void testStreamingOperation() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency);
recorderWrapper.putClientBlockingLatencies(throttlingLatency);

recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER);
Expand Down Expand Up @@ -291,8 +290,7 @@ public void testUnaryOperations() throws InterruptedException {
recorderWrapper.putGfeLatencies(serverLatency);
recorderWrapper.putGfeMissingHeaders(connectivityErrorCount);
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);
recorderWrapper.putGrpcChannelQueuedLatencies(throttlingLatency);
recorderWrapper.putClientBlockingLatencies(throttlingLatency);

recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class BuiltinMetricsTracer extends BigtableTracer {
private String zone = "global";
private String cluster = "unspecified";

private AtomicLong totalClientBlockingTime = new AtomicLong(0);

@VisibleForTesting
BuiltinMetricsTracer(
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
Expand Down Expand Up @@ -219,12 +221,12 @@ public void setLocations(String zone, String cluster) {

@Override
public void batchRequestThrottled(long throttledTimeMs) {
recorder.putBatchRequestThrottled(throttledTimeMs);
totalClientBlockingTime.addAndGet(throttledTimeMs);
}

@Override
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
recorder.putGrpcChannelQueuedLatencies(queuedTimeMs);
totalClientBlockingTime.addAndGet(queuedTimeMs);
}

@Override
Expand Down Expand Up @@ -271,6 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}
}

recorder.putClientBlockingLatencies(totalClientBlockingTime.get());

// Patch the status until it's fixed in gax. When an attempt failed,
// it'll throw a ServerStreamingAttemptException. Unwrap the exception
// so it could get processed by extractStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ public void testBatchBlockingLatencies() throws InterruptedException {

int expectedNumRequests = 6 / batchElementCount;
ArgumentCaptor<Long> throttledTime = ArgumentCaptor.forClass(Long.class);
verify(statsRecorderWrapper, times(expectedNumRequests))
.putBatchRequestThrottled(throttledTime.capture());
verify(statsRecorderWrapper, timeout(1000).times(expectedNumRequests))
.putClientBlockingLatencies(throttledTime.capture());

// Adding the first 2 elements should not get throttled since the batch is empty
assertThat(throttledTime.getAllValues().get(0)).isEqualTo(0);
Expand All @@ -493,7 +493,7 @@ public void testBatchBlockingLatencies() throws InterruptedException {
}

@Test
public void testBlockedOnChannelServerStreamLatencies() throws InterruptedException {
public void testQueuedOnChannelServerStreamLatencies() throws InterruptedException {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
Expand All @@ -505,14 +505,14 @@ public void testBlockedOnChannelServerStreamLatencies() throws InterruptedExcept

ArgumentCaptor<Long> blockedTime = ArgumentCaptor.forClass(Long.class);

verify(statsRecorderWrapper, times(fakeService.attemptCounter.get()))
.putGrpcChannelQueuedLatencies(blockedTime.capture());
verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get()))
.putClientBlockingLatencies(blockedTime.capture());

assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
}

@Test
public void testBlockedOnChannelUnaryLatencies() throws InterruptedException {
public void testQueuedOnChannelUnaryLatencies() throws InterruptedException {
when(mockFactory.newTracer(any(), any(), any()))
.thenReturn(
new BuiltinMetricsTracer(
Expand All @@ -521,8 +521,8 @@ public void testBlockedOnChannelUnaryLatencies() throws InterruptedException {

ArgumentCaptor<Long> blockedTime = ArgumentCaptor.forClass(Long.class);

verify(statsRecorderWrapper, times(fakeService.attemptCounter.get()))
.putGrpcChannelQueuedLatencies(blockedTime.capture());
verify(statsRecorderWrapper, timeout(1000).times(fakeService.attemptCounter.get()))
.putClientBlockingLatencies(blockedTime.capture());

assertThat(blockedTime.getAllValues().get(1)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
assertThat(blockedTime.getAllValues().get(2)).isAtLeast(CHANNEL_BLOCKING_LATENCY);
Expand Down

0 comments on commit 9313db4

Please sign in to comment.