From 0da660ed32f2f93832835b9ddd14be31dd8b70ed Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Mon, 9 Dec 2024 16:25:46 -0800 Subject: [PATCH] [router] remove store level metrics for non-streaming multiget (#1306) * [router] remove store level metrics for non-streaming multiget The store level metrics in AggRouterHttpRequestStats are registered lazily. Add a flag in AggRouterHttpRequestStats to disable store level metric emission for non-streaming multiget request type. This is cleaner and the less intrusive than adding checks and handling elsewhere because AggRouterHttpRequestStats and the triggers to corresponding record functions are shared across different request types (SINGLE_GET, MULTI_GET, MULTI_GET_STREAMING, COMPUTE, etc...). We will keep the total stats for non-streaming multiget to give us visibility in case any legacy clients start to make non-streaming multiget requests. Fixed a bug where recordNoAvailableReplicaAbortedRetryRequest was calling the wrong method for the store metric recording. --- .../stats/AggRouterHttpRequestStats.java | 87 +++++++++++-------- .../router/AggRouterHttpRequestStatsTest.java | 42 +++++++++ 2 files changed, 91 insertions(+), 38 deletions(-) diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java index 7f59b12b2b3..e36f8683e33 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/stats/AggRouterHttpRequestStats.java @@ -9,11 +9,13 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.tehuti.metrics.MetricsRepository; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public class AggRouterHttpRequestStats extends AbstractVeniceAggStoreStats { private final Map scatterGatherStatsMap = new VeniceConcurrentHashMap<>(); + private final boolean isStoreStatsEnabled; public AggRouterHttpRequestStats( String clusterName, @@ -38,6 +40,9 @@ public AggRouterHttpRequestStats( ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled) { super(cluster, metricsRepository, metadataRepository, isUnregisterMetricForDeletedStoreEnabled); + // Disable store level non-streaming multi get stats reporting because it's no longer used in clients. We still + // report to the total stats for visibility of potential old clients. + isStoreStatsEnabled = !RequestType.MULTI_GET.equals(requestType); /** * Use a setter function to bypass the restriction that the supertype constructor could not * touch member fields of current object. @@ -64,32 +69,38 @@ public ScatterGatherStats getScatterGatherStatsForStore(String storeName) { return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats()); } + private void recordStoreStats(String storeName, Consumer statsConsumer) { + if (isStoreStatsEnabled) { + statsConsumer.accept(getStoreStats(storeName)); + } + } + public void recordRequest(String storeName) { totalStats.recordIncomingRequest(); - getStoreStats(storeName).recordIncomingRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordIncomingRequest); } public void recordHealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) { totalStats.recordHealthyRequest(latency, responseStatus); - getStoreStats(storeName).recordHealthyRequest(latency, responseStatus); + recordStoreStats(storeName, stats -> stats.recordHealthyRequest(latency, responseStatus)); } public void recordUnhealthyRequest(String storeName, HttpResponseStatus responseStatus) { totalStats.recordUnhealthyRequest(responseStatus); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(responseStatus); + recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(responseStatus)); } } public void recordUnavailableReplicaStreamingRequest(String storeName) { totalStats.recordUnavailableReplicaStreamingRequest(); - getStoreStats(storeName).recordUnavailableReplicaStreamingRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableReplicaStreamingRequest); } public void recordUnhealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) { totalStats.recordUnhealthyRequest(latency, responseStatus); if (storeName != null) { - getStoreStats(storeName).recordUnhealthyRequest(latency, responseStatus); + recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(latency, responseStatus)); } } @@ -101,12 +112,12 @@ public void recordUnhealthyRequest(String storeName, double latency, HttpRespons */ public void recordReadQuotaUsage(String storeName, int quotaUsage) { totalStats.recordReadQuotaUsage(quotaUsage); - getStoreStats(storeName).recordReadQuotaUsage(quotaUsage); + recordStoreStats(storeName, stats -> stats.recordReadQuotaUsage(quotaUsage)); } public void recordTardyRequest(String storeName, double latency, HttpResponseStatus responseStatus) { totalStats.recordTardyRequest(latency, responseStatus); - getStoreStats(storeName).recordTardyRequest(latency, responseStatus); + recordStoreStats(storeName, stats -> stats.recordTardyRequest(latency, responseStatus)); } /** @@ -118,81 +129,81 @@ public void recordTardyRequest(String storeName, double latency, HttpResponseSta */ public void recordThrottledRequest(String storeName, HttpResponseStatus httpResponseStatus) { totalStats.recordThrottledRequest(httpResponseStatus); - getStoreStats(storeName).recordThrottledRequest(httpResponseStatus); + recordStoreStats(storeName, stats -> stats.recordThrottledRequest(httpResponseStatus)); } public void recordThrottledRequest(String storeName, double latency, HttpResponseStatus httpResponseStatus) { totalStats.recordThrottledRequest(latency, httpResponseStatus); - getStoreStats(storeName).recordThrottledRequest(latency, httpResponseStatus); + recordStoreStats(storeName, stats -> stats.recordThrottledRequest(latency, httpResponseStatus)); } public void recordBadRequest(String storeName, HttpResponseStatus responseStatus) { totalStats.recordBadRequest(responseStatus); if (storeName != null) { - getStoreStats(storeName).recordBadRequest(responseStatus); + recordStoreStats(storeName, stats -> stats.recordBadRequest(responseStatus)); } } public void recordBadRequestKeyCount(String storeName, int keyCount) { totalStats.recordBadRequestKeyCount(keyCount); if (storeName != null) { - getStoreStats(storeName).recordBadRequestKeyCount(keyCount); + recordStoreStats(storeName, stats -> stats.recordBadRequestKeyCount(keyCount)); } } public void recordRequestThrottledByRouterCapacity(String storeName) { totalStats.recordRequestThrottledByRouterCapacity(); if (storeName != null) { - getStoreStats(storeName).recordRequestThrottledByRouterCapacity(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRequestThrottledByRouterCapacity); } } public void recordErrorRetryCount(String storeName) { totalStats.recordErrorRetryCount(); if (storeName != null) { - getStoreStats(storeName).recordErrorRetryCount(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryCount); } } public void recordFanoutRequestCount(String storeName, int count) { totalStats.recordFanoutRequestCount(count); - getStoreStats(storeName).recordFanoutRequestCount(count); + recordStoreStats(storeName, stats -> stats.recordFanoutRequestCount(count)); } public void recordLatency(String storeName, double latency) { totalStats.recordLatency(latency); if (storeName != null) { - getStoreStats(storeName).recordLatency(latency); + recordStoreStats(storeName, stats -> stats.recordLatency(latency)); } } public void recordResponseWaitingTime(String storeName, double waitingTime) { totalStats.recordResponseWaitingTime(waitingTime); - getStoreStats(storeName).recordResponseWaitingTime(waitingTime); + recordStoreStats(storeName, stats -> stats.recordResponseWaitingTime(waitingTime)); } public void recordRequestSize(String storeName, double keySize) { totalStats.recordRequestSize(keySize); - getStoreStats(storeName).recordRequestSize(keySize); + recordStoreStats(storeName, stats -> stats.recordRequestSize(keySize)); } public void recordCompressedResponseSize(String storeName, double compressedResponseSize) { totalStats.recordCompressedResponseSize(compressedResponseSize); - getStoreStats(storeName).recordCompressedResponseSize(compressedResponseSize); + recordStoreStats(storeName, stats -> stats.recordCompressedResponseSize(compressedResponseSize)); } public void recordResponseSize(String storeName, double valueSize) { totalStats.recordResponseSize(valueSize); - getStoreStats(storeName).recordResponseSize(valueSize); + recordStoreStats(storeName, stats -> stats.recordResponseSize(valueSize)); } public void recordDecompressionTime(String storeName, double decompressionTime) { totalStats.recordDecompressionTime(decompressionTime); - getStoreStats(storeName).recordDecompressionTime(decompressionTime); + recordStoreStats(storeName, stats -> stats.recordDecompressionTime(decompressionTime)); } public void recordQuota(String storeName, double quota) { - getStoreStats(storeName).recordQuota(quota); + recordStoreStats(storeName, stats -> stats.recordQuota(quota)); } public void recordTotalQuota(double totalQuota) { @@ -201,17 +212,17 @@ public void recordTotalQuota(double totalQuota) { public void recordFindUnhealthyHostRequest(String storeName) { totalStats.recordFindUnhealthyHostRequest(); - getStoreStats(storeName).recordFindUnhealthyHostRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordFindUnhealthyHostRequest); } public void recordResponse(String storeName) { totalStats.recordResponse(); - getStoreStats(storeName).recordResponse(); + recordStoreStats(storeName, RouterHttpRequestStats::recordResponse); } public void recordMetaStoreShadowRead(String storeName) { totalStats.recordMetaStoreShadowRead(); - getStoreStats(storeName).recordMetaStoreShadowRead(); + recordStoreStats(storeName, RouterHttpRequestStats::recordMetaStoreShadowRead); } private class AggScatterGatherStats extends ScatterGatherStats { @@ -251,47 +262,47 @@ public long getTotalRetriesError() { public void recordKeyNum(String storeName, int keyNum) { totalStats.recordKeyNum(keyNum); - getStoreStats(storeName).recordKeyNum(keyNum); + recordStoreStats(storeName, stats -> stats.recordKeyNum(keyNum)); } public void recordRequestUsage(String storeName, int usage) { totalStats.recordRequestUsage(usage); - getStoreStats(storeName).recordRequestUsage(usage); + recordStoreStats(storeName, stats -> stats.recordRequestUsage(usage)); } public void recordMultiGetFallback(String storeName, int keyCount) { totalStats.recordMultiGetFallback(keyCount); - getStoreStats(storeName).recordMultiGetFallback(keyCount); + recordStoreStats(storeName, stats -> stats.recordMultiGetFallback(keyCount)); } public void recordRequestParsingLatency(String storeName, double latency) { totalStats.recordRequestParsingLatency(latency); - getStoreStats(storeName).recordRequestParsingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestParsingLatency(latency)); } public void recordRequestRoutingLatency(String storeName, double latency) { totalStats.recordRequestRoutingLatency(latency); - getStoreStats(storeName).recordRequestRoutingLatency(latency); + recordStoreStats(storeName, stats -> stats.recordRequestRoutingLatency(latency)); } public void recordUnavailableRequest(String storeName) { totalStats.recordUnavailableRequest(); - getStoreStats(storeName).recordUnavailableRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableRequest); } public void recordDelayConstraintAbortedRetryRequest(String storeName) { totalStats.recordDelayConstraintAbortedRetryRequest(); - getStoreStats(storeName).recordDelayConstraintAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest); } public void recordSlowRouteAbortedRetryRequest(String storeName) { totalStats.recordSlowRouteAbortedRetryRequest(); - getStoreStats(storeName).recordSlowRouteAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordSlowRouteAbortedRetryRequest); } public void recordRetryRouteLimitAbortedRetryRequest(String storeName) { totalStats.recordRetryRouteLimitAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest); } public void recordKeySize(String storeName, long keySize) { @@ -300,26 +311,26 @@ public void recordKeySize(String storeName, long keySize) { public void recordAllowedRetryRequest(String storeName) { totalStats.recordAllowedRetryRequest(); - getStoreStats(storeName).recordAllowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordAllowedRetryRequest); } public void recordDisallowedRetryRequest(String storeName) { totalStats.recordDisallowedRetryRequest(); - getStoreStats(storeName).recordDisallowedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordDisallowedRetryRequest); } public void recordNoAvailableReplicaAbortedRetryRequest(String storeName) { totalStats.recordNoAvailableReplicaAbortedRetryRequest(); - getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest(); + recordStoreStats(storeName, RouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest); } public void recordErrorRetryAttemptTriggeredByPendingRequestCheck(String storeName) { totalStats.recordErrorRetryAttemptTriggeredByPendingRequestCheck(); - getStoreStats(storeName).recordErrorRetryAttemptTriggeredByPendingRequestCheck(); + recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryAttemptTriggeredByPendingRequestCheck); } public void recordRetryDelay(String storeName, double delay) { totalStats.recordRetryDelay(delay); - getStoreStats(storeName).recordRetryDelay(delay); + recordStoreStats(storeName, stats -> stats.recordRetryDelay(delay)); } } diff --git a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java index 333b37ca33d..0a05181ad78 100644 --- a/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java +++ b/services/venice-router/src/test/java/com/linkedin/venice/router/AggRouterHttpRequestStatsTest.java @@ -7,6 +7,9 @@ import com.linkedin.venice.router.stats.AggRouterHttpRequestStats; import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.tehuti.MockTehutiReporter; +import com.linkedin.venice.utils.Utils; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.tehuti.TehutiException; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeSuite; @@ -87,4 +90,43 @@ public void testProfilingMetrics() { Assert.assertEquals((int) reporter.query(".total--compute_response_size.3thPercentile").value(), 3); Assert.assertEquals((int) reporter.query(".total--compute_response_size.4thPercentile").value(), 4); } + + @Test + public void testDisableMultiGetStoreMetrics() { + String clusterName = "test-cluster"; + AggRouterHttpRequestStats multiGetStats = new AggRouterHttpRequestStats( + clusterName, + metricsRepository, + RequestType.MULTI_GET, + storeMetadataRepository, + true); + AggRouterHttpRequestStats streamingMultiGetStats = new AggRouterHttpRequestStats( + clusterName, + metricsRepository, + RequestType.MULTI_GET_STREAMING, + storeMetadataRepository, + true); + String storeName = Utils.getUniqueString("test-store"); + multiGetStats.recordRequest(storeName); + streamingMultiGetStats.recordRequest(storeName); + multiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK); + streamingMultiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK); + // Total stats should exist for streaming and non-streaming multi-get + Assert.assertEquals((int) reporter.query(".total--multiget_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals((int) reporter.query(".total--multiget_healthy_request_latency.Max").value(), 10); + Assert.assertEquals((int) reporter.query(".total--multiget_streaming_healthy_request_latency.Max").value(), 10); + // Store level stats should only exist for streaming multi-get + Assert.assertEquals((int) reporter.query("." + storeName + "--multiget_streaming_request.Count").value(), 1); + Assert.assertEquals( + (int) reporter.query("." + storeName + "--multiget_streaming_healthy_request_latency.Max").value(), + 10); + TehutiException exception = + Assert.expectThrows(TehutiException.class, () -> reporter.query("." + storeName + "--multiget_request.Count")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + exception = Assert.expectThrows( + TehutiException.class, + () -> reporter.query("." + storeName + "--multiget_healthy_request_latency.Max")); + Assert.assertTrue(exception.getMessage().contains("does not exist")); + } }