diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java index 7f59b12b2b..191113a1dc 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java @@ -9,11 +9,13 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.tehuti.metrics.MetricsRepository; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public class AggRouterHttpRequestStats extends AbstractVeniceAggStoreStats { private final Map scatterGatherStatsMap = new VeniceConcurrentHashMap<>(); + private final boolean isStoreStatsEnabled; public AggRouterHttpRequestStats( String clusterName, @@ -38,6 +40,9 @@ public AggRouterHttpRequestStats( ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled) { super(cluster, metricsRepository, metadataRepository, isUnregisterMetricForDeletedStoreEnabled); + // Disable store level non-streaming multi get stats reporting because it's no longer used in clients. We still + // report to the total stats for visibility of potential old clients. + isStoreStatsEnabled = !RequestType.MULTI_GET.equals(requestType); /** * Use a setter function to bypass the restriction that the supertype constructor could not * touch member fields of current object. @@ -64,32 +69,38 @@ public ScatterGatherStats getScatterGatherStatsForStore(String storeName) { return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); } + private void recordStoreStats(String storeName, Consumer statsConsumer) { + if (isStoreStatsEnabled) { + statsConsumer.accept(getStoreStats(storeName)); + } + } + public void recordRequest(String storeName) { totalStats.recordIncomingRequest(); - getStoreStats(storeName).recordIncomingRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordIncomingRequest); } public void recordHealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) { totalStats.recordHealthyRequest(latency, responseStatus); - getStoreStats(storeName).recordHealthyRequest(latency, responseStatus); + recordStoreStats(storeName, stats -> stats.recordHealthyRequest(latency, responseStatus)); } public void recordUnhealthyRequest(String storeName, HttpResponseStatus responseStatus) { totalStats.recordUnhealthyRequest(responseStatus); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(responseStatus); + recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(responseStatus)); } } public void recordUnavailableReplicaStreamingRequest(String storeName) { totalStats.recordUnavailableReplicaStreamingRequest(); - getStoreStats(storeName).recordUnavailableReplicaStreamingRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableReplicaStreamingRequest); } public void recordUnhealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) { totalStats.recordUnhealthyRequest(latency, responseStatus); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(latency, responseStatus); + recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(latency, responseStatus)); } } @@ -101,12 +112,12 @@ public void recordUnhealthyRequest(String storeName, double latency, HttpRespons */ public void recordReadQuotaUsage(String storeName, int quotaUsage) { totalStats.recordReadQuotaUsage(quotaUsage); - getStoreStats(storeName).recordReadQuotaUsage(quotaUsage); + recordStoreStats(storeName, stats -> stats.recordReadQuotaUsage(quotaUsage)); } public void recordTardyRequest(String storeName, double latency, HttpResponseStatus responseStatus) { totalStats.recordTardyRequest(latency, responseStatus); - getStoreStats(storeName).recordTardyRequest(latency, responseStatus); + recordStoreStats(storeName, stats -> stats.recordTardyRequest(latency, responseStatus)); } /** @@ -123,76 +134,76 @@ public void recordThrottledRequest(String storeName, HttpResponseStatus httpResp public void recordThrottledRequest(String storeName, double latency, HttpResponseStatus httpResponseStatus) { totalStats.recordThrottledRequest(latency, httpResponseStatus); - getStoreStats(storeName).recordThrottledRequest(latency, httpResponseStatus); + recordStoreStats(storeName, stats -> stats.recordThrottledRequest(latency, httpResponseStatus)); } public void recordBadRequest(String storeName, HttpResponseStatus responseStatus) { totalStats.recordBadRequest(responseStatus); if (storeName != null) { - getStoreStats(storeName).recordBadRequest(responseStatus); + recordStoreStats(storeName, stats -> stats.recordBadRequest(responseStatus)); } } public void recordBadRequestKeyCount(String storeName, int keyCount) { totalStats.recordBadRequestKeyCount(keyCount); if (storeName != null) { - getStoreStats(storeName).recordBadRequestKeyCount(keyCount); + recordStoreStats(storeName, stats -> stats.recordBadRequestKeyCount(keyCount)); } } public void recordRequestThrottledByRouterCapacity(String storeName) { totalStats.recordRequestThrottledByRouterCapacity(); if (storeName != null) { - getStoreStats(storeName).recordRequestThrottledByRouterCapacity(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRequestThrottledByRouterCapacity); } } public void recordErrorRetryCount(String storeName) { totalStats.recordErrorRetryCount(); if (storeName != null) { - getStoreStats(storeName).recordErrorRetryCount(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryCount); } } public void recordFanoutRequestCount(String storeName, int count) { totalStats.recordFanoutRequestCount(count); - getStoreStats(storeName).recordFanoutRequestCount(count); + recordStoreStats(storeName, stats -> stats.recordFanoutRequestCount(count)); } public void recordLatency(String storeName, double latency) { totalStats.recordLatency(latency); if (storeName != null) { - getStoreStats(storeName).recordLatency(latency); + recordStoreStats(storeName, stats -> stats.recordLatency(latency)); } } public void recordResponseWaitingTime(String storeName, double waitingTime) { totalStats.recordResponseWaitingTime(waitingTime); - getStoreStats(storeName).recordResponseWaitingTime(waitingTime); + recordStoreStats(storeName, stats -> stats.recordResponseWaitingTime(waitingTime)); } public void recordRequestSize(String storeName, double keySize) { totalStats.recordRequestSize(keySize); - getStoreStats(storeName).recordRequestSize(keySize); + recordStoreStats(storeName, stats -> stats.recordRequestSize(keySize)); } public void recordCompressedResponseSize(String storeName, double compressedResponseSize) { totalStats.recordCompressedResponseSize(compressedResponseSize); - getStoreStats(storeName).recordCompressedResponseSize(compressedResponseSize); + recordStoreStats(storeName, stats -> stats.recordCompressedResponseSize(compressedResponseSize)); } public void recordResponseSize(String storeName, double valueSize) { totalStats.recordResponseSize(valueSize); - getStoreStats(storeName).recordResponseSize(valueSize); + recordStoreStats(storeName, stats -> stats.recordResponseSize(valueSize)); } public void recordDecompressionTime(String storeName, double decompressionTime) { totalStats.recordDecompressionTime(decompressionTime); - getStoreStats(storeName).recordDecompressionTime(decompressionTime); + recordStoreStats(storeName, stats -> stats.recordDecompressionTime(decompressionTime)); } public void recordQuota(String storeName, double quota) { - getStoreStats(storeName).recordQuota(quota); + recordStoreStats(storeName, stats -> stats.recordQuota(quota)); } public void recordTotalQuota(double totalQuota) { @@ -201,17 +212,17 @@ public void recordTotalQuota(double totalQuota) { public void recordFindUnhealthyHostRequest(String storeName) { totalStats.recordFindUnhealthyHostRequest(); - getStoreStats(storeName).recordFindUnhealthyHostRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordFindUnhealthyHostRequest); } public void recordResponse(String storeName) { totalStats.recordResponse(); - getStoreStats(storeName).recordResponse(); + recordStoreStats(storeName, RouterHttpRequestStats::recordResponse); } public void recordMetaStoreShadowRead(String storeName) { totalStats.recordMetaStoreShadowRead(); - getStoreStats(storeName).recordMetaStoreShadowRead(); + recordStoreStats(storeName, RouterHttpRequestStats::recordMetaStoreShadowRead); } private class AggScatterGatherStats extends ScatterGatherStats { @@ -251,47 +262,47 @@ public long getTotalRetriesError() { public void recordKeyNum(String storeName, int keyNum) { totalStats.recordKeyNum(keyNum); - getStoreStats(storeName).recordKeyNum(keyNum); + recordStoreStats(storeName, stats -> stats.recordKeyNum(keyNum)); } public void recordRequestUsage(String storeName, int usage) { totalStats.recordRequestUsage(usage); - getStoreStats(storeName).recordRequestUsage(usage); + recordStoreStats(storeName, stats -> stats.recordRequestUsage(usage)); } public void recordMultiGetFallback(String storeName, int keyCount) { totalStats.recordMultiGetFallback(keyCount); - getStoreStats(storeName).recordMultiGetFallback(keyCount); + recordStoreStats(storeName, stats -> stats.recordMultiGetFallback(keyCount)); } public void recordRequestParsingLatency(String storeName, double latency) { totalStats.recordRequestParsingLatency(latency); - getStoreStats(storeName).recordRequestParsingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestParsingLatency(latency)); } public void recordRequestRoutingLatency(String storeName, double latency) { totalStats.recordRequestRoutingLatency(latency); - getStoreStats(storeName).recordRequestRoutingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestRoutingLatency(latency)); } public void recordUnavailableRequest(String storeName) { totalStats.recordUnavailableRequest(); - getStoreStats(storeName).recordUnavailableRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableRequest); } public void recordDelayConstraintAbortedRetryRequest(String storeName) { totalStats.recordDelayConstraintAbortedRetryRequest(); - getStoreStats(storeName).recordDelayConstraintAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest); } public void recordSlowRouteAbortedRetryRequest(String storeName) { totalStats.recordSlowRouteAbortedRetryRequest(); - getStoreStats(storeName).recordSlowRouteAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordSlowRouteAbortedRetryRequest); } public void recordRetryRouteLimitAbortedRetryRequest(String storeName) { totalStats.recordRetryRouteLimitAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest); } public void recordKeySize(String storeName, long keySize) { @@ -300,26 +311,26 @@ public void recordKeySize(String storeName, long keySize) { public void recordAllowedRetryRequest(String storeName) { totalStats.recordAllowedRetryRequest(); - getStoreStats(storeName).recordAllowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordAllowedRetryRequest); } public void recordDisallowedRetryRequest(String storeName) { totalStats.recordDisallowedRetryRequest(); - getStoreStats(storeName).recordDisallowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDisallowedRetryRequest); } public void recordNoAvailableReplicaAbortedRetryRequest(String storeName) { totalStats.recordNoAvailableReplicaAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest); } public void recordErrorRetryAttemptTriggeredByPendingRequestCheck(String storeName) { totalStats.recordErrorRetryAttemptTriggeredByPendingRequestCheck(); - getStoreStats(storeName).recordErrorRetryAttemptTriggeredByPendingRequestCheck(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryAttemptTriggeredByPendingRequestCheck); } public void recordRetryDelay(String storeName, double delay) { totalStats.recordRetryDelay(delay); - getStoreStats(storeName).recordRetryDelay(delay); + recordStoreStats(storeName, stats -> stats.recordRetryDelay(delay)); } } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java index 333b37ca33..dc385e6c92 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java @@ -7,6 +7,9 @@ import com.linkedin.venice.router.stats.AggRouterHttpRequestStats; import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.tehuti.TehutiException; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeSuite; @@ -87,4 +90,39 @@ public void testProfilingMetrics() { Assert.assertEquals((int) reporter.query(".total--compute_response_size.3thPercentile").value(), 3); Assert.assertEquals((int) reporter.query(".total--compute_response_size.4thPercentile").value(), 4); } + + @Test + public void testDisableMultiGetStoreMetrics() { + String clusterName = "test-cluster"; + AggRouterHttpRequestStats multiGetStats = + new AggRouterHttpRequestStats(clusterName, metricsRepository, RequestType.MULTI_GET, storeMetadataRepository, true); + AggRouterHttpRequestStats streamingMultiGetStats = new AggRouterHttpRequestStats( + clusterName, + metricsRepository, + RequestType.MULTI_GET_STREAMING, + storeMetadataRepository, + true); + String storeName = Utils.getUniqueString("test-store"); + multiGetStats.recordRequest(storeName); + streamingMultiGetStats.recordRequest(storeName); + multiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK); + streamingMultiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK); + // Total stats should exist for streaming and non-streaming multi-get + Assert.assertEquals((int) reporter.query(".total--multiget_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_healthy_request_latency.Max").value(), 10); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_healthy_request_latency.Max").value(), 10); + // Store level stats should only exist for streaming multi-get + Assert.assertEquals((int) reporter.query("." + storeName + "--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals( + (int) reporter.query("." + storeName + "--multiget_streaming_healthy_request_latency.Max").value(), + 10); + TehutiException exception = + Assert.expectThrows(TehutiException.class, () -> reporter.query("." + storeName + "--multiget_request.Count")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + exception = Assert.expectThrows( + TehutiException.class, + () -> reporter.query("." + storeName + "--multiget_healthy_request_latency.Max")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + } }