forked from mastodon/mastodon
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add request pool to improve delivery performance (mastodon#10353)
* Add request pool to improve delivery performance Fix mastodon#7909 * Ensure connection is closed when exception interrupts execution * Remove Timeout#timeout from socket connection * Fix infinite retrial loop on HTTP::ConnectionError * Close sockets on failure, reduce idle time to 90 seconds * Add MAX_REQUEST_POOL_SIZE option to limit concurrent connections to the same server * Use a shared pool size, 512 by default, to stay below open file limit * Add some tests * Add more tests * Reduce MAX_IDLE_TIME from 90 to 30 seconds, reap every 30 seconds * Use a shared pool that returns preferred connection but re-purposes other ones when needed * Fix wrong connection being returned on subsequent calls within the same thread * Reduce mutex calls on flushes from 2 to 1 and add test for reaping
- Loading branch information
Showing
10 changed files
with
488 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,3 +148,4 @@ group :production do | |
end | ||
|
||
gem 'concurrent-ruby', require: false | ||
gem 'connection_pool', require: false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'connection_pool' | ||
require_relative './shared_timed_stack' | ||
|
||
class ConnectionPool::SharedConnectionPool < ConnectionPool | ||
def initialize(options = {}, &block) | ||
super(options, &block) | ||
|
||
@available = ConnectionPool::SharedTimedStack.new(@size, &block) | ||
end | ||
|
||
delegate :size, :flush, to: :@available | ||
|
||
def with(preferred_tag, options = {}) | ||
Thread.handle_interrupt(Exception => :never) do | ||
conn = checkout(preferred_tag, options) | ||
|
||
begin | ||
Thread.handle_interrupt(Exception => :immediate) do | ||
yield conn | ||
end | ||
ensure | ||
checkin(preferred_tag) | ||
end | ||
end | ||
end | ||
|
||
def checkout(preferred_tag, options = {}) | ||
if ::Thread.current[key(preferred_tag)] | ||
::Thread.current[key_count(preferred_tag)] += 1 | ||
::Thread.current[key(preferred_tag)] | ||
else | ||
::Thread.current[key_count(preferred_tag)] = 1 | ||
::Thread.current[key(preferred_tag)] = @available.pop(preferred_tag, options[:timeout] || @timeout) | ||
end | ||
end | ||
|
||
def checkin(preferred_tag) | ||
if ::Thread.current[key(preferred_tag)] | ||
if ::Thread.current[key_count(preferred_tag)] == 1 | ||
@available.push(::Thread.current[key(preferred_tag)]) | ||
::Thread.current[key(preferred_tag)] = nil | ||
else | ||
::Thread.current[key_count(preferred_tag)] -= 1 | ||
end | ||
else | ||
raise ConnectionPool::Error, 'no connections are checked out' | ||
end | ||
|
||
nil | ||
end | ||
|
||
private | ||
|
||
def key(tag) | ||
:"#{@key}-#{tag}" | ||
end | ||
|
||
def key_count(tag) | ||
:"#{@key_count}-#{tag}" | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
# frozen_string_literal: true | ||
|
||
class ConnectionPool::SharedTimedStack | ||
def initialize(max = 0, &block) | ||
@create_block = block | ||
@max = max | ||
@created = 0 | ||
@queue = [] | ||
@tagged_queue = Hash.new { |hash, key| hash[key] = [] } | ||
@mutex = Mutex.new | ||
@resource = ConditionVariable.new | ||
end | ||
|
||
def push(connection) | ||
@mutex.synchronize do | ||
store_connection(connection) | ||
@resource.broadcast | ||
end | ||
end | ||
|
||
alias << push | ||
|
||
def pop(preferred_tag, timeout = 5.0) | ||
deadline = current_time + timeout | ||
|
||
@mutex.synchronize do | ||
loop do | ||
return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty? | ||
|
||
connection = try_create(preferred_tag) | ||
return connection if connection | ||
|
||
to_wait = deadline - current_time | ||
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 | ||
|
||
@resource.wait(@mutex, to_wait) | ||
end | ||
end | ||
end | ||
|
||
def empty? | ||
size.zero? | ||
end | ||
|
||
def size | ||
@mutex.synchronize do | ||
@queue.size | ||
end | ||
end | ||
|
||
def flush | ||
@mutex.synchronize do | ||
@queue.delete_if do |connection| | ||
delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME) | ||
|
||
if delete | ||
@tagged_queue[connection.site].delete(connection) | ||
connection.close | ||
@created -= 1 | ||
end | ||
|
||
delete | ||
end | ||
end | ||
end | ||
|
||
private | ||
|
||
def try_create(preferred_tag) | ||
if @created == @max && !@queue.empty? | ||
throw_away_connection = @queue.pop | ||
@tagged_queue[throw_away_connection.site].delete(throw_away_connection) | ||
@create_block.call(preferred_tag) | ||
elsif @created != @max | ||
connection = @create_block.call(preferred_tag) | ||
@created += 1 | ||
connection | ||
end | ||
end | ||
|
||
def fetch_preferred_connection(preferred_tag) | ||
connection = @tagged_queue[preferred_tag].pop | ||
@queue.delete(connection) | ||
connection | ||
end | ||
|
||
def current_time | ||
Process.clock_gettime(Process::CLOCK_MONOTONIC) | ||
end | ||
|
||
def store_connection(connection) | ||
@tagged_queue[connection.site].push(connection) | ||
@queue.push(connection) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.