Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13911: Fix the rate window size calculation for edge cases #12184

Merged
merged 1 commit into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ public long windowSize(MetricConfig config, long now) {
if (numFullWindows < minFullWindows)
totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();

return totalElapsedTimeMs;
// If window size is being calculated at the exact beginning of the window with no prior samples, the window size
// will result in a value of 0. Calculation of rate over a window is size 0 is undefined, hence, we assume the
// minimum window size to be at least 1ms.
return Math.max(totalElapsedTimeMs, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
Expand Down Expand Up @@ -607,15 +608,15 @@ public void testRateWindowing() throws Exception {
// Sleep for half the window.
time.sleep(cfg.timeWindowMs() / 2);

// prior to any time passing
double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
// prior to any time passing, elapsedSecs = sampleWindowSize * (total samples - half of final sample)
double elapsedSecs = MetricsUtils.convert(cfg.timeWindowMs(), TimeUnit.SECONDS) * (cfg.samples() - 0.5);

KafkaMetric rateMetric = metrics.metrics().get(rateMetricName);
KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName);
assertEquals(sum / elapsedSecs, (Double) rateMetric.metricValue(), EPS, "Rate(0...2) = 2.666");
assertEquals(count / elapsedSecs, (Double) countRateMetric.metricValue(), EPS, "Count rate(0...2) = 0.02666");
assertEquals(elapsedSecs,
((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS, "Elapsed Time = 75 seconds");
MetricsUtils.convert(((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()), TimeUnit.SECONDS), EPS, "Elapsed Time = 75 seconds");
assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

public class RateTest {
private static final double EPS = 0.000001;
private Rate r;
private Time timeClock;

@BeforeEach
public void setup() {
r = new Rate();
timeClock = new MockTime();
}

// Tests the scenario where the recording and measurement is done before the window for first sample finishes
// with no prior samples retained.
@ParameterizedTest
@CsvSource({"1,1", "1,11", "11,1", "11,11"})
public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowSizeSec) {
final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS);
final double sampleValue = 50.0;
// record at beginning of the window
r.record(config, sampleValue, timeClock.milliseconds());
// forward time till almost the end of window
final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1;
timeClock.sleep(measurementTime);
// calculate rate at almost the end of window
final double observedRate = r.measure(config, timeClock.milliseconds());
assertFalse(Double.isNaN(observedRate));

// In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes
// presence of N-1 (where N = numSample) prior samples with sample values of 0. Hence, the window size for rate
// calculation accounts for N-1 prior samples
final int dummyPriorSamplesAssumedByAlgorithm = numSample - 1;
final double windowSize = MetricsUtils.convert(measurementTime, TimeUnit.SECONDS) + (dummyPriorSamplesAssumedByAlgorithm * sampleWindowSizeSec);
double expectedRatePerSec = sampleValue / windowSize;
assertEquals(expectedRatePerSec, observedRate, EPS);
}
}