Skip to content

Commit

Permalink
Fix the rate window size calculation for edge cases (#12184)
Browse files Browse the repository at this point in the history
## Problem
Implementation of connection creation rate quotas in Kafka is dependent on two configurations:
[quota.window.num](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.num) AND [quota.window.size.seconds](https://kafka.apache.org/documentation.html#brokerconfigs_quota.window.size.seconds)

The minimum possible values of these configuration is 1 as per the documentation. However, when we set 1 as the configuration value, we can hit a situation where rate is calculated as NaN (and hence, leads to exceptions). This specific scenario occurs when an event is recorded at the start of a sample window.

## Solution
This patch fixes this edge case by ensuring that the windowSize over which Rate is calculated is at least 1ms (even if it is calculated at the start of the sample window).

## Test
Added a unit test which fails before the patch and passes after the patch

Reviewers: Ismael Juma <ismael@juma.me.uk>, David Mao <dmao@confluent.io>
  • Loading branch information
divijvaidya authored Aug 10, 2022
1 parent 3494d6e commit f17928e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ public long windowSize(MetricConfig config, long now) {
if (numFullWindows < minFullWindows)
totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();

return totalElapsedTimeMs;
// If window size is being calculated at the exact beginning of the window with no prior samples, the window size
// will result in a value of 0. Calculation of rate over a window is size 0 is undefined, hence, we assume the
// minimum window size to be at least 1ms.
return Math.max(totalElapsedTimeMs, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
Expand Down Expand Up @@ -607,15 +608,15 @@ public void testRateWindowing() throws Exception {
// Sleep for half the window.
time.sleep(cfg.timeWindowMs() / 2);

// prior to any time passing
double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
// prior to any time passing, elapsedSecs = sampleWindowSize * (total samples - half of final sample)
double elapsedSecs = MetricsUtils.convert(cfg.timeWindowMs(), TimeUnit.SECONDS) * (cfg.samples() - 0.5);

KafkaMetric rateMetric = metrics.metrics().get(rateMetricName);
KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName);
assertEquals(sum / elapsedSecs, (Double) rateMetric.metricValue(), EPS, "Rate(0...2) = 2.666");
assertEquals(count / elapsedSecs, (Double) countRateMetric.metricValue(), EPS, "Count rate(0...2) = 0.02666");
assertEquals(elapsedSecs,
((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS, "Elapsed Time = 75 seconds");
MetricsUtils.convert(((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()), TimeUnit.SECONDS), EPS, "Elapsed Time = 75 seconds");
assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics.stats;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

public class RateTest {
private static final double EPS = 0.000001;
private Rate r;
private Time timeClock;

@BeforeEach
public void setup() {
r = new Rate();
timeClock = new MockTime();
}

// Tests the scenario where the recording and measurement is done before the window for first sample finishes
// with no prior samples retained.
@ParameterizedTest
@CsvSource({"1,1", "1,11", "11,1", "11,11"})
public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowSizeSec) {
final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS);
final double sampleValue = 50.0;
// record at beginning of the window
r.record(config, sampleValue, timeClock.milliseconds());
// forward time till almost the end of window
final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1;
timeClock.sleep(measurementTime);
// calculate rate at almost the end of window
final double observedRate = r.measure(config, timeClock.milliseconds());
assertFalse(Double.isNaN(observedRate));

// In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes
// presence of N-1 (where N = numSample) prior samples with sample values of 0. Hence, the window size for rate
// calculation accounts for N-1 prior samples
final int dummyPriorSamplesAssumedByAlgorithm = numSample - 1;
final double windowSize = MetricsUtils.convert(measurementTime, TimeUnit.SECONDS) + (dummyPriorSamplesAssumedByAlgorithm * sampleWindowSizeSec);
double expectedRatePerSec = sampleValue / windowSize;
assertEquals(expectedRatePerSec, observedRate, EPS);
}
}

0 comments on commit f17928e

Please sign in to comment.