Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuation of #273 #276

Merged
merged 12 commits into from
Dec 3, 2016
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ gemspec

group :test do
gem 'codeclimate-test-reporter', require: nil
gem "simplecov"
gem 'simplecov'
gem 'multi_xml'
end
70 changes: 35 additions & 35 deletions lib/shoryuken/queue.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
module Shoryuken
class Queue
FIFO_ATTR = 'FifoQueue'
MESSAGE_GROUP_ID = 'ShoryukenMessage'
VISIBILITY_TIMEOUT_ATTR = 'VisibilityTimeout'

attr_accessor :name, :client, :url

def initialize(client, name)
self.name = name
self.client = client
begin
self.url = client.get_queue_url(queue_name: name).queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise e, "The specified queue '#{name}' does not exist"
end
self.url = client.get_queue_url(queue_name: name).queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise e, "The specified queue '#{name}' does not exist."
end

def visibility_timeout
client.get_queue_attributes(
queue_url: url,
attribute_names: ['VisibilityTimeout']
).attributes['VisibilityTimeout'].to_i
queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i
end

def delete_messages(options)
Expand All @@ -41,49 +40,50 @@ def receive_messages(options)
map { |m| Message.new(client, self, m) }
end

def fifo?
@_fifo ||= queue_attributes.attributes[FIFO_ATTR] == 'true'
end

private

def queue_attributes
# Note: Retrieving all queue attributes as requesting `FifoQueue` on non-FIFO queue raises error.
# See issue: https://github.com/aws/aws-sdk-ruby/issues/1350
client.get_queue_attributes(queue_url: url, attribute_names: ['All'])
end

def sanitize_messages!(options)
options = case
when options.is_a?(Array)
{ entries: options.map.with_index do |m, index|
{ id: index.to_s }.merge(m.is_a?(Hash) ? m : { message_body: m })
end }
when options.is_a?(Hash)
options
end
if options.is_a?(Array)
entries = options.map.with_index do |m, index|
{ id: index.to_s }.merge(m.is_a?(Hash) ? m : { message_body: m })
end

validate_messages!(options)
options = { entries: entries }
end

options[:entries].each(&method(:sanitize_message!))

options
end

def sanitize_message!(options)
options = case
when options.is_a?(String)
# send_message('message')
{ message_body: options }
when options.is_a?(Hash)
options
end
def add_fifo_attributes!(options)
return unless fifo?

validate_message!(options)
options[:message_group_id] ||= MESSAGE_GROUP_ID
options[:message_deduplication_id] ||= Digest::SHA256.hexdigest(options[:message_body].to_s)

options
end

def validate_messages!(options)
options[:entries].map { |m| validate_message!(m) }
end
def sanitize_message!(options)
options = { message_body: options } if options.is_a?(String)

def validate_message!(options)
body = options[:message_body]
if body.is_a?(Hash)
if (body = options[:message_body]).is_a?(Hash)
options[:message_body] = JSON.dump(body)
elsif !body.is_a?(String)
fail ArgumentError, "The message body must be a String and you passed a #{body.class}"
end

add_fifo_attributes!(options)

options
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/shoryuken/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Shoryuken
VERSION = '2.0.11'
VERSION = '2.1.0'
end
174 changes: 110 additions & 64 deletions spec/shoryuken/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

describe Shoryuken::Queue do
let(:credentials) { Aws::Credentials.new('access_key_id', 'secret_access_key') }
let(:sqs) { Aws::SQS::Client.new(stub_responses: true, credentials: credentials) }
let(:queue_name) { 'shoryuken' }
let(:queue_url) { 'https://eu-west-1.amazonaws.com:6059/123456789012/shoryuken' }
let(:sqs) { Aws::SQS::Client.new(stub_responses: true, credentials: credentials) }
let(:queue_name) { 'shoryuken' }
let(:queue_url) { 'https://eu-west-1.amazonaws.com:6059/123456789012/shoryuken' }

subject { described_class.new(sqs, queue_name) }
before {
# Required as Aws::SQS::Client.get_queue_url returns 'String' when responses are stubbed,
# which is not accepted by Aws::SQS::Client.get_queue_attributes for :queue_name parameter.
allow(subject).to receive(:url).and_return(queue_url)
}

describe '#send_message' do
before {
allow(subject).to receive(:fifo?).and_return(false)
}
it 'accepts SQS request parameters' do
# https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message-instance_method
expect(sqs).to receive(:send_message).with(hash_including(message_body: 'msg1'))
Expand All @@ -22,96 +30,134 @@
subject.send_message('msg1')
end

