Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[router] remove store level metrics for non-streaming multiget #1306

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;


public class AggRouterHttpRequestStats extends AbstractVeniceAggStoreStats<RouterHttpRequestStats> {
private final Map<String, ScatterGatherStats> scatterGatherStatsMap = new VeniceConcurrentHashMap<>();
private final boolean isStoreStatsEnabled;

public AggRouterHttpRequestStats(
String clusterName,
Expand All @@ -38,6 +40,9 @@ public AggRouterHttpRequestStats(
ReadOnlyStoreRepository metadataRepository,
boolean isUnregisterMetricForDeletedStoreEnabled) {
super(cluster, metricsRepository, metadataRepository, isUnregisterMetricForDeletedStoreEnabled);
// Disable store level non-streaming multi get stats reporting because it's no longer used in clients. We still
// report to the total stats for visibility of potential old clients.
isStoreStatsEnabled = !RequestType.MULTI_GET.equals(requestType);
xunyin8 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Use a setter function to bypass the restriction that the supertype constructor could not
* touch member fields of current object.
Expand All @@ -64,32 +69,38 @@ public ScatterGatherStats getScatterGatherStatsForStore(String storeName) {
return scatterGatherStatsMap.computeIfAbsent(storeName, k -> new ScatterGatherStats());
}

private void recordStoreStats(String storeName, Consumer<RouterHttpRequestStats> statsConsumer) {
if (isStoreStatsEnabled) {
statsConsumer.accept(getStoreStats(storeName));
}
}

public void recordRequest(String storeName) {
totalStats.recordIncomingRequest();
getStoreStats(storeName).recordIncomingRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordIncomingRequest);
}

public void recordHealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) {
totalStats.recordHealthyRequest(latency, responseStatus);
getStoreStats(storeName).recordHealthyRequest(latency, responseStatus);
recordStoreStats(storeName, stats -> stats.recordHealthyRequest(latency, responseStatus));
}

public void recordUnhealthyRequest(String storeName, HttpResponseStatus responseStatus) {
totalStats.recordUnhealthyRequest(responseStatus);
if (storeName != null) {
getStoreStats(storeName).recordUnhealthyRequest(responseStatus);
recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(responseStatus));
}
}

public void recordUnavailableReplicaStreamingRequest(String storeName) {
totalStats.recordUnavailableReplicaStreamingRequest();
getStoreStats(storeName).recordUnavailableReplicaStreamingRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableReplicaStreamingRequest);
}

public void recordUnhealthyRequest(String storeName, double latency, HttpResponseStatus responseStatus) {
totalStats.recordUnhealthyRequest(latency, responseStatus);
if (storeName != null) {
getStoreStats(storeName).recordUnhealthyRequest(latency, responseStatus);
recordStoreStats(storeName, stats -> stats.recordUnhealthyRequest(latency, responseStatus));
}
}

Expand All @@ -101,12 +112,12 @@ public void recordUnhealthyRequest(String storeName, double latency, HttpRespons
*/
public void recordReadQuotaUsage(String storeName, int quotaUsage) {
totalStats.recordReadQuotaUsage(quotaUsage);
getStoreStats(storeName).recordReadQuotaUsage(quotaUsage);
recordStoreStats(storeName, stats -> stats.recordReadQuotaUsage(quotaUsage));
}

public void recordTardyRequest(String storeName, double latency, HttpResponseStatus responseStatus) {
totalStats.recordTardyRequest(latency, responseStatus);
getStoreStats(storeName).recordTardyRequest(latency, responseStatus);
recordStoreStats(storeName, stats -> stats.recordTardyRequest(latency, responseStatus));
}

/**
Expand All @@ -118,81 +129,81 @@ public void recordTardyRequest(String storeName, double latency, HttpResponseSta
*/
public void recordThrottledRequest(String storeName, HttpResponseStatus httpResponseStatus) {
totalStats.recordThrottledRequest(httpResponseStatus);
getStoreStats(storeName).recordThrottledRequest(httpResponseStatus);
recordStoreStats(storeName, stats -> stats.recordThrottledRequest(httpResponseStatus));
}

public void recordThrottledRequest(String storeName, double latency, HttpResponseStatus httpResponseStatus) {
totalStats.recordThrottledRequest(latency, httpResponseStatus);
getStoreStats(storeName).recordThrottledRequest(latency, httpResponseStatus);
recordStoreStats(storeName, stats -> stats.recordThrottledRequest(latency, httpResponseStatus));
}

public void recordBadRequest(String storeName, HttpResponseStatus responseStatus) {
totalStats.recordBadRequest(responseStatus);
if (storeName != null) {
getStoreStats(storeName).recordBadRequest(responseStatus);
recordStoreStats(storeName, stats -> stats.recordBadRequest(responseStatus));
}
}

public void recordBadRequestKeyCount(String storeName, int keyCount) {
totalStats.recordBadRequestKeyCount(keyCount);
if (storeName != null) {
getStoreStats(storeName).recordBadRequestKeyCount(keyCount);
recordStoreStats(storeName, stats -> stats.recordBadRequestKeyCount(keyCount));
}
}

public void recordRequestThrottledByRouterCapacity(String storeName) {
totalStats.recordRequestThrottledByRouterCapacity();
if (storeName != null) {
getStoreStats(storeName).recordRequestThrottledByRouterCapacity();
recordStoreStats(storeName, RouterHttpRequestStats::recordRequestThrottledByRouterCapacity);
}
}

public void recordErrorRetryCount(String storeName) {
totalStats.recordErrorRetryCount();
if (storeName != null) {
getStoreStats(storeName).recordErrorRetryCount();
recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryCount);
}
}

