Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reap functionality #187

Merged
merged 4 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,34 @@ cp.with { |conn| conn.get('some-count') }

Like `shutdown`, this will block until all connections are checked in and closed.

## Reap

You can reap idle connections in the ConnectionPool instance to close connections that were created but have not been used for a certain amount of time. This can be useful to run periodically in a separate thread especially if keeping the connection open is resource intensive.

You can specify how many seconds the connections have to be idle for them to be reaped.
Defaults to 60 seconds.

```ruby
cp = ConnectionPool.new { Redis.new }
cp.reap(300) { |conn| conn.close } # Reaps connections that have been idle for 300 seconds (5 minutes).
```

### Reaper Thread

You can start your own reaper thread to reap idle connections in the ConnectionPool instance on a regular interval.

```ruby
cp = ConnectionPool.new { Redis.new }

# Start a reaper thread to reap connections that have been idle for 300 seconds (5 minutes).
Thread.new do
loop do
cp.reap(300) { |conn| conn.close }
sleep 300
end
end
```

## Current State

There are several methods that return information about a pool.
Expand All @@ -109,11 +137,15 @@ There are several methods that return information about a pool.
cp = ConnectionPool.new(size: 10) { Redis.new }
cp.size # => 10
cp.available # => 10
cp.idle # => 0

cp.with do |conn|
cp.size # => 10
cp.available # => 9
cp.idle # => 0
end