context 'when body is invalid' do
it 'raises ArgumentError for nil' do
expect {
subject.send_message(message_body: nil)
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass')
end

it 'raises ArgumentError for Fixnum' do
expect {
subject.send_message(message_body: 1)
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum')
end
context 'when a client middleware' do
class MyClientMiddleware
def call(options)
options[:message_body] = 'changed'

context 'when a client middleware' do
class MyClientMiddleware
def call(options)
options[:message_body] = 'changed'

yield
end
yield
end
end

before do
allow(Shoryuken).to receive(:server?).and_return(false)
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.add MyClientMiddleware
end
before do
allow(Shoryuken).to receive(:server?).and_return(false)
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.add MyClientMiddleware
end
end
end

after do
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.remove MyClientMiddleware
end
after do
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.remove MyClientMiddleware
end
end
end

it 'invokes MyClientMiddleware' do
expect(sqs).to receive(:send_message).with(hash_including(message_body: 'changed'))
it 'invokes MyClientMiddleware' do
expect(sqs).to receive(:send_message).with(hash_including(message_body: 'changed'))

subject.send_message(message_body: 'original')
end
subject.send_message(message_body: 'original')
end
end
end

describe '#send_messages' do
before {
allow(subject).to receive(:fifo?).and_return(false)
}
it 'accepts SQS request parameters' do
# https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message_batch-instance_method
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1'}, { id: '1', message_body: 'msg2' }]))
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{id: '0', message_body: 'msg1'}, {id: '1', message_body: 'msg2'}]))

subject.send_messages(entries: [{ id: '0', message_body: 'msg1'}, { id: '1', message_body: 'msg2' }])
subject.send_messages(entries: [{id: '0', message_body: 'msg1'}, {id: '1', message_body: 'msg2'}])
end

it 'accepts an array of messages' do
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1', delay_seconds: 1, message_attributes: { attr: 'attr1' } }, { id: '1', message_body: 'msg2', delay_seconds: 1, message_attributes: { attr: 'attr2' } }]))

subject.send_messages([
{
message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' }
}, {
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' }
}
])
options = { entries: [{ id: '0',
message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' } },
{ id: '1',
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' } }] }

expect(sqs).to receive(:send_message_batch).with(hash_including(options))

subject.send_messages([{ message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' }
}, {
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' }
}])
end

context 'when FIFO' do
before do
allow(subject).to receive(:fifo?).and_return(true)
end

context 'and message_group_id and message_deduplication_id are absent' do
it 'sets default values' do
expect(sqs).to receive(:send_message_batch) do |arg|
first_entry = arg[:entries].first

expect(first_entry[:message_group_id]).to eq described_class::MESSAGE_GROUP_ID
expect(first_entry[:message_deduplication_id]).to be
end

subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }])
end
end

context 'and message_group_id and message_deduplication_id are present' do
it 'preserves existing values' do
expect(sqs).to receive(:send_message_batch) do |arg|
first_entry = arg[:entries].first

expect(first_entry[:message_group_id]).to eq 'my group'
expect(first_entry[:message_deduplication_id]).to eq 'my id'
end

subject.send_messages([{ message_body: 'msg1',
message_attributes: { attr: 'attr1' },
message_group_id: 'my group',
message_deduplication_id: 'my id' }])
end
end
end

it 'accepts an array of string' do
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1'}, { id: '1', message_body: 'msg2' }]))
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1' }, { id: '1', message_body: 'msg2' }]))

subject.send_messages(%w(msg1 msg2))
end
end

describe '#fifo?' do
before do
attribute_response = double 'Aws::SQS::Types::GetQueueAttributesResponse'

allow(attribute_response).to receive(:attributes).and_return('FifoQueue' => fifo.to_s, 'ContentBasedDeduplication' => 'true')
allow(subject).to receive(:url).and_return(queue_url)
allow(sqs).to receive(:get_queue_attributes).with(queue_url: queue_url, attribute_names: ['All']).and_return(attribute_response)
end

context 'when body is invalid' do
it 'raises ArgumentError for nil' do
expect {
subject.send_messages(entries: [message_body: nil])
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass')
end
context 'when queue is FIFO' do
let(:fifo) { true }

it 'raises ArgumentError for Fixnum' do
expect {
subject.send_messages(entries: [message_body: 1])
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum')
end
it { expect(subject.fifo?).to be }
end

context 'when queue is not FIFO' do
let(:fifo) { false }

it { expect(subject.fifo?).to_not be }
end
end
end