Skip to content

Commit

Permalink
Bugfix headers exchange wont match tables (#575)
Browse files Browse the repository at this point in the history
Header exchange couldn't match on tables because a mix of AMQ::Protocol::Table and Hash(String, AMQ::Protocol::Field). This will change to use AMQ::Protocol::Table everywhere.
  • Loading branch information
spuun authored Oct 17, 2023
1 parent 4450ccb commit d9faf4e
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- A bug in delay exchanges caused messages to be routed to x-dead-letter-exchange instead of bound queues. It also ruined any dead lettering headers.
- A bug that prevented headers exchange to match on table in message headers.

## [1.2.4] - 2023-09-26

Expand Down
2 changes: 1 addition & 1 deletion shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ shards:

amq-protocol:
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.10
version: 1.1.11

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
Expand Down
47 changes: 30 additions & 17 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ describe LavinMQ::DirectExchange do
vhost = Server.vhosts.create("x")
q1 = LavinMQ::Queue.new(vhost, "q1")
x = LavinMQ::DirectExchange.new(vhost, "")
x.bind(q1, "q1", Hash(String, LavinMQ::AMQP::Field).new)
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
x.matches("q1").should eq(Set{q1})
end

Expand Down Expand Up @@ -174,16 +174,16 @@ describe LavinMQ::HeadersExchange do
x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true)
end

hdrs_all = {
hdrs_all = LavinMQ::AMQP::Table.new({
"x-match" => "all",
"org" => "84codes",
"user" => "test",
} of String => LavinMQ::AMQP::Field
hdrs_any = {
})
hdrs_any = LavinMQ::AMQP::Table.new({
"x-match" => "any",
"org" => "84codes",
"user" => "test",
} of String => LavinMQ::AMQP::Field
})

describe "match all" do
it "should match if same args" do
Expand Down Expand Up @@ -225,15 +225,26 @@ describe LavinMQ::HeadersExchange do
msg_hdrs["user"] = "hest"
x.matches("", msg_hdrs).size.should eq 0
end

it "should match nestled amq-protocol tables" do
x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true)
q10 = LavinMQ::Queue.new(vhost, "q10")
bind_hdrs = LavinMQ::AMQP::Table.new({
"x-match" => "any",
"tbl" => LavinMQ::AMQP::Table.new({"foo": "bar"}),
})
x.bind(q10, "", bind_hdrs) # to_h because that's what's done in VHost
msg_hdrs = bind_hdrs.clone
msg_hdrs.delete("x-match")
x.matches("", msg_hdrs).size.should eq 1
end
end

it "should handle multiple bindings" do
q10 = LavinMQ::Queue.new(vhost, "q10")
x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true)
hdrs1 = {"x-match" => "any", "org" => "84codes",
"user" => "test"} of String => LavinMQ::AMQP::Field
hdrs2 = {"x-match" => "all", "org" => "google",
"user" => "test"} of String => LavinMQ::AMQP::Field
hdrs1 = LavinMQ::AMQP::Table.new({"x-match" => "any", "org" => "84codes", "user" => "test"})
hdrs2 = LavinMQ::AMQP::Table.new({"x-match" => "all", "org" => "google", "user" => "test"})

x.bind(q10, "", hdrs1)
x.bind(q10, "", hdrs2)
Expand All @@ -249,21 +260,23 @@ describe LavinMQ::HeadersExchange do
hsh = {"k" => "v"} of String => LavinMQ::AMQP::Field
arrf = [1] of LavinMQ::AMQP::Field
arru = [1_u8] of LavinMQ::AMQP::Field
hdrs = {"Nil" => nil, "Bool" => true, "UInt8" => 1_u8, "UInt16" => 1_u16, "UInt32" => 1_u32,
"Int16" => 1_u16, "Int32" => 1_i32, "Int64" => 1_i64, "Float32" => 1_f32,
"Float64" => 1_f64, "String" => "String", "Array(Field)" => arrf,
"Array(UInt8)" => arru, "Time" => Time.utc, "Hash(String, Field)" => hsh,
"x-match" => "all",
} of String => LavinMQ::AMQP::Field
hdrs = LavinMQ::AMQP::Table.new({
"Nil" => nil, "Bool" => true, "UInt8" => 1_u8, "UInt16" => 1_u16, "UInt32" => 1_u32,
"Int16" => 1_u16, "Int32" => 1_i32, "Int64" => 1_i64, "Float32" => 1_f32,
"Float64" => 1_f64, "String" => "String", "Array(Field)" => arrf,
"Array(UInt8)" => arru, "Time" => Time.utc, "Hash(String, Field)" => hsh,
"x-match" => "all",
})
x.bind(q11, "", hdrs)
x.matches("", hdrs).should eq Set{q11}
end

