Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce configurable percentiles for wavefront sink #222

Merged
merged 1 commit into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,25 @@ pub fn parse_args() -> Args {
// We read from CLI arguments
None => {
let wavefront = if args.is_present("wavefront") {
let percentiles = vec![("min".to_string(), 0.0),
("max".to_string(), 1.0),
("2".to_string(), 0.02),
("9".to_string(), 0.09),
("25".to_string(), 0.25),
("50".to_string(), 0.5),
("75".to_string(), 0.75),
("90".to_string(), 0.90),
("91".to_string(), 0.91),
("95".to_string(), 0.95),
("98".to_string(), 0.98),
("99".to_string(), 0.99),
("999".to_string(), 0.999)];
Some(WavefrontConfig {
port: u16::from_str(args.value_of("wavefront-port").unwrap()).unwrap(),
host: args.value_of("wavefront-host").unwrap().to_string(),
bin_width: 1,
config_path: "sinks.wavefront".to_string(),
percentiles: percentiles,
tags: Default::default(),
})
} else {
Expand Down Expand Up @@ -189,6 +203,44 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
};

let wavefront = if value.lookup("wavefront").or(value.lookup("sinks.wavefront")).is_some() {
let mut prcnt = Vec::new();
if let Some(tbl) = value.lookup("wavefront.percentiles")
.or(value.lookup("sinks.wavefront.percentiles"))
.and_then(|t| t.as_slice()) {
for opt_val in tbl.iter().map(|x| x.as_slice()) {
match opt_val {
Some(val) => {
let k: String =
val[0].as_str().expect("percentile name must be a string").to_string();
let v: f64 = val[1]
.as_str()
.expect("percentile value must be a string of a float")
.parse()
.expect("percentile value must be a float");
prcnt.push((k.clone(), v));
}
None => {}
}
}
}
let percentiles = if prcnt.is_empty() {
vec![("min".to_string(), 0.0),
("max".to_string(), 1.0),
("2".to_string(), 0.02),
("9".to_string(), 0.09),
("25".to_string(), 0.25),
("50".to_string(), 0.5),
("75".to_string(), 0.75),
("90".to_string(), 0.90),
("91".to_string(), 0.91),
("95".to_string(), 0.95),
("98".to_string(), 0.98),
("99".to_string(), 0.99),
("999".to_string(), 0.999)]
} else {
prcnt
};

Some(WavefrontConfig {
port: value.lookup("wavefront.port")
.or(value.lookup("sinks.wavefront.port"))
Expand All @@ -208,6 +260,7 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
.as_integer()
.expect("could not parse sinks.wavefront.bin_width"),
config_path: "sinks.wavefront".to_string(),
percentiles: percentiles,
tags: tags.clone(),
})
} else {
Expand Down Expand Up @@ -1037,6 +1090,33 @@ bin_width = 9
assert_eq!(wavefront.bin_width, 9);
}

#[test]
fn config_file_wavefront_percentile_specification() {
let config = r#"
[sinks]
[sinks.wavefront]
port = 3131
host = "example.com"
bin_width = 9
percentiles = [ ["min", "0.0"], ["max", "1.0"], ["median", "0.5"] ]
"#
.to_string();

let args = parse_config_file(config, 4);

assert!(args.wavefront.is_some());
let wavefront = args.wavefront.unwrap();
assert_eq!(wavefront.host, String::from("example.com"));
assert_eq!(wavefront.port, 3131);
assert_eq!(wavefront.bin_width, 9);

assert_eq!(wavefront.percentiles.len(), 3);
assert_eq!(wavefront.percentiles[0], ("min".to_string(), 0.0));
assert_eq!(wavefront.percentiles[1], ("max".to_string(), 1.0));
assert_eq!(wavefront.percentiles[2], ("median".to_string(), 0.5));
}


#[test]
fn config_file_influxdb() {
let config = r#"
Expand Down
33 changes: 19 additions & 14 deletions src/sink/wavefront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct Wavefront {
port: u16,
aggrs: Buckets,
delivery_attempts: u32,
percentiles: Vec<(String, f64)>,
pub stats: String,
}

Expand All @@ -23,6 +24,7 @@ pub struct WavefrontConfig {
pub host: String,
pub port: u16,
pub config_path: String,
pub percentiles: Vec<(String, f64)>,
pub tags: TagMap,
}

Expand Down Expand Up @@ -63,6 +65,7 @@ impl Wavefront {
port: config.port,
aggrs: Buckets::new(config.bin_width),
delivery_attempts: 0,
percentiles: config.percentiles,
stats: String::with_capacity(8_192),
}
}
Expand Down Expand Up @@ -95,20 +98,8 @@ impl Wavefront {
}
AggregationMethod::Summarize => {
fmt_tags(&value.tags, &mut tag_buf);
for tup in &[("min", 0.0),
("max", 1.0),
("2", 0.02),
("9", 0.09),
("25", 0.25),
("50", 0.5),
("75", 0.75),
("90", 0.90),
("91", 0.91),
("95", 0.95),
("98", 0.98),
("99", 0.99),
("999", 0.999)] {
let stat: &str = tup.0;
for tup in self.percentiles.iter() {
let ref stat: String = tup.0;
let quant: f64 = tup.1;
self.stats.push_str(&value.name);
self.stats.push_str(".");
Expand Down Expand Up @@ -216,12 +207,26 @@ mod test {
fn test_format_wavefront() {
let mut tags = TagMap::default();
tags.insert("source".into(), "test-src".into());
let percentiles = vec![("min".to_string(), 0.0),
("max".to_string(), 1.0),
("2".to_string(), 0.02),
("9".to_string(), 0.09),
("25".to_string(), 0.25),
("50".to_string(), 0.5),
("75".to_string(), 0.75),
("90".to_string(), 0.90),
("91".to_string(), 0.91),
("95".to_string(), 0.95),
("98".to_string(), 0.98),
("99".to_string(), 0.99),
("999".to_string(), 0.999)];
let config = WavefrontConfig {
bin_width: 1,
host: "127.0.0.1".to_string(),
port: 1987,
config_path: "sinks.wavefront".to_string(),
tags: tags.clone(),
percentiles: percentiles,
};
let mut wavefront = Wavefront::new(config);
let dt_0 = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 00).timestamp();
Expand Down