Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Noskov committed Nov 12, 2018
2 parents 142e077 + ac45f7d commit 3064869
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 67 deletions.
86 changes: 80 additions & 6 deletions config.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# This is an example config showing all the options, filled with default values
# This is an example config showing all the possible options
# Required options are filled with default values
# Non-required options are commented with defaul values in comments

verbosity = "warn"

Expand All @@ -8,16 +10,22 @@ n-threads = 4
# Number of aggregating and counting threads, use 0(not recommended) to use all CPU cores
w-threads = 4

# Queue size for single counting thread before packet is dropped
# Queue size for single counting thread before task is dropped
task-queue-size = 1024

# If server should become leader from it's very start
start-as-leader = true

# How often to gather own stats, in ms. Use 0 to disable (stats are still gathered and printed to log,
# but not included in metric dump
stats-interval = 10000

# Prefix for sending own stats
stats-prefix = "resources.monitoring.bioyino"

# What consensus to use: "consul", "internal" or "none"
consensus = "none"

[metrics]
# Should we provide metrics that update more than update-counter-threshold times diring aggregation interval
count-updates = true
Expand All @@ -31,6 +39,22 @@ update-counter-suffix = ""
# Minimal update counter to be reported
update-counter-threshold = 200

# Aggregation can be made in two ways giving a tradeoff between the aggregation speed and
# the probability of loosing metrics
# 1. In a specially spawned thread (fast-aggregation=false). In this case only one thread will be used to do all the counting.
# This is slow, but worker threads will keep parsing decreasing probabiilty of metric drops.
# 2. In worker threads (fast-aggregation=true). In this case metrics will be aggregated in worker threads, so
# any heavy counting would potentially block task and make some unparsed buffers to be skipped
# fast-aggregation = true

# Process buffers from different hosts separately, this gives more guarantee to parse
# metrics from different hosts correctly. Can play bad if lots of metrics is received from a single host, set
# it to false if you have such use case
# consistent-parsing = true

# Log all buffers being dropped due to parsing errors. Can be very spammy.
# log-parse-errors = false

[carbon]

# IP and port of the carbon-protocol backend to send aggregated data to
Expand All @@ -57,9 +81,12 @@ send-retries = 30
# Address:port to listen for metrics at
listen = "127.0.0.1:8125"

# Address and port for replication/command server to listen on
# Address and port for replication server to listen on
peer-listen = "127.0.0.1:8136"

# Address and port for management server to listen on
mgmt-listen = "127.0.0.1:8137"

# UDP buffer size for single packet. Needs to be around MTU. Packet's bytes after that value
# may be lost
bufsize = 1500
Expand All @@ -68,23 +95,70 @@ bufsize = 1500
multimessage = false

# Number of multimessage packets to receive at once if in multimessage mode
# Note that this setting is per thread, so in reality one can only see metrics
# after receiving at least mm-packets*n_threads datagrams
mm-packets = 100

# Do multimessage operations in async mode.
# This means recvmmsg will receive 0..mm-packets datagrams instead of waiting for mm-packets
mm-async = false

# Multimessage mode assumes early return by timeout, but ONLY when received
# a packet after timeout expiration.
# Basically this should be changed in very rare and specific cases.
# 0 means this value will be equal to buffer-flush-time
# mm-timeout = 0

# To avoid packets staying in queue forever, this option can be used to flush
# incoming data buffer forcing it to be sent even if it's not full
buffer-flush-time = 0

# Same as buffer-flush-time, but riggers on buffer length. Please, notice that multimessage
# mode can only consider timer, so this check is only possible every mm-packets packets.
# zero value means automatic management depending on memory allocator internal logic,
# which on tests was found to reach 30Mb
# if in multimessage mode this value is lower that mm-packets*bufsize, it will be set to this value
buffer-flush-length = 65536

# Nmber of green threads for single-message mode
greens = 4

# Socket pool size for single-message mode
snum = 4
async-sockets = 4

# List of nodes to replicate metrics to
nodes = []

# Interval to send snapshots to nodes, ms
snapshot-interval = 1000

# Settings for internal Raft
[raft]
# Defer start of raft consensus to avoid node becoming leader too early
# Such situation is very likely when restarting current leader node
# and means losing metrics in most cases
#start-delay = 0

