Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplifications and performance improvements #200

Merged
merged 7 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions spec/database_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ describe DB::Database do
end
end

it "should close pool statements when closing db" do
stmt = uninitialized DB::PoolStatement
with_dummy do |db|
stmt = db.build("query1")
end
stmt.closed?.should be_true
end

it "should not reconnect if connection is lost and retry_attempts=0" do
DummyDriver::DummyConnection.clear_connections
DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=0" do |db|
Expand Down Expand Up @@ -187,6 +179,25 @@ describe DB::Database do
end
end

it "should not checkout multiple connections if there is a statement error" do
with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=10&retry_attempts=10" do |db|
expect_raises DB::Error do
db.exec("syntax error")
end
DummyDriver::DummyConnection.connections.size.should eq(1)
end
end

it "should attempt all retries if connection is lost" do
with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=10" do |db|
expect_raises DB::PoolRetryAttemptsExceeded do
db.exec("raise ConnectionLost")
end
# 1 initial + 10 retries
DummyDriver::DummyConnection.connections.size.should eq(11)
end
end

describe "prepared_statements connection option" do
it "defaults to true" do
with_dummy "dummy://localhost:1027" do |db|
Expand Down Expand Up @@ -239,24 +250,6 @@ describe DB::Database do
end
end

describe "prepared_statements_cache connection option" do
it "should reuse prepared statements if true" do
with_dummy "dummy://localhost:1027?prepared_statements=true&prepared_statements_cache=true" do |db|
stmt1 = db.build("the query")
stmt2 = db.build("the query")
stmt1.object_id.should eq(stmt2.object_id)
end
end

it "should not reuse prepared statements if false" do
with_dummy "dummy://localhost:1027?prepared_statements=true&prepared_statements_cache=false" do |db|
stmt1 = db.build("the query")
stmt2 = db.build("the query")
stmt1.object_id.should_not eq(stmt2.object_id)
end
end
end

describe "unprepared statements in pool" do
it "creating statements should not create new connections" do
with_dummy "dummy://localhost:1027?initial_pool_size=1" do |db|
Expand Down
42 changes: 37 additions & 5 deletions spec/dummy_driver.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
require "spec"
require "../src/db"

class DummyDriver < DB::Driver
Expand All @@ -17,20 +16,37 @@ class DummyDriver < DB::Driver
end

class DummyConnection < DB::Connection
@@connections = [] of DummyConnection
@@connections_count = Atomic(Int32).new(0)

def initialize(options : DB::Connection::Options)
super(options)
Fiber.yield
@@connections_count.add(1)
@connected = true
@@connections ||= [] of DummyConnection
@@connections.not_nil! << self
{% unless flag?(:preview_mt) %}
# @@connections is only used in single-threaded mode in specs
# for benchmarks we want to avoid the overhead of synchronizing this array
@@connections << self
{% end %}
end

def self.connections_count
@@connections_count.get
end

def self.connections
@@connections.not_nil!
{% if flag?(:preview_mt) %}
raise "DummyConnection.connections is only available in single-threaded mode"
{% end %}
@@connections
end

def self.clear_connections
@@connections.try &.clear
{% if flag?(:preview_mt) %}
raise "DummyConnection.clear_connections is only available in single-threaded mode"
{% end %}
@@connections.clear
end

def build_prepared_statement(query) : DB::Statement
Expand Down Expand Up @@ -117,17 +133,31 @@ class DummyDriver < DB::Driver
end

class DummyStatement < DB::Statement
@@statements_count = Atomic(Int32).new(0)
@@statements_exec_count = Atomic(Int32).new(0)
property params

def initialize(connection, command : String, @prepared : Bool)
@params = Hash(Int32 | String, DB::Any | Array(DB::Any)).new
super(connection, command)
@@statements_count.add(1)
raise DB::Error.new(command) if command == "syntax error"
raise DB::ConnectionLost.new(connection) if command == "raise ConnectionLost"
end

