Skip to content

Commit

Permalink
add specs
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Dec 18, 2024
1 parent 48f6275 commit 7969430
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,109 @@ describe LavinMQ::AMQP::StreamQueue do
end
end
end

describe "Filters" do
it "should only get message matching filter" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_1", args: stream_queue_args)
q.publish("msg without filter")
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter-value": "foo"})) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter"
end
end
end

it "should ignore messages with non-matching filters" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_2", args: stream_queue_args)
q.publish("msg without filter")
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter-value": "bar"})) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: bar"
end
end
end

it "should support multiple filters" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_3", args: stream_queue_args)
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg without filter")
q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "xyz"})
q.publish("msg with filter: xyz", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
filters = "foo,bar"
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new(
{"x-stream-filter-value": filters}
)) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: foo"
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: bar"
end
end
end

it "should get messages without filter when x-stream-match-unfiltered set" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_4", args: stream_queue_args)
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new(
{
"x-stream-filter-value": "foo",
"x-stream-match-unfiltered": true,
}
)) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: foo"
msg = msgs.receive
msg.body_io.to_s.should eq "msg without filter"
end
end
end
end
end

0 comments on commit 7969430

Please sign in to comment.