public void recordFanoutRequestCount(String storeName, int count) {
totalStats.recordFanoutRequestCount(count);
getStoreStats(storeName).recordFanoutRequestCount(count);
recordStoreStats(storeName, stats -> stats.recordFanoutRequestCount(count));
}

public void recordLatency(String storeName, double latency) {
totalStats.recordLatency(latency);
if (storeName != null) {
getStoreStats(storeName).recordLatency(latency);
recordStoreStats(storeName, stats -> stats.recordLatency(latency));
}
}

public void recordResponseWaitingTime(String storeName, double waitingTime) {
totalStats.recordResponseWaitingTime(waitingTime);
getStoreStats(storeName).recordResponseWaitingTime(waitingTime);
recordStoreStats(storeName, stats -> stats.recordResponseWaitingTime(waitingTime));
}

public void recordRequestSize(String storeName, double keySize) {
totalStats.recordRequestSize(keySize);
getStoreStats(storeName).recordRequestSize(keySize);
recordStoreStats(storeName, stats -> stats.recordRequestSize(keySize));
}

public void recordCompressedResponseSize(String storeName, double compressedResponseSize) {
totalStats.recordCompressedResponseSize(compressedResponseSize);
getStoreStats(storeName).recordCompressedResponseSize(compressedResponseSize);
recordStoreStats(storeName, stats -> stats.recordCompressedResponseSize(compressedResponseSize));
}

public void recordResponseSize(String storeName, double valueSize) {
totalStats.recordResponseSize(valueSize);
getStoreStats(storeName).recordResponseSize(valueSize);
recordStoreStats(storeName, stats -> stats.recordResponseSize(valueSize));
}

public void recordDecompressionTime(String storeName, double decompressionTime) {
totalStats.recordDecompressionTime(decompressionTime);
getStoreStats(storeName).recordDecompressionTime(decompressionTime);
recordStoreStats(storeName, stats -> stats.recordDecompressionTime(decompressionTime));
}

public void recordQuota(String storeName, double quota) {
getStoreStats(storeName).recordQuota(quota);
recordStoreStats(storeName, stats -> stats.recordQuota(quota));
}

public void recordTotalQuota(double totalQuota) {
Expand All @@ -201,17 +212,17 @@ public void recordTotalQuota(double totalQuota) {

public void recordFindUnhealthyHostRequest(String storeName) {
totalStats.recordFindUnhealthyHostRequest();
getStoreStats(storeName).recordFindUnhealthyHostRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordFindUnhealthyHostRequest);
}

public void recordResponse(String storeName) {
totalStats.recordResponse();
getStoreStats(storeName).recordResponse();
recordStoreStats(storeName, RouterHttpRequestStats::recordResponse);
}

public void recordMetaStoreShadowRead(String storeName) {
totalStats.recordMetaStoreShadowRead();
getStoreStats(storeName).recordMetaStoreShadowRead();
recordStoreStats(storeName, RouterHttpRequestStats::recordMetaStoreShadowRead);
}

private class AggScatterGatherStats extends ScatterGatherStats {
Expand Down Expand Up @@ -251,47 +262,47 @@ public long getTotalRetriesError() {

public void recordKeyNum(String storeName, int keyNum) {
totalStats.recordKeyNum(keyNum);
getStoreStats(storeName).recordKeyNum(keyNum);
recordStoreStats(storeName, stats -> stats.recordKeyNum(keyNum));
}

public void recordRequestUsage(String storeName, int usage) {
totalStats.recordRequestUsage(usage);
getStoreStats(storeName).recordRequestUsage(usage);
recordStoreStats(storeName, stats -> stats.recordRequestUsage(usage));
}

public void recordMultiGetFallback(String storeName, int keyCount) {
totalStats.recordMultiGetFallback(keyCount);
getStoreStats(storeName).recordMultiGetFallback(keyCount);
recordStoreStats(storeName, stats -> stats.recordMultiGetFallback(keyCount));
}

public void recordRequestParsingLatency(String storeName, double latency) {
totalStats.recordRequestParsingLatency(latency);
getStoreStats(storeName).recordRequestParsingLatency(latency);
recordStoreStats(storeName, stats -> stats.recordRequestParsingLatency(latency));
}

public void recordRequestRoutingLatency(String storeName, double latency) {
totalStats.recordRequestRoutingLatency(latency);
getStoreStats(storeName).recordRequestRoutingLatency(latency);
recordStoreStats(storeName, stats -> stats.recordRequestRoutingLatency(latency));
}

public void recordUnavailableRequest(String storeName) {
totalStats.recordUnavailableRequest();
getStoreStats(storeName).recordUnavailableRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordUnavailableRequest);
}

public void recordDelayConstraintAbortedRetryRequest(String storeName) {
totalStats.recordDelayConstraintAbortedRetryRequest();
getStoreStats(storeName).recordDelayConstraintAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordDelayConstraintAbortedRetryRequest);
}

public void recordSlowRouteAbortedRetryRequest(String storeName) {
totalStats.recordSlowRouteAbortedRetryRequest();
getStoreStats(storeName).recordSlowRouteAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordSlowRouteAbortedRetryRequest);
}