cp.idle # => 1
```

Notes
Expand Down
11 changes: 11 additions & 0 deletions lib/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def reload(&block)
@available.shutdown(reload: true, &block)
end

## Reaps idle connections that have been idle for over +idle_seconds+.
# +idle_seconds+ defaults to 60.
def reap(idle_seconds = 60, &block)
@available.reap(idle_seconds, &block)
end

# Size of this connection pool
attr_reader :size
# Automatically drop all connections after fork
Expand All @@ -169,6 +175,11 @@ def reload(&block)
def available
@available.length
end

# Number of pool entries created and idle in the pool.
def idle
@available.idle
end
end

require_relative "connection_pool/timed_stack"
Expand Down
48 changes: 46 additions & 2 deletions lib/connection_pool/timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,19 @@ def shutdown(reload: false, &block)
end
end

##
# Reaps connections that were checked in more than +idle_seconds+ ago.
def reap(idle_seconds, &block)
raise ArgumentError, "reap must receive a block" unless block
raise ArgumentError, "idle_seconds must be a number" unless idle_seconds.is_a?(Numeric)

@mutex.synchronize do
reap_start_time = current_time

reap_idle_connections(idle_seconds, reap_start_time, &block)
end
end

##
# Returns +true+ if there are no available connections.

Expand All @@ -112,6 +125,12 @@ def length
@max - @created + @que.length
end

##
# The number of connections created and available on the stack.
def idle
@que.length
end

private

def current_time
Expand All @@ -133,7 +152,7 @@ def connection_stored?(options = nil)
# This method must return a connection from the stack.

def fetch_connection(options = nil)
@que.pop
@que.pop&.first
end

##
Expand All @@ -149,13 +168,38 @@ def shutdown_connections(options = nil)
@created = 0
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method iterates over the connections in the stack and reaps the oldest idle connections one at a time until
# the first connection is not idle. This requires that the stack is kept in order of checked in time (oldest first).

def reap_idle_connections(idle_seconds, reap_start_time, &reap_block)
while idle_connections?(idle_seconds, reap_start_time)
conn, _last_checked_out = @que.shift
reap_block.call(conn)
Comment on lines +179 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this also need to decrement @created?

end
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# Returns true if the first connection in the stack has been idle for more than idle_seconds

def idle_connections?(idle_seconds, reap_start_time)
if connection_stored?
_conn, last_checked_out = @que.first
reap_start_time - last_checked_out > idle_seconds
end
end

##
# This is an extension point for TimedStack and is called with a mutex.
#
# This method must return +obj+ to the stack.

def store_connection(obj, options = nil)
@que.push obj
@que.push [obj, current_time]
end

##
Expand Down
70 changes: 69 additions & 1 deletion test/test_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def do_something_with_block
end

def respond_to?(method_id, *args)
method_id == :do_magic || super(method_id, *args)
method_id == :do_magic || super
end
end

Expand Down Expand Up @@ -467,6 +467,74 @@ def test_shutdown_is_executed_for_all_connections_in_wrapped_pool
assert_equal [["shutdown"]] * 3, recorders.map { |r| r.calls }
end

def test_reap_removes_idle_connections
recorders = []
pool = ConnectionPool.new(size: 1) do
Recorder.new.tap { |r| recorders << r }
end

pool.with { |conn| conn }

assert_equal 1, pool.idle

pool.reap(0) { |recorder| recorder.do_work("reap") }

assert_equal 0, pool.idle
assert_equal [["reap"]], recorders.map(&:calls)
end

def test_reap_removes_all_idle_connections
recorders = []
pool = ConnectionPool.new(size: 3) do
Recorder.new.tap { |r| recorders << r }
end
threads = use_pool(pool, 3)
kill_threads(threads)

assert_equal 3, pool.idle

pool.reap(0) { |recorder| recorder.do_work("reap") }

assert_equal 0, pool.idle
assert_equal [["reap"]] * 3, recorders.map(&:calls)
end

def test_reap_does_not_remove_connections_if_outside_idle_time
pool = ConnectionPool.new(size: 1) { Object.new }

pool.with { |conn| conn }

pool.reap(1000) { |conn| flunk "should not reap active connection" }
end

def test_idle_returns_number_of_idle_connections
pool = ConnectionPool.new(size: 1) { Object.new }

assert_equal 0, pool.idle

pool.checkout

assert_equal 0, pool.idle

pool.checkin

assert_equal 1, pool.idle
end

def test_idle_with_multiple_connections
pool = ConnectionPool.new(size: 3) { Object.new }

assert_equal 0, pool.idle

threads = use_pool(pool, 3)

assert_equal 0, pool.idle

kill_threads(threads)

assert_equal 3, pool.idle
end

def test_wrapper_wrapped_pool
wrapper = ConnectionPool::Wrapper.new { NetworkConnection.new }
assert_equal ConnectionPool, wrapper.wrapped_pool.class
Expand Down
131 changes: 131 additions & 0 deletions test/test_connection_pool_timed_stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def test_length
assert_equal 1, stack.length
end

def test_idle
stack = ConnectionPool::TimedStack.new(1) { Object.new }

assert_equal 0, stack.idle

popped = stack.pop

assert_equal 0, stack.idle

stack.push popped

assert_equal 1, stack.idle
end

def test_object_creation_fails
stack = ConnectionPool::TimedStack.new(2) { raise "failure" }

Expand Down Expand Up @@ -147,4 +161,121 @@ def test_shutdown
refute_empty called
assert_empty @stack
end

def test_reap
@stack.push Object.new

called = []

@stack.reap(0) do |object|
called << object
end

refute_empty called
assert_empty @stack
end

def test_reap_large_idle_seconds
@stack.push Object.new

called = []

@stack.reap(100) do |object|
called << object
end

assert_empty called
refute_empty @stack
end

def test_reap_no_block
assert_raises(ArgumentError) do
@stack.reap(0)
end
end

def test_reap_non_numeric_idle_seconds
assert_raises(ArgumentError) do
@stack.reap("0") { |object| object }
end
end

def test_reap_with_multiple_connections
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(1.5) do |object|
called << object
end
end

assert_equal [conn1], called
refute_empty stack
assert_equal 1, stack.idle
end

def test_reap_with_multiple_connections_and_zero_idle_seconds
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(0) do |object|
called << object
end
end

assert_equal [conn1, conn2], called
assert_empty stack
end

def test_reap_with_multiple_connections_and_idle_seconds_outside_range
stack = ConnectionPool::TimedStack.new(2) { Object.new }
stubbed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
conn1 = stack.pop
conn2 = stack.pop

stack.stub :current_time, stubbed_time do
stack.push conn1
end

stack.stub :current_time, stubbed_time + 1 do
stack.push conn2
end

called = []

stack.stub :current_time, stubbed_time + 2 do
stack.reap(3) do |object|
called << object
end
end

assert_empty called
assert_equal 2, stack.idle
end
end
Loading