From 08f99b1b17be423436a3c2851942fd61925a68bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 13 Dec 2024 15:09:51 +0100 Subject: [PATCH] Fix flaky upstream spec (#878) --- spec/upstream_spec.cr | 48 +++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index 358e990d09..35a1b50cb2 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -352,44 +352,52 @@ describe LavinMQ::Federation::Upstream do s.users.add_permission("guest", ds_vhost.name, /.*/, /.*/, /.*/) message_count = 2 + ds_vhost.declare_queue(ds_queue_name, true, false) + ds_queue = ds_vhost.queues[ds_queue_name].as(LavinMQ::AMQP::Queue) + + wait_for { ds_queue.policy.try(&.name) == "FE" } + wait_for { upstream.links.first?.try &.state.running? } + + us_queue = us_vhost.queues[us_queue_name].as(LavinMQ::AMQP::Queue) + # publish 2 messages to upstream queue with_channel(s, vhost: us_vhost.name) do |upstream_ch| upstream_q = upstream_ch.queue(us_queue_name) message_count.times do |i| - upstream_q.publish "msg#{i}" + upstream_q.publish_confirm "msg#{i}" end end + us_queue.message_count.should eq message_count + # consume 1 message from downstream queue with_channel(s, vhost: ds_vhost.name) do |downstream_ch| downstream_q = downstream_ch.queue(ds_queue_name) - # make sure FE policy is set and link is running - wait_for { ds_vhost.queues[ds_queue_name].policy.try(&.name) == "FE" } - wait_for { upstream.links.first?.try &.state.running? } + downstream_q.subscribe(tag: "c") { downstream_q.unsubscribe("c") } - messages_consumed = 0 - downstream_q.subscribe(tag: "c") do |_msg| - messages_consumed += 1 - downstream_q.unsubscribe("c") + # Wait until consumer has been removed before we continue + until ds_queue.consumers.empty? + ds_queue.@consumers_empty_change.receive? end - wait_for { messages_consumed == 1 } - wait_for { s.vhosts[ds_vhost.name].queues[ds_queue_name].message_count == 0 } - wait_for { s.vhosts[us_vhost.name].queues[us_queue_name].message_count == 1 } end - # make sure consumer is disconnected - wait_for { s.vhosts[ds_vhost.name].queues[ds_queue_name].consumers.empty? } + # One message hsa been transferred? + us_queue.message_count.should eq 1 - # resume consuming on downstream, should get 1 message + # resume consuming on downstream, federation should start again with_channel(s, vhost: ds_vhost.name) do |downstream_ch| - messages_consumed = 0 - downstream_ch.queue(ds_queue_name).subscribe(tag: "c2") do |_msg| - messages_consumed += 1 + ch = Channel(Nil).new + downstream_ch.queue(ds_queue_name).subscribe(tag: "c2") { ch.close } + + select + when ch.receive? + when timeout 100.milliseconds + fail "federation didn't resume? timeout waiting for message on downstream queue" end - wait_for { s.vhosts[us_vhost.name].queues[us_queue_name].message_count == 0 } - wait_for { s.vhosts[ds_vhost.name].queues[ds_queue_name].message_count == 0 } - wait_for { messages_consumed == 1 } + + us_queue.message_count.should eq 0 + ds_queue.message_count.should eq 0 end end end