Skip to content

Commit

Permalink
Introduce configurable percentiles for wavefront sink (#222)
Browse files Browse the repository at this point in the history
This commit makes it possible for the end-user to specify what
percentiles will be shipped out in the wavefront sink. The config
for doing so is... goofy. This problem is discussed in #221.

This commit resolves #218.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Mar 6, 2017
1 parent 301d104 commit 9dbf4c6
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 14 deletions.
80 changes: 80 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,25 @@ pub fn parse_args() -> Args {
// We read from CLI arguments
None => {
let wavefront = if args.is_present("wavefront") {
let percentiles = vec![("min".to_string(), 0.0),
("max".to_string(), 1.0),
("2".to_string(), 0.02),
("9".to_string(), 0.09),
("25".to_string(), 0.25),
("50".to_string(), 0.5),
("75".to_string(), 0.75),
("90".to_string(), 0.90),
("91".to_string(), 0.91),
("95".to_string(), 0.95),
("98".to_string(), 0.98),
("99".to_string(), 0.99),
("999".to_string(), 0.999)];
Some(WavefrontConfig {
port: u16::from_str(args.value_of("wavefront-port").unwrap()).unwrap(),
host: args.value_of("wavefront-host").unwrap().to_string(),
bin_width: 1,
config_path: "sinks.wavefront".to_string(),
percentiles: percentiles,
tags: Default::default(),
})
} else {
Expand Down Expand Up @@ -189,6 +203,44 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
};

let wavefront = if value.lookup("wavefront").or(value.lookup("sinks.wavefront")).is_some() {
let mut prcnt = Vec::new();
if let Some(tbl) = value.lookup("wavefront.percentiles")
.or(value.lookup("sinks.wavefront.percentiles"))
.and_then(|t| t.as_slice()) {
for opt_val in tbl.iter().map(|x| x.as_slice()) {
match opt_val {
Some(val) => {
let k: String =
val[0].as_str().expect("percentile name must be a string").to_string();
let v: f64 = val[1]
.as_str()
.expect("percentile value must be a string of a float")
.parse()
.expect("percentile value must be a float");
prcnt.push((k.clone(), v));
}
None => {}
}
}
}
let percentiles = if prcnt.is_empty() {
vec![("min".to_string(), 0.0),
("max".to_string(), 1.0),
("2".to_string(), 0.02),
("9".to_string(), 0.09),
("25".to_string(), 0.25),
("50".to_string(), 0.5),
("75".to_string(), 0.75),
("90".to_string(), 0.90),
("91".to_string(), 0.91),
("95".to_string(), 0.95),
("98".to_string(), 0.98),
("99".to_string(), 0.99),
("999".to_string(), 0.999)]
} else {
prcnt
};

Some(WavefrontConfig {
port: value.lookup("wavefront.port")
.or(value.lookup("sinks.wavefront.port"))
Expand All @@ -208,6 +260,7 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
.as_integer()
.expect("could not parse sinks.wavefront.bin_width"),
config_path: "sinks.wavefront".to_string(),
percentiles: percentiles,
tags: tags.clone(),
})
} else {
Expand Down Expand Up @@ -1037,6 +1090,33 @@ bin_width = 9
assert_eq!(wavefront.bin_width, 9);
}

#[test]
fn config_file_wavefront_percentile_specification() {
let config = r#"
[sinks]
[sinks.wavefront]
port = 3131
host = "example.com"
bin_width = 9
percentiles = [ ["min", "0.0"], ["max", "1.0"], ["median", "0.5"] ]
"#
.to_string();

let args = parse_config_file(config, 4);

assert!(args.wavefront.is_some());
let wavefront = args.wavefront.unwrap();
assert_eq!(wavefront.host, String::from("example.com"));
assert_eq!(wavefront.port, 3131);
assert_eq!(wavefront.bin_width, 9);

assert_eq!(wavefront.percentiles.len(), 3);
assert_eq!(wavefront.percentiles[0], ("min".to_string(), 0.0));
assert_eq!(wavefront.percentiles[1], ("max".to_string(), 1.0));
assert_eq!(wavefront.percentiles[2], ("median".to_string(), 0.5));
}


#[test]
fn config_file_influxdb() {
let config = r#"
Expand Down
33 changes: 19 additions & 14 deletions src/sink/wavefront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct Wavefront {
port: u16,
aggrs: Buckets,
delivery_attempts: u32,
percentiles: Vec<(String, f64)>,
pub stats: String,
}

Expand All @@ -23,6 +24,7 @@ pub struct WavefrontConfig {
pub host: String,
pub port: u16,
pub config_path: String,
pub percentiles: Vec<(String, f64)>,
pub tags: TagMap,
}

Expand Down Expand Up @@ -63,6 +65,7 @@ impl Wavefront {
port: config.port,
aggrs: Buckets::new(config.bin_width),
delivery_attempts: 0,
percentiles: config.percentiles,
stats: String::with_capacity(8_192),
}
}
Expand Down Expand Up @@ -95,20 +98,8 @@ impl Wavefront {
}
AggregationMethod::Summarize => {
fmt_tags(&value.tags, &mut tag_buf);
for tup in &[("min", 0.0),
("max", 1.0),
("2", 0.02),
("9", 0.09),
("25", 0.25),
("50", 0.5),
("75", 0.75),
("90", 0.90),
("91", 0.91),
("95", 0.95),
("98", 0.98),
("99", 0.99),
("999", 0.999)] {
let stat: &str = tup.0;
for tup in self.percentiles.iter() {
let ref stat: String = tup.0;
let quant: f64 = tup.1;
self.stats.push_str(&value.name);
self.stats.push_str(".");
Expand Down Expand Up @@ -216,12 +207,26 @@ mod test {
fn test_format_wavefront() {
let mut tags = TagMap::default();
tags.insert("source".into(), "test-src".into());
let percentiles = vec![("min".to_string(), 0.0),
("max".to_string(), 1.0),
("2".to_string(), 0.02),
("9".to_string(), 0.09),
("25".to_string(), 0.25),
("50".to_string(), 0.5),
("75".to_string(), 0.75),
("90".to_string(), 0.90),
("91".to_string(), 0.91),
("95".to_string(), 0.95),
("98".to_string(), 0.98),
("99".to_string(), 0.99),
("999".to_string(), 0.999)];
let config = WavefrontConfig {
bin_width: 1,
host: "127.0.0.1".to_string(),
port: 1987,
config_path: "sinks.wavefront".to_string(),
tags: tags.clone(),
percentiles: percentiles,
};
let mut wavefront = Wavefront::new(config);
let dt_0 = UTC.ymd(1990, 6, 12).and_hms_milli(9, 10, 11, 00).timestamp();
Expand Down

0 comments on commit 9dbf4c6

Please sign in to comment.