Skip to content

Commit

Permalink
Merge commit from ba35668
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Dec 3, 2016
2 parents 0470269 + ba35668 commit 0713207
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 101 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ gemspec

group :test do
gem 'codeclimate-test-reporter', require: nil
gem "simplecov"
gem 'simplecov'
gem 'multi_xml'
end
70 changes: 35 additions & 35 deletions lib/shoryuken/queue.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
module Shoryuken
class Queue
FIFO_ATTR = 'FifoQueue'
MESSAGE_GROUP_ID = 'ShoryukenMessage'
VISIBILITY_TIMEOUT_ATTR = 'VisibilityTimeout'

attr_accessor :name, :client, :url

def initialize(client, name)
self.name = name
self.client = client
begin
self.url = client.get_queue_url(queue_name: name).queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise e, "The specified queue '#{name}' does not exist"
end
self.url = client.get_queue_url(queue_name: name).queue_url
rescue Aws::SQS::Errors::NonExistentQueue => e
raise e, "The specified queue '#{name}' does not exist."
end

def visibility_timeout
client.get_queue_attributes(
queue_url: url,
attribute_names: ['VisibilityTimeout']
).attributes['VisibilityTimeout'].to_i
queue_attributes.attributes[VISIBILITY_TIMEOUT_ATTR].to_i
end

def delete_messages(options)
Expand All @@ -41,49 +40,50 @@ def receive_messages(options)
map { |m| Message.new(client, self, m) }
end

def fifo?
@_fifo ||= queue_attributes.attributes[FIFO_ATTR] == 'true'
end

private

def queue_attributes
# Note: Retrieving all queue attributes as requesting `FifoQueue` on non-FIFO queue raises error.
# See issue: https://github.com/aws/aws-sdk-ruby/issues/1350
client.get_queue_attributes(queue_url: url, attribute_names: ['All'])
end

def sanitize_messages!(options)
options = case
when options.is_a?(Array)
{ entries: options.map.with_index do |m, index|
{ id: index.to_s }.merge(m.is_a?(Hash) ? m : { message_body: m })
end }
when options.is_a?(Hash)
options
end
if options.is_a?(Array)
entries = options.map.with_index do |m, index|
{ id: index.to_s }.merge(m.is_a?(Hash) ? m : { message_body: m })
end

validate_messages!(options)
options = { entries: entries }
end

options[:entries].each(&method(:sanitize_message!))

options
end

def sanitize_message!(options)
options = case
when options.is_a?(String)
# send_message('message')
{ message_body: options }
when options.is_a?(Hash)
options
end
def add_fifo_attributes!(options)
return unless fifo?

validate_message!(options)
options[:message_group_id] ||= MESSAGE_GROUP_ID
options[:message_deduplication_id] ||= Digest::SHA256.hexdigest(options[:message_body].to_s)

options
end

def validate_messages!(options)
options[:entries].map { |m| validate_message!(m) }
end
def sanitize_message!(options)
options = { message_body: options } if options.is_a?(String)

def validate_message!(options)
body = options[:message_body]
if body.is_a?(Hash)
if (body = options[:message_body]).is_a?(Hash)
options[:message_body] = JSON.dump(body)
elsif !body.is_a?(String)
fail ArgumentError, "The message body must be a String and you passed a #{body.class}"
end

add_fifo_attributes!(options)

options
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/shoryuken/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Shoryuken
VERSION = '2.0.11'
VERSION = '2.1.0'
end
174 changes: 110 additions & 64 deletions spec/shoryuken/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@

describe Shoryuken::Queue do
let(:credentials) { Aws::Credentials.new('access_key_id', 'secret_access_key') }
let(:sqs) { Aws::SQS::Client.new(stub_responses: true, credentials: credentials) }
let(:queue_name) { 'shoryuken' }
let(:queue_url) { 'https://eu-west-1.amazonaws.com:6059/123456789012/shoryuken' }
let(:sqs) { Aws::SQS::Client.new(stub_responses: true, credentials: credentials) }
let(:queue_name) { 'shoryuken' }
let(:queue_url) { 'https://eu-west-1.amazonaws.com:6059/123456789012/shoryuken' }

subject { described_class.new(sqs, queue_name) }
before {
# Required as Aws::SQS::Client.get_queue_url returns 'String' when responses are stubbed,
# which is not accepted by Aws::SQS::Client.get_queue_attributes for :queue_name parameter.
allow(subject).to receive(:url).and_return(queue_url)
}

describe '#send_message' do
before {
allow(subject).to receive(:fifo?).and_return(false)
}
it 'accepts SQS request parameters' do
# https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message-instance_method
expect(sqs).to receive(:send_message).with(hash_including(message_body: 'msg1'))
Expand All @@ -22,96 +30,134 @@
subject.send_message('msg1')
end

