Skip to content

Avoid sending Z packet in the middle of extended protocol packet sequence if we fail to get connection form pool #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 23, 2022
Merged
9 changes: 6 additions & 3 deletions .circleci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard1 -i
PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -U sharding_user shard2 -i

# Install Toxiproxy to simulate a downed/slow database
wget -O toxiproxy-2.1.4.deb https://github.com/Shopify/toxiproxy/releases/download/v2.1.4/toxiproxy_2.1.4_amd64.deb
sudo dpkg -i toxiproxy-2.1.4.deb
wget -O toxiproxy-2.4.0.deb https://github.com/Shopify/toxiproxy/releases/download/v2.4.0/toxiproxy_2.4.0_linux_$(dpkg --print-architecture).deb
sudo dpkg -i toxiproxy-2.4.0.deb
Comment on lines +22 to +23
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes it possible to run this test script on any architecture


# Start Toxiproxy
toxiproxy-server &
Expand Down Expand Up @@ -129,11 +129,14 @@ toxiproxy-cli toxic remove --toxicName latency_downstream postgres_replica
start_pgcat "info"

# Test session mode (and config reload)
sed -i 's/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml
sed -i '0,/simple_db/s/pool_mode = "transaction"/pool_mode = "session"/' .circleci/pgcat.toml

# Reload config test
kill -SIGHUP $(pgrep pgcat)

# Revert settings after reload. Makes test runs idempotent
sed -i '0,/simple_db/s/pool_mode = "session"/pool_mode = "transaction"/' .circleci/pgcat.toml

sleep 1

# Prepared statements that will only work in session mode
Expand Down
17 changes: 14 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,9 +644,20 @@ where
conn
}
Err(err) => {
error!("Could not get connection from pool: {:?}", err);
error_response(&mut self.write, "could not get connection from the pool")
.await?;
// Clients do not expect to get SystemError followed by ReadyForQuery in the middle
// of extended protocol submission. So we will hold off on sending the actual error
// message to the client until we get 'S' message
match message[0] as char {
'P' | 'B' | 'E' | 'D' => (),
_ => {
error!("Could not get connection from pool: {:?}", err);
error_response(
&mut self.write,
"could not get connection from the pool",
)
.await?;
}
}
continue;
}
};
Expand Down
110 changes: 86 additions & 24 deletions tests/ruby/tests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,89 @@
require 'toml'

$stdout.sync = true
$stderr.sync = true

class ConfigEditor
def initialize
@original_config_text = File.read('../../.circleci/pgcat.toml')
text_to_load = @original_config_text.gsub("5432", "\"5432\"")

@original_configs = TOML.load(text_to_load)
end

def original_configs
TOML.load(TOML::Generator.new(@original_configs).body)
end

def with_modified_configs(new_configs)
text_to_write = TOML::Generator.new(new_configs).body
text_to_write = text_to_write.gsub("\"5432\"", "5432")
File.write('../../.circleci/pgcat.toml', text_to_write)
yield
ensure
File.write('../../.circleci/pgcat.toml', @original_config_text)
end
end

def with_captured_stdout_stderr
sout = STDOUT.clone
serr = STDERR.clone
STDOUT.reopen("/tmp/out.txt", "w+")
STDERR.reopen("/tmp/err.txt", "w+")
STDOUT.sync = true
STDERR.sync = true
yield
return File.read('/tmp/out.txt'), File.read('/tmp/err.txt')
ensure
STDOUT.reopen(sout)
STDERR.reopen(serr)
end


def test_extended_protocol_pooler_errors
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")

conf_editor = ConfigEditor.new
new_configs = conf_editor.original_configs

# shorter timeouts
new_configs["general"]["connect_timeout"] = 500
new_configs["general"]["ban_time"] = 1
new_configs["general"]["shutdown_timeout"] = 1
new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
new_configs["pools"]["sharded_db"]["users"]["1"]["pool_size"] = 1

conf_editor.with_modified_configs(new_configs) { admin_conn.async_exec("RELOAD") }

conn_str = "postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db"
10.times do
Thread.new do
conn = PG::connect(conn_str)
conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
ensure
conn&.close
end
end

sleep(0.5)
conn_under_test = PG::connect(conn_str)
stdout, stderr = with_captured_stdout_stderr do
5.times do |i|
conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
sleep 1
end
end

raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
puts "Pool checkout errors not breaking clients passed"
ensure
sleep 1
admin_conn.async_exec("RELOAD") # Reset state
conn_under_test&.close
end

test_extended_protocol_pooler_errors

# Uncomment these two to see all queries.
# ActiveRecord.verbose_query_logs = true
Expand Down Expand Up @@ -144,30 +227,6 @@ def test_server_parameters
end


class ConfigEditor
def initialize
@original_config_text = File.read('../../.circleci/pgcat.toml')
text_to_load = @original_config_text.gsub("5432", "\"5432\"")

@original_configs = TOML.load(text_to_load)
end

def original_configs
TOML.load(TOML::Generator.new(@original_configs).body)
end

def with_modified_configs(new_configs)
text_to_write = TOML::Generator.new(new_configs).body
text_to_write = text_to_write.gsub("\"5432\"", "5432")
File.write('../../.circleci/pgcat.toml', text_to_write)
yield
ensure
File.write('../../.circleci/pgcat.toml', @original_config_text)
end

end


def test_reload_pool_recycling
admin_conn = PG::connect("postgres://admin_user:admin_pass@127.0.0.1:6432/pgcat")
server_conn = PG::connect("postgres://sharding_user:sharding_user@127.0.0.1:6432/sharded_db?application_name=testing_pgcat")
Expand Down Expand Up @@ -201,3 +260,6 @@ def test_reload_pool_recycling
end

test_reload_pool_recycling