Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,29 @@ public synchronized MutableQuantiles newQuantiles(String name, String desc,
return ret;
}

/**
* Create a mutable inverse metric that estimates inverse quantiles of a stream of values
* @param name of the metric
* @param desc metric description
* @param sampleName of the metric (e.g., "Ops")
* @param valueName of the metric (e.g., "Rate")
* @param interval rollover interval of estimator in seconds
* @return a new inverse quantile estimator object
* @throws MetricsException if interval is not a positive integer
*/
public synchronized MutableQuantiles newInverseQuantiles(String name, String desc,
String sampleName, String valueName, int interval) {
checkMetricName(name);
if (interval <= 0) {
throw new MetricsException("Interval should be positive. Value passed" +
" is: " + interval);
}
MutableQuantiles ret =
new MutableInverseQuantiles(name, desc, sampleName, valueName, interval);
metricsMap.put(name, ret);
return ret;
}

/**
* Create a mutable metric with stats
* @param name of the metric
Expand Down Expand Up @@ -278,7 +301,7 @@ public MutableRate newRate(String name, String description) {
}

/**
* Create a mutable rate metric (for throughput measurement)
* Create a mutable rate metric (for throughput measurement).
* @param name of the metric
* @param desc description
* @param extended produce extended stat (stdev/min/max etc.) if true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.lib;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.SampleQuantiles;
import java.text.DecimalFormat;
import static org.apache.hadoop.metrics2.lib.Interns.info;

/**
* Watches a stream of long values, maintaining online estimates of specific
* quantiles with provably low error bounds. Inverse quantiles are meant for
* highly accurate low-percentile (e.g. 1st, 5th) metrics.
* InverseQuantiles are used for metrics where higher the value better it is.
* ( eg: data transfer rate ).
* The 1st percentile here corresponds to the 99th inverse percentile metric,
* 5th percentile to 95th and so on.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class MutableInverseQuantiles extends MutableQuantiles{

static class InversePercentile extends Quantile {
InversePercentile(double inversePercentile) {
super(inversePercentile/100, inversePercentile/1000);
}
}

@VisibleForTesting
public static final Quantile[] INVERSE_QUANTILES = {new InversePercentile(50),
new InversePercentile(25), new InversePercentile(10),
new InversePercentile(5), new InversePercentile(1)};

/**
* Instantiates a new {@link MutableInverseQuantiles} for a metric that rolls itself
* over on the specified time interval.
*
* @param name of the metric
* @param description long-form textual description of the metric
* @param sampleName type of items in the stream (e.g., "Ops")
* @param valueName type of the values
* @param intervalSecs rollover interval (in seconds) of the estimator
*/
public MutableInverseQuantiles(String name, String description, String sampleName,
String valueName, int intervalSecs) {
super(name, description, sampleName, valueName, intervalSecs);
}

/**
* Sets quantileInfo and estimator.
*
* @param ucName capitalized name of the metric
* @param uvName capitalized type of the values
* @param desc uncapitalized long-form textual description of the metric
* @param lvName uncapitalized type of the values
* @param df Number formatter for inverse percentile value
*/
void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat df) {
// Construct the MetricsInfos for inverse quantiles, converting to inverse percentiles
setQuantileInfos(INVERSE_QUANTILES.length);
for (int i = 0; i < INVERSE_QUANTILES.length; i++) {
double inversePercentile = 100 * (1 - INVERSE_QUANTILES[i].quantile);
String nameTemplate = ucName + df.format(inversePercentile) + "thInversePercentile" + uvName;
String descTemplate = df.format(inversePercentile) + " inverse percentile " + lvName
+ " with " + getInterval() + " second interval for " + desc;
addQuantileInfo(i, info(nameTemplate, descTemplate));
}

setEstimator(new SampleQuantiles(INVERSE_QUANTILES));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hadoop.metrics2.lib.Interns.info;

import java.text.DecimalFormat;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -52,9 +53,10 @@ public class MutableQuantiles extends MutableMetric {
new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };

private final MetricsInfo numInfo;
private final MetricsInfo[] quantileInfos;
private final int interval;
private MetricsInfo numInfo;
private MetricsInfo[] quantileInfos;
private int intervalSecs;
private static DecimalFormat decimalFormat = new DecimalFormat("###.####");

private QuantileEstimator estimator;
private long previousCount = 0;
Expand Down Expand Up @@ -91,26 +93,39 @@ public MutableQuantiles(String name, String description, String sampleName,
String lsName = StringUtils.uncapitalize(sampleName);
String lvName = StringUtils.uncapitalize(valueName);

numInfo = info(ucName + "Num" + usName, String.format(
"Number of %s for %s with %ds interval", lsName, desc, interval));
setInterval(interval);
setNumInfo(info(ucName + "Num" + usName, String.format(
"Number of %s for %s with %ds interval", lsName, desc, interval)));
scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
interval, interval, TimeUnit.SECONDS);
setQuantiles(ucName, uvName, desc, lvName, decimalFormat);
}

