Skip to content

Commit

Permalink
Trying a custom executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Jul 5, 2017
1 parent 40332a5 commit e5c3fb8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
11 changes: 8 additions & 3 deletions lib/shoryuken/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ def stop
private

def executor
Concurrent.global_io_executor
@executor ||= Concurrent::FixedThreadPool.new(pool_size, max_queue: pool_size)
end

def start_managers
@managers.each do |manager|
Concurrent::Promise.execute { manager.start }.rescue do |ex|
Concurrent::Promise.execute(executor: executor) { manager.start }.rescue do |ex|
log_manager_failure(ex)
start_soft_shutdown
end
Expand Down Expand Up @@ -85,12 +85,17 @@ def stop_callback
fire_event(:shutdown, true)
end

def pool_size
Shoryuken.groups.sum { |(_group, options)| options[:concurrency] }
end

def create_managers
Shoryuken.groups.map do |group, options|
Shoryuken::Manager.new(
Shoryuken::Fetcher.new(group),
Shoryuken.polling_strategy(group).new(options[:queues]),
options[:concurrency]
options[:concurrency],
executor: executor
)
end
end
Expand Down
7 changes: 4 additions & 3 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ class Manager

BATCH_LIMIT = 10
# See https://github.com/phstc/shoryuken/issues/348#issuecomment-292847028
MIN_DISPATCH_INTERVAL = 0.5
MIN_DISPATCH_INTERVAL = 0.1

def initialize(fetcher, polling_strategy, concurrency)
def initialize(fetcher, polling_strategy, concurrency, executor)
@fetcher = fetcher
@polling_strategy = polling_strategy
@max_processors = concurrency
@busy_processors = Concurrent::AtomicFixnum.new(0)
@done = Concurrent::AtomicBoolean.new(false)
@executor = executor
end

def start
Expand Down Expand Up @@ -68,7 +69,7 @@ def assign(queue_name, sqs_msg)

@busy_processors.increment

Concurrent::Promise.execute {
Concurrent::Promise.execute(executor: @executor) {
Processor.new(queue_name, sqs_msg).process
}.then { processor_done }.rescue { processor_done }
end
Expand Down

0 comments on commit e5c3fb8

Please sign in to comment.