From 9dbf4c61164eff2e90bd9a25da48d7b0e9a0f097 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Mon, 6 Mar 2017 09:18:47 -0800 Subject: [PATCH] Introduce configurable percentiles for wavefront sink (#222) This commit makes it possible for the end-user to specify what percentiles will be shipped out in the wavefront sink. The config for doing so is... goofy. This problem is discussed in #221. This commit resolves #218. Signed-off-by: Brian L. Troutwine --- src/config.rs | 80 +++++++++++++++++++++++++++++++++++++++++++ src/sink/wavefront.rs | 33 ++++++++++-------- 2 files changed, 99 insertions(+), 14 deletions(-) diff --git a/src/config.rs b/src/config.rs index 691bd6d2..7ed9f2d8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -80,11 +80,25 @@ pub fn parse_args() -> Args { // We read from CLI arguments None => { let wavefront = if args.is_present("wavefront") { + let percentiles = vec![("min".to_string(), 0.0), + ("max".to_string(), 1.0), + ("2".to_string(), 0.02), + ("9".to_string(), 0.09), + ("25".to_string(), 0.25), + ("50".to_string(), 0.5), + ("75".to_string(), 0.75), + ("90".to_string(), 0.90), + ("91".to_string(), 0.91), + ("95".to_string(), 0.95), + ("98".to_string(), 0.98), + ("99".to_string(), 0.99), + ("999".to_string(), 0.999)]; Some(WavefrontConfig { port: u16::from_str(args.value_of("wavefront-port").unwrap()).unwrap(), host: args.value_of("wavefront-host").unwrap().to_string(), bin_width: 1, config_path: "sinks.wavefront".to_string(), + percentiles: percentiles, tags: Default::default(), }) } else { @@ -189,6 +203,44 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { }; let wavefront = if value.lookup("wavefront").or(value.lookup("sinks.wavefront")).is_some() { + let mut prcnt = Vec::new(); + if let Some(tbl) = value.lookup("wavefront.percentiles") + .or(value.lookup("sinks.wavefront.percentiles")) + .and_then(|t| t.as_slice()) { + for opt_val in tbl.iter().map(|x| x.as_slice()) { + match opt_val { + Some(val) => { + let k: String = + val[0].as_str().expect("percentile name must be a string").to_string(); + let v: f64 = val[1] + .as_str() + .expect("percentile value must be a string of a float") + .parse() + .expect("percentile value must be a float"); + prcnt.push((k.clone(), v)); + } + None => {} + } + } + } + let percentiles = if prcnt.is_empty() { + vec![("min".to_string(), 0.0), + ("max".to_string(), 1.0), + ("2".to_string(), 0.02), + ("9".to_string(), 0.09), + ("25".to_string(), 0.25), + ("50".to_string(), 0.5), + ("75".to_string(), 0.75), + ("90".to_string(), 0.90), + ("91".to_string(), 0.91), + ("95".to_string(), 0.95), + ("98".to_string(), 0.98), + ("99".to_string(), 0.99), + ("999".to_string(), 0.999)] + } else { + prcnt + }; + Some(WavefrontConfig { port: value.lookup("wavefront.port") .or(value.lookup("sinks.wavefront.port")) @@ -208,6 +260,7 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .as_integer() .expect("could not parse sinks.wavefront.bin_width"), config_path: "sinks.wavefront".to_string(), + percentiles: percentiles, tags: tags.clone(), }) } else { @@ -1037,6 +1090,33 @@ bin_width = 9 assert_eq!(wavefront.bin_width, 9); } + #[test] + fn config_file_wavefront_percentile_specification() { + let config = r#" +[sinks] + [sinks.wavefront] + port = 3131 + host = "example.com" + bin_width = 9 + percentiles = [ ["min", "0.0"], ["max", "1.0"], ["median", "0.5"] ] +"# + .to_string(); + + let args = parse_config_file(config, 4); + + assert!(args.wavefront.is_some()); + let wavefront = args.wavefront.unwrap(); + assert_eq!(wavefront.host, String::from("example.com")); + assert_eq!(wavefront.port, 3131); + assert_eq!(wavefront.bin_width, 9); + + assert_eq!(wavefront.percentiles.len(), 3); + assert_eq!(wavefront.percentiles[0], ("min".to_string(), 0.0)); + assert_eq!(wavefront.percentiles[1], ("max".to_string(), 1.0)); + assert_eq!(wavefront.percentiles[2], ("median".to_string(), 0.5)); + } + + #[test] fn config_file_influxdb() { let config = r#" diff --git a/src/sink/wavefront.rs b/src/sink/wavefront.rs index cf81d9e0..ab270719 100644 --- a/src/sink/wavefront.rs +++ b/src/sink/wavefront.rs @@ -14,6 +14,7 @@ pub struct Wavefront { port: u16, aggrs: Buckets, delivery_attempts: u32, + percentiles: Vec<(String, f64)>, pub stats: String, } @@ -23,6 +24,7 @@ pub struct WavefrontConfig { pub host: String, pub port: u16, pub config_path: String, + pub percentiles: Vec<(String, f64)>, pub tags: TagMap, } @@ -63,6 +65,7 @@ impl Wavefront { port: config.port, aggrs: Buckets::new(config.bin_width), delivery_attempts: 0, + percentiles: config.percentiles, stats: String::with_capacity(8_192), } } @@ -95,20 +98,8 @@ impl Wavefront { } AggregationMethod::Summarize => { fmt_tags(&value.tags, &mut tag_buf); - for tup in &[("min", 0.0), - ("max", 1.0), - ("2", 0.02), - ("9", 0.09), - ("25", 0.25), - ("50", 0.5), - ("75", 0.75), - ("90", 0.90), - ("91", 0.91), - ("95", 0.95), - ("98", 0.98), - ("99", 0.99), - ("999", 0.999)] { - let stat: &str = tup.0; + for tup in self.percentiles.iter() { + let ref stat: String = tup.0; let quant: f64 = tup.1; self.stats.push_str(&value.name); self.stats.push_str("."); @@ -216,12 +207,26 @@ mod test { fn test_format_wavefront() { let mut tags = TagMap::default(); tags.insert("source".into(), "test-src".into()); + let percentiles = vec![("min".to_string(), 0.0), + ("max".to_string(), 1.0), + ("2".to_string(), 0.02), + ("9".to_string(), 0.09), + ("25".to_string(), 0.25), + ("50".to_string(), 0.5), + ("75".to_string(), 0.75), + ("90".to_string(), 0.90), + ("91".to_string(), 0.91), + ("95".to_string(), 0.95), + ("98".to_string(), 0.98), + ("99".to_string(), 0.99), + ("999".to_string(), 0.999)]; let config = WavefrontConfig { bin_width: 1, host: "127.0.0.1".to_string(), port: 1987, config_path: "sinks.wavefront".to_string(), tags: tags.clone(), + percentiles: percentiles, }; let mut wavefront = Wavefront::new(config); let dt_0 = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 00).timestamp();