/**
* Sets quantileInfo and estimator.
*
* @param ucName capitalized name of the metric
* @param uvName capitalized type of the values
* @param desc uncapitalized long-form textual description of the metric
* @param lvName uncapitalized type of the values
* @param pDecimalFormat Number formatter for percentile value
*/
void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat pDecimalFormat) {
// Construct the MetricsInfos for the quantiles, converting to percentiles
quantileInfos = new MetricsInfo[quantiles.length];
String nameTemplate = ucName + "%dthPercentile" + uvName;
String descTemplate = "%d percentile " + lvName + " with " + interval
+ " second interval for " + desc;
setQuantileInfos(quantiles.length);
for (int i = 0; i < quantiles.length; i++) {
int percentile = (int) (100 * quantiles[i].quantile);
quantileInfos[i] = info(String.format(nameTemplate, percentile),
String.format(descTemplate, percentile));
double percentile = 100 * quantiles[i].quantile;
String nameTemplate = ucName + pDecimalFormat.format(percentile) + "thPercentile" + uvName;
String descTemplate = pDecimalFormat.format(percentile) + " percentile " + lvName
+ " with " + getInterval() + " second interval for " + desc;
addQuantileInfo(i, info(nameTemplate, descTemplate));
}

estimator = new SampleQuantiles(quantiles);

this.interval = interval;
scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
interval, interval, TimeUnit.SECONDS);
setEstimator(new SampleQuantiles(quantiles));
}

public MutableQuantiles() {}