def self.statements_count
@@statements_count.get
end

def self.statements_exec_count
@@statements_exec_count.get
end

protected def perform_query(args : Enumerable) : DB::ResultSet
assert_not_closed!

@@statements_exec_count.add(1)

Fiber.yield
@connection.as(DummyConnection).check
set_params args
Expand All @@ -137,6 +167,8 @@ class DummyDriver < DB::Driver
protected def perform_exec(args : Enumerable) : DB::ExecResult
assert_not_closed!

@@statements_exec_count.add(1)

@connection.as(DummyConnection).check
set_params args
raise DB::Error.new("forced exception due to query") if command == "raise"
Expand Down
68 changes: 68 additions & 0 deletions spec/manual/pool_concurrency_test.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# This file is to be executed as:
#
# % crystal run --release [-Dpreview_mt] ./spec/manual/pool_concurrency_test.cr -- --options="max_pool_size=5" --duration=30 --concurrency=4
#
#

require "option_parser"
require "../dummy_driver"
require "../../src/db"

options = ""
duration = 3
concurrency = 4

OptionParser.parse do |parser|
parser.banner = "Usage: pool_concurrency_test [arguments]"
parser.on("-o", "--options=VALUE", "Connection string options") { |v| options = v }
parser.on("-d", "--duration=SECONDS", "Specifies the duration in seconds") { |v| duration = v.to_i }
parser.on("-c", "--concurrency=VALUE", "Specifies the concurrent requests to perform") { |v| concurrency = v.to_i }
parser.on("-h", "--help", "Show this help") do
puts parser
exit
end
parser.invalid_option do |flag|
STDERR.puts "ERROR: #{flag} is not a valid option."
STDERR.puts parser
exit(1)
end
end

multi_threaded = {% if flag?(:preview_mt) %} ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || 4 {% else %} false {% end %}
release = {% if flag?(:release) %} true {% else %} false {% end %}

if !release
puts "WARNING: This should be run in release mode."
end

db = DB.open "dummy://host?#{options}"

start_time = Time.monotonic

puts "Starting test for #{duration} seconds..."

concurrency.times do
spawn do
loop do
db.scalar "1"
Fiber.yield
end
end
end

sleep duration.seconds

end_time = Time.monotonic

puts " Options : #{options}"
puts " Duration (sec) : #{duration} (actual #{end_time - start_time})"
puts " Concurrency : #{concurrency}"
puts " Multi Threaded : #{multi_threaded ? "Yes (#{multi_threaded})" : "No"}"
puts "Total Connections : #{DummyDriver::DummyConnection.connections_count}"
puts " Total Statements : #{DummyDriver::DummyStatement.statements_count}"
puts " Total Queries : #{DummyDriver::DummyStatement.statements_exec_count}"
puts " Throughput (q/s) : #{DummyDriver::DummyStatement.statements_exec_count / duration}"

if !release
puts "WARNING: This should be run in release mode."
end
13 changes: 0 additions & 13 deletions src/db/database.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ module DB
@connection_options : Connection::Options
@pool : Pool(Connection)
@setup_connection : Connection -> Nil
@statements_cache = StringKeyCache(PoolPreparedStatement).new

# Initialize a database with the specified options and connection factory.
# This covers more advanced use cases that might not be supported by an URI connection string such as tunneling connection.
Expand Down Expand Up @@ -81,9 +80,6 @@ module DB

# Closes all connection to the database.
def close
@statements_cache.each_value &.close
@statements_cache.clear

@pool.close
end

Expand All @@ -99,15 +95,6 @@ module DB

# :nodoc:
def fetch_or_build_prepared_statement(query) : PoolStatement
if @connection_options.prepared_statements_cache
@statements_cache.fetch(query) { build_prepared_statement(query) }
else
build_prepared_statement(query)
end
end

# :nodoc:
def build_prepared_statement(query) : PoolStatement
PoolPreparedStatement.new(self, query)
end

Expand Down
23 changes: 0 additions & 23 deletions src/db/pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -158,27 +158,6 @@ module DB
end
end

# ```
# selected, is_candidate = pool.checkout_some(candidates)
# ```
# `selected` be a resource from the `candidates` list and `is_candidate` == `true`
# or `selected` will be a new resource and `is_candidate` == `false`
def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool}
sync do
candidates.each do |ref|
resource = ref.value
if resource && is_available?(resource)
@idle.delete resource
resource.before_checkout
return {resource, true}
end
end
end