# Timeouts tuned according to the Raft paper and typical network latency.
# Better not to change if unsure
#heartbeat-timeout = 250
#election-timeout-min = 500
#election-timeout-max = 750

# The name of the current node is taken from hostname by default
# After that all hostnames are resolved using DNS. If node name cannot
# be resolved through DNS for some reason, it can be specified in this-node
# parameter in a format similar to one in node list.
# this-node = <empty>

# A map of other raft nodes. Keys are in form of hostname:port or IP:port
# values are integers
nodes = {}

[consul]
# Start in disabled leader finding mode
start-disabled = false
# Start in disabled leader finding mode. This only works while consul is bootstrapping.
# Can be helpful when there is a danger of agent being inaccessible.
start-as = "disabled"

# Consul agent address
agent = "127.0.0.1:8500"
Expand Down
74 changes: 37 additions & 37 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub enum ConsensusKind {

lazy_static! {
pub static ref CONSENSUS_STATE: Mutex<ConsensusState> =
{ Mutex::new(ConsensusState::Disabled) };
{ Mutex::new(ConsensusState::Disabled) };
}

pub static IS_LEADER: AtomicBool = ATOMIC_BOOL_INIT;
Expand Down Expand Up @@ -169,34 +169,34 @@ fn main() {
nodes,
snapshot_interval,
},
raft,
consul:
Consul {
start_as: consul_start_as,
agent,
session_ttl: consul_session_ttl,
renew_time: consul_renew_time,
key_name: consul_key,
},
metrics:
Metrics {
// max_metrics,
mut count_updates,
update_counter_prefix,
update_counter_suffix,
update_counter_threshold,
fast_aggregation,
consistent_parsing: _,
log_parse_errors: _,
},
carbon,
n_threads,
w_threads,
stats_interval: s_interval,
task_queue_size,
start_as_leader,
stats_prefix,
consensus,
raft,
consul:
Consul {
start_as: consul_start_as,
agent,
session_ttl: consul_session_ttl,
renew_time: consul_renew_time,
key_name: consul_key,
},
metrics:
Metrics {
// max_metrics,
mut count_updates,
update_counter_prefix,
update_counter_suffix,
update_counter_threshold,
fast_aggregation,
consistent_parsing: _,
log_parse_errors: _,
},
carbon,
n_threads,
w_threads,
stats_interval: s_interval,
task_queue_size,
start_as_leader,
stats_prefix,
consensus,
} = system;

let verbosity = Level::from_str(&verbosity).expect("bad verbosity");
Expand Down Expand Up @@ -265,7 +265,7 @@ fn main() {
runner.run(task);
Ok(runner)
}).map(|_| ())
.map_err(|_| ());
.map_err(|_| ());
// let future = rx.for_each(|task: Task| ok(runner.run(task)));
runtime.block_on(future).expect("worker thread failed");
}).expect("starting counting worker thread");
Expand All @@ -288,11 +288,11 @@ fn main() {
nodes.clone(),
Duration::from_millis(snapshot_interval as u64),
&chans,
).into_future()
.map_err(move |e| {
PEER_ERRORS.fetch_add(1, Ordering::Relaxed);
info!(snap_err_log, "error sending snapshot";"error"=>format!("{}", e));
});
).into_future()
.map_err(move |e| {
PEER_ERRORS.fetch_add(1, Ordering::Relaxed);
info!(snap_err_log, "error sending snapshot";"error"=>format!("{}", e));
});
runtime.spawn(snapshot);

// settings safe for asap restart
Expand Down Expand Up @@ -520,7 +520,7 @@ fn main() {
mm_async,
mm_timeout,
flush_flags.clone(),
);
);
} else {
start_async_udp(
log,
Expand All @@ -532,7 +532,7 @@ fn main() {
async_sockets,
bufsize,
flush_flags.clone(),
);
);
}

runtime
Expand Down
28 changes: 14 additions & 14 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl TaskRunner {
if rest.len() == 0 {
return cutlen;
}
continue
continue;
}

