Skip to content

Commit

Permalink
[router] remove store level metrics for non-streaming multiget
Browse files Browse the repository at this point in the history
The store level metrics in AggRouterHttpRequestStats are registered lazily.
Add a flag in AggRouterHttpRequestStats to disable store level metric emission
for non-streaming multiget request type. This is cleaner and the less intrusive
than adding checks and handling elsewhere because AggRouterHttpRequestStats and
the triggers to corresponding record functions are shared across different
request types (SINGLE_GET, MULTI_GET, MULTI_GET_STREAMING, COMPUTE, etc...).

We will keep the total stats for non-streaming multiget to give us visibility
in case any legacy clients start to make non-streaming multiget requests.

Fixed a bug where recordNoAvailableReplicaAbortedRetryRequest was calling the
wrong method for the store metric recording.
  • Loading branch information
xunyin8 committed Dec 7, 2024
1 parent e7c5286 commit 2fa4d9e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;


public class AggRouterHttpRequestStats extends AbstractVeniceAggStoreStats<RouterHttpRequestStats> {
private final Map<String, ScatterGatherStats> scatterGatherStatsMap = new VeniceConcurrentHashMap<>();
private final boolean isStoreStatsEnabled;

public AggRouterHttpRequestStats(
String clusterName,
Expand All @@ -38,6 +40,9 @@ public AggRouterHttpRequestStats(
ReadOnlyStoreRepository metadataRepository,
boolean isUnregisterMetricForDeletedStoreEnabled) {
super(cluster, metricsRepository, metadataRepository, isUnregisterMetricForDeletedStoreEnabled);
// Disable store level non-streaming multi get stats reporting because it's no longer used in clients. We still
// report to the total stats for visibility of potential old clients.
isStoreStatsEnabled = !RequestType.MULTI_GET.equals(requestType);
/**
* Use a setter function to bypass the restriction that the supertype constructor could not
* touch member fields of current object.
Expand All @@ -64,32 +69,38 @@ public ScatterGatherStats getScatterGatherStatsForStore(String storeName) {
return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats());
}

private void recordStoreStats(String storeName, Consumer<RouterHttpRequestStats> statsConsumer) {
if (isStoreStatsEnabled) {
statsConsumer.accept(getStoreStats(storeName));
}
}

public void recordRequest(String storeName) {
totalStats.recordIncomingRequest();
getStoreStats(storeName).recordIncomingRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordIncomingRequest);
}

public void recordHealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) {
totalStats.recordHealthyRequest(latency, responseStatus);
getStoreStats(storeName).recordHealthyRequest(latency, responseStatus);
recordStoreStats(storeName, stats -> stats.recordHealthyRequest(latency, responseStatus));
}

public void recordUnhealthyRequest(String storeName, HttpResponseStatus responseStatus) {
totalStats.recordUnhealthyRequest(responseStatus);
if (storeName != null) {
getStoreStats(storeName).recordUnhealthyRequest(responseStatus);
recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(responseStatus));
}
}

public void recordUnavailableReplicaStreamingRequest(String storeName) {
totalStats.recordUnavailableReplicaStreamingRequest();
getStoreStats(storeName).recordUnavailableReplicaStreamingRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableReplicaStreamingRequest);
}

public void recordUnhealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) {
totalStats.recordUnhealthyRequest(latency, responseStatus);
if (storeName != null) {
getStoreStats(storeName).recordUnhealthyRequest(latency, responseStatus);
recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(latency, responseStatus));
}
}

Expand All @@ -101,12 +112,12 @@ public void recordUnhealthyRequest(String storeName, double latency, HttpRespons
*/
public void recordReadQuotaUsage(String storeName, int quotaUsage) {
totalStats.recordReadQuotaUsage(quotaUsage);
getStoreStats(storeName).recordReadQuotaUsage(quotaUsage);
recordStoreStats(storeName, stats -> stats.recordReadQuotaUsage(quotaUsage));
}

public void recordTardyRequest(String storeName, double latency, HttpResponseStatus responseStatus) {
totalStats.recordTardyRequest(latency, responseStatus);
getStoreStats(storeName).recordTardyRequest(latency, responseStatus);
recordStoreStats(storeName, stats -> stats.recordTardyRequest(latency, responseStatus));
}

/**
Expand All @@ -123,76 +134,76 @@ public void recordThrottledRequest(String storeName, HttpResponseStatus httpResp

public void recordThrottledRequest(String storeName, double latency, HttpResponseStatus httpResponseStatus) {
totalStats.recordThrottledRequest(latency, httpResponseStatus);
getStoreStats(storeName).recordThrottledRequest(latency, httpResponseStatus);
recordStoreStats(storeName, stats -> stats.recordThrottledRequest(latency, httpResponseStatus));
}

public void recordBadRequest(String storeName, HttpResponseStatus responseStatus) {
totalStats.recordBadRequest(responseStatus);
if (storeName != null) {
getStoreStats(storeName).recordBadRequest(responseStatus);
recordStoreStats(storeName, stats -> stats.recordBadRequest(responseStatus));
}
}

public void recordBadRequestKeyCount(String storeName, int keyCount) {
totalStats.recordBadRequestKeyCount(keyCount);
if (storeName != null) {
getStoreStats(storeName).recordBadRequestKeyCount(keyCount);
recordStoreStats(storeName, stats -> stats.recordBadRequestKeyCount(keyCount));
}
}

public void recordRequestThrottledByRouterCapacity(String storeName) {
totalStats.recordRequestThrottledByRouterCapacity();
if (storeName != null) {
getStoreStats(storeName).recordRequestThrottledByRouterCapacity();
recordStoreStats(storeName, RouterHttpRequestStats::recordRequestThrottledByRouterCapacity);
}
}

public void recordErrorRetryCount(String storeName) {
totalStats.recordErrorRetryCount();
if (storeName != null) {
getStoreStats(storeName).recordErrorRetryCount();
recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryCount);
}
}

public void recordFanoutRequestCount(String storeName, int count) {
totalStats.recordFanoutRequestCount(count);
getStoreStats(storeName).recordFanoutRequestCount(count);
recordStoreStats(storeName, stats -> stats.recordFanoutRequestCount(count));
}

public void recordLatency(String storeName, double latency) {
totalStats.recordLatency(latency);
if (storeName != null) {
getStoreStats(storeName).recordLatency(latency);
recordStoreStats(storeName, stats -> stats.recordLatency(latency));
}
}

public void recordResponseWaitingTime(String storeName, double waitingTime) {
totalStats.recordResponseWaitingTime(waitingTime);
getStoreStats(storeName).recordResponseWaitingTime(waitingTime);
recordStoreStats(storeName, stats -> stats.recordResponseWaitingTime(waitingTime));
}

public void recordRequestSize(String storeName, double keySize) {
totalStats.recordRequestSize(keySize);
getStoreStats(storeName).recordRequestSize(keySize);
recordStoreStats(storeName, stats -> stats.recordRequestSize(keySize));
}

public void recordCompressedResponseSize(String storeName, double compressedResponseSize) {
totalStats.recordCompressedResponseSize(compressedResponseSize);
getStoreStats(storeName).recordCompressedResponseSize(compressedResponseSize);
recordStoreStats(storeName, stats -> stats.recordCompressedResponseSize(compressedResponseSize));
}

public void recordResponseSize(String storeName, double valueSize) {
totalStats.recordResponseSize(valueSize);
getStoreStats(storeName).recordResponseSize(valueSize);
recordStoreStats(storeName, stats -> stats.recordResponseSize(valueSize));
}

public void recordDecompressionTime(String storeName, double decompressionTime) {
totalStats.recordDecompressionTime(decompressionTime);
getStoreStats(storeName).recordDecompressionTime(decompressionTime);
recordStoreStats(storeName, stats -> stats.recordDecompressionTime(decompressionTime));
}

public void recordQuota(String storeName, double quota) {
getStoreStats(storeName).recordQuota(quota);
recordStoreStats(storeName, stats -> stats.recordQuota(quota));
}

public void recordTotalQuota(double totalQuota) {
Expand All @@ -201,17 +212,17 @@ public void recordTotalQuota(double totalQuota) {

public void recordFindUnhealthyHostRequest(String storeName) {
totalStats.recordFindUnhealthyHostRequest();
getStoreStats(storeName).recordFindUnhealthyHostRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordFindUnhealthyHostRequest);
}

public void recordResponse(String storeName) {
totalStats.recordResponse();
getStoreStats(storeName).recordResponse();
recordStoreStats(storeName, RouterHttpRequestStats::recordResponse);
}

public void recordMetaStoreShadowRead(String storeName) {
totalStats.recordMetaStoreShadowRead();
getStoreStats(storeName).recordMetaStoreShadowRead();
recordStoreStats(storeName, RouterHttpRequestStats::recordMetaStoreShadowRead);
}

private class AggScatterGatherStats extends ScatterGatherStats {
Expand Down Expand Up @@ -251,47 +262,47 @@ public long getTotalRetriesError() {

public void recordKeyNum(String storeName, int keyNum) {
totalStats.recordKeyNum(keyNum);
getStoreStats(storeName).recordKeyNum(keyNum);
recordStoreStats(storeName, stats -> stats.recordKeyNum(keyNum));
}

public void recordRequestUsage(String storeName, int usage) {
totalStats.recordRequestUsage(usage);
getStoreStats(storeName).recordRequestUsage(usage);
recordStoreStats(storeName, stats -> stats.recordRequestUsage(usage));
}

public void recordMultiGetFallback(String storeName, int keyCount) {
totalStats.recordMultiGetFallback(keyCount);
getStoreStats(storeName).recordMultiGetFallback(keyCount);
recordStoreStats(storeName, stats -> stats.recordMultiGetFallback(keyCount));
}

public void recordRequestParsingLatency(String storeName, double latency) {
totalStats.recordRequestParsingLatency(latency);
getStoreStats(storeName).recordRequestParsingLatency(latency);
recordStoreStats(storeName, stats -> stats.recordRequestParsingLatency(latency));
}

public void recordRequestRoutingLatency(String storeName, double latency) {
totalStats.recordRequestRoutingLatency(latency);
getStoreStats(storeName).recordRequestRoutingLatency(latency);
recordStoreStats(storeName, stats -> stats.recordRequestRoutingLatency(latency));
}

public void recordUnavailableRequest(String storeName) {
totalStats.recordUnavailableRequest();
getStoreStats(storeName).recordUnavailableRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableRequest);
}

public void recordDelayConstraintAbortedRetryRequest(String storeName) {
totalStats.recordDelayConstraintAbortedRetryRequest();
getStoreStats(storeName).recordDelayConstraintAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest);
}

public void recordSlowRouteAbortedRetryRequest(String storeName) {
totalStats.recordSlowRouteAbortedRetryRequest();
getStoreStats(storeName).recordSlowRouteAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordSlowRouteAbortedRetryRequest);
}

public void recordRetryRouteLimitAbortedRetryRequest(String storeName) {
totalStats.recordRetryRouteLimitAbortedRetryRequest();
getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest);
}

public void recordKeySize(String storeName, long keySize) {
Expand All @@ -300,26 +311,26 @@ public void recordKeySize(String storeName, long keySize) {

public void recordAllowedRetryRequest(String storeName) {
totalStats.recordAllowedRetryRequest();
getStoreStats(storeName).recordAllowedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordAllowedRetryRequest);
}

public void recordDisallowedRetryRequest(String storeName) {
totalStats.recordDisallowedRetryRequest();
getStoreStats(storeName).recordDisallowedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordDisallowedRetryRequest);
}

public void recordNoAvailableReplicaAbortedRetryRequest(String storeName) {
totalStats.recordNoAvailableReplicaAbortedRetryRequest();
getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest);
}

public void recordErrorRetryAttemptTriggeredByPendingRequestCheck(String storeName) {
totalStats.recordErrorRetryAttemptTriggeredByPendingRequestCheck();
getStoreStats(storeName).recordErrorRetryAttemptTriggeredByPendingRequestCheck();
recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryAttemptTriggeredByPendingRequestCheck);
}

public void recordRetryDelay(String storeName, double delay) {
totalStats.recordRetryDelay(delay);
getStoreStats(storeName).recordRetryDelay(delay);
recordStoreStats(storeName, stats -> stats.recordRetryDelay(delay));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.tehuti.MockTehutiReporter;
import com.linkedin.venice.utils.Utils;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.tehuti.TehutiException;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeSuite;
Expand Down Expand Up @@ -87,4 +90,39 @@ public void testProfilingMetrics() {
Assert.assertEquals((int) reporter.query(".total--compute_response_size.3thPercentile").value(), 3);
Assert.assertEquals((int) reporter.query(".total--compute_response_size.4thPercentile").value(), 4);
}

@Test
public void testDisableMultiGetStoreMetrics() {
String clusterName = "test-cluster";
AggRouterHttpRequestStats multiGetStats =
new AggRouterHttpRequestStats(clusterName, metricsRepository, RequestType.MULTI_GET, storeMetadataRepository, true);
AggRouterHttpRequestStats streamingMultiGetStats = new AggRouterHttpRequestStats(
clusterName,
metricsRepository,
RequestType.MULTI_GET_STREAMING,
storeMetadataRepository,
true);
String storeName = Utils.getUniqueString("test-store");
multiGetStats.recordRequest(storeName);
streamingMultiGetStats.recordRequest(storeName);
multiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK);
streamingMultiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK);
// Total stats should exist for streaming and non-streaming multi-get
Assert.assertEquals((int) reporter.query(".total--multiget_request.Count").value(), 1);
Assert.assertEquals((int) reporter.query(".total--multiget_streaming_request.Count").value(), 1);
Assert.assertEquals((int) reporter.query(".total--multiget_healthy_request_latency.Max").value(), 10);
Assert.assertEquals((int) reporter.query(".total--multiget_streaming_healthy_request_latency.Max").value(), 10);
// Store level stats should only exist for streaming multi-get
Assert.assertEquals((int) reporter.query("." + storeName + "--multiget_streaming_request.Count").value(), 1);
Assert.assertEquals(
(int) reporter.query("." + storeName + "--multiget_streaming_healthy_request_latency.Max").value(),
10);
TehutiException exception =
Assert.expectThrows(TehutiException.class, () -> reporter.query("." + storeName + "--multiget_request.Count"));
Assert.assertTrue(exception.getMessage().contains("does not exist"));
exception = Assert.expectThrows(
TehutiException.class,
() -> reporter.query("." + storeName + "--multiget_healthy_request_latency.Max"));
Assert.assertTrue(exception.getMessage().contains("does not exist"));
}
}

0 comments on commit 2fa4d9e

Please sign in to comment.