Skip to content

Commit

Permalink
Introduce more lints into cernan crate root (#190)
Browse files Browse the repository at this point in the history
Enabling lints is something that was not done early in the project
and now there's a backlog of work to be done. You'll note that the
missing_docs lint is still disabled and this represents the bulk of
the work that needs doing.

Signed-off-by: Brian L. Troutwine <blt@postmates.com>
  • Loading branch information
blt authored Jan 15, 2017
1 parent 442d977 commit af12873
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 20 deletions.
2 changes: 1 addition & 1 deletion examples/configs/basic.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ source = "cernan"
port = 8125
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]

[sources.native_server]
[sources.native]
ip = "127.0.0.1"
port = 1972
forwards = ["sinks.console", "sinks.null", "sinks.influxdb", "sinks.prometheus"]
Expand Down
4 changes: 0 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
.or(value.lookup("sinks.console.bin_width"))
.unwrap_or(&Value::Integer(1))
.as_integer()
.map(|i| i as i64)
.unwrap(),
config_path: "sinks.console".to_string(),
})
Expand All @@ -207,7 +206,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
.or(value.lookup("sinks.wavefront.bin_width"))
.unwrap_or(&Value::Integer(1))
.as_integer()
.map(|i| i as i64)
.unwrap(),
config_path: "sinks.wavefront".to_string(),
tags: tags.clone(),
Expand All @@ -234,7 +232,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
.or(value.lookup("sinks.influxdb.bin_width"))
.unwrap_or(&Value::Integer(1))
.as_integer()
.map(|i| i as i64)
.unwrap(),
config_path: "sinks.influxdb".to_string(),
tags: tags.clone(),
Expand All @@ -261,7 +258,6 @@ pub fn parse_config_file(buffer: String, verbosity: u64) -> Args {
.or(value.lookup("sinks.prometheus.bin_width"))
.unwrap_or(&Value::Integer(1))
.as_integer()
.map(|i| i as i64)
.unwrap(),
config_path: "sinks.prometheus".to_string(),
})
Expand Down
27 changes: 15 additions & 12 deletions src/filter/programmable_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_metric_name(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len());
let idx = idx(state.to_integer(2), (*pyld).metrics.len());
state.push_string(&(*pyld).metrics[idx].name);
1
}
Expand All @@ -66,7 +66,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_set_metric_name(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len());
let idx = idx(state.to_integer(2), (*pyld).metrics.len());
(*pyld).metrics[idx].name = state.check_string(3).into();
0
}
Expand All @@ -75,7 +75,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_push_metric(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let val = state.to_number(3) as f64;
let val = state.to_number(3);
match state.to_str(2) {
Some(name) => {
let m = metric::Telemetry::new(name, val)
Expand Down Expand Up @@ -110,7 +110,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_metric_value(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len());
let idx = idx(state.to_integer(2), (*pyld).metrics.len());
match (*pyld).metrics[idx].value() {
Some(v) => {
state.push_number(v);
Expand All @@ -126,7 +126,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_log_tag_value(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).logs.len());
let idx = idx(state.to_integer(2), (*pyld).logs.len());
match state.to_str(3).map(|k| k.to_owned()) {
Some(key) => {
match (*pyld).logs[idx].tags.get(&key) {
Expand All @@ -150,7 +150,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_metric_tag_value(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len());
let idx = idx(state.to_integer(2), (*pyld).metrics.len());
match state.to_str(3).map(|k| k.to_owned()) {
Some(key) => {
match (*pyld).metrics[idx].tags.get(&key) {
Expand All @@ -174,7 +174,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_metric_set_tag(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len());
let idx = idx(state.to_integer(2), (*pyld).metrics.len());
match state.to_str(3).map(|k| k.to_owned()) {
Some(key) => {
match state.to_str(4).map(|v| v.to_owned()) {
Expand Down Expand Up @@ -206,7 +206,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_log_set_tag(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).logs.len());
let idx = idx(state.to_integer(2), (*pyld).logs.len());
match state.to_str(3).map(|k| k.to_owned()) {
Some(key) => {
match state.to_str(4).map(|v| v.to_owned()) {
Expand Down Expand Up @@ -238,7 +238,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_metric_remove_tag(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len());
let idx = idx(state.to_integer(2), (*pyld).metrics.len());
match state.to_str(3).map(|k| k.to_owned()) {
Some(key) => {
match sync::Arc::make_mut(&mut (*pyld).metrics[idx].tags).remove(&key) {
Expand All @@ -262,7 +262,7 @@ impl<'a> Payload<'a> {
unsafe extern "C" fn lua_log_remove_tag(L: *mut lua_State) -> c_int {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let idx = idx(state.to_integer(2) as i64, (*pyld).logs.len());
let idx = idx(state.to_integer(2), (*pyld).logs.len());
match state.to_str(3).map(|k| k.to_owned()) {
Some(key) => {
match (*pyld).logs[idx].tags.remove(&key) {
Expand All @@ -287,7 +287,7 @@ impl<'a> Payload<'a> {
let mut state = State::from_ptr(L);
let pyld = state.to_userdata(1) as *mut Payload;
let prcnt = state.to_number(2);
let idx = idx(state.to_integer(2) as i64, (*pyld).metrics.len());
let idx = idx(state.to_integer(2), (*pyld).metrics.len());
match (*pyld).metrics[idx].query(prcnt) {
Some(v) => {
state.push_number(v);
Expand Down Expand Up @@ -370,7 +370,10 @@ impl ProgrammableFilter {
}

impl filter::Filter for ProgrammableFilter {
fn process(&mut self, event: metric::Event, res: &mut Vec<metric::Event>) -> Result<(), filter::FilterError> {
fn process(&mut self,
event: metric::Event,
res: &mut Vec<metric::Event>)
-> Result<(), filter::FilterError> {
match event {
metric::Event::Telemetry(mut m) => {
self.state.get_global("process_metric");
Expand Down
22 changes: 22 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
//! Cernan is a telemetry and logging aggregation server. It exposes multiple
//! interfaces for ingestion and can emit to mutiple aggregation sources while
//! doing in-flight manipulation of data. Cernan has minimal CPU and memory
//! requirements and is intended to service bursty telemetry _without_ load
//! shedding. Cernan aims to be _reliable_ and _convenient_ to use, both for
//! application engineers and operations staff.
//!
//! Why you might choose to use cernan:
//!
//! * You need to ingest telemetry from multiple protocols.
//! * You need to multiplex telemetry over aggregation services.
//! * You want to convert log lines into telemetry.
//! * You want to convert telemetry into log lines.
//! * You want to transform telemetry or log lines in-flight.
//!
//! If you'd like to learn more, please do have a look in
//! our [wiki](https://github.com/postmates/cernan/wiki/).
#![deny(trivial_numeric_casts,
// missing_docs,
unstable_features,
unused_import_braces,
)]
extern crate bincode;
extern crate byteorder;
extern crate chrono;
Expand Down
4 changes: 1 addition & 3 deletions src/metric/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,7 @@ impl Value {

fn query(&self, query: f64) -> Option<(usize, f64)> {
match self.kind {
ValueKind::Single => {
Some((1 as usize, self.single.expect("NOT SINGLE IN METRICVALUE QUERY")))
}
ValueKind::Single => Some((1, self.single.expect("NOT SINGLE IN METRICVALUE QUERY"))),
ValueKind::Many => {
match self.many {
Some(ref ckms) => ckms.query(query),
Expand Down
1 change: 1 addition & 0 deletions src/protocols/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
Expand Down
1 change: 1 addition & 0 deletions src/protocols/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
Expand Down
29 changes: 29 additions & 0 deletions src/sink/console.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,55 @@
//! TODO
use buckets::Buckets;
use chrono;
use metric::{AggregationMethod, LogLine, Telemetry};
use sink::{Sink, Valve};
use std::sync;

/// The 'console' sink exists for development convenience. The sink will
/// aggregate according to [buckets](../buckets/struct.Buckets.html) method and
/// print each `flush-interval` to stdout.
pub struct Console {
aggrs: Buckets,
}

impl Console {
/// Create a new Console
///
/// # Examples
///
/// ```
/// use cernan::sink::{Console, ConsoleConfig};
/// let config = ConsoleConfig { config_path: "sinks.console".to_string(), bin_width: 2 };
/// let c = Console::new(config);
/// ```
pub fn new(config: ConsoleConfig) -> Console {
Console { aggrs: Buckets::new(config.bin_width) }
}
}

/// The configuration struct for Console. There's not a whole lot to configure
/// here, independent of other sinks, but Console does do aggregations and that
/// requires knowing what the user wants for `bin_width`.
#[derive(Debug)]
pub struct ConsoleConfig {
/// The sink's unique name in the routing topology.
pub config_path: String,
/// Sets the bin width for Console's underlying
/// [bucket](../buckets/struct.Bucket.html).
pub bin_width: i64,
}

impl ConsoleConfig {
/// Convenience method to create a ConsoleConfig with `bin_width` equal to
/// 1.
///
/// # Examples
///
/// ```
/// use cernan::sink::ConsoleConfig;
/// let config = ConsoleConfig::new("sinks.console".to_string());
/// assert_eq!(1, config.bin_width);
/// ```
pub fn new(config_path: String) -> ConsoleConfig {
ConsoleConfig {
config_path: config_path,
Expand Down
5 changes: 5 additions & 0 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! A 'sink' is a final destination for telemetry and log lines. That is, a
//! 'sink' is that which is at the end of a `source -> filter -> filter ->
//! ... -> sink` chain. The sink has no obligations with regard to the telemetry
//! and log lines it receives, other than to receive them. Individual sinks make
//! different choices.
use hopper;
use metric::{Event, LogLine, Telemetry};
use std::sync;
Expand Down

0 comments on commit af12873

Please sign in to comment.