From 684b8ac0321d349ac5231ad4b204adb2fa236c5c Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Thu, 30 Nov 2023 11:26:02 -0300 Subject: [PATCH 1/7] Add pool_concurrency_test manual spec Add MT connection count without Mutex --- spec/dummy_driver.cr | 41 +++++++++++++++-- spec/manual/pool_concurrency_test.cr | 68 ++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 5 deletions(-) create mode 100644 spec/manual/pool_concurrency_test.cr diff --git a/spec/dummy_driver.cr b/spec/dummy_driver.cr index da6417c8..8d0f7e57 100644 --- a/spec/dummy_driver.cr +++ b/spec/dummy_driver.cr @@ -1,4 +1,3 @@ -require "spec" require "../src/db" class DummyDriver < DB::Driver @@ -17,20 +16,37 @@ class DummyDriver < DB::Driver end class DummyConnection < DB::Connection + @@connections = [] of DummyConnection + @@connections_count = Atomic(Int32).new(0) + def initialize(options : DB::Connection::Options) super(options) Fiber.yield + @@connections_count.add(1) @connected = true - @@connections ||= [] of DummyConnection - @@connections.not_nil! << self + {% unless flag?(:preview_mt) %} + # @@connections is only used in single-threaded mode in specs + # for benchmarks we want to avoid the overhead of synchronizing this array + @@connections << self + {% end %} + end + + def self.connections_count + @@connections_count.get end def self.connections - @@connections.not_nil! + {% if flag?(:preview_mt) %} + raise "DummyConnection.connections is only available in single-threaded mode" + {% end %} + @@connections end def self.clear_connections - @@connections.try &.clear + {% if flag?(:preview_mt) %} + raise "DummyConnection.clear_connections is only available in single-threaded mode" + {% end %} + @@connections.clear end def build_prepared_statement(query) : DB::Statement @@ -117,17 +133,30 @@ class DummyDriver < DB::Driver end class DummyStatement < DB::Statement + @@statements_count = Atomic(Int32).new(0) + @@statements_exec_count = Atomic(Int32).new(0) property params def initialize(connection, command : String, @prepared : Bool) @params = Hash(Int32 | String, DB::Any | Array(DB::Any)).new super(connection, command) + @@statements_count.add(1) raise DB::Error.new(command) if command == "syntax error" end + def self.statements_count + @@statements_count.get + end + + def self.statements_exec_count + @@statements_exec_count.get + end + protected def perform_query(args : Enumerable) : DB::ResultSet assert_not_closed! + @@statements_exec_count.add(1) + Fiber.yield @connection.as(DummyConnection).check set_params args @@ -137,6 +166,8 @@ class DummyDriver < DB::Driver protected def perform_exec(args : Enumerable) : DB::ExecResult assert_not_closed! + @@statements_exec_count.add(1) + @connection.as(DummyConnection).check set_params args raise DB::Error.new("forced exception due to query") if command == "raise" diff --git a/spec/manual/pool_concurrency_test.cr b/spec/manual/pool_concurrency_test.cr new file mode 100644 index 00000000..84bdb289 --- /dev/null +++ b/spec/manual/pool_concurrency_test.cr @@ -0,0 +1,68 @@ +# This file is to be executed as: +# +# % crystal run --release [-Dpreview_mt] ./spec/manual/pool_concurrency_test.cr -- --options="max_pool_size=5" --duration=30 --concurrency=4 +# +# + +require "option_parser" +require "../dummy_driver" +require "../../src/db" + +options = "" +duration = 3 +concurrency = 4 + +OptionParser.parse do |parser| + parser.banner = "Usage: pool_concurrency_test [arguments]" + parser.on("-o", "--options=VALUE", "Connection string options") { |v| options = v } + parser.on("-d", "--duration=SECONDS", "Specifies the duration in seconds") { |v| duration = v.to_i } + parser.on("-c", "--concurrency=VALUE", "Specifies the concurrent requests to perform") { |v| concurrency = v.to_i } + parser.on("-h", "--help", "Show this help") do + puts parser + exit + end + parser.invalid_option do |flag| + STDERR.puts "ERROR: #{flag} is not a valid option." + STDERR.puts parser + exit(1) + end +end + +multi_threaded = {% if flag?(:preview_mt) %} ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || 4 {% else %} false {% end %} +release = {% if flag?(:release) %} true {% else %} false {% end %} + +if !release + puts "WARNING: This should be run in release mode." +end + +db = DB.open "dummy://host?#{options}" + +start_time = Time.monotonic + +puts "Starting test for #{duration} seconds..." + +concurrency.times do + spawn do + loop do + db.scalar "1" + Fiber.yield + end + end +end + +sleep duration.seconds + +end_time = Time.monotonic + +puts " Options : #{options}" +puts " Duration (sec) : #{duration} (actual #{end_time - start_time})" +puts " Concurrency : #{concurrency}" +puts " Multi Threaded : #{multi_threaded ? "Yes (#{multi_threaded})" : "No"}" +puts "Total Connections : #{DummyDriver::DummyConnection.connections_count}" +puts " Total Statements : #{DummyDriver::DummyStatement.statements_count}" +puts " Total Queries : #{DummyDriver::DummyStatement.statements_exec_count}" +puts "Throughtput (q/s) : #{DummyDriver::DummyStatement.statements_exec_count / duration}" + +if !release + puts "WARNING: This should be run in release mode." +end From cd107ca635e22c072cafd0e16c695da6a0a18304 Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Thu, 30 Nov 2023 11:13:58 -0300 Subject: [PATCH 2/7] Drop checkout_some, simpler pool_prepared statement --- src/db/pool.cr | 21 ------------ src/db/pool_prepared_statement.cr | 56 ++----------------------------- 2 files changed, 2 insertions(+), 75 deletions(-) diff --git a/src/db/pool.cr b/src/db/pool.cr index 757e4e35..aeef2d67 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -158,27 +158,6 @@ module DB end end - # ``` - # selected, is_candidate = pool.checkout_some(candidates) - # ``` - # `selected` be a resource from the `candidates` list and `is_candidate` == `true` - # or `selected` will be a new resource and `is_candidate` == `false` - def checkout_some(candidates : Enumerable(WeakRef(T))) : {T, Bool} - sync do - candidates.each do |ref| - resource = ref.value - if resource && is_available?(resource) - @idle.delete resource - resource.before_checkout - return {resource, true} - end - end - end - - resource = checkout - {resource, candidates.any? { |ref| ref.value == resource }} - end - def release(resource : T) : Nil idle_pushed = false diff --git a/src/db/pool_prepared_statement.cr b/src/db/pool_prepared_statement.cr index 388b3b2e..164ae756 100644 --- a/src/db/pool_prepared_statement.cr +++ b/src/db/pool_prepared_statement.cr @@ -5,74 +5,22 @@ module DB # # See `PoolStatement` class PoolPreparedStatement < PoolStatement - # connections where the statement was prepared - @connections = Set(WeakRef(Connection)).new - @mutex = Mutex.new - def initialize(db : Database, query : String) super - # Prepares a statement on some connection - # otherwise the preparation is delayed until the first execution. - # After the first initialization the connection must be released - # it will be checked out when executing it. - - # This only happens if the db is configured to use prepared statements cache. - # Without that there is no reference to the already prepared statement we can - # take advantage of. - if db.prepared_statements_cache? - statement_with_retry &.release_connection - end - - # TODO use a round-robin selection in the pool so multiple sequentially - # initialized statements are assigned to different connections. end protected def do_close - @mutex.synchronize do - # TODO close all statements on all connections. - # currently statements are closed when the connection is closed. - - # WHAT-IF the connection is busy? Should each statement be able to - # deallocate itself when the connection is free. - @connections.clear - end end # builds a statement over a real connection - # the connection is registered in `@connections` private def build_statement : Statement - clean_connections - - conn, existing = @mutex.synchronize do - @db.checkout_some(@connections) - end - + conn = @db.pool.checkout begin - stmt = conn.prepared.build(@query) + conn.prepared.build(@query) rescue ex conn.release raise ex end - if !existing && @db.prepared_statements_cache? - @mutex.synchronize do - @connections << WeakRef.new(conn) - end - end - stmt - end - - private def clean_connections - return unless @db.prepared_statements_cache? - - @mutex.synchronize do - # remove disposed or closed connections - @connections.each do |ref| - conn = ref.value - if !conn || conn.closed? - @connections.delete ref - end - end - end end end end From aea76a6392b7690e161c0c8146ff2729e966ab59 Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Fri, 1 Dec 2023 23:51:01 -0300 Subject: [PATCH 3/7] Make pool statement a struct --- spec/database_spec.cr | 26 -------------------------- src/db/database.cr | 13 ------------- src/db/pool_prepared_statement.cr | 2 +- src/db/pool_statement.cr | 2 +- src/db/pool_unprepared_statement.cr | 2 +- src/db/statement.cr | 9 ++++----- 6 files changed, 7 insertions(+), 47 deletions(-) diff --git a/spec/database_spec.cr b/spec/database_spec.cr index a9ccef88..74373aff 100644 --- a/spec/database_spec.cr +++ b/spec/database_spec.cr @@ -57,14 +57,6 @@ describe DB::Database do end end - it "should close pool statements when closing db" do - stmt = uninitialized DB::PoolStatement - with_dummy do |db| - stmt = db.build("query1") - end - stmt.closed?.should be_true - end - it "should not reconnect if connection is lost and retry_attempts=0" do DummyDriver::DummyConnection.clear_connections DB.open "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=0" do |db| @@ -239,24 +231,6 @@ describe DB::Database do end end - describe "prepared_statements_cache connection option" do - it "should reuse prepared statements if true" do - with_dummy "dummy://localhost:1027?prepared_statements=true&prepared_statements_cache=true" do |db| - stmt1 = db.build("the query") - stmt2 = db.build("the query") - stmt1.object_id.should eq(stmt2.object_id) - end - end - - it "should not reuse prepared statements if false" do - with_dummy "dummy://localhost:1027?prepared_statements=true&prepared_statements_cache=false" do |db| - stmt1 = db.build("the query") - stmt2 = db.build("the query") - stmt1.object_id.should_not eq(stmt2.object_id) - end - end - end - describe "unprepared statements in pool" do it "creating statements should not create new connections" do with_dummy "dummy://localhost:1027?initial_pool_size=1" do |db| diff --git a/src/db/database.cr b/src/db/database.cr index 79741d51..16e1e7b6 100644 --- a/src/db/database.cr +++ b/src/db/database.cr @@ -38,7 +38,6 @@ module DB @connection_options : Connection::Options @pool : Pool(Connection) @setup_connection : Connection -> Nil - @statements_cache = StringKeyCache(PoolPreparedStatement).new # Initialize a database with the specified options and connection factory. # This covers more advanced use cases that might not be supported by an URI connection string such as tunneling connection. @@ -81,9 +80,6 @@ module DB # Closes all connection to the database. def close - @statements_cache.each_value &.close - @statements_cache.clear - @pool.close end @@ -99,15 +95,6 @@ module DB # :nodoc: def fetch_or_build_prepared_statement(query) : PoolStatement - if @connection_options.prepared_statements_cache - @statements_cache.fetch(query) { build_prepared_statement(query) } - else - build_prepared_statement(query) - end - end - - # :nodoc: - def build_prepared_statement(query) : PoolStatement PoolPreparedStatement.new(self, query) end diff --git a/src/db/pool_prepared_statement.cr b/src/db/pool_prepared_statement.cr index 164ae756..41564cd3 100644 --- a/src/db/pool_prepared_statement.cr +++ b/src/db/pool_prepared_statement.cr @@ -4,7 +4,7 @@ module DB # The execution of the statement is retried according to the pool configuration. # # See `PoolStatement` - class PoolPreparedStatement < PoolStatement + struct PoolPreparedStatement < PoolStatement def initialize(db : Database, query : String) super end diff --git a/src/db/pool_statement.cr b/src/db/pool_statement.cr index f0fc2b2f..20a8214b 100644 --- a/src/db/pool_statement.cr +++ b/src/db/pool_statement.cr @@ -3,7 +3,7 @@ module DB # a statement from the DB needs to be able to represent a statement in any # of the connections of the pool. Otherwise the user will need to deal with # actual connections in some point. - abstract class PoolStatement + abstract struct PoolStatement include StatementMethods def initialize(@db : Database, @query : String) diff --git a/src/db/pool_unprepared_statement.cr b/src/db/pool_unprepared_statement.cr index c58fafd9..7ba42e70 100644 --- a/src/db/pool_unprepared_statement.cr +++ b/src/db/pool_unprepared_statement.cr @@ -4,7 +4,7 @@ module DB # The execution of the statement is retried according to the pool configuration. # # See `PoolStatement` - class PoolUnpreparedStatement < PoolStatement + struct PoolUnpreparedStatement < PoolStatement def initialize(db : Database, query : String) super end diff --git a/src/db/statement.cr b/src/db/statement.cr index 6602e6d7..b3b32f6b 100644 --- a/src/db/statement.cr +++ b/src/db/statement.cr @@ -2,11 +2,6 @@ module DB # Common interface for connection based statements # and for connection pool statements. module StatementMethods - include Disposable - - protected def do_close - end - # See `QueryMethods#scalar` def scalar(*args_, args : Array? = nil) query(*args_, args: args) do |rs| @@ -47,6 +42,10 @@ module DB # 6. `#do_close` is called to release the statement resources. abstract class Statement include StatementMethods + include Disposable + + protected def do_close + end # :nodoc: getter connection From 7a19a5da872bfeb22234fe10a23c447870b1095b Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Fri, 1 Dec 2023 23:55:06 -0300 Subject: [PATCH 4/7] Drop StringKeyCache mutex The StringKeyCache is now only used inside a connection. It's assumed that connections are not used concurrently with multiple queries. --- src/db/string_key_cache.cr | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/db/string_key_cache.cr b/src/db/string_key_cache.cr index ef77d67d..f2cae629 100644 --- a/src/db/string_key_cache.cr +++ b/src/db/string_key_cache.cr @@ -1,28 +1,21 @@ module DB class StringKeyCache(T) @cache = {} of String => T - @mutex = Mutex.new def fetch(key : String) : T - @mutex.synchronize do - value = @cache.fetch(key, nil) - value = @cache[key] = yield unless value - value - end + value = @cache.fetch(key, nil) + value = @cache[key] = yield unless value + value end def each_value - @mutex.synchronize do - @cache.each do |_, value| - yield value - end + @cache.each do |_, value| + yield value end end def clear - @mutex.synchronize do - @cache.clear - end + @cache.clear end end end From 0780916845bd9800f1b6b03500cc4d11e9f9005d Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Sat, 2 Dec 2023 00:02:31 -0300 Subject: [PATCH 5/7] Drop do_close in pool statements --- src/db/pool_prepared_statement.cr | 3 --- src/db/pool_unprepared_statement.cr | 4 ---- 2 files changed, 7 deletions(-) diff --git a/src/db/pool_prepared_statement.cr b/src/db/pool_prepared_statement.cr index 41564cd3..dea14ca8 100644 --- a/src/db/pool_prepared_statement.cr +++ b/src/db/pool_prepared_statement.cr @@ -9,9 +9,6 @@ module DB super end - protected def do_close - end - # builds a statement over a real connection private def build_statement : Statement conn = @db.pool.checkout diff --git a/src/db/pool_unprepared_statement.cr b/src/db/pool_unprepared_statement.cr index 7ba42e70..fcf56d88 100644 --- a/src/db/pool_unprepared_statement.cr +++ b/src/db/pool_unprepared_statement.cr @@ -9,10 +9,6 @@ module DB super end - protected def do_close - # unprepared statements do not need to be release in each connection - end - # builds a statement over a real connection private def build_statement : Statement conn = @db.pool.checkout From 9d2e601f53873380318d76b47b25df88636204be Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Sat, 2 Dec 2023 22:18:15 -0300 Subject: [PATCH 6/7] Add specs and update comment --- spec/database_spec.cr | 19 +++++++++++++++++++ spec/dummy_driver.cr | 1 + src/db/pool.cr | 2 -- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/spec/database_spec.cr b/spec/database_spec.cr index 74373aff..38f963b8 100644 --- a/spec/database_spec.cr +++ b/spec/database_spec.cr @@ -179,6 +179,25 @@ describe DB::Database do end end + it "should not checkout multiple connections if there is a statement error" do + with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=10&retry_attempts=10" do |db| + expect_raises DB::Error do + db.exec("syntax error") + end + DummyDriver::DummyConnection.connections.size.should eq(1) + end + end + + it "should attempt all retries if connection is lost" do + with_dummy "dummy://localhost:1027?initial_pool_size=1&max_pool_size=1&retry_attempts=10" do |db| + expect_raises DB::PoolRetryAttemptsExceeded do + db.exec("raise ConnectionLost") + end + # 1 initial + 10 retries + DummyDriver::DummyConnection.connections.size.should eq(11) + end + end + describe "prepared_statements connection option" do it "defaults to true" do with_dummy "dummy://localhost:1027" do |db| diff --git a/spec/dummy_driver.cr b/spec/dummy_driver.cr index 8d0f7e57..84ab5b79 100644 --- a/spec/dummy_driver.cr +++ b/spec/dummy_driver.cr @@ -142,6 +142,7 @@ class DummyDriver < DB::Driver super(connection, command) @@statements_count.add(1) raise DB::Error.new(command) if command == "syntax error" + raise DB::ConnectionLost.new(connection) if command == "raise ConnectionLost" end def self.statements_count diff --git a/src/db/pool.cr b/src/db/pool.cr index aeef2d67..378e3035 100644 --- a/src/db/pool.cr +++ b/src/db/pool.cr @@ -206,8 +206,6 @@ module DB # if the connection is lost it will be closed by # the exception to release resources # we still need to remove it from the known pool. - # Closed connection will be evicted from statement cache - # in PoolPreparedStatement#clean_connections sync { delete(e.resource) } rescue e : PoolResourceRefused # a ConnectionRefused means a new connection From ca51cad5476ebbbebd997151b2cf83946b83729c Mon Sep 17 00:00:00 2001 From: "Brian J. Cardiff" Date: Sun, 3 Dec 2023 20:07:58 -0300 Subject: [PATCH 7/7] Fix typo --- spec/manual/pool_concurrency_test.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/manual/pool_concurrency_test.cr b/spec/manual/pool_concurrency_test.cr index 84bdb289..a36f96be 100644 --- a/spec/manual/pool_concurrency_test.cr +++ b/spec/manual/pool_concurrency_test.cr @@ -61,7 +61,7 @@ puts " Multi Threaded : #{multi_threaded ? "Yes (#{multi_threaded})" : "No"}" puts "Total Connections : #{DummyDriver::DummyConnection.connections_count}" puts " Total Statements : #{DummyDriver::DummyStatement.statements_count}" puts " Total Queries : #{DummyDriver::DummyStatement.statements_exec_count}" -puts "Throughtput (q/s) : #{DummyDriver::DummyStatement.statements_exec_count / duration}" +puts " Throughput (q/s) : #{DummyDriver::DummyStatement.statements_exec_count / duration}" if !release puts "WARNING: This should be run in release mode."