-
-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deliver Messages Async #588
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# ActiveJob docs: http://edgeguides.rubyonrails.org/active_job_basics.html | ||
# Example adapters ref: https://github.com/rails/rails/tree/master/activejob/lib/active_job/queue_adapters | ||
module ActiveJob | ||
module QueueAdapters | ||
# == Shoryuken concurrent adapter for Active Job | ||
# | ||
# This adapter sends messages asynchronously (ie non-blocking) and allows | ||
# the caller to set up handlers for both success and failure | ||
# | ||
# To use this adapter, set up as: | ||
# | ||
# adapter = ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter.new | ||
# adapter.success_handler = ->(response, job, options) { StatsD.increment(job.class.name + ".success") } | ||
# adapter.error_handler = ->(err, job, options) { StatsD.increment(job.class.name + ".failure") } | ||
# | ||
# config.active_job.queue_adapter = adapter | ||
class ShoryukenConcurrentSendAdapter < ShoryukenAdapter | ||
attr_accessor :error_handler | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm open to moving this as constructor requirement but the advantage to leaving this as an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@allcentury Would change at runtime be thread-safe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @phstc - probably not, I'm also naively thinking of my own use case, where a k8s cron task is run. I can change this in the task though so I think using dependency injection (like your added commit does) is the way to go. |
||
attr_accessor :success_handler | ||
|
||
def initialize | ||
self.error_handler = lambda { |error, job, _options| | ||
Shoryuken.logger.warn("Failed to enqueue job: #{job.inspect} due to error: #{error}") | ||
} | ||
self.success_handler = ->(_send_message_response, _job, _options) { nil } | ||
end | ||
|
||
def enqueue(job, options = {}) | ||
send_concurrently(job, options) { |f_job, f_options| super(f_job, f_options) } | ||
end | ||
|
||
private | ||
|
||
def send_concurrently(job, options) | ||
Concurrent::Promises | ||
.future(job, options) { |f_job, f_options| [ yield(f_job, f_options), f_job, f_options ] } | ||
.then { |send_message_response, f_job, f_options| success_handler.call(send_message_response, f_job, f_options) } | ||
.rescue(job, options) { |err, f_job, f_options| error_handler.call(err, f_job, f_options) } | ||
end | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
RSpec.shared_examples "active_job_adapters" do | ||
let(:job) { double 'Job', id: '123', queue_name: 'queue' } | ||
let(:fifo) { false } | ||
let(:queue) { double 'Queue', fifo?: fifo } | ||
|
||
before do | ||
allow(Shoryuken::Client).to receive(:queues).with(job.queue_name).and_return(queue) | ||
allow(job).to receive(:serialize).and_return( | ||
'job_class' => 'Worker', | ||
'job_id' => job.id, | ||
'queue_name' => job.queue_name, | ||
'arguments' => nil, | ||
'locale' => nil | ||
) | ||
end | ||
|
||
describe '#enqueue' do | ||
specify do | ||
expect(queue).to receive(:send_message) do |hash| | ||
expect(hash[:message_deduplication_id]).to_not be | ||
end | ||
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) | ||
|
||
subject.enqueue(job) | ||
end | ||
|
||
context 'when fifo' do | ||
let(:fifo) { true } | ||
|
||
it 'does not include job_id in the deduplication_id' do | ||
expect(queue).to receive(:send_message) do |hash| | ||
message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) | ||
|
||
expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) | ||
end | ||
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) | ||
|
||
subject.enqueue(job) | ||
end | ||
end | ||
end | ||
|
||
describe '#enqueue_at' do | ||
specify do | ||
delay = 1 | ||
|
||
expect(queue).to receive(:send_message) do |hash| | ||
expect(hash[:message_deduplication_id]).to_not be | ||
expect(hash[:delay_seconds]).to eq(delay) | ||
end | ||
|
||
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) | ||
|
||
# need to figure out what to require Time.current and N.minutes to remove the stub | ||
allow(subject).to receive(:calculate_delay).and_return(delay) | ||
|
||
subject.enqueue_at(job, nil) | ||
end | ||
end | ||
|
||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,64 +1,7 @@ | ||
require 'spec_helper' | ||
require 'active_job' | ||
require 'shoryuken/extensions/active_job_adapter' | ||
require 'shared_examples_for_active_job' | ||
|
||
RSpec.describe ActiveJob::QueueAdapters::ShoryukenAdapter do | ||
let(:job) { double 'Job', id: '123', queue_name: 'queue' } | ||
let(:fifo) { false } | ||
let(:queue) { double 'Queue', fifo?: fifo } | ||
|
||
before do | ||
allow(Shoryuken::Client).to receive(:queues).with(job.queue_name).and_return(queue) | ||
allow(job).to receive(:serialize).and_return( | ||
'job_class' => 'Worker', | ||
'job_id' => job.id, | ||
'queue_name' => job.queue_name, | ||
'arguments' => nil, | ||
'locale' => nil | ||
) | ||
end | ||
|
||
describe '#enqueue' do | ||
specify do | ||
expect(queue).to receive(:send_message) do |hash| | ||
expect(hash[:message_deduplication_id]).to_not be | ||
end | ||
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) | ||
|
||
subject.enqueue(job) | ||
end | ||
|
||
context 'when fifo' do | ||
let(:fifo) { true } | ||
|
||
it 'does not include job_id in the deduplication_id' do | ||
expect(queue).to receive(:send_message) do |hash| | ||
message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) | ||
|
||
expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) | ||
end | ||
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) | ||
|
||
subject.enqueue(job) | ||
end | ||
end | ||
end | ||
|
||
describe '#enqueue_at' do | ||
specify do | ||
delay = 1 | ||
|
||
expect(queue).to receive(:send_message) do |hash| | ||
expect(hash[:message_deduplication_id]).to_not be | ||
expect(hash[:delay_seconds]).to eq(delay) | ||
end | ||
|
||
expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) | ||
|
||
# need to figure out what to require Time.current and N.minutes to remove the stub | ||
allow(subject).to receive(:calculate_delay).and_return(delay) | ||
|
||
subject.enqueue_at(job, nil) | ||
end | ||
end | ||
include_examples "active_job_adapters" | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
require 'spec_helper' | ||
require 'shared_examples_for_active_job' | ||
require 'shoryuken/extensions/active_job_concurrent_send_adapter' | ||
|
||
RSpec.describe ActiveJob::QueueAdapters::ShoryukenConcurrentSendAdapter do | ||
include_examples "active_job_adapters" | ||
|
||
context "#success_hander" do | ||
it "is called when a job succeeds" do | ||
options = {} | ||
response = true | ||
allow(queue).to receive(:send_message).and_return(response) | ||
expect(subject.success_handler).to receive(:call).with(response, job, options) | ||
|
||
subject.enqueue(job, options) | ||
end | ||
end | ||
|
||
context "#error_handler" do | ||
it "is called when sending a job fails" do | ||
options = {} | ||
response = Aws::SQS::Errors::InternalError.new("error", "error") | ||
allow(queue).to receive(:send_message).and_raise(response) | ||
expect(subject.error_handler).to receive(:call).with(response, job, options).and_call_original | ||
|
||
subject.enqueue(job, options) | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this because I'm worried https://github.com/allcentury/shoryuken/blob/712b35c4dd253ef275d7756760b604cd184d45bd/lib/shoryuken/extensions/active_job_adapter.rb#L34 is not thread safe.