diff --git a/lib/graphiti/configuration.rb b/lib/graphiti/configuration.rb index be7b4f19..d1a69098 100644 --- a/lib/graphiti/configuration.rb +++ b/lib/graphiti/configuration.rb @@ -8,6 +8,20 @@ class Configuration # Defaults to false OR if classes are cached (Rails-only) attr_accessor :concurrency + # This number must be considered in accordance with the database + # connection pool size configured in `database.yml`. The connection + # pool should be large enough to accommodate both the foreground + # threads (ie. web server or job worker threads) and background + # threads. For each process, Graphiti will create one global + # executor that uses this many threads to sideload resources + # asynchronously. Thus, the pool size should be at least + # `thread_count + concurrency_max_threads + 1`. For example, if your + # web server has a maximum of 3 threads, and + # `concurrency_max_threads` is set to 4, then your pool size should + # be at least 8. + # @return [Integer] Maximum number of threads to use when fetching sideloads concurrently + attr_accessor :concurrency_max_threads + attr_accessor :respond_to attr_accessor :context_for_endpoint attr_accessor :links_on_demand @@ -26,6 +40,7 @@ class Configuration def initialize @raise_on_missing_sideload = true @concurrency = false + @concurrency_max_threads = 4 @respond_to = [:json, :jsonapi, :xml] @links_on_demand = false @pagination_links_on_demand = false diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index af1f6ed0..fcb0731f 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -2,6 +2,23 @@ module Graphiti class Scope attr_accessor :object, :unpaginated_object attr_reader :pagination + + @thread_pool_executor_mutex = Mutex.new + + def self.thread_pool_executor + return @thread_pool_executor if @thread_pool_executor + + concurrency = Graphiti.config.concurrency_max_threads || 4 + @thread_pool_executor_mutex.synchronize do + @thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 0, + max_threads: concurrency, + max_queue: concurrency * 4, + fallback_policy: :caller_runs + ) + end + end + def initialize(object, resource, query, opts = {}) @object = object @resource = resource @@ -49,7 +66,7 @@ def resolve_sideloads(results) @resource.adapter.close if concurrent } if concurrent - promises << Concurrent::Promise.execute(&resolve_sideload) + promises << Concurrent::Promise.execute(executor: self.class.thread_pool_executor, &resolve_sideload) else resolve_sideload.call end diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index 22acf1f4..42013fbe 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -150,6 +150,21 @@ end end + describe "#concurrency_max_threads" do + include_context "with config", :concurrency_max_threads + + it "defaults" do + expect(Graphiti.config.concurrency_max_threads).to eq(4) + end + + it "is overridable" do + Graphiti.configure do |c| + c.concurrency_max_threads = 1 + end + expect(Graphiti.config.concurrency_max_threads).to eq(1) + end + end + describe "#raise_on_missing_sideload" do include_context "with config", :raise_on_missing_sideload