Skip to content

Client md5 auth and clean up scram #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ function start_pgcat() {

# Setup the database with shards and user
PGPASSWORD=postgres psql -e -h 127.0.0.1 -p 5432 -U postgres -f tests/sharding/query_routing_setup.sql

PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard0 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i
Expand All @@ -30,26 +31,28 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica

start_pgcat "info"

export PGPASSWORD=sharding_user

# pgbench test
pgbench -i -h 127.0.0.1 -p 6432
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended
pgbench -U sharding_user -i -h 127.0.0.1 -p 6432
pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol simple -f tests/pgbench/simple.sql
pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol extended

# COPY TO STDOUT test
psql -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'COPY (SELECT * FROM pgbench_accounts LIMIT 15) TO STDOUT;' > /dev/null

# Query cancellation test
(psql -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(5)' || true) &
(psql -U sharding_user -h 127.0.0.1 -p 6432 -c 'SELECT pg_sleep(5)' || true) &
killall psql -s SIGINT

# Sharding insert
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_insert.sql
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_insert.sql

# Sharding select
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /dev/null
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_select.sql > /dev/null

# Replica/primary selection & more sharding tests
psql -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -f tests/sharding/query_routing_test_primary_replica.sql > /dev/null

#
# ActiveRecord tests
Expand All @@ -61,15 +64,15 @@ cd tests/ruby && \
cd ../..

# Admin tests
psql -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
psql -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
(! psql -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)
psql -U sharding_user -e -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW STATS' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'RELOAD' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW CONFIG' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW DATABASES' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW LISTS' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW POOLS' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c 'SHOW VERSION' > /dev/null
psql -U sharding_user -h 127.0.0.1 -p 6432 -d pgbouncer -c "SET client_encoding TO 'utf8'" > /dev/null # will ignore
(! psql -U sharding_user -e -h 127.0.0.1 -p 6432 -d random_db -c 'SHOW STATS' > /dev/null)

# Start PgCat in debug to demonstrate failover better
start_pgcat "trace"
Expand All @@ -79,7 +82,7 @@ toxiproxy-cli toxic add -t latency -a latency=300 postgres_replica
sleep 1

# Note the failover in the logs
timeout 5 psql -e -h 127.0.0.1 -p 6432 <<-EOF
timeout 5 psql -U sharding_user -e -h 127.0.0.1 -p 6432 <<-EOF
SELECT 1;
SELECT 1;
SELECT 1;
Expand All @@ -97,7 +100,7 @@ sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' pgcat.toml
kill -SIGHUP $(pgrep pgcat)

# Prepared statements that will only work in session mode
pgbench -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol prepared
pgbench -U sharding_user -h 127.0.0.1 -p 6432 -t 500 -c 2 --protocol prepared

# Attempt clean shut down
killall pgcat -s SIGINT
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgcat"
version = "0.2.0-beta1"
version = "0.2.1-beta1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
45 changes: 41 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ impl Client {
server_info: BytesMut,
stats: Reporter,
) -> Result<Client, Error> {
let config = get_config();
let config = get_config().clone();
let transaction_mode = config.general.pool_mode.starts_with("t");
drop(config);
// drop(config);
loop {
trace!("Waiting for StartupMessage");

Expand Down Expand Up @@ -108,14 +108,51 @@ impl Client {
// Regular startup message.
PROTOCOL_VERSION_NUMBER => {
trace!("Got StartupMessage");

// TODO: perform actual auth.
let parameters = parse_startup(bytes.clone())?;

// Generate random backend ID and secret key
let process_id: i32 = rand::random();
let secret_key: i32 = rand::random();

// Perform MD5 authentication.
// TODO: Add SASL support.
let salt = md5_challenge(&mut stream).await?;

let code = match stream.read_u8().await {
Ok(p) => p,
Err(_) => return Err(Error::SocketError),
};

// PasswordMessage
if code as char != 'p' {
debug!("Expected p, got {}", code as char);
return Err(Error::ProtocolSyncError);
}

let len = match stream.read_i32().await {
Ok(len) => len,
Err(_) => return Err(Error::SocketError),
};

let mut password_response = vec![0u8; (len - 4) as usize];

match stream.read_exact(&mut password_response).await {
Ok(_) => (),
Err(_) => return Err(Error::SocketError),
};

// Compare server and client hashes.
let password_hash =
md5_hash_password(&config.user.name, &config.user.password, &salt);

if password_hash != password_response {
debug!("Password authentication failed");
wrong_password(&mut stream, &config.user.name).await?;
return Err(Error::ClientError);
}

debug!("Password authentication successful");

auth_ok(&mut stream).await?;
write_all(&mut stream, server_info).await?;
backend_key_data(&mut stream, process_id, secret_key).await?;
Expand Down
1 change: 1 addition & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ pub enum Error {
ServerError,
BadConfig,
AllServersDown,
ClientError,
}
76 changes: 68 additions & 8 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ pub async fn auth_ok(stream: &mut TcpStream) -> Result<(), Error> {
Ok(write_all(stream, auth_ok).await?)
}

/// Generate md5 password challenge.
pub async fn md5_challenge(stream: &mut TcpStream) -> Result<[u8; 4], Error> {
// let mut rng = rand::thread_rng();
let salt: [u8; 4] = [
rand::random(),
rand::random(),
rand::random(),
rand::random(),
];

let mut res = BytesMut::new();
res.put_u8(b'R');
res.put_i32(12);
res.put_i32(5); // MD5
res.put_slice(&salt[..]);

write_all(stream, res).await?;
Ok(salt)
}

/// Give the client the process_id and secret we generated
/// used in query cancellation.
pub async fn backend_key_data(
Expand Down Expand Up @@ -160,14 +180,8 @@ pub fn parse_startup(bytes: BytesMut) -> Result<HashMap<String, String>, Error>
Ok(result)
}

/// Send password challenge response to the server.
/// This is the MD5 challenge.
pub async fn md5_password(
stream: &mut TcpStream,
user: &str,
password: &str,
salt: &[u8],
) -> Result<(), Error> {
/// Create md5 password hash given a salt.
pub fn md5_hash_password(user: &str, password: &str, salt: &[u8]) -> Vec<u8> {
let mut md5 = Md5::new();

// First pass
Expand All @@ -186,6 +200,19 @@ pub async fn md5_password(
.collect::<Vec<u8>>();
password.push(0);

password
}

/// Send password challenge response to the server.
/// This is the MD5 challenge.
pub async fn md5_password(
stream: &mut TcpStream,
user: &str,
password: &str,
salt: &[u8],
) -> Result<(), Error> {
let password = md5_hash_password(user, password, salt);

let mut message = BytesMut::with_capacity(password.len() as usize + 5);

message.put_u8(b'p');
Expand Down Expand Up @@ -264,6 +291,39 @@ pub async fn error_response(stream: &mut OwnedWriteHalf, message: &str) -> Resul
Ok(write_all_half(stream, res).await?)
}

pub async fn wrong_password(stream: &mut TcpStream, user: &str) -> Result<(), Error> {
let mut error = BytesMut::new();

// Error level
error.put_u8(b'S');
error.put_slice(&b"FATAL\0"[..]);

// Error level (non-translatable)
error.put_u8(b'V');
error.put_slice(&b"FATAL\0"[..]);

// Error code: not sure how much this matters.
error.put_u8(b'C');
error.put_slice(&b"28P01\0"[..]); // system_error, see Appendix A.

// The short error message.
error.put_u8(b'M');
error.put_slice(&format!("password authentication failed for user \"{}\"\0", user).as_bytes());

// No more fields follow.
error.put_u8(0);

// Compose the two message reply.
let mut res = BytesMut::new();

res.put_u8(b'E');
res.put_i32(error.len() as i32 + 4);

res.put(error);

write_all(stream, res).await
}

/// Respond to a SHOW SHARD command.
pub async fn show_response(
stream: &mut OwnedWriteHalf,
Expand Down
Loading