Skip to content

Commit

Permalink
Improve logging for connection cleanup (#428)
Browse files Browse the repository at this point in the history
* initial commit

* fix

* fmt
  • Loading branch information
zainkabani authored May 12, 2023
1 parent 7326069 commit 0907f1b
Showing 1 changed file with 53 additions and 11 deletions.
64 changes: 53 additions & 11 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,48 @@ impl StreamInner {
}
}

#[derive(Copy, Clone)]
struct CleanupState {
/// If server connection requires DISCARD ALL before checkin because of set statement
needs_cleanup_set: bool,

/// If server connection requires DISCARD ALL before checkin because of prepare statement
needs_cleanup_prepare: bool,
}

impl CleanupState {
fn new() -> Self {
CleanupState {
needs_cleanup_set: false,
needs_cleanup_prepare: false,
}
}

fn needs_cleanup(&self) -> bool {
self.needs_cleanup_set || self.needs_cleanup_prepare
}

fn set_true(&mut self) {
self.needs_cleanup_set = true;
self.needs_cleanup_prepare = true;
}

fn reset(&mut self) {
self.needs_cleanup_set = false;
self.needs_cleanup_prepare = false;
}
}

impl std::fmt::Display for CleanupState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SET: {}, PREPARE: {}",
self.needs_cleanup_set, self.needs_cleanup_prepare
)
}
}

/// Server state.
pub struct Server {
/// Server host, e.g. localhost,
Expand Down Expand Up @@ -131,8 +173,8 @@ pub struct Server {
/// Is the server broken? We'll remote it from the pool if so.
bad: bool,

/// If server connection requires a DISCARD ALL before checkin
needs_cleanup: bool,
/// If server connection requires DISCARD ALL before checkin
cleanup_state: CleanupState,

/// Mapping of clients and servers used for query cancellation.
client_server_map: ClientServerMap,
Expand Down Expand Up @@ -630,7 +672,7 @@ impl Server {
in_transaction: false,
data_available: false,
bad: false,
needs_cleanup: false,
cleanup_state: CleanupState::new(),
client_server_map,
addr_set,
connected_at: chrono::offset::Utc::now().naive_utc(),
Expand Down Expand Up @@ -793,12 +835,12 @@ impl Server {
// This will reduce amount of discard statements sent
if !self.in_transaction {
debug!("Server connection marked for clean up");
self.needs_cleanup = true;
self.cleanup_state.needs_cleanup_set = true;
}
}
"PREPARE\0" => {
debug!("Server connection marked for clean up");
self.needs_cleanup = true;
self.cleanup_state.needs_cleanup_prepare = true;
}
_ => (),
}
Expand Down Expand Up @@ -960,11 +1002,11 @@ impl Server {
// to avoid leaking state between clients. For performance reasons we only
// send `DISCARD ALL` if we think the session is altered instead of just sending
// it before each checkin.
if self.needs_cleanup {
warn!("Server returned with session state altered, discarding state");
if self.cleanup_state.needs_cleanup() {
warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name);
self.query("DISCARD ALL").await?;
self.query("RESET ROLE").await?;
self.needs_cleanup = false;
self.cleanup_state.reset();
}

Ok(())
Expand All @@ -976,12 +1018,12 @@ impl Server {
self.application_name = name.to_string();
// We don't want `SET application_name` to mark the server connection
// as needing cleanup
let needs_cleanup_before = self.needs_cleanup;
let needs_cleanup_before = self.cleanup_state;

let result = Ok(self
.query(&format!("SET application_name = '{}'", name))
.await?);
self.needs_cleanup = needs_cleanup_before;
self.cleanup_state = needs_cleanup_before;
result
} else {
Ok(())
Expand All @@ -1006,7 +1048,7 @@ impl Server {

// Marks a connection as needing DISCARD ALL at checkin
pub fn mark_dirty(&mut self) {
self.needs_cleanup = true;
self.cleanup_state.set_true();
}

pub fn mirror_send(&mut self, bytes: &BytesMut) {
Expand Down

0 comments on commit 0907f1b

Please sign in to comment.