Skip to content

Commit

Permalink
Introduce system for garbage collecting connection managers
Browse files Browse the repository at this point in the history
Introduces a system for garbage collecting connection managers in an
attempt to solve #850.

Previously, the number of connection managers (and by extension the
number of connections that they were holding) would stay stable if a
program used a stable number of threads. However, if threads were used
disposably, the number of active connection managers out there could
continue to grow unchecked, and each of those could be holding one or
more dead connections which are no longer open, but still holding a file
descriptor waiting to be unlinked in disposed of by Ruby's GC.

This PR introduces a connection manager garbage collector that runs
periodically whenever a new connection manager is created. Connection
managers get a timestamp to indicate when they were last used, and the
GC runs through each one and prunes any that haven't seen use within a
certain threshold (currently, 120 seconds). This should have the effect
of removing connection managers as they're not needed anymore, and thus
resolving the socket leakage seen in #850.

I had to make a couple implementation tweaks to get this working
correctly. Namely:

* The `StripeClient` class now tracks thread contexts instead of
  connection managers. This is so that when we're disposing of a
  connection manager, we can set `default_connection_manager` on its
  parent thread context to `nil` so that it's not still tracking a
  connection manager that we're trying to get rid of.

* `StripeClient` instances can still be instantiated as before, but no
  longer internalize a reference to their own connection manager,
  instead falling back to the one in the current thread context. The
  rationale is that when trying to dispose of a connection manager, we'd
  also have to dispose of its reference in any outstanding
  `StripeClient` instances that might still be tracking it, and that
  starts to get a little unwieldy. I've left `#connection_manager` in
  place for backwards compatibility, but marked it as deprecated.
  • Loading branch information
brandur committed Sep 20, 2019
1 parent 1f80da4 commit d350f67
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 54 deletions.
53 changes: 35 additions & 18 deletions lib/stripe/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,35 @@ module Stripe
#
# Note that this class in itself is *not* thread safe. We expect it to be
# instantiated once per thread.
#
# Note also that this class doesn't currently clean up after itself because
# it expects to only ever have a few connections (unless `.clear` is called
# manually). It'd be possible to tank memory by constantly changing the value
# of `Stripe.api_base` or the like. A possible improvement might be to detect
# and prune old connections whenever a request is executed.
class ConnectionManager
# Timestamp indicating when the connection manager last made a request.
# This is used by `StripeClient` to determine whether a connection manager
# should be garbage collected or not.
attr_reader :last_used

def initialize
@active_connections = {}
@last_used = Time.now

# A connection manager may be accessed across threads as one thread makes
# requests on it while another is trying to clear it (either because it's
# trying to garbage collect the manager or trying to clear it because a
# configuration setting has changed). That's probably thread-safe already
# because of Ruby's GIL, but just in case the library's running on JRuby
# or the like, use a mutex to synchronize access in this connection
# manager.
@mutex = Mutex.new
end

# Finishes any active connections by closing their TCP connection and
# clears them from internal tracking.
def clear
@active_connections.each do |_, connection|
connection.finish
@mutex.synchronize do
@active_connections.each do |_, connection|
connection.finish
end
@active_connections = {}
end
@active_connections = {}
end

# Gets a connection for a given URI. This is for internal use only as it's
Expand All @@ -35,17 +46,19 @@ def clear
#
# `uri` is expected to be a string.
def connection_for(uri)
u = URI.parse(uri)
connection = @active_connections[[u.host, u.port]]
@mutex.synchronize do
u = URI.parse(uri)
connection = @active_connections[[u.host, u.port]]

if connection.nil?
connection = create_connection(u)
connection.start
if connection.nil?
connection = create_connection(u)
connection.start

@active_connections[[u.host, u.port]] = connection
end
@active_connections[[u.host, u.port]] = connection
end

connection
connection
end
end

# Executes an HTTP request to the given URI with the given method. Also
Expand All @@ -65,6 +78,8 @@ def execute_request(method, uri, body: nil, headers: nil, query: nil)
raise ArgumentError, "query should be a string" \
if query && !query.is_a?(String)

@last_used = Time.now

connection = connection_for(uri)

u = URI.parse(uri)
Expand All @@ -74,7 +89,9 @@ def execute_request(method, uri, body: nil, headers: nil, query: nil)
u.path
end

connection.send_request(method.to_s.upcase, path, body, headers)
@mutex.synchronize do
connection.send_request(method.to_s.upcase, path, body, headers)
end
end

#
Expand Down
108 changes: 86 additions & 22 deletions lib/stripe/stripe_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ module Stripe
# recover both a resource a call returns as well as a response object that
# contains information on the HTTP call.
class StripeClient
# A set of all known connection managers across all threads and a mutex to
# A set of all known thread contexts across all threads and a mutex to
# synchronize global access to them.
@all_connection_managers = []
@all_connection_managers_mutex = Mutex.new
@thread_contexts_with_connection_managers = []
@thread_contexts_with_connection_managers_mutex = Mutex.new
@last_connection_manager_gc = Time.now

attr_accessor :connection_manager

# Initializes a new `StripeClient`. Expects a `ConnectionManager` object,
# and uses a default connection manager unless one is passed.
def initialize(connection_manager = nil)
self.connection_manager = connection_manager ||
self.class.default_connection_manager
# Initializes a new `StripeClient`.
#
# Takes a connection manager object for backwards compatibility only, and
# that use is DEPRECATED.
def initialize(_connection_manager = nil)
@system_profiler = SystemProfiler.new
@last_request_metrics = nil
end
Expand All @@ -42,26 +41,34 @@ def self.clear_all_connection_managers
# Just a quick path for when configuration is being set for the first
# time before any connections have been opened. There is technically some
# potential for thread raciness here, but not in a practical sense.
return if @all_connection_managers.empty?