it "should handle unbind" do
q12 = LavinMQ::Queue.new(vhost, "q12")
x = LavinMQ::HeadersExchange.new(vhost, "h", false, false, true)
hdrs1 = {"x-match" => "any", "org" => "84codes",
"user" => "test"} of String => LavinMQ::AMQP::Field
hdrs1 = LavinMQ::AMQP::Table.new({
"x-match" => "any", "org" => "84codes", "user" => "test",
})
x.bind(q12, "", hdrs1)
x.unbind(q12, "", hdrs1)
x.matches("", hdrs1).size.should eq 0
Expand Down
6 changes: 3 additions & 3 deletions spec/policies_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ describe LavinMQ::VHost do
it "arguments should have priority for non numeric arguments" do
vhost.exchanges["no-ae"] = LavinMQ::DirectExchange.new(vhost, "no-ae")
vhost.exchanges["x-with-ae"] = LavinMQ::DirectExchange.new(vhost, "x-with-ae",
arguments: {"x-alternate-exchange" => "ae2".as(AMQ::Protocol::Field)})
arguments: AMQ::Protocol::Table.new({"x-alternate-exchange": "ae2"}))
vhost.add_policy("test", ".*", "all", definitions, 100_i8)
sleep 0.01
vhost.exchanges["no-ae"].@alternate_exchange.should eq "dead-letters"
Expand All @@ -230,8 +230,8 @@ describe LavinMQ::VHost do
end

it "should use the lowest value" do
vhost.queues["test1"] = LavinMQ::Queue.new(vhost, "test1", arguments: {"x-max-length" => 1_i64.as(AMQ::Protocol::Field)})
vhost.queues["test2"] = LavinMQ::Queue.new(vhost, "test2", arguments: {"x-max-length" => 11_i64.as(AMQ::Protocol::Field)})
vhost.queues["test1"] = LavinMQ::Queue.new(vhost, "test1", arguments: LavinMQ::AMQP::Table.new({"x-max-length" => 1_i64}))
vhost.queues["test2"] = LavinMQ::Queue.new(vhost, "test2", arguments: LavinMQ::AMQP::Table.new({"x-max-length" => 11_i64}))
vhost.add_policy("test", ".*", "all", definitions, 100_i8)
sleep 0.01
vhost.queues["test1"].@max_length.should eq 1
Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/exchange/consistent_hash.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module LavinMQ
end
end

def bind(destination : Destination, routing_key : String, headers : Hash(String, AMQP::Field)?)
def bind(destination : Destination, routing_key : String, headers : AMQP::Table?)
w = weight(routing_key)
@hasher.add(destination.name, w, destination)
ret = case destination
Expand All @@ -37,7 +37,7 @@ module LavinMQ
ret
end

def unbind(destination : Destination, routing_key : String, headers : Hash(String, AMQP::Field)?)
def unbind(destination : Destination, routing_key : String, headers : AMQP::Table?)
w = weight(routing_key)
ret = case destination
when Queue
Expand Down
26 changes: 15 additions & 11 deletions src/lavinmq/exchange/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require "../observable"
require "../queue"

module LavinMQ
alias BindingKey = Tuple(String, Hash(String, AMQP::Field)?)
alias BindingKey = Tuple(String, AMQP::Table?)
alias Destination = Queue | Exchange

abstract class Exchange
Expand All @@ -30,7 +30,7 @@ module LavinMQ

def initialize(@vhost : VHost, @name : String, @durable = false,
@auto_delete = false, @internal = false,
@arguments = Hash(String, AMQP::Field).new)
@arguments = AMQP::Table.new)
@queue_bindings = Hash(BindingKey, Set(Queue)).new do |h, k|
h[k] = Set(Queue).new
end
Expand Down Expand Up @@ -91,12 +91,16 @@ module LavinMQ

def match?(type, durable, auto_delete, internal, arguments)
delayed = type == "x-delayed-message"
frame_args = arguments.to_h.dup.reject("x-delayed-type").merge({"x-delayed-exchange" => true})
frame_args = arguments
if delayed
frame_args = frame_args.clone.merge!({"x-delayed-exchange": true})
frame_args.delete("x-delayed-type")
end
self.type == (delayed ? arguments["x-delayed-type"] : type) &&
@durable == durable &&
@auto_delete == auto_delete &&
@internal == internal &&
@arguments == (delayed ? frame_args : arguments.to_h)
@arguments == frame_args
end

