Skip to content

Health check delay #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .circleci/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ connect_timeout = 100
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 100

# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000

# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 5000

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
| | | |
| **`user`** | | |
Expand Down Expand Up @@ -252,6 +253,7 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process or by qu
| `connect_timeout` | yes |
| `healthcheck_timeout` | no |
| `shutdown_timeout` | no |
| `healthcheck_delay` | no |
| `ban_time` | no |
| `user` | yes |
| `shards` | yes |
Expand Down
3 changes: 3 additions & 0 deletions examples/docker/pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ connect_timeout = 5000
# How much time to give `SELECT 1` health check query to return with a result (ms).
healthcheck_timeout = 1000

# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000

# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 60000

Expand Down
3 changes: 3 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ connect_timeout = 5000
# How much time to give the health check query to return with a result (ms).
healthcheck_timeout = 1000

# How long to keep connection available for immediate re-use, without running a healthcheck query on it
healthcheck_delay = 30000

# How much time to give clients during shutdown before forcibly killing client connections (ms).
shutdown_timeout = 60000

Expand Down
131 changes: 104 additions & 27 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use tokio::net::TcpStream;
use tokio::sync::broadcast::Receiver;

use crate::admin::{generate_server_info_for_admin, handle_admin};
use crate::config::get_config;
use crate::config::{get_config, Address};
use crate::constants::*;
use crate::errors::Error;
use crate::messages::*;
use crate::pool::{get_pool, ClientServerMap};
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
use crate::query_router::{Command, QueryRouter};
use crate::server::Server;
use crate::stats::{get_reporter, Reporter};
Expand Down Expand Up @@ -246,7 +246,7 @@ where
}
}