@all_connection_managers_mutex.synchronize do
@all_connection_managers.each(&:clear)
return if @thread_contexts_with_connection_managers.empty?

@thread_contexts_with_connection_managers_mutex.synchronize do
@thread_contexts_with_connection_managers.each do |thread_context|
# Note that the thread context itself is not destroyed, but we clear
# its connection manager and remove our reference to it. If it ever
# makes a new request we'll give it a new connection manager and
# it'll go back into `@thread_contexts_with_connection_managers`.
thread_context.default_connection_manager.clear
thread_context.default_connection_manager = nil
end
@thread_contexts_with_connection_managers.clear
end
end

# A default client for the current thread.
def self.default_client
current_thread_context.default_client ||=
StripeClient.new(default_connection_manager)
current_thread_context.default_client ||= StripeClient.new
end

# A default connection manager for the current thread.
def self.default_connection_manager
current_thread_context.default_connection_manager ||= begin
connection_manager = ConnectionManager.new

@all_connection_managers_mutex.synchronize do
@all_connection_managers << connection_manager
@thread_contexts_with_connection_managers_mutex.synchronize do
maybe_gc_connection_managers
@thread_contexts_with_connection_managers << current_thread_context
end

connection_manager
Expand Down Expand Up @@ -131,6 +138,15 @@ def self.sleep_time(num_retries)
sleep_seconds
end

# Gets the connection manager in use for the current `StripeClient`.
#
# This method is DEPRECATED and for backwards compatibility only.
def connection_manager
self.class.default_connection_manager
end
extend Gem::Deprecate
deprecate :connection_manager, :none, 2020, 9

# Executes the API call within the given block. Usage looks like:
#
# client = StripeClient.new
Expand Down Expand Up @@ -207,10 +223,10 @@ def execute_request(method, path,
context.query = query

http_resp = execute_request_with_rescues(method, api_base, context) do
connection_manager.execute_request(method, url,
body: body,
headers: headers,
query: query)
self.class.default_connection_manager.execute_request(method, url,
body: body,
headers: headers,
query: query)
end

begin
Expand All @@ -233,6 +249,14 @@ def execute_request(method, path,
# private
#

# How often to check (in seconds) for connection managers that haven't been
# used in a long time and which should be garbage collected.
CONNECTION_MANAGER_GC_PERIOD = 60

# Time (in seconds) that a connection manager has not been used before it's
# eligible for garbage collection.
CONNECTION_MANAGER_GC_LAST_USED_EXPIRY = 120

ERROR_MESSAGE_CONNECTION =
"Unexpected error communicating when trying to connect to " \
"Stripe (%s). You may be seeing this message because your DNS is not " \
Expand Down Expand Up @@ -320,6 +344,46 @@ def self.current_thread_context
Thread.current[:stripe_client__internal_use_only] ||= ThreadContext.new
end

# Garbage collects connection managers that haven't been used in some time,
# with the idea being that we want to remove old connection managers that
# belong to dead threads and the like.
#
# Prefixed with `maybe_` because garbage collection will only run
# periodically so that we're not constantly engaged in busy work. If
# connection managers live a little passed their useful age it's not
# harmful, so it's not necessary to get them right away.
#
# For testability, returns `nil` if it didn't run and the number of
# connection managers that were garbage collected otherwise.
#
# IMPORTANT: This method is not thread-safe and expects to be called inside
# a lock on `@thread_contexts_with_connection_managers_mutex`.
#
# For internal use only. Does not provide a stable API and may be broken
# with future non-major changes.
def self.maybe_gc_connection_managers
if @last_connection_manager_gc + CONNECTION_MANAGER_GC_PERIOD > Time.now
return nil
end

last_used_threshold = Time.now - CONNECTION_MANAGER_GC_LAST_USED_EXPIRY

pruned_thread_contexts = []
@thread_contexts_with_connection_managers.each do |thread_context|
connection_manager = thread_context.default_connection_manager
next if connection_manager.last_used > last_used_threshold

connection_manager.clear
thread_context.default_connection_manager = nil
pruned_thread_contexts << thread_context
end

@thread_contexts_with_connection_managers -= pruned_thread_contexts
@last_connection_manager_gc = Time.now

pruned_thread_contexts.count
end

private def api_url(url = "", api_base = nil)
(api_base || Stripe.api_base) + url
end
Expand Down
28 changes: 28 additions & 0 deletions test/stripe/connection_manager_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ class ConnectionManagerTest < Test::Unit::TestCase
@manager = Stripe::ConnectionManager.new
end

context "#initialize" do
should "set #last_used to current time" do
t = Time.new(2019)
Timecop.freeze(t) do
assert_equal t, Stripe::ConnectionManager.new.last_used
end
end
end

context "#clear" do
should "clear any active connections" do
stub_request(:post, "#{Stripe.api_base}/path")
Expand Down Expand Up @@ -133,6 +142,25 @@ class ConnectionManagerTest < Test::Unit::TestCase
end
assert_equal e.message, "query should be a string"
end

should "set #last_used to current time" do
stub_request(:post, "#{Stripe.api_base}/path")
.to_return(body: JSON.generate(object: "account"))

t = Time.new(2019)

# Make sure the connection manager is initialized at a different time
# than the one we're going to measure at because `#last_used` is also
# set by the constructor.
manager = Timecop.freeze(t) do
Stripe::ConnectionManager.new
end

Timecop.freeze(t + 1) do
manager.execute_request(:post, "#{Stripe.api_base}/path")
assert_equal t + 1, manager.last_used
end
end
end
end
end
Loading

0 comments on commit d350f67

Please sign in to comment.