@Override
public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
if (all || changed()) {
Expand All @@ -133,8 +148,50 @@ public synchronized void add(long value) {
estimator.insert(value);
}

public int getInterval() {
return interval;
/**
* Set info about the metrics.
*
* @param pNumInfo info about the metrics.
*/
public synchronized void setNumInfo(MetricsInfo pNumInfo) {
this.numInfo = pNumInfo;
}

/**
* Initialize quantileInfos array.
*
* @param length of the quantileInfos array.
*/
public synchronized void setQuantileInfos(int length) {
this.quantileInfos = new MetricsInfo[length];
}

/**
* Add entry to quantileInfos array.
*
* @param i array index.
* @param info info to be added to quantileInfos array.
*/
public synchronized void addQuantileInfo(int i, MetricsInfo info) {
this.quantileInfos[i] = info;
}

/**
* Set the rollover interval (in seconds) of the estimator.
*
* @param pIntervalSecs of the estimator.
*/
public synchronized void setInterval(int pIntervalSecs) {
this.intervalSecs = pIntervalSecs;
}

/**
* Get the rollover interval (in seconds) of the estimator.
*
* @return intervalSecs of the estimator.
*/
public synchronized int getInterval() {
return intervalSecs;
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.metrics2.lib.MutableInverseQuantiles;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -36,6 +37,7 @@ public class TestSampleQuantiles {
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };

SampleQuantiles estimator;
final static int NUM_REPEATS = 10;

@Before
public void init() {
Expand Down Expand Up @@ -91,28 +93,70 @@ public void testClear() throws IOException {
@Test
public void testQuantileError() throws IOException {
final int count = 100000;
Random r = new Random(0xDEADDEAD);
Long[] values = new Long[count];
Random rnd = new Random(0xDEADDEAD);
int[] values = new int[count];
for (int i = 0; i < count; i++) {
values[i] = (long) (i + 1);
values[i] = i + 1;
}
// Do 10 shuffle/insert/check cycles
for (int i = 0; i < 10; i++) {
System.out.println("Starting run " + i);
Collections.shuffle(Arrays.asList(values), r);

// Repeat shuffle/insert/check cycles 10 times
for (int i = 0; i < NUM_REPEATS; i++) {

// Shuffle
Collections.shuffle(Arrays.asList(values), rnd);
estimator.clear();
for (int j = 0; j < count; j++) {
estimator.insert(values[j]);

// Insert
for (int value : values) {
estimator.insert(value);
}
Map<Quantile, Long> snapshot;
snapshot = estimator.snapshot();

// Check
for (Quantile q : quantiles) {
long actual = (long) (q.quantile * count);
long error = (long) (q.error * count);
long estimate = snapshot.get(q);
System.out
.println(String.format("Expected %d with error %d, estimated %d",
actual, error, estimate));
assertThat(estimate <= actual + error).isTrue();
assertThat(estimate >= actual - error).isTrue();
}
}
}

/**
* Correctness test that checks that absolute error of the estimate for inverse quantiles
* is within specified error bounds for some randomly permuted streams of items.
*/
@Test
public void testInverseQuantiles() throws IOException {
SampleQuantiles inverseQuantilesEstimator =
new SampleQuantiles(MutableInverseQuantiles.INVERSE_QUANTILES);
final int count = 100000;
Random rnd = new Random(0xDEADDEAD);
int[] values = new int[count];
for (int i = 0; i < count; i++) {
values[i] = i + 1;
}

// Repeat shuffle/insert/check cycles 10 times
for (int i = 0; i < NUM_REPEATS; i++) {
// Shuffle
Collections.shuffle(Arrays.asList(values), rnd);
inverseQuantilesEstimator.clear();

// Insert
for (int value : values) {
inverseQuantilesEstimator.insert(value);
}
Map<Quantile, Long> snapshot;
snapshot = inverseQuantilesEstimator.snapshot();

// Check
for (Quantile q : MutableInverseQuantiles.INVERSE_QUANTILES) {
long actual = (long) (q.quantile * count);
long error = (long) (q.error * count);
long estimate = snapshot.get(q);
assertThat(estimate <= actual + error).isTrue();
assertThat(estimate >= actual - error).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,13 +392,34 @@ public static void assertQuantileGauges(String prefix,
*/
public static void assertQuantileGauges(String prefix,
MetricsRecordBuilder rb, String valueName) {
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0l));
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
for (Quantile q : MutableQuantiles.quantiles) {
String nameTemplate = prefix + "%dthPercentile" + valueName;
int percentile = (int) (100 * q.quantile);
verify(rb).addGauge(
eqName(info(String.format(nameTemplate, percentile), "")),
geq(0l));
geq(0L));
}
}

/**
* Asserts that the NumOps and inverse quantiles for a metric have been changed at
* some point to a non-zero value, for the specified value name of the
* metrics (e.g., "Rate").
*
* @param prefix of the metric
* @param rb MetricsRecordBuilder with the metric
* @param valueName the value name for the metric
*/
public static void assertInverseQuantileGauges(String prefix,
MetricsRecordBuilder rb, String valueName) {
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
for (Quantile q : MutableQuantiles.quantiles) {
String nameTemplate = prefix + "%dthInversePercentile" + valueName;
int percentile = (int) (100 * q.quantile);
verify(rb).addGauge(
eqName(info(String.format(nameTemplate, percentile), "")),
geq(0L));
}
}

Expand Down
Loading