Skip to content

Fix missing statistics #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ async fn main() {
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));

// Statistics reporting.
let (tx, rx) = mpsc::channel(100);
let (tx, rx) = mpsc::channel(100_000);
REPORTER.store(Arc::new(Reporter::new(tx.clone())));

// Connection pool that allows to query all shards and replicas.
Expand Down
52 changes: 36 additions & 16 deletions src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use arc_swap::ArcSwap;
/// Statistics and reporting.
use log::info;
use log::{error, info, trace};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::pool::get_number_of_addresses;
Expand Down Expand Up @@ -43,7 +44,7 @@ enum EventName {

/// Event data sent to the collector
/// from clients and servers.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Event {
/// The name of the event being reported.
name: EventName,
Expand Down Expand Up @@ -79,6 +80,25 @@ impl Reporter {
Reporter { tx: tx }
}

/// Send statistics to the task keeping track of stats.
fn send(&self, event: Event) {
let name = event.name;
let result = self.tx.try_send(event);

match result {
Ok(_) => trace!(
"{:?} event reported successfully, capacity: {}",
name,
self.tx.capacity()
),

Err(err) => match err {
TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name),
TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name),
},
};
}

/// Report a query executed by a client against
/// a server identified by the `address_id`.
pub fn query(&self, process_id: i32, address_id: usize) {
Expand All @@ -89,7 +109,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event);
}

/// Report a transaction executed by a client against
Expand All @@ -102,7 +122,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Report data sent to a server identified by `address_id`.
Expand All @@ -115,7 +135,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Report data received from a server identified by `address_id`.
Expand All @@ -128,7 +148,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Time spent waiting to get a healthy connection from the pool
Expand All @@ -142,7 +162,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a client identified by `process_id` waiting for a connection
Expand All @@ -155,7 +175,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a client identified by `process_id` is done waiting for a connection
Expand All @@ -168,7 +188,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a client identified by `process_id` is done querying the server
Expand All @@ -181,7 +201,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a client identified by `process_id` is disconecting from the pooler.
Expand All @@ -194,7 +214,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a server connection identified by `process_id` for
Expand All @@ -208,7 +228,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a server connection identified by `process_id` for
Expand All @@ -222,7 +242,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a server connection identified by `process_id` for
Expand All @@ -236,7 +256,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a server connection identified by `process_id` for
Expand All @@ -250,7 +270,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}

/// Reports a server connection identified by `process_id` is disconecting from the pooler.
Expand All @@ -263,7 +283,7 @@ impl Reporter {
address_id: address_id,
};

let _ = self.tx.try_send(event);
self.send(event)
}
}

Expand Down