From 596d420adc8d7e75ad6d78d2827f9ca91ff49dc2 Mon Sep 17 00:00:00 2001 From: iMacTia Date: Wed, 24 Jul 2019 18:08:35 +0100 Subject: [PATCH] WHAT --- faraday.gemspec | 1 + lib/faraday/adapter.rb | 6 ++ .../adapter/concerns/connection_pooling.rb | 38 +++++++++++ lib/faraday/adapter/patron.rb | 67 +++++++++++-------- spec/faraday/adapter/patron_spec.rb | 2 +- .../support/shared_examples/request_method.rb | 19 ++++++ 6 files changed, 104 insertions(+), 29 deletions(-) create mode 100644 lib/faraday/adapter/concerns/connection_pooling.rb diff --git a/faraday.gemspec b/faraday.gemspec index b684baac0..42a7e7679 100644 --- a/faraday.gemspec +++ b/faraday.gemspec @@ -18,6 +18,7 @@ Gem::Specification.new do |spec| spec.required_ruby_version = '>= 2.3' + spec.add_dependency 'connection_pool', '~> 2.2' spec.add_dependency 'multipart-post', '>= 1.2', '< 3' spec.require_paths = %w[lib spec/external_adapters] diff --git a/lib/faraday/adapter.rb b/lib/faraday/adapter.rb index 6a3bb7d5c..676e1713e 100644 --- a/lib/faraday/adapter.rb +++ b/lib/faraday/adapter.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'faraday/adapter/concerns/connection_pooling' + module Faraday # Base class for all Faraday adapters. Adapters are # responsible for fulfilling a Faraday request. @@ -40,10 +42,14 @@ def inherited(subclass) extend Parallelism self.supports_parallel = false + include ConnectionPooling + self.supports_pooling = false + def initialize(_app = nil, opts = {}, &block) @app = ->(env) { env.response } @connection_options = opts @config_block = block + initialize_pool(opts[:pool] || {}) if self.class.supports_pooling end def call(env) diff --git a/lib/faraday/adapter/concerns/connection_pooling.rb b/lib/faraday/adapter/concerns/connection_pooling.rb new file mode 100644 index 000000000..4851301b7 --- /dev/null +++ b/lib/faraday/adapter/concerns/connection_pooling.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +# This module marks an Adapter as supporting connection pooling. +module ConnectionPooling + def self.included(base) + base.extend(ClassMethods) + end + + # Class methods injected into the class including this module. + module ClassMethods + attr_accessor :supports_pooling + + def inherited(subclass) + super + subclass.supports_pooling = supports_pooling + end + end + + attr_reader :pool + + MISSING_CONNECTION_ERROR = 'You need to define a `connection` method' \ + 'in order to support connection pooling!' + + # Initializes the connection pool. + # + # @param opts [Hash] the options to pass to `ConnectionPool` initializer. + def initialize_pool(opts = {}) + ensure_connection_defined! + @pool = ConnectionPool.new(opts, &method(:connection)) + end + + # Checks if `connection` method exists and raises an error otherwise. + def ensure_connection_defined! + return if self.class.method_defined?(:connection) + + raise NoMethodError, MISSING_CONNECTION_ERROR + end +end diff --git a/lib/faraday/adapter/patron.rb b/lib/faraday/adapter/patron.rb index 3020dcf4b..0927de2d7 100644 --- a/lib/faraday/adapter/patron.rb +++ b/lib/faraday/adapter/patron.rb @@ -6,39 +6,19 @@ class Adapter class Patron < Faraday::Adapter dependency 'patron' + self.supports_pooling = true + def connection + ::Patron::Session.new + end + def call(env) super # TODO: support streaming requests env[:body] = env[:body].read if env[:body].respond_to? :read - session = ::Patron::Session.new - @config_block&.call(session) - if (env[:url].scheme == 'https') && env[:ssl] - configure_ssl(session, env[:ssl]) - end - - if (req = env[:request]) - if req[:timeout] - session.timeout = session.connect_timeout = req[:timeout] - end - session.connect_timeout = req[:open_timeout] if req[:open_timeout] - - if (proxy = req[:proxy]) - proxy_uri = proxy[:uri].dup - proxy_uri.user = proxy[:user] && - Utils.escape(proxy[:user]).gsub('+', '%20') - proxy_uri.password = proxy[:password] && - Utils.escape(proxy[:password]).gsub('+', '%20') - session.proxy = proxy_uri.to_s - end - end - - response = begin - data = env[:body] ? env[:body].to_s : nil - session.request(env[:method], env[:url].to_s, - env[:request_headers], data: data) - rescue Errno::ECONNREFUSED, ::Patron::ConnectionFailed - raise Faraday::ConnectionFailed, $ERROR_INFO + response = pool.with do |session| + @config_block&.call(session) + perform_request(session, env) end if (req = env[:request]).stream_response? @@ -83,6 +63,37 @@ def call(env) end end + def perform_request(session, env) + if (env[:url].scheme == 'https') && env[:ssl] + configure_ssl(session, env[:ssl]) + end + + if (req = env[:request]) + if req[:timeout] + session.timeout = session.connect_timeout = req[:timeout] + end + session.connect_timeout = req[:open_timeout] if req[:open_timeout] + + if (proxy = req[:proxy]) + proxy_uri = proxy[:uri].dup + proxy_uri.user = proxy[:user] && + Utils.escape(proxy[:user]).gsub('+', '%20') + proxy_uri.password = proxy[:password] && + Utils.escape(proxy[:password]).gsub('+', '%20') + session.proxy = proxy_uri.to_s + end + end + + begin + data = env[:body] ? env[:body].to_s : nil + puts "start request... #{pool.available}" + session.request(env[:method], env[:url].to_s, + env[:request_headers], data: data) + rescue Errno::ECONNREFUSED, ::Patron::ConnectionFailed + raise Faraday::ConnectionFailed, $ERROR_INFO + end + end + def configure_ssl(session, ssl) if ssl.fetch(:verify, true) session.cacert = ssl[:ca_file] diff --git a/spec/faraday/adapter/patron_spec.rb b/spec/faraday/adapter/patron_spec.rb index 812fd1a06..d60fca9ee 100644 --- a/spec/faraday/adapter/patron_spec.rb +++ b/spec/faraday/adapter/patron_spec.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true RSpec.describe Faraday::Adapter::Patron do - features :reason_phrase_parse + features :reason_phrase_parse, :pooling it_behaves_like 'an adapter' diff --git a/spec/support/shared_examples/request_method.rb b/spec/support/shared_examples/request_method.rb index 8e2828a21..412b7f2f6 100644 --- a/spec/support/shared_examples/request_method.rb +++ b/spec/support/shared_examples/request_method.rb @@ -231,4 +231,23 @@ expect { conn.public_send(http_method, '/') }.to raise_error(Faraday::ProxyAuthError) end + + on_feature :pooling do + it 'uses a connection_pool internally' do + pool = nil + allow_any_instance_of(described_class).to receive(:pool).and_wrap_original do |m, *args| + pool ||= m.call(*args) + end + + request_stub.to_return do |req| + nested = (req.headers['Nested'] || 1).to_i + return if nested >= pool.size + expect(pool.available).to eq(pool.size - nested) + conn.public_send(http_method, '/', nil, nested: nested + 1) + end + + conn.public_send(http_method, '/') + request_stub.disable + end + end end