/// Handle TLS connection negotation.
/// Handle TLS connection negotiation.
pub async fn startup_tls(
stream: TcpStream,
client_server_map: ClientServerMap,
Expand All @@ -259,14 +259,14 @@ pub async fn startup_tls(
let mut stream = match tls.acceptor.accept(stream).await {
Ok(stream) => stream,

// TLS negotitation failed.
// TLS negotiation failed.
Err(err) => {
error!("TLS negotiation failed: {:?}", err);
return Err(Error::TlsError);
}
};

// TLS negotitation successful.
// TLS negotiation successful.
// Continue with regular startup using encrypted connection.
match get_startup::<TlsStream<TcpStream>>(&mut stream).await {
// Got good startup message, proceeding like normal except we
Expand Down Expand Up @@ -540,21 +540,21 @@ where
// Get a pool instance referenced by the most up-to-date
// pointer. This ensures we always read the latest config
// when starting a query.
let mut pool =
match get_pool(self.target_pool_name.clone(), self.target_user_name.clone()) {
Some(pool) => pool,
None => {
error_response(
&mut self.write,
&format!(
"No pool configured for database: {:?}, user: {:?}",
self.target_pool_name, self.target_user_name
),
)
.await?;
return Err(Error::ClientError);
}
};
let pool = match get_pool(self.target_pool_name.clone(), self.target_user_name.clone())
{
Some(pool) => pool,
None => {
error_response(
&mut self.write,
&format!(
"No pool configured for database: {:?}, user: {:?}",
self.target_pool_name, self.target_user_name
),
)
.await?;
return Err(Error::ClientError);
}
};
query_router.update_pool_settings(pool.settings.clone());
let current_shard = query_router.shard();

Expand Down Expand Up @@ -731,12 +731,26 @@ where
'Q' => {
debug!("Sending query to server");

server.send(original).await?;
self.send_server_message(
server,
original,
&address,
query_router.shard(),
&pool,
)
.await?;

// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = server.recv().await?;
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;

// Send server reply to the client.
match write_all_half(&mut self.write, response).await {
Expand Down Expand Up @@ -816,14 +830,28 @@ where

self.buffer.put(&original[..]);

server.send(self.buffer.clone()).await?;
self.send_server_message(
server,
self.buffer.clone(),
&address,
query_router.shard(),
&pool,
)
.await?;

self.buffer.clear();

// Read all data the server has to offer, which can be multiple messages
// buffered in 8196 bytes chunks.
loop {
let response = server.recv().await?;
let response = self
.receive_server_message(
server,
&address,
query_router.shard(),
&pool,
)
.await?;

match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Expand Down Expand Up @@ -857,15 +885,31 @@ where
'd' => {
// Forward the data to the server,
// don't buffer it since it can be rather large.
server.send(original).await?;
self.send_server_message(
server,
original,
&address,
query_router.shard(),
&pool,
)
.await?;
}

// CopyDone or CopyFail
// Copy is done, successfully or not.
'c' | 'f' => {
server.send(original).await?;
self.send_server_message(
server,
original,
&address,
query_router.shard(),
&pool,
)
.await?;

let response = server.recv().await?;
let response = self
.receive_server_message(server, &address, query_router.shard(), &pool)
.await?;

match write_all_half(&mut self.write, response).await {
Ok(_) => (),
Expand Down Expand Up @@ -907,6 +951,39 @@ where
let mut guard = self.client_server_map.lock();
guard.remove(&(self.process_id, self.secret_key));
}

async fn send_server_message(
&self,
server: &mut Server,
message: BytesMut,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<(), Error> {
match server.send(message).await {
Ok(_) => Ok(()),
Err(err) => {
pool.ban(address, shard, self.process_id);
Err(err)
}
}
}

async fn receive_server_message(
&self,
server: &mut Server,
address: &Address,
shard: usize,
pool: &ConnectionPool,
) -> Result<BytesMut, Error> {
match server.recv().await {
Ok(message) => Ok(message),
Err(err) => {
pool.ban(address, shard, self.process_id);
Err(err)
}
}
}
}

impl<S, T> Drop for Client<S, T> {
Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub struct General {
pub connect_timeout: u64,
pub healthcheck_timeout: u64,
pub shutdown_timeout: u64,
pub healthcheck_delay: u64,
pub ban_time: i64,
pub autoreload: bool,
pub tls_certificate: Option<String>,
Expand All @@ -138,6 +139,7 @@ impl Default for General {
connect_timeout: 5000,
healthcheck_timeout: 1000,
shutdown_timeout: 60000,
healthcheck_delay: 30000,
ban_time: 60,
autoreload: false,
tls_certificate: None,
Expand Down Expand Up @@ -281,6 +283,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
"shutdown_timeout".to_string(),
config.general.shutdown_timeout.to_string(),
),
(
"healthcheck_delay".to_string(),
config.general.healthcheck_delay.to_string(),
),
("ban_time".to_string(), config.general.ban_time.to_string()),
];

Expand All @@ -299,6 +305,7 @@ impl Config {
);
info!("Connection timeout: {}ms", self.general.connect_timeout);
info!("Shutdown timeout: {}ms", self.general.shutdown_timeout);
info!("Healthcheck delay: {}ms", self.general.healthcheck_delay);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);
Expand Down
36 changes: 24 additions & 12 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl ConnectionPool {

/// Get a connection from the pool.
pub async fn get(
&mut self,
&self,
shard: usize, // shard number
role: Option<Role>, // primary or replica
process_id: i32, // client id
Expand Down Expand Up @@ -283,6 +283,9 @@ impl ConnectionPool {
return Err(Error::BadConfig);
}

let healthcheck_timeout = get_config().general.healthcheck_timeout;
let healthcheck_delay = get_config().general.healthcheck_delay as u128;

while allowed_attempts > 0 {
// Round-robin replicas.
round_robin += 1;
Expand Down Expand Up @@ -312,7 +315,7 @@ impl ConnectionPool {
Ok(conn) => conn,
Err(err) => {
error!("Banning replica {}, error: {:?}", index, err);
self.ban(address, shard);
self.ban(address, shard, process_id);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
Expand All @@ -322,8 +325,19 @@ impl ConnectionPool {

// // Check if this server is alive with a health check.
let server = &mut *conn;
let healthcheck_timeout = get_config().general.healthcheck_timeout;

// Will return error if timestamp is greater than current system time, which it should never be set to
let require_healthcheck =
server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay;

if !require_healthcheck {
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.stats.server_idle(conn.process_id(), address.id);
return Ok((conn, address.clone()));
}

debug!("Running health check for replica {}, {:?}", index, address);
self.stats.server_tested(server.process_id(), address.id);

match tokio::time::timeout(
Expand All @@ -348,10 +362,7 @@ impl ConnectionPool {
// Don't leave a bad connection in the pool.
server.mark_bad();

self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.ban(address, shard, process_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a debug! a little bit above so we know that the health check is still being performed? It's nice to trace the execution of things when debugging things. I tend to organize the log levels as follows:

  • info: good to know under normal operations, e.g. client connected, client disconnected, etc.
  • warn: need to know, something might be wrong.
  • debug: good to know if I'm trying to understand what's really happening underneath.
  • trace: I need to see almost every interaction and byte going through because I think something is terribly wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right above it there's already an error! log which says the replica is being banned. Does that work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the health check being issued, not the ban, but GitHub doesn't let me comment on random file lines i think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we don't always health check i think it's important to log when we do.

continue;
}
},
Expand All @@ -362,10 +373,7 @@ impl ConnectionPool {
// Don't leave a bad connection in the pool.
server.mark_bad();

self.ban(address, shard);
self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
self.ban(address, shard, process_id);
continue;
}
}
Expand All @@ -377,7 +385,11 @@ impl ConnectionPool {
/// Ban an address (i.e. replica). It no longer will serve
/// traffic for any new transactions. Existing transactions on that replica
/// will finish successfully or error out to the clients.
pub fn ban(&self, address: &Address, shard: usize) {
pub fn ban(&self, address: &Address, shard: usize, process_id: i32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to explain why ban would need process_id until you read the function code. Maybe it's better to move the stats handling to the caller?

self.stats.client_disconnecting(process_id, address.id);
self.stats
.checkout_time(Instant::now().elapsed().as_micros(), process_id, address.id);

error!("Banning {:?}", address);
let now = chrono::offset::Utc::now().naive_utc();
let mut guard = self.banlist.write();
Expand Down
Loading