diff --git a/src/stats.rs b/src/stats.rs index 6de784f5..ce076d2d 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -113,6 +113,7 @@ impl Collector { for stats in server_stats.values() { if !stats.check_address_stat_average_is_updated_status() { stats.address_stats().update_averages(); + stats.address_stats().reset_current_counts(); stats.set_address_stat_average_is_updated_status(true); } } diff --git a/src/stats/address.rs b/src/stats/address.rs index 89e4ebe7..a0486445 100644 --- a/src/stats/address.rs +++ b/src/stats/address.rs @@ -1,35 +1,26 @@ use std::sync::atomic::*; use std::sync::Arc; +#[derive(Debug, Clone, Default)] +struct AddressStatFields { + xact_count: Arc, + query_count: Arc, + bytes_received: Arc, + bytes_sent: Arc, + xact_time: Arc, + query_time: Arc, + wait_time: Arc, + errors: Arc, +} + /// Internal address stats #[derive(Debug, Clone, Default)] pub struct AddressStats { - pub total_xact_count: Arc, - pub total_query_count: Arc, - pub total_received: Arc, - pub total_sent: Arc, - pub total_xact_time: Arc, - pub total_query_time: Arc, - pub total_wait_time: Arc, - pub total_errors: Arc, - - pub old_total_xact_count: Arc, - pub old_total_query_count: Arc, - pub old_total_received: Arc, - pub old_total_sent: Arc, - pub old_total_xact_time: Arc, - pub old_total_query_time: Arc, - pub old_total_wait_time: Arc, - pub old_total_errors: Arc, - - pub avg_query_count: Arc, - pub avg_query_time: Arc, - pub avg_recv: Arc, - pub avg_sent: Arc, - pub avg_errors: Arc, - pub avg_xact_time: Arc, - pub avg_xact_count: Arc, - pub avg_wait_time: Arc, + total: AddressStatFields, + + current: AddressStatFields, + + averages: AddressStatFields, // Determines if the averages have been updated since the last time they were reported pub averages_updated: Arc, @@ -43,67 +34,67 @@ impl IntoIterator for AddressStats { vec![ ( "total_xact_count".to_string(), - self.total_xact_count.load(Ordering::Relaxed), + self.total.xact_count.load(Ordering::Relaxed), ), ( "total_query_count".to_string(), - self.total_query_count.load(Ordering::Relaxed), + self.total.query_count.load(Ordering::Relaxed), ), ( "total_received".to_string(), - self.total_received.load(Ordering::Relaxed), + self.total.bytes_received.load(Ordering::Relaxed), ), ( "total_sent".to_string(), - self.total_sent.load(Ordering::Relaxed), + self.total.bytes_sent.load(Ordering::Relaxed), ), ( "total_xact_time".to_string(), - self.total_xact_time.load(Ordering::Relaxed), + self.total.xact_time.load(Ordering::Relaxed), ), ( "total_query_time".to_string(), - self.total_query_time.load(Ordering::Relaxed), + self.total.query_time.load(Ordering::Relaxed), ), ( "total_wait_time".to_string(), - self.total_wait_time.load(Ordering::Relaxed), + self.total.wait_time.load(Ordering::Relaxed), ), ( "total_errors".to_string(), - self.total_errors.load(Ordering::Relaxed), + self.total.errors.load(Ordering::Relaxed), ), ( "avg_xact_count".to_string(), - self.avg_xact_count.load(Ordering::Relaxed), + self.averages.xact_count.load(Ordering::Relaxed), ), ( "avg_query_count".to_string(), - self.avg_query_count.load(Ordering::Relaxed), + self.averages.query_count.load(Ordering::Relaxed), ), ( "avg_recv".to_string(), - self.avg_recv.load(Ordering::Relaxed), + self.averages.bytes_received.load(Ordering::Relaxed), ), ( "avg_sent".to_string(), - self.avg_sent.load(Ordering::Relaxed), + self.averages.bytes_sent.load(Ordering::Relaxed), ), ( "avg_errors".to_string(), - self.avg_errors.load(Ordering::Relaxed), + self.averages.errors.load(Ordering::Relaxed), ), ( "avg_xact_time".to_string(), - self.avg_xact_time.load(Ordering::Relaxed), + self.averages.xact_time.load(Ordering::Relaxed), ), ( "avg_query_time".to_string(), - self.avg_query_time.load(Ordering::Relaxed), + self.averages.query_time.load(Ordering::Relaxed), ), ( "avg_wait_time".to_string(), - self.avg_wait_time.load(Ordering::Relaxed), + self.averages.wait_time.load(Ordering::Relaxed), ), ] .into_iter() @@ -111,21 +102,120 @@ impl IntoIterator for AddressStats { } impl AddressStats { + pub fn xact_count_add(&self) { + self.total.xact_count.fetch_add(1, Ordering::Relaxed); + self.current.xact_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn query_count_add(&self) { + self.total.query_count.fetch_add(1, Ordering::Relaxed); + self.current.query_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn bytes_received_add(&self, bytes: u64) { + self.total + .bytes_received + .fetch_add(bytes, Ordering::Relaxed); + self.current + .bytes_received + .fetch_add(bytes, Ordering::Relaxed); + } + + pub fn bytes_sent_add(&self, bytes: u64) { + self.total.bytes_sent.fetch_add(bytes, Ordering::Relaxed); + self.current.bytes_sent.fetch_add(bytes, Ordering::Relaxed); + } + + pub fn xact_time_add(&self, time: u64) { + self.total.xact_time.fetch_add(time, Ordering::Relaxed); + self.current.xact_time.fetch_add(time, Ordering::Relaxed); + } + + pub fn query_time_add(&self, time: u64) { + self.total.query_time.fetch_add(time, Ordering::Relaxed); + self.current.query_time.fetch_add(time, Ordering::Relaxed); + } + + pub fn wait_time_add(&self, time: u64) { + self.total.wait_time.fetch_add(time, Ordering::Relaxed); + self.current.wait_time.fetch_add(time, Ordering::Relaxed); + } + pub fn error(&self) { - self.total_errors.fetch_add(1, Ordering::Relaxed); + self.total.errors.fetch_add(1, Ordering::Relaxed); + self.current.errors.fetch_add(1, Ordering::Relaxed); } pub fn update_averages(&self) { - let (totals, averages, old_totals) = self.fields_iterators(); - for (total, average, old_total) in itertools::izip!(totals, averages, old_totals) { - let total_value = total.load(Ordering::Relaxed); - let old_total_value = old_total.load(Ordering::Relaxed); - average.store( - (total_value - old_total_value) / (crate::stats::STAT_PERIOD / 1_000), - Ordering::Relaxed, - ); // Avg / second - old_total.store(total_value, Ordering::Relaxed); + let stat_period_per_second = crate::stats::STAT_PERIOD / 1_000; + + // xact_count + let current_xact_count = self.current.xact_count.load(Ordering::Relaxed); + let current_xact_time = self.current.xact_time.load(Ordering::Relaxed); + self.averages.xact_count.store( + current_xact_count / stat_period_per_second, + Ordering::Relaxed, + ); + if current_xact_count == 0 { + self.averages.xact_time.store(0, Ordering::Relaxed); + } else { + self.averages + .xact_time + .store(current_xact_time / current_xact_count, Ordering::Relaxed); + } + + // query_count + let current_query_count = self.current.query_count.load(Ordering::Relaxed); + let current_query_time = self.current.query_time.load(Ordering::Relaxed); + self.averages.query_count.store( + current_query_count / stat_period_per_second, + Ordering::Relaxed, + ); + if current_query_count == 0 { + self.averages.query_time.store(0, Ordering::Relaxed); + } else { + self.averages + .query_time + .store(current_query_time / current_query_count, Ordering::Relaxed); } + + // bytes_received + let current_bytes_received = self.current.bytes_received.load(Ordering::Relaxed); + self.averages.bytes_received.store( + current_bytes_received / stat_period_per_second, + Ordering::Relaxed, + ); + + // bytes_sent + let current_bytes_sent = self.current.bytes_sent.load(Ordering::Relaxed); + self.averages.bytes_sent.store( + current_bytes_sent / stat_period_per_second, + Ordering::Relaxed, + ); + + // wait_time + let current_wait_time = self.current.wait_time.load(Ordering::Relaxed); + self.averages.wait_time.store( + current_wait_time / stat_period_per_second, + Ordering::Relaxed, + ); + + // errors + let current_errors = self.current.errors.load(Ordering::Relaxed); + self.averages + .errors + .store(current_errors / stat_period_per_second, Ordering::Relaxed); + } + + pub fn reset_current_counts(&self) { + self.current.xact_count.store(0, Ordering::Relaxed); + self.current.xact_time.store(0, Ordering::Relaxed); + self.current.query_count.store(0, Ordering::Relaxed); + self.current.query_time.store(0, Ordering::Relaxed); + self.current.bytes_received.store(0, Ordering::Relaxed); + self.current.bytes_sent.store(0, Ordering::Relaxed); + self.current.wait_time.store(0, Ordering::Relaxed); + self.current.errors.store(0, Ordering::Relaxed); } pub fn populate_row(&self, row: &mut Vec) { @@ -133,43 +223,4 @@ impl AddressStats { row.push(value.to_string()); } } - - fn fields_iterators( - &self, - ) -> ( - Vec>, - Vec>, - Vec>, - ) { - let mut totals: Vec> = Vec::new(); - let mut averages: Vec> = Vec::new(); - let mut old_totals: Vec> = Vec::new(); - - totals.push(self.total_xact_count.clone()); - old_totals.push(self.old_total_xact_count.clone()); - averages.push(self.avg_xact_count.clone()); - totals.push(self.total_query_count.clone()); - old_totals.push(self.old_total_query_count.clone()); - averages.push(self.avg_query_count.clone()); - totals.push(self.total_received.clone()); - old_totals.push(self.old_total_received.clone()); - averages.push(self.avg_recv.clone()); - totals.push(self.total_sent.clone()); - old_totals.push(self.old_total_sent.clone()); - averages.push(self.avg_sent.clone()); - totals.push(self.total_xact_time.clone()); - old_totals.push(self.old_total_xact_time.clone()); - averages.push(self.avg_xact_time.clone()); - totals.push(self.total_query_time.clone()); - old_totals.push(self.old_total_query_time.clone()); - averages.push(self.avg_query_time.clone()); - totals.push(self.total_wait_time.clone()); - old_totals.push(self.old_total_wait_time.clone()); - averages.push(self.avg_wait_time.clone()); - totals.push(self.total_errors.clone()); - old_totals.push(self.old_total_errors.clone()); - averages.push(self.avg_errors.clone()); - - (totals, averages, old_totals) - } } diff --git a/src/stats/server.rs b/src/stats/server.rs index 399e585f..a327fa34 100644 --- a/src/stats/server.rs +++ b/src/stats/server.rs @@ -177,12 +177,9 @@ impl ServerStats { } pub fn checkout_time(&self, microseconds: u64, application_name: String) { - // Update server stats and address aggergation stats + // Update server stats and address aggregation stats self.set_application(application_name); - self.address - .stats - .total_wait_time - .fetch_add(microseconds, Ordering::Relaxed); + self.address.stats.wait_time_add(microseconds); self.pool_stats .maxwait .fetch_max(microseconds, Ordering::Relaxed); @@ -191,13 +188,8 @@ impl ServerStats { /// Report a query executed by a client against a server pub fn query(&self, milliseconds: u64, application_name: &str) { self.set_application(application_name.to_string()); - let address_stats = self.address_stats(); - address_stats - .total_query_count - .fetch_add(1, Ordering::Relaxed); - address_stats - .total_query_time - .fetch_add(milliseconds, Ordering::Relaxed); + self.address.stats.query_count_add(); + self.address.stats.query_time_add(milliseconds); } /// Report a transaction executed by a client a server @@ -208,29 +200,20 @@ impl ServerStats { self.set_application(application_name.to_string()); self.transaction_count.fetch_add(1, Ordering::Relaxed); - self.address - .stats - .total_xact_count - .fetch_add(1, Ordering::Relaxed); + self.address.stats.xact_count_add(); } /// Report data sent to a server pub fn data_sent(&self, amount_bytes: usize) { self.bytes_sent .fetch_add(amount_bytes as u64, Ordering::Relaxed); - self.address - .stats - .total_sent - .fetch_add(amount_bytes as u64, Ordering::Relaxed); + self.address.stats.bytes_sent_add(amount_bytes as u64); } /// Report data received from a server pub fn data_received(&self, amount_bytes: usize) { self.bytes_received .fetch_add(amount_bytes as u64, Ordering::Relaxed); - self.address - .stats - .total_received - .fetch_add(amount_bytes as u64, Ordering::Relaxed); + self.address.stats.bytes_received_add(amount_bytes as u64); } } diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index e054b45e..f93b1a6c 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -27,7 +27,7 @@ results = admin_conn.async_exec("SHOW STATS")[0] admin_conn.close expect(results["total_query_time"].to_i).to be_within(200).of(750) - expect(results["avg_query_time"].to_i).to be_within(20).of(50) + expect(results["avg_query_time"].to_i).to be_within(50).of(250) expect(results["total_wait_time"].to_i).to_not eq(0) expect(results["avg_wait_time"].to_i).to_not eq(0)