def in_use?
Expand All @@ -123,10 +127,10 @@ module LavinMQ
return unless @delayed
q_name = "amq.delayed.#{@name}"
raise "Exchange name too long" if q_name.bytesize > MAX_NAME_LENGTH
arguments = Hash(String, AMQP::Field){
arguments = AMQP::Table.new({
"x-dead-letter-exchange" => @name,
"auto-delete" => @auto_delete,
}
})
@delayed_queue = if durable?
DurableDelayedExchangeQueue.new(@vhost, q_name, false, false, arguments)
else
Expand All @@ -137,7 +141,7 @@ module LavinMQ

REPUBLISH_HEADERS = {"x-head", "x-tail", "x-from"}

protected def after_bind(destination : Destination, routing_key : String, headers : Hash(String, AMQP::Field)?)
protected def after_bind(destination : Destination, routing_key : String, headers : AMQP::Table?)
notify_observers(:bind, binding_details({routing_key, headers}, destination))
true
end
Expand All @@ -162,10 +166,10 @@ module LavinMQ
end

abstract def type : String
abstract def bind(destination : Queue, routing_key : String, headers : Hash(String, AMQP::Field)?)
abstract def unbind(destination : Queue, routing_key : String, headers : Hash(String, AMQP::Field)?)
abstract def bind(destination : Exchange, routing_key : String, headers : Hash(String, AMQP::Field)?)
abstract def unbind(destination : Exchange, routing_key : String, headers : Hash(String, AMQP::Field)?)
abstract def bind(destination : Queue, routing_key : String, headers : AMQP::Table?)
abstract def unbind(destination : Queue, routing_key : String, headers : AMQP::Table?)
abstract def bind(destination : Exchange, routing_key : String, headers : AMQP::Table?)
abstract def unbind(destination : Exchange, routing_key : String, headers : AMQP::Table?)
abstract def do_queue_matches(routing_key : String, headers : AMQP::Table?, & : Queue -> _)
abstract def do_exchange_matches(routing_key : String, headers : AMQP::Table?, & : Exchange -> _)

Expand Down
10 changes: 5 additions & 5 deletions src/lavinmq/exchange/headers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,36 @@ module LavinMQ

def initialize(@vhost : VHost, @name : String, @durable = false,
@auto_delete = false, @internal = false,
@arguments = Hash(String, AMQP::Field).new)
@arguments = AMQP::Table.new)
validate!(@arguments)
super
end

def bind(destination : Queue, routing_key, headers)
validate!(headers)
args = headers ? @arguments.merge(headers) : @arguments
args = headers ? @arguments.clone.merge!(headers) : @arguments
ret = @queue_bindings[{routing_key, args}].add? destination
after_bind(destination, routing_key, headers)
ret
end

def bind(destination : Exchange, routing_key, headers)
validate!(headers)
args = headers ? @arguments.merge(headers) : @arguments
args = headers ? @arguments.clone.merge!(headers) : @arguments
ret = @exchange_bindings[{routing_key, args}].add? destination
after_bind(destination, routing_key, headers)
ret
end

def unbind(destination : Queue, routing_key, headers)
args = headers ? @arguments.merge(headers) : @arguments
args = headers ? @arguments.clone.merge!(headers) : @arguments
ret = @queue_bindings[{routing_key, args}].delete destination
after_unbind(destination, routing_key, headers)
ret
end

def unbind(destination : Exchange, routing_key, headers)
args = headers ? @arguments.merge(headers) : @arguments
args = headers ? @arguments.clone.merge!(headers) : @arguments
ret = @exchange_bindings[{routing_key, args}].delete destination
after_unbind(destination, routing_key, headers)
ret
Expand Down
13 changes: 6 additions & 7 deletions src/lavinmq/federation/link.cr
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ module LavinMQ
when :bind
with_consumer_q do |q|
b = data_as_binding_details(data)
args = ::AMQP::Client::Arguments.new(b.arguments)
args = b.arguments || ::AMQP::Client::Arguments.new
q.bind(@upstream_exchange, b.routing_key, args: args)
end
when :unbind
with_consumer_q do |q|
b = data_as_binding_details(data)
args = ::AMQP::Client::Arguments.new(b.arguments)
args = b.arguments || ::AMQP::Client::Arguments.new
q.unbind(@upstream_exchange, b.routing_key, args: args)
end
else raise "Unexpected event '#{event}'"
Expand Down Expand Up @@ -317,10 +317,9 @@ module LavinMQ
end

