From af1287323b6b35fe1cc48ae0c49a073fdcbe11a4 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Sun, 15 Jan 2017 10:23:36 -0800 Subject: [PATCH] Introduce more lints into cernan crate root (#190) Enabling lints is something that was not done early in the project and now there's a backlog of work to be done. You'll note that the missing_docs lint is still disabled and this represents the bulk of the work that needs doing. Signed-off-by: Brian L. Troutwine --- examples/configs/basic.toml | 2 +- src/config.rs | 4 ---- src/filter/programmable_filter.rs | 27 +++++++++++++++------------ src/lib.rs | 22 ++++++++++++++++++++++ src/metric/telemetry.rs | 4 +--- src/protocols/native.rs | 1 + src/protocols/prometheus.rs | 1 + src/sink/console.rs | 29 +++++++++++++++++++++++++++++ src/sink/mod.rs | 5 +++++ 9 files changed, 75 insertions(+), 20 deletions(-) diff --git a/examples/configs/basic.toml b/examples/configs/basic.toml index 971a6177..056391fa 100644 --- a/examples/configs/basic.toml +++ b/examples/configs/basic.toml @@ -12,7 +12,7 @@ source = "cernan" port = 8125 forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"] - [sources.native_server] + [sources.native] ip = "127.0.0.1" port = 1972 forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"] diff --git a/src/config.rs b/src/config.rs index 34f56188..df0943c5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -181,7 +181,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .or(value.lookup("sinks.console.bin_width")) .unwrap_or(&Value::Integer(1)) .as_integer() - .map(|i| i as i64) .unwrap(), config_path: "sinks.console".to_string(), }) @@ -207,7 +206,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .or(value.lookup("sinks.wavefront.bin_width")) .unwrap_or(&Value::Integer(1)) .as_integer() - .map(|i| i as i64) .unwrap(), config_path: "sinks.wavefront".to_string(), tags: tags.clone(), @@ -234,7 +232,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .or(value.lookup("sinks.influxdb.bin_width")) .unwrap_or(&Value::Integer(1)) .as_integer() - .map(|i| i as i64) .unwrap(), config_path: "sinks.influxdb".to_string(), tags: tags.clone(), @@ -261,7 +258,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args { .or(value.lookup("sinks.prometheus.bin_width")) .unwrap_or(&Value::Integer(1)) .as_integer() - .map(|i| i as i64) .unwrap(), config_path: "sinks.prometheus".to_string(), }) diff --git a/src/filter/programmable_filter.rs b/src/filter/programmable_filter.rs index 9f172f45..7a2c3cdd 100644 --- a/src/filter/programmable_filter.rs +++ b/src/filter/programmable_filter.rs @@ -57,7 +57,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_metric_name(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len()); + let idx = idx(state.to_integer(2), (*pyld).metrics.len()); state.push_string(&(*pyld).metrics[idx].name); 1 } @@ -66,7 +66,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_set_metric_name(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len()); + let idx = idx(state.to_integer(2), (*pyld).metrics.len()); (*pyld).metrics[idx].name = state.check_string(3).into(); 0 } @@ -75,7 +75,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_push_metric(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let val = state.to_number(3) as f64; + let val = state.to_number(3); match state.to_str(2) { Some(name) => { let m = metric::Telemetry::new(name, val) @@ -110,7 +110,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_metric_value(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len()); + let idx = idx(state.to_integer(2), (*pyld).metrics.len()); match (*pyld).metrics[idx].value() { Some(v) => { state.push_number(v); @@ -126,7 +126,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_log_tag_value(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).logs.len()); + let idx = idx(state.to_integer(2), (*pyld).logs.len()); match state.to_str(3).map(|k| k.to_owned()) { Some(key) => { match (*pyld).logs[idx].tags.get(&key) { @@ -150,7 +150,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_metric_tag_value(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len()); + let idx = idx(state.to_integer(2), (*pyld).metrics.len()); match state.to_str(3).map(|k| k.to_owned()) { Some(key) => { match (*pyld).metrics[idx].tags.get(&key) { @@ -174,7 +174,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_metric_set_tag(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len()); + let idx = idx(state.to_integer(2), (*pyld).metrics.len()); match state.to_str(3).map(|k| k.to_owned()) { Some(key) => { match state.to_str(4).map(|v| v.to_owned()) { @@ -206,7 +206,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_log_set_tag(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).logs.len()); + let idx = idx(state.to_integer(2), (*pyld).logs.len()); match state.to_str(3).map(|k| k.to_owned()) { Some(key) => { match state.to_str(4).map(|v| v.to_owned()) { @@ -238,7 +238,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_metric_remove_tag(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len()); + let idx = idx(state.to_integer(2), (*pyld).metrics.len()); match state.to_str(3).map(|k| k.to_owned()) { Some(key) => { match sync::Arc::make_mut(&mut (*pyld).metrics[idx].tags).remove(&key) { @@ -262,7 +262,7 @@ impl<'a> Payload<'a> { unsafe extern "C" fn lua_log_remove_tag(L: *mut lua_State) -> c_int { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; - let idx = idx(state.to_integer(2) as i64, (*pyld).logs.len()); + let idx = idx(state.to_integer(2), (*pyld).logs.len()); match state.to_str(3).map(|k| k.to_owned()) { Some(key) => { match (*pyld).logs[idx].tags.remove(&key) { @@ -287,7 +287,7 @@ impl<'a> Payload<'a> { let mut state = State::from_ptr(L); let pyld = state.to_userdata(1) as *mut Payload; let prcnt = state.to_number(2); - let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len()); + let idx = idx(state.to_integer(2), (*pyld).metrics.len()); match (*pyld).metrics[idx].query(prcnt) { Some(v) => { state.push_number(v); @@ -370,7 +370,10 @@ impl ProgrammableFilter { } impl filter::Filter for ProgrammableFilter { - fn process(&mut self, event: metric::Event, res: &mut Vec) -> Result<(), filter::FilterError> { + fn process(&mut self, + event: metric::Event, + res: &mut Vec) + -> Result<(), filter::FilterError> { match event { metric::Event::Telemetry(mut m) => { self.state.get_global("process_metric"); diff --git a/src/lib.rs b/src/lib.rs index 50d122a4..44783bb1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,25 @@ +//! Cernan is a telemetry and logging aggregation server. It exposes multiple +//! interfaces for ingestion and can emit to mutiple aggregation sources while +//! doing in-flight manipulation of data. Cernan has minimal CPU and memory +//! requirements and is intended to service bursty telemetry _without_ load +//! shedding. Cernan aims to be _reliable_ and _convenient_ to use, both for +//! application engineers and operations staff. +//! +//! Why you might choose to use cernan: +//! +//! * You need to ingest telemetry from multiple protocols. +//! * You need to multiplex telemetry over aggregation services. +//! * You want to convert log lines into telemetry. +//! * You want to convert telemetry into log lines. +//! * You want to transform telemetry or log lines in-flight. +//! +//! If you'd like to learn more, please do have a look in +//! our [wiki](https://github.com/postmates/cernan/wiki/). +#![deny(trivial_numeric_casts, + // missing_docs, + unstable_features, + unused_import_braces, +)] extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/src/metric/telemetry.rs b/src/metric/telemetry.rs index 7779e119..97c6f1bb 100644 --- a/src/metric/telemetry.rs +++ b/src/metric/telemetry.rs @@ -420,9 +420,7 @@ impl Value { fn query(&self, query: f64) -> Option<(usize, f64)> { match self.kind { - ValueKind::Single => { - Some((1 as usize, self.single.expect("NOT SINGLE IN METRICVALUE QUERY"))) - } + ValueKind::Single => Some((1, self.single.expect("NOT SINGLE IN METRICVALUE QUERY"))), ValueKind::Many => { match self.many { Some(ref ckms) => ckms.query(query), diff --git a/src/protocols/native.rs b/src/protocols/native.rs index 35738286..c909c8e4 100644 --- a/src/protocols/native.rs +++ b/src/protocols/native.rs @@ -9,6 +9,7 @@ #![allow(box_pointers)] #![allow(dead_code)] +#![allow(missing_docs)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] #![allow(non_upper_case_globals)] diff --git a/src/protocols/prometheus.rs b/src/protocols/prometheus.rs index 742c975b..34df1ce1 100644 --- a/src/protocols/prometheus.rs +++ b/src/protocols/prometheus.rs @@ -9,6 +9,7 @@ #![allow(box_pointers)] #![allow(dead_code)] +#![allow(missing_docs)] #![allow(non_camel_case_types)] #![allow(non_snake_case)] #![allow(non_upper_case_globals)] diff --git a/src/sink/console.rs b/src/sink/console.rs index 77205447..90e8ee4f 100644 --- a/src/sink/console.rs +++ b/src/sink/console.rs @@ -1,26 +1,55 @@ +//! TODO use buckets::Buckets; use chrono; use metric::{AggregationMethod, LogLine, Telemetry}; use sink::{Sink, Valve}; use std::sync; +/// The 'console' sink exists for development convenience. The sink will +/// aggregate according to [buckets](../buckets/struct.Buckets.html) method and +/// print each `flush-interval` to stdout. pub struct Console { aggrs: Buckets, } impl Console { + /// Create a new Console + /// + /// # Examples + /// + /// ``` + /// use cernan::sink::{Console, ConsoleConfig}; + /// let config = ConsoleConfig { config_path: "sinks.console".to_string(), bin_width: 2 }; + /// let c = Console::new(config); + /// ``` pub fn new(config: ConsoleConfig) -> Console { Console { aggrs: Buckets::new(config.bin_width) } } } +/// The configuration struct for Console. There's not a whole lot to configure +/// here, independent of other sinks, but Console does do aggregations and that +/// requires knowing what the user wants for `bin_width`. #[derive(Debug)] pub struct ConsoleConfig { + /// The sink's unique name in the routing topology. pub config_path: String, + /// Sets the bin width for Console's underlying + /// [bucket](../buckets/struct.Bucket.html). pub bin_width: i64, } impl ConsoleConfig { + /// Convenience method to create a ConsoleConfig with `bin_width` equal to + /// 1. + /// + /// # Examples + /// + /// ``` + /// use cernan::sink::ConsoleConfig; + /// let config = ConsoleConfig::new("sinks.console".to_string()); + /// assert_eq!(1, config.bin_width); + /// ``` pub fn new(config_path: String) -> ConsoleConfig { ConsoleConfig { config_path: config_path, diff --git a/src/sink/mod.rs b/src/sink/mod.rs index d2f89dd3..f880516e 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -1,3 +1,8 @@ +//! A 'sink' is a final destination for telemetry and log lines. That is, a +//! 'sink' is that which is at the end of a `source -> filter -> filter -> +//! ... -> sink` chain. The sink has no obligations with regard to the telemetry +//! and log lines it receives, other than to receive them. Individual sinks make +//! different choices. use hopper; use metric::{Event, LogLine, Telemetry}; use std::sync;