Skip to content

Better handling extended protocol messages in the event of busy pool #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,19 +600,32 @@ where
message_result = read_message(&mut self.read) => message_result?
};

// Avoid taking a server if the client just wants to disconnect.
if message[0] as char == 'X' {
debug!("Client disconnecting");
return Ok(());
}

// Handle admin database queries.
if self.admin {
debug!("Handling admin command");
handle_admin(&mut self.write, message, self.client_server_map.clone()).await?;
continue;
}

match message[0] as char {
// Buffer extended protocol messages even if we do not have
// a server connection yet. Hopefully, when we get the S message
// we'll be able to allocate a connection. Also, clients do not expect
// the server to respond to these messages so even if we were not able to
// allocate a connection, we wouldn't be able to send back an error message
// to the client so we buffer them and defer the decision to error out or not
// to when we get the S message
'P' | 'B' | 'D' | 'E' => {
self.buffer.put(&message[..]);
continue;
}
'X' => {
debug!("Client disconnecting");
return Ok(());
}
_ => (),
}

// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
Expand Down Expand Up @@ -714,22 +727,17 @@ where
conn
}
Err(err) => {
// Clients do not expect to get SystemError followed by ReadyForQuery in the middle
// of extended protocol submission. So we will hold off on sending the actual error
// message to the client until we get 'S' message
match message[0] as char {
'P' | 'B' | 'E' | 'D' => (),
_ => {
error_response(
&mut self.write,
"could not get connection from the pool",
)
.await?;
}
};

// Client is attempting to get results from the server,
// but we were unable to grab a connection from the pool
// We'll send back an error message and clean the extended
// protocol buffer
if message[0] as char == 'S' {
error!("Got Sync message but failed to get a connection from the pool");
self.buffer.clear();
}
error_response(&mut self.write, "could not get connection from the pool")
.await?;
error!("Could not get connection from pool: {:?}", err);

continue;
}
};
Expand Down