private def setup(upstream_client)
args = ::AMQP::Client::Arguments.new(@federated_ex.arguments)
ch, _ = try_passive(upstream_client) do |uch, passive|
uch.exchange(@upstream_exchange, type: @federated_ex.type,
args: args, passive: passive)
args: @federated_ex.arguments, passive: passive)
end
args2 = ::AMQP::Client::Arguments.new({
"x-downstream-name" => System.hostname,
Expand All @@ -331,19 +330,19 @@ module LavinMQ
uch.exchange(@upstream_q, type: "x-federation-upstream",
args: args2, passive: passive)
end
q_args = Hash(String, AMQP::Field){"x-internal-purpose" => "federation"}
q_args = ::AMQP::Client::Arguments.new({"x-internal-purpose" => "federation"})
if expires = @upstream.expires
q_args["x-expires"] = expires
end
if msg_ttl = @upstream.msg_ttl
q_args["x-message-ttl"] = msg_ttl
end
ch, q = try_passive(upstream_client, ch) do |uch, passive|
uch.queue(@upstream_q, args: ::AMQP::Client::Arguments.new(q_args), passive: passive)
uch.queue(@upstream_q, args: q_args, passive: passive)
end
@federated_ex.register_observer(self)
@federated_ex.bindings_details.each do |binding|
args = ::AMQP::Client::Arguments.new(binding.arguments)
args = binding.arguments || ::AMQP::Client::Arguments.new
q.bind(@upstream_exchange, binding.routing_key, args: args)
end
{ch, q}
Expand Down
10 changes: 5 additions & 5 deletions src/lavinmq/http/controller/bindings.cr
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ module LavinMQ
bad_request(context, "Field 'routing_key' is required")
end
ok = e.vhost.bind_queue(q.name, e.name, routing_key, arguments)
props = BindingDetails.hash_key({routing_key, arguments.to_h})
props = BindingDetails.hash_key({routing_key, arguments})
context.response.headers["Location"] = q.name + "/" + props
context.response.status_code = 201
Log.debug do
Expand Down Expand Up @@ -101,8 +101,8 @@ module LavinMQ
found = false
e.queue_bindings.each do |k, destinations|
next unless destinations.includes?(q) && BindingDetails.hash_key(k) == props
arguments = k[1] || Hash(String, AMQP::Field).new
@amqp_server.vhosts[vhost].unbind_queue(q.name, e.name, k[0], AMQP::Table.new arguments)
arguments = k[1] || AMQP::Table.new
@amqp_server.vhosts[vhost].unbind_queue(q.name, e.name, k[0], arguments)
found = true
Log.debug { "exchange '#{e.name}' unbound from queue '#{q.name}' with key '#{k}'" }
break
Expand Down Expand Up @@ -144,7 +144,7 @@ module LavinMQ
bad_request(context, "Field 'routing_key' is required")
end
source.vhost.bind_exchange(destination.name, source.name, routing_key, arguments)
props = BindingDetails.hash_key({routing_key, arguments.to_h})
props = BindingDetails.hash_key({routing_key, arguments})
context.response.headers["Location"] = context.request.path + "/" + props
context.response.status_code = 201
end
Expand Down Expand Up @@ -180,7 +180,7 @@ module LavinMQ
found = false
source.exchange_bindings.each do |k, destinations|
next unless destinations.includes?(destination) && BindingDetails.hash_key(k) == props
arguments = AMQP::Table.new(k[1] || Hash(String, AMQP::Field).new)
arguments = k[1] || AMQP::Table.new
@amqp_server.vhosts[vhost].unbind_exchange(destination.name, source.name, k[0], arguments)
found = true
break
Expand Down
6 changes: 3 additions & 3 deletions src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ module LavinMQ

def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
@arguments = Hash(String, AMQP::Field).new)
@arguments = AMQP::Table.new)
@last_get_time = RoughTime.monotonic
@log = Log.for "queue[vhost=#{@vhost.name} name=#{@name}]"
@data_dir = make_data_dir
Expand Down Expand Up @@ -871,14 +871,14 @@ module LavinMQ
durable? == frame.durable &&
@exclusive == frame.exclusive &&
@auto_delete == frame.auto_delete &&
@arguments == frame.arguments.to_h
@arguments == frame.arguments
end

def match?(durable, exclusive, auto_delete, arguments)
durable? == durable &&
@exclusive == exclusive &&
@auto_delete == auto_delete &&
@arguments == arguments.to_h
@arguments == arguments
end

def in_use?
Expand Down
Loading

0 comments on commit d9faf4e

Please sign in to comment.