Skip to content

Commit

Permalink
Merge pull request #14 from ResultadosDigitais/core-requeue-items
Browse files Browse the repository at this point in the history
Improve resilience
  • Loading branch information
Nando Sousa authored Jun 28, 2018
2 parents 0f0babb + ffa5d73 commit de033cd
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 18 deletions.
29 changes: 29 additions & 0 deletions lib/upperkut/batch_execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
require_relative 'logging'

module Upperkut
class BatchExecution
def initialize(worker, logger = Upperkut::Logging.logger)
@worker = worker
@logger = logger
end

def execute
worker_instance = @worker.new
items = @worker.fetch_items.collect! do |item|
item['body']
end

worker_instance.perform(items.dup)
rescue Exception => ex
@worker.push_items(items)

@logger.info(
action: :requeue,
ex: ex,
item_size: items.size
)

raise ex
end
end
end
4 changes: 3 additions & 1 deletion lib/upperkut/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ def start
@logger.level = log_level
end

@options[:logger] = @logger

manager = Manager.new(@options)

@logger.info(@options)
Expand Down Expand Up @@ -75,7 +77,7 @@ def parse_options(args)
@options[:concurrency] = Integer(arg)
end
o.on('-l', '--log-level LEVEL', 'Log level') do |arg|
@options[:log_level] = arg
@options[:log_level] = arg.to_i
end
end.parse!(args)
end
Expand Down
17 changes: 15 additions & 2 deletions lib/upperkut/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@
module Upperkut
class Manager
attr_accessor :worker, :redis
attr_reader :stopped
attr_reader :stopped, :logger

def initialize(opts = {})
self.worker = opts.fetch(:worker).constantize
self.redis = worker.setup.redis
@concurrency = opts.fetch(:concurrency, 25)
@logger = opts.fetch(:logger, Upperkut::Logging.logger)

@stopped = false
@processors = []
end

def run
@concurrency.times do
@processors << Processor.new(self).run
processor = Processor.new(self)
@processors << processor
processor.run
end
end

Expand All @@ -27,5 +31,14 @@ def stop
def kill
@processors.each(&:kill)
end

def notify_killed_processor(processor)
@processors.delete(processor)
return if @stopped

processor = Processor.new(self)
@processors << processor
processor.run
end
end
end
25 changes: 17 additions & 8 deletions lib/upperkut/processor.rb
Original file line number Diff line number Diff line change
@@ -1,20 +1,34 @@
require_relative 'batch_execution'

module Upperkut
class Processor
def initialize(manager)
@manager = manager
@worker = @manager.worker
@logger = @manager.logger

@sleeping_time = 0
end

def run
@thread ||= Thread.new do
process
begin
process
rescue Exception => e
@logger.debug(
action: :processor_killed,
reason: e
)

@manager.notify_killed_processor(self)
end
end
end

def kill
return unless @thread
@thread.raise Upperkut::Shutdown
@thread.value # wait
end

private
Expand All @@ -28,6 +42,7 @@ def process
end

@sleeping_time += sleep(@worker.setup.polling_interval)
@logger.debug(sleeping_time: @sleeping_time)
end
end

Expand All @@ -43,13 +58,7 @@ def should_process?
end

def process_batch
@sleeping_time = 0
@worker.new.process
rescue Exception => ex
# Add to retry_queue
# if retry_limit is reached
# send to dead
raise ex
BatchExecution.new(@worker, @logger).execute
end
end
end
7 changes: 0 additions & 7 deletions lib/upperkut/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ def self.included(base)
base.extend(ClassMethods)
end

def process
items = self.class.fetch_items.collect! do |item|
item['body']
end
perform(items)
end

module ClassMethods
extend Forwardable

Expand Down
33 changes: 33 additions & 0 deletions spec/upperkut/batch_execution_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'spec_helper'
require 'upperkut/batch_execution'

module Upperkut
RSpec.describe BatchExecution do
class DummyWorker
include Worker

def perform(_items); end
end

after do
DummyWorker.clear
end

let(:worker) { DummyWorker }

context 'when something goes wrong while processing' do
it 'requeue_item' do
allow_any_instance_of(worker).to receive(:perform).and_raise

item = {'id' => '1', 'event' => 'open'}
worker.push_items(item)

expect(worker.size).to eq 1

execution = BatchExecution.new(worker)
expect { execution.execute }.to raise_error
expect(worker.size).to eq 1
end
end
end
end

0 comments on commit de033cd

Please sign in to comment.