resource = checkout
{resource, candidates.any? { |ref| ref.value == resource }}
end

def release(resource : T) : Nil
idle_pushed = false

Expand Down Expand Up @@ -227,8 +206,6 @@ module DB
# if the connection is lost it will be closed by
# the exception to release resources
# we still need to remove it from the known pool.
# Closed connection will be evicted from statement cache
# in PoolPreparedStatement#clean_connections
sync { delete(e.resource) }
rescue e : PoolResourceRefused
# a ConnectionRefused means a new connection
Expand Down
61 changes: 3 additions & 58 deletions src/db/pool_prepared_statement.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,75 +4,20 @@ module DB
# The execution of the statement is retried according to the pool configuration.
#
# See `PoolStatement`
class PoolPreparedStatement < PoolStatement
# connections where the statement was prepared
@connections = Set(WeakRef(Connection)).new
@mutex = Mutex.new

struct PoolPreparedStatement < PoolStatement
def initialize(db : Database, query : String)
super
# Prepares a statement on some connection
# otherwise the preparation is delayed until the first execution.
# After the first initialization the connection must be released
# it will be checked out when executing it.

# This only happens if the db is configured to use prepared statements cache.
# Without that there is no reference to the already prepared statement we can
# take advantage of.
if db.prepared_statements_cache?
statement_with_retry &.release_connection
end

# TODO use a round-robin selection in the pool so multiple sequentially
# initialized statements are assigned to different connections.
end

protected def do_close
@mutex.synchronize do
# TODO close all statements on all connections.
# currently statements are closed when the connection is closed.

# WHAT-IF the connection is busy? Should each statement be able to
# deallocate itself when the connection is free.
@connections.clear
end
end

# builds a statement over a real connection
# the connection is registered in `@connections`
private def build_statement : Statement
clean_connections

conn, existing = @mutex.synchronize do
@db.checkout_some(@connections)
end

conn = @db.pool.checkout
begin
stmt = conn.prepared.build(@query)
conn.prepared.build(@query)
rescue ex
conn.release
raise ex
end
if !existing && @db.prepared_statements_cache?
@mutex.synchronize do
@connections << WeakRef.new(conn)
end
end
stmt
end

private def clean_connections
return unless @db.prepared_statements_cache?

@mutex.synchronize do
# remove disposed or closed connections
@connections.each do |ref|
conn = ref.value
if !conn || conn.closed?
@connections.delete ref
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion src/db/pool_statement.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module DB
# a statement from the DB needs to be able to represent a statement in any
# of the connections of the pool. Otherwise the user will need to deal with
# actual connections in some point.
abstract class PoolStatement
abstract struct PoolStatement
include StatementMethods

def initialize(@db : Database, @query : String)
Expand Down
6 changes: 1 addition & 5 deletions src/db/pool_unprepared_statement.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@ module DB
# The execution of the statement is retried according to the pool configuration.
#
# See `PoolStatement`
class PoolUnpreparedStatement < PoolStatement
struct PoolUnpreparedStatement < PoolStatement
def initialize(db : Database, query : String)
super
end

protected def do_close
# unprepared statements do not need to be release in each connection
end

# builds a statement over a real connection
private def build_statement : Statement
conn = @db.pool.checkout
Expand Down
Loading
Loading