From 3f23c8104332dc2878ce00cbf57499f6ab978363 Mon Sep 17 00:00:00 2001 From: Divij Vaidya Date: Thu, 19 May 2022 18:35:08 +0200 Subject: [PATCH] Fix the rate window size calculation for edge cases --- .../kafka/common/metrics/stats/Rate.java | 5 +- .../kafka/common/metrics/MetricsTest.java | 7 +- .../kafka/common/metrics/stats/RateTest.java | 67 +++++++++++++++++++ 3 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index c6b8574186a88..09b7c05c8f283 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -91,7 +91,10 @@ public long windowSize(MetricConfig config, long now) { if (numFullWindows < minFullWindows) totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs(); - return totalElapsedTimeMs; + // If window size is being calculated at the exact beginning of the window with no prior samples, the window size + // will result in a value of 0. Calculation of rate over a window is size 0 is undefined, hence, we assume the + // minimum window size to be at least 1ms. + return Math.max(totalElapsedTimeMs, 1); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 3dd114d9fd4b1..bc1fc5d9e5624 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.internals.MetricsUtils; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; @@ -607,15 +608,15 @@ public void testRateWindowing() throws Exception { // Sleep for half the window. time.sleep(cfg.timeWindowMs() / 2); - // prior to any time passing - double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; + // prior to any time passing, elapsedSecs = sampleWindowSize * (total samples - half of final sample) + double elapsedSecs = MetricsUtils.convert(cfg.timeWindowMs(), TimeUnit.SECONDS) * (cfg.samples() - 0.5); KafkaMetric rateMetric = metrics.metrics().get(rateMetricName); KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName); assertEquals(sum / elapsedSecs, (Double) rateMetric.metricValue(), EPS, "Rate(0...2) = 2.666"); assertEquals(count / elapsedSecs, (Double) countRateMetric.metricValue(), EPS, "Count rate(0...2) = 0.02666"); assertEquals(elapsedSecs, - ((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS, "Elapsed Time = 75 seconds"); + MetricsUtils.convert(((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()), TimeUnit.SECONDS), EPS, "Elapsed Time = 75 seconds"); assertEquals(sum, (Double) totalMetric.metricValue(), EPS); assertEquals(count, (Double) countTotalMetric.metricValue(), EPS); diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java new file mode 100644 index 0000000000000..04c5ca1292f98 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.internals.MetricsUtils; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class RateTest { + private static final double EPS = 0.000001; + private Rate r; + private Time timeClock; + + @BeforeEach + public void setup() { + r = new Rate(); + timeClock = new MockTime(); + } + + // Tests the scenario where the recording and measurement is done before the window for first sample finishes + // with no prior samples retained. + @ParameterizedTest + @CsvSource({"1,1", "1,11", "11,1", "11,11"}) + public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowSizeSec) { + final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS); + final double sampleValue = 50.0; + // record at beginning of the window + r.record(config, sampleValue, timeClock.milliseconds()); + // forward time till almost the end of window + final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1; + timeClock.sleep(measurementTime); + // calculate rate at almost the end of window + final double observedRate = r.measure(config, timeClock.milliseconds()); + assertFalse(Double.isNaN(observedRate)); + + // In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes + // presence of N-1 (where N = numSample) prior samples with sample values of 0. Hence, the window size for rate + // calculation accounts for N-1 prior samples + final int dummyPriorSamplesAssumedByAlgorithm = numSample - 1; + final double windowSize = MetricsUtils.convert(measurementTime, TimeUnit.SECONDS) + (dummyPriorSamplesAssumedByAlgorithm * sampleWindowSizeSec); + double expectedRatePerSec = sampleValue / windowSize; + assertEquals(expectedRatePerSec, observedRate, EPS); + } +}