context 'when body is invalid' do
it 'raises ArgumentError for nil' do
expect {
subject.send_message(message_body: nil)
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass')
end

it 'raises ArgumentError for Fixnum' do
expect {
subject.send_message(message_body: 1)
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum')
end
context 'when a client middleware' do
class MyClientMiddleware
def call(options)
options[:message_body] = 'changed'

context 'when a client middleware' do
class MyClientMiddleware
def call(options)
options[:message_body] = 'changed'

yield
end
yield
end
end

before do
allow(Shoryuken).to receive(:server?).and_return(false)
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.add MyClientMiddleware
end
before do
allow(Shoryuken).to receive(:server?).and_return(false)
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.add MyClientMiddleware
end
end
end

after do
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.remove MyClientMiddleware
end
after do
Shoryuken.configure_client do |config|
config.client_middleware do |chain|
chain.remove MyClientMiddleware
end
end
end

it 'invokes MyClientMiddleware' do
expect(sqs).to receive(:send_message).with(hash_including(message_body: 'changed'))
it 'invokes MyClientMiddleware' do
expect(sqs).to receive(:send_message).with(hash_including(message_body: 'changed'))

subject.send_message(message_body: 'original')
end
subject.send_message(message_body: 'original')
end
end
end

describe '#send_messages' do
before {
allow(subject).to receive(:fifo?).and_return(false)
}
it 'accepts SQS request parameters' do
# https://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#send_message_batch-instance_method
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1'}, { id: '1', message_body: 'msg2' }]))
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{id: '0', message_body: 'msg1'}, {id: '1', message_body: 'msg2'}]))

subject.send_messages(entries: [{ id: '0', message_body: 'msg1'}, { id: '1', message_body: 'msg2' }])
subject.send_messages(entries: [{id: '0', message_body: 'msg1'}, {id: '1', message_body: 'msg2'}])
end

it 'accepts an array of messages' do
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1', delay_seconds: 1, message_attributes: { attr: 'attr1' } }, { id: '1', message_body: 'msg2', delay_seconds: 1, message_attributes: { attr: 'attr2' } }]))

subject.send_messages([
{
message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' }
}, {
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' }
}
])
options = { entries: [{ id: '0',
message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' } },
{ id: '1',
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' } }] }

expect(sqs).to receive(:send_message_batch).with(hash_including(options))

subject.send_messages([{ message_body: 'msg1',
delay_seconds: 1,
message_attributes: { attr: 'attr1' }
}, {
message_body: 'msg2',
delay_seconds: 1,
message_attributes: { attr: 'attr2' }
}])
end

context 'when FIFO' do
before do
allow(subject).to receive(:fifo?).and_return(true)
end

context 'and message_group_id and message_deduplication_id are absent' do
it 'sets default values' do
expect(sqs).to receive(:send_message_batch) do |arg|
first_entry = arg[:entries].first

expect(first_entry[:message_group_id]).to eq described_class::MESSAGE_GROUP_ID
expect(first_entry[:message_deduplication_id]).to be
end

subject.send_messages([{ message_body: 'msg1', message_attributes: { attr: 'attr1' } }])
end
end

context 'and message_group_id and message_deduplication_id are present' do
it 'preserves existing values' do
expect(sqs).to receive(:send_message_batch) do |arg|
first_entry = arg[:entries].first

expect(first_entry[:message_group_id]).to eq 'my group'
expect(first_entry[:message_deduplication_id]).to eq 'my id'
end

subject.send_messages([{ message_body: 'msg1',
message_attributes: { attr: 'attr1' },
message_group_id: 'my group',
message_deduplication_id: 'my id' }])
end
end
end

it 'accepts an array of string' do
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1'}, { id: '1', message_body: 'msg2' }]))
expect(sqs).to receive(:send_message_batch).with(hash_including(entries: [{ id: '0', message_body: 'msg1' }, { id: '1', message_body: 'msg2' }]))

subject.send_messages(%w(msg1 msg2))
end
end

describe '#fifo?' do
before do
attribute_response = double 'Aws::SQS::Types::GetQueueAttributesResponse'

allow(attribute_response).to receive(:attributes).and_return('FifoQueue' => fifo.to_s, 'ContentBasedDeduplication' => 'true')
allow(subject).to receive(:url).and_return(queue_url)
allow(sqs).to receive(:get_queue_attributes).with(queue_url: queue_url, attribute_names: ['All']).and_return(attribute_response)
end

context 'when body is invalid' do
it 'raises ArgumentError for nil' do
expect {
subject.send_messages(entries: [message_body: nil])
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a NilClass')
end
context 'when queue is FIFO' do
let(:fifo) { true }

it 'raises ArgumentError for Fixnum' do
expect {
subject.send_messages(entries: [message_body: 1])
}.to raise_error(ArgumentError, 'The message body must be a String and you passed a Fixnum')
end
it { expect(subject.fifo?).to be }
end

context 'when queue is not FIFO' do
let(:fifo) { false }

it { expect(subject.fifo?).to_not be }
end
end
end

0 comments on commit 0713207

Please sign in to comment.