public void recordRetryRouteLimitAbortedRetryRequest(String storeName) {
totalStats.recordRetryRouteLimitAbortedRetryRequest();
getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordRetryRouteLimitAbortedRetryRequest);
}

public void recordKeySize(String storeName, long keySize) {
Expand All @@ -300,26 +311,26 @@ public void recordKeySize(String storeName, long keySize) {

public void recordAllowedRetryRequest(String storeName) {
totalStats.recordAllowedRetryRequest();
getStoreStats(storeName).recordAllowedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordAllowedRetryRequest);
}

public void recordDisallowedRetryRequest(String storeName) {
totalStats.recordDisallowedRetryRequest();
getStoreStats(storeName).recordDisallowedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordDisallowedRetryRequest);
}

public void recordNoAvailableReplicaAbortedRetryRequest(String storeName) {
totalStats.recordNoAvailableReplicaAbortedRetryRequest();
getStoreStats(storeName).recordRetryRouteLimitAbortedRetryRequest();
recordStoreStats(storeName, RouterHttpRequestStats::recordNoAvailableReplicaAbortedRetryRequest);
}

public void recordErrorRetryAttemptTriggeredByPendingRequestCheck(String storeName) {
totalStats.recordErrorRetryAttemptTriggeredByPendingRequestCheck();
getStoreStats(storeName).recordErrorRetryAttemptTriggeredByPendingRequestCheck();
recordStoreStats(storeName, RouterHttpRequestStats::recordErrorRetryAttemptTriggeredByPendingRequestCheck);
}

public void recordRetryDelay(String storeName, double delay) {
totalStats.recordRetryDelay(delay);
getStoreStats(storeName).recordRetryDelay(delay);
recordStoreStats(storeName, stats -> stats.recordRetryDelay(delay));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.tehuti.MockTehutiReporter;
import com.linkedin.venice.utils.Utils;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.tehuti.TehutiException;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeSuite;
Expand Down Expand Up @@ -87,4 +90,43 @@ public void testProfilingMetrics() {
Assert.assertEquals((int) reporter.query(".total--compute_response_size.3thPercentile").value(), 3);
Assert.assertEquals((int) reporter.query(".total--compute_response_size.4thPercentile").value(), 4);
}

@Test
public void testDisableMultiGetStoreMetrics() {
String clusterName = "test-cluster";
AggRouterHttpRequestStats multiGetStats = new AggRouterHttpRequestStats(
clusterName,
metricsRepository,
RequestType.MULTI_GET,
storeMetadataRepository,
true);
AggRouterHttpRequestStats streamingMultiGetStats = new AggRouterHttpRequestStats(
clusterName,
metricsRepository,
RequestType.MULTI_GET_STREAMING,
storeMetadataRepository,
true);
String storeName = Utils.getUniqueString("test-store");
multiGetStats.recordRequest(storeName);
streamingMultiGetStats.recordRequest(storeName);
multiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK);
streamingMultiGetStats.recordHealthyRequest(storeName, 10, HttpResponseStatus.OK);
// Total stats should exist for streaming and non-streaming multi-get
Assert.assertEquals((int) reporter.query(".total--multiget_request.Count").value(), 1);
Assert.assertEquals((int) reporter.query(".total--multiget_streaming_request.Count").value(), 1);
Assert.assertEquals((int) reporter.query(".total--multiget_healthy_request_latency.Max").value(), 10);
Assert.assertEquals((int) reporter.query(".total--multiget_streaming_healthy_request_latency.Max").value(), 10);
// Store level stats should only exist for streaming multi-get
Assert.assertEquals((int) reporter.query("." + storeName + "--multiget_streaming_request.Count").value(), 1);
Assert.assertEquals(
(int) reporter.query("." + storeName + "--multiget_streaming_healthy_request_latency.Max").value(),
10);
TehutiException exception =
Assert.expectThrows(TehutiException.class, () -> reporter.query("." + storeName + "--multiget_request.Count"));
Assert.assertTrue(exception.getMessage().contains("does not exist"));
exception = Assert.expectThrows(
TehutiException.class,
() -> reporter.query("." + storeName + "--multiget_healthy_request_latency.Max"));
Assert.assertTrue(exception.getMessage().contains("does not exist"));
}
}
Loading