if let Ok(metric) = Metric::<Float>::new(value, mtype, None, sampling) {
Expand All @@ -226,12 +226,12 @@ impl TaskRunner {
if let Some(pos) = cut_bad(log.clone(), &mut buf) {
// on success increase cutlen to cutting position plus \n
// and try to parse next part
cutlen += pos+1;
cutlen += pos + 1;
input = input.split_at(pos + 1).1;
if input.len() != 0{
if input.len() != 0 {
continue;
} else {
return cutlen
return cutlen;
}
} else {
// failure means we have a buffer full of some bad data
Expand Down Expand Up @@ -294,16 +294,16 @@ pub fn aggregate_task(data: AggregateData) {
let name = buf.take().freeze();
(name, value)
}).chain(upd)
.map(|data| {
spawn(
response
.clone()
.send(data)
.map_err(|_| {
AGG_ERRORS.fetch_add(1, Ordering::Relaxed);
}).map(|_| ()),
.map(|data| {
spawn(
response
.clone()
.send(data)
.map_err(|_| {
AGG_ERRORS.fetch_add(1, Ordering::Relaxed);
}).map(|_| ()),
);
}).last();
}).last();
}

#[cfg(test)]
Expand All @@ -318,7 +318,7 @@ mod tests {
let mut data = BytesMut::new();
data.extend_from_slice(
b"trash\ngorets1:+1000|g\nTRASH\ngorets2:-1000|g|@0.5\nMORETrasH\nFUUU",
);
);

let mut config = System::default();
config.metrics.log_parse_errors = true;
Expand Down
20 changes: 10 additions & 10 deletions src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub(crate) fn start_sync_udp(
mm_async: bool,
mm_timeout: u64,
flush_flags: Arc<Vec<AtomicBool>>,
) {
) {
info!(log, "multimessage enabled, starting in sync UDP mode"; "socket-is-blocking"=>!mm_async, "packets"=>mm_packets);

// It is crucial for recvmmsg to have one socket per many threads
Expand Down Expand Up @@ -132,7 +132,7 @@ pub(crate) fn start_sync_udp(
let mut chunk = iovec {
iov_base: recv_buffer[i * rowsize..i * rowsize + rowsize].as_mut_ptr()
as *mut c_void,
iov_len: rowsize,
iov_len: rowsize,
};
chunks.push(chunk);
// put the result to mheaders
Expand Down Expand Up @@ -167,7 +167,7 @@ pub(crate) fn start_sync_udp(
} else {
null_mut()
},
)
)
};

if res == 0 {
Expand All @@ -182,9 +182,9 @@ pub(crate) fn start_sync_udp(
INGRESS.fetch_add(mlen, Ordering::Relaxed);

// create address entry in messagemap
let mut entry = bufmap.entry(addrs[i]).or_insert(
BytesMut::with_capacity(mlen),
);
let mut entry = bufmap
.entry(addrs[i])
.or_insert(BytesMut::with_capacity(mlen));

// check we can fit the buffer
if entry.remaining_mut() < mlen + 1 {
Expand Down Expand Up @@ -231,7 +231,7 @@ pub(crate) fn start_sync_udp(
DROPS.fetch_add(
messages as usize,
Ordering::Relaxed,
);
);
}).unwrap_or(());
}).last();
}
Expand Down Expand Up @@ -261,7 +261,7 @@ pub(crate) fn start_async_udp(
async_sockets: usize,
bufsize: usize,
flush_flags: Arc<Vec<AtomicBool>>,
) {
) {
info!(log, "multimessage is disabled, starting in async UDP mode");

// Create a pool of listener sockets
Expand Down Expand Up @@ -301,7 +301,7 @@ pub(crate) fn start_async_udp(
let socket = socket.try_clone().expect("cloning socket");
let socket =
UdpSocket::from_std(socket, &::tokio::reactor::Handle::current())
.expect("adding socket to event loop");
.expect("adding socket to event loop");

let server = StatsdServer::new(
socket,
Expand All @@ -314,7 +314,7 @@ pub(crate) fn start_async_udp(
readbuf,
flush_flags.clone(),
i,
);
);

runtime.spawn(server.into_future());
}
Expand Down

0 comments on commit 3064869

Please sign in to comment.