Skip to content

Commit

Permalink
Fix flaky upstream spec (#878)
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun authored Dec 13, 2024
1 parent 5faa7f1 commit 4fe717c
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -352,44 +352,52 @@ describe LavinMQ::Federation::Upstream do
s.users.add_permission("guest", ds_vhost.name, /.*/, /.*/, /.*/)
message_count = 2

ds_vhost.declare_queue(ds_queue_name, true, false)
ds_queue = ds_vhost.queues[ds_queue_name].as(LavinMQ::AMQP::Queue)

wait_for { ds_queue.policy.try(&.name) == "FE" }
wait_for { upstream.links.first?.try &.state.running? }

us_queue = us_vhost.queues[us_queue_name].as(LavinMQ::AMQP::Queue)

# publish 2 messages to upstream queue
with_channel(s, vhost: us_vhost.name) do |upstream_ch|
upstream_q = upstream_ch.queue(us_queue_name)
message_count.times do |i|
upstream_q.publish "msg#{i}"
upstream_q.publish_confirm "msg#{i}"
end
end

us_queue.message_count.should eq message_count

# consume 1 message from downstream queue
with_channel(s, vhost: ds_vhost.name) do |downstream_ch|
downstream_q = downstream_ch.queue(ds_queue_name)

# make sure FE policy is set and link is running
wait_for { ds_vhost.queues[ds_queue_name].policy.try(&.name) == "FE" }
wait_for { upstream.links.first?.try &.state.running? }
downstream_q.subscribe(tag: "c") { downstream_q.unsubscribe("c") }

messages_consumed = 0
downstream_q.subscribe(tag: "c") do |_msg|
messages_consumed += 1
downstream_q.unsubscribe("c")
# Wait until consumer has been removed before we continue
until ds_queue.consumers.empty?
ds_queue.@consumers_empty_change.receive?
end
wait_for { messages_consumed == 1 }
wait_for { s.vhosts[ds_vhost.name].queues[ds_queue_name].message_count == 0 }
wait_for { s.vhosts[us_vhost.name].queues[us_queue_name].message_count == 1 }
end

# make sure consumer is disconnected
wait_for { s.vhosts[ds_vhost.name].queues[ds_queue_name].consumers.empty? }
# One message hsa been transferred?
us_queue.message_count.should eq 1

# resume consuming on downstream, should get 1 message
# resume consuming on downstream, federation should start again
with_channel(s, vhost: ds_vhost.name) do |downstream_ch|
messages_consumed = 0
downstream_ch.queue(ds_queue_name).subscribe(tag: "c2") do |_msg|
messages_consumed += 1
ch = Channel(Nil).new
downstream_ch.queue(ds_queue_name).subscribe(tag: "c2") { ch.close }

select
when ch.receive?
when timeout 100.milliseconds
fail "federation didn't resume? timeout waiting for message on downstream queue"
end
wait_for { s.vhosts[us_vhost.name].queues[us_queue_name].message_count == 0 }
wait_for { s.vhosts[ds_vhost.name].queues[ds_queue_name].message_count == 0 }
wait_for { messages_consumed == 1 }

us_queue.message_count.should eq 0
ds_queue.message_count.should eq 0
end
end
end
Expand Down

0 comments on commit 4fe717c

Please sign in to comment.