diff --git a/README.md b/README.md index 2a212be..804929c 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,34 @@ cp.with { |conn| conn.get('some-count') } Like `shutdown`, this will block until all connections are checked in and closed. +## Reap + +You can reap idle connections in the ConnectionPool instance to close connections that were created but have not been used for a certain amount of time. This can be useful to run periodically in a separate thread especially if keeping the connection open is resource intensive. + +You can specify how many seconds the connections have to be idle for them to be reaped. +Defaults to 60 seconds. + +```ruby +cp = ConnectionPool.new { Redis.new } +cp.reap(300) { |conn| conn.close } # Reaps connections that have been idle for 300 seconds (5 minutes). +``` + +### Reaper Thread + +You can start your own reaper thread to reap idle connections in the ConnectionPool instance on a regular interval. + +```ruby +cp = ConnectionPool.new { Redis.new } + +# Start a reaper thread to reap connections that have been idle for 300 seconds (5 minutes). +Thread.new do + loop do + cp.reap(300) { |conn| conn.close } + sleep 300 + end +end +``` + ## Current State There are several methods that return information about a pool. @@ -109,11 +137,15 @@ There are several methods that return information about a pool. cp = ConnectionPool.new(size: 10) { Redis.new } cp.size # => 10 cp.available # => 10 +cp.idle # => 0 cp.with do |conn| cp.size # => 10 cp.available # => 9 + cp.idle # => 0 end + +cp.idle # => 1 ``` Notes diff --git a/lib/connection_pool.rb b/lib/connection_pool.rb index 1a75222..06122ff 100644 --- a/lib/connection_pool.rb +++ b/lib/connection_pool.rb @@ -160,6 +160,12 @@ def reload(&block) @available.shutdown(reload: true, &block) end + ## Reaps idle connections that have been idle for over +idle_seconds+. + # +idle_seconds+ defaults to 60. + def reap(idle_seconds = 60, &block) + @available.reap(idle_seconds, &block) + end + # Size of this connection pool attr_reader :size # Automatically drop all connections after fork @@ -169,6 +175,11 @@ def reload(&block) def available @available.length end + + # Number of pool entries created and idle in the pool. + def idle + @available.idle + end end require_relative "connection_pool/timed_stack" diff --git a/lib/connection_pool/timed_stack.rb b/lib/connection_pool/timed_stack.rb index 8bad99e..e67af8a 100644 --- a/lib/connection_pool/timed_stack.rb +++ b/lib/connection_pool/timed_stack.rb @@ -98,6 +98,19 @@ def shutdown(reload: false, &block) end end + ## + # Reaps connections that were checked in more than +idle_seconds+ ago. + def reap(idle_seconds, &block) + raise ArgumentError, "reap must receive a block" unless block + raise ArgumentError, "idle_seconds must be a number" unless idle_seconds.is_a?(Numeric) + + @mutex.synchronize do + reap_start_time = current_time + + reap_idle_connections(idle_seconds, reap_start_time, &block) + end + end + ## # Returns +true+ if there are no available connections. @@ -112,6 +125,12 @@ def length @max - @created + @que.length end + ## + # The number of connections created and available on the stack. + def idle + @que.length + end + private def current_time @@ -133,7 +152,7 @@ def connection_stored?(options = nil) # This method must return a connection from the stack. def fetch_connection(options = nil) - @que.pop + @que.pop&.first end ## @@ -149,13 +168,38 @@ def shutdown_connections(options = nil) @created = 0 end + ## + # This is an extension point for TimedStack and is called with a mutex. + # + # This method iterates over the connections in the stack and reaps the oldest idle connections one at a time until + # the first connection is not idle. This requires that the stack is kept in order of checked in time (oldest first). + + def reap_idle_connections(idle_seconds, reap_start_time, &reap_block) + while idle_connections?(idle_seconds, reap_start_time) + conn, _last_checked_out = @que.shift + reap_block.call(conn) + end + end + + ## + # This is an extension point for TimedStack and is called with a mutex. + # + # Returns true if the first connection in the stack has been idle for more than idle_seconds + + def idle_connections?(idle_seconds, reap_start_time) + if connection_stored? + _conn, last_checked_out = @que.first + reap_start_time - last_checked_out > idle_seconds + end + end + ## # This is an extension point for TimedStack and is called with a mutex. # # This method must return +obj+ to the stack. def store_connection(obj, options = nil) - @que.push obj + @que.push [obj, current_time] end ## diff --git a/test/test_connection_pool.rb b/test/test_connection_pool.rb index cb4931d..c1539d3 100644 --- a/test/test_connection_pool.rb +++ b/test/test_connection_pool.rb @@ -31,7 +31,7 @@ def do_something_with_block end def respond_to?(method_id, *args) - method_id == :do_magic || super(method_id, *args) + method_id == :do_magic || super end end @@ -467,6 +467,74 @@ def test_shutdown_is_executed_for_all_connections_in_wrapped_pool assert_equal [["shutdown"]] * 3, recorders.map { |r| r.calls } end + def test_reap_removes_idle_connections + recorders = [] + pool = ConnectionPool.new(size: 1) do + Recorder.new.tap { |r| recorders << r } + end + + pool.with { |conn| conn } + + assert_equal 1, pool.idle + + pool.reap(0) { |recorder| recorder.do_work("reap") } + + assert_equal 0, pool.idle + assert_equal [["reap"]], recorders.map(&:calls) + end + + def test_reap_removes_all_idle_connections + recorders = [] + pool = ConnectionPool.new(size: 3) do + Recorder.new.tap { |r| recorders << r } + end + threads = use_pool(pool, 3) + kill_threads(threads) + + assert_equal 3, pool.idle + + pool.reap(0) { |recorder| recorder.do_work("reap") } + + assert_equal 0, pool.idle + assert_equal [["reap"]] * 3, recorders.map(&:calls) + end + + def test_reap_does_not_remove_connections_if_outside_idle_time + pool = ConnectionPool.new(size: 1) { Object.new } + + pool.with { |conn| conn } + + pool.reap(1000) { |conn| flunk "should not reap active connection" } + end + + def test_idle_returns_number_of_idle_connections + pool = ConnectionPool.new(size: 1) { Object.new } + + assert_equal 0, pool.idle + + pool.checkout + + assert_equal 0, pool.idle + + pool.checkin + + assert_equal 1, pool.idle + end + + def test_idle_with_multiple_connections + pool = ConnectionPool.new(size: 3) { Object.new } + + assert_equal 0, pool.idle + + threads = use_pool(pool, 3) + + assert_equal 0, pool.idle + + kill_threads(threads) + + assert_equal 3, pool.idle + end + def test_wrapper_wrapped_pool wrapper = ConnectionPool::Wrapper.new { NetworkConnection.new } assert_equal ConnectionPool, wrapper.wrapped_pool.class diff --git a/test/test_connection_pool_timed_stack.rb b/test/test_connection_pool_timed_stack.rb index a75ec78..64037fd 100644 --- a/test/test_connection_pool_timed_stack.rb +++ b/test/test_connection_pool_timed_stack.rb @@ -33,6 +33,20 @@ def test_length assert_equal 1, stack.length end + def test_idle + stack = ConnectionPool::TimedStack.new(1) { Object.new } + + assert_equal 0, stack.idle + + popped = stack.pop + + assert_equal 0, stack.idle + + stack.push popped + + assert_equal 1, stack.idle + end + def test_object_creation_fails stack = ConnectionPool::TimedStack.new(2) { raise "failure" } @@ -147,4 +161,121 @@ def test_shutdown refute_empty called assert_empty @stack end + + def test_reap + @stack.push Object.new + + called = [] + + @stack.reap(0) do |object| + called << object + end + + refute_empty called + assert_empty @stack + end + + def test_reap_large_idle_seconds + @stack.push Object.new + + called = [] + + @stack.reap(100) do |object| + called << object + end + + assert_empty called + refute_empty @stack + end + + def test_reap_no_block + assert_raises(ArgumentError) do + @stack.reap(0) + end + end + + def test_reap_non_numeric_idle_seconds + assert_raises(ArgumentError) do + @stack.reap("0") { |object| object } + end + end + + def test_reap_with_multiple_connections + stack = ConnectionPool::TimedStack.new(2) { Object.new } + stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + conn1 = stack.pop + conn2 = stack.pop + + stack.stub :current_time, stubbed_time do + stack.push conn1 + end + + stack.stub :current_time, stubbed_time + 1 do + stack.push conn2 + end + + called = [] + + stack.stub :current_time, stubbed_time + 2 do + stack.reap(1.5) do |object| + called << object + end + end + + assert_equal [conn1], called + refute_empty stack + assert_equal 1, stack.idle + end + + def test_reap_with_multiple_connections_and_zero_idle_seconds + stack = ConnectionPool::TimedStack.new(2) { Object.new } + stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + conn1 = stack.pop + conn2 = stack.pop + + stack.stub :current_time, stubbed_time do + stack.push conn1 + end + + stack.stub :current_time, stubbed_time + 1 do + stack.push conn2 + end + + called = [] + + stack.stub :current_time, stubbed_time + 2 do + stack.reap(0) do |object| + called << object + end + end + + assert_equal [conn1, conn2], called + assert_empty stack + end + + def test_reap_with_multiple_connections_and_idle_seconds_outside_range + stack = ConnectionPool::TimedStack.new(2) { Object.new } + stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + conn1 = stack.pop + conn2 = stack.pop + + stack.stub :current_time, stubbed_time do + stack.push conn1 + end + + stack.stub :current_time, stubbed_time + 1 do + stack.push conn2 + end + + called = [] + + stack.stub :current_time, stubbed_time + 2 do + stack.reap(3) do |object| + called << object + end + end + + assert_empty called + assert_equal 2, stack.idle + end end