Skip to content

Commit 9376b06

Browse files
Reduce priority_queue_SUITE to single node tests
Other tests (that produce flakes) arguably test classic mirrored queues, a deprecated feature reasonably well covered in other suites. Per discussion with @gerhard. (cherry picked from commit bed64f2)
1 parent b321476 commit 9376b06

File tree

1 file changed

+23
-273
lines changed

1 file changed

+23
-273
lines changed

deps/rabbit/test/priority_queue_SUITE.erl

Lines changed: 23 additions & 273 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,29 @@
1515

1616
all() ->
1717
[
18-
{group, cluster_size_2},
19-
{group, cluster_size_3}
18+
{group, single_node}
2019
].
2120

2221
groups() ->
2322
[
24-
{cluster_size_2, [], [
25-
ackfold,
26-
drop,
27-
{overflow_reject_publish, [], [reject]},
28-
{overflow_reject_publish_dlx, [], [reject]},
29-
dropwhile_fetchwhile,
30-
info_head_message_timestamp,
31-
matching,
32-
mirror_queue_sync,
33-
mirror_queue_sync_priority_above_max,
34-
mirror_queue_sync_priority_above_max_pending_ack,
35-
mirror_queue_sync_order,
36-
purge,
37-
requeue,
38-
resume,
39-
simple_order,
40-
straight_through,
41-
invoke,
42-
gen_server2_stats,
43-
negative_max_priorities,
44-
max_priorities_above_hard_limit
45-
]},
46-
{cluster_size_3, [], [
47-
mirror_queue_auto_ack,
48-
mirror_fast_reset_policy,
49-
mirror_reset_policy,
50-
mirror_stop_pending_followers
51-
]}
23+
{single_node, [], [
24+
ackfold,
25+
drop,
26+
{overflow_reject_publish, [], [reject]},
27+
{overflow_reject_publish_dlx, [], [reject]},
28+
dropwhile_fetchwhile,
29+
info_head_message_timestamp,
30+
matching,
31+
purge,
32+
requeue,
33+
resume,
34+
simple_order,
35+
straight_through,
36+
invoke,
37+
gen_server2_stats,
38+
negative_max_priorities,
39+
max_priorities_above_hard_limit
40+
]}
5241
].
5342

5443
%% -------------------------------------------------------------------
@@ -62,21 +51,12 @@ init_per_suite(Config) ->
6251
end_per_suite(Config) ->
6352
rabbit_ct_helpers:run_teardown_steps(Config).
6453

65-
init_per_group(cluster_size_2, Config) ->
54+
init_per_group(single_node, Config) ->
6655
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
6756
Config1 = rabbit_ct_helpers:set_config(Config, [
68-
{rmq_nodes_count, 2},
69-
{rmq_nodename_suffix, Suffix}
70-
]),
71-
rabbit_ct_helpers:run_steps(Config1,
72-
rabbit_ct_broker_helpers:setup_steps() ++
73-
rabbit_ct_client_helpers:setup_steps());
74-
init_per_group(cluster_size_3, Config) ->
75-
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
76-
Config1 = rabbit_ct_helpers:set_config(Config, [
77-
{rmq_nodes_count, 3},
78-
{rmq_nodename_suffix, Suffix}
79-
]),
57+
{rmq_nodes_count, 1},
58+
{rmq_nodename_suffix, Suffix}
59+
]),
8060
rabbit_ct_helpers:run_steps(Config1,
8161
rabbit_ct_broker_helpers:setup_steps() ++
8262
rabbit_ct_client_helpers:setup_steps());
@@ -430,199 +410,6 @@ ram_duration(_Config) ->
430410
PQ:delete_and_terminate(a_whim, BQS5),
431411
passed.
432412

433-
mirror_queue_sync(Config) ->
434-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
435-
Q = <<"mirror_queue_sync-queue">>,
436-
declare(Ch, Q, 3),
437-
publish(Ch, Q, [1, 2, 3]),
438-
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, 0,
439-
<<"^mirror_queue_sync-queue$">>, <<"all">>),
440-
publish(Ch, Q, [1, 2, 3, 1, 2, 3]),
441-
%% master now has 9, mirror 6.
442-
get_partial(Ch, Q, manual_ack, [3, 3, 3, 2, 2, 2]),
443-
%% So some but not all are unacked at the mirror
444-
Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
445-
rabbit_ct_broker_helpers:control_action(sync_queue, Nodename0,
446-
[binary_to_list(Q)], [{"-p", "/"}]),
447-
wait_for_sync(Config, Nodename0, rabbit_misc:r(<<"/">>, queue, Q)),
448-
rabbit_ct_client_helpers:close_connection(Conn),
449-
passed.
450-
451-
mirror_queue_sync_priority_above_max(Config) ->
452-
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
453-
%% Tests synchronisation of mirrors when priority is higher than max priority.
454-
%% This causes an infinity loop (and test timeout) before rabbitmq-server-795
455-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
456-
Q = <<"mirror_queue_sync_priority_above_max-queue">>,
457-
declare(Ch, Q, 3),
458-
publish(Ch, Q, [5, 5, 5]),
459-
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
460-
<<".*">>, <<"all">>),
461-
rabbit_ct_broker_helpers:control_action(sync_queue, A,
462-
[binary_to_list(Q)], [{"-p", "/"}]),
463-
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
464-
delete(Ch, Q),
465-
rabbit_ct_client_helpers:close_connection(Conn),
466-
passed.
467-
468-
mirror_queue_sync_priority_above_max_pending_ack(Config) ->
469-
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
470-
%% Tests synchronisation of mirrors when priority is higher than max priority
471-
%% and there are pending acks.
472-
%% This causes an infinity loop (and test timeout) before rabbitmq-server-795
473-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
474-
Q = <<"mirror_queue_sync_priority_above_max_pending_ack-queue">>,
475-
declare(Ch, Q, 3),
476-
publish(Ch, Q, [5, 5, 5]),
477-
%% Consume but 'forget' to acknowledge
478-
get_without_ack(Ch, Q),
479-
get_without_ack(Ch, Q),
480-
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
481-
<<".*">>, <<"all">>),
482-
rabbit_ct_broker_helpers:control_action(sync_queue, A,
483-
[binary_to_list(Q)], [{"-p", "/"}]),
484-
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
485-
synced_msgs(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 3),
486-
synced_msgs(Config, B, rabbit_misc:r(<<"/">>, queue, Q), 3),
487-
delete(Ch, Q),
488-
rabbit_ct_client_helpers:close_connection(Conn),
489-
passed.
490-
491-
mirror_queue_auto_ack(Config) ->
492-
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
493-
%% Check correct use of AckRequired in the notifications to the mirrors.
494-
%% If mirrors are notified with AckRequired == true when it is false,
495-
%% the mirrors will crash with the depth notification as they will not
496-
%% match the master delta.
497-
%% Bug rabbitmq-server 687
498-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
499-
Q = <<"mirror_queue_auto_ack-queue">>,
500-
declare(Ch, Q, 3),
501-
publish(Ch, Q, [1, 2, 3]),
502-
ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
503-
<<".*">>, <<"all">>),
504-
get_partial(Ch, Q, no_ack, [3, 2, 1]),
505-
506-
%% Retrieve mirrors
507-
SPids = slave_pids(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
508-
[{SNode1, _SPid1}, {SNode2, SPid2}] = nodes_and_pids(SPids),
509-
510-
%% Restart one of the mirrors so `request_depth` is triggered
511-
rabbit_ct_broker_helpers:restart_node(Config, SNode1),
512-
513-
%% The alive mirror must have the same pid after its neighbour is restarted
514-
timer:sleep(3000), %% ugly but we can't know when the `depth` instruction arrives
515-
Slaves = nodes_and_pids(slave_pids(Config, A, rabbit_misc:r(<<"/">>, queue, Q))),
516-
SPid2 = proplists:get_value(SNode2, Slaves),
517-
518-
delete(Ch, Q),
519-
rabbit_ct_client_helpers:close_channel(Ch),
520-
rabbit_ct_client_helpers:close_connection(Conn),
521-
passed.
522-
523-
mirror_queue_sync_order(Config) ->
524-
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
525-
B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
526-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
527-
{Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, B),
528-
Q = <<"mirror_queue_sync_order-queue">>,
529-
declare(Ch, Q, 3),
530-
publish_payload(Ch, Q, [{1, <<"msg1">>}, {2, <<"msg2">>},
531-
{2, <<"msg3">>}, {2, <<"msg4">>},
532-
{3, <<"msg5">>}]),
533-
rabbit_ct_client_helpers:close_channel(Ch),
534-
535-
%% Add and sync mirror
536-
ok = rabbit_ct_broker_helpers:set_ha_policy(
537-
Config, A, <<"^mirror_queue_sync_order-queue$">>, <<"all">>),
538-
rabbit_ct_broker_helpers:control_action(sync_queue, A,
539-
[binary_to_list(Q)], [{"-p", "/"}]),
540-
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
541-
542-
%% Stop the master
543-
rabbit_ct_broker_helpers:stop_node(Config, A),
544-
545-
get_payload(Ch2, Q, do_ack, [<<"msg5">>, <<"msg2">>, <<"msg3">>,
546-
<<"msg4">>, <<"msg1">>]),
547-
548-
delete(Ch2, Q),
549-
rabbit_ct_broker_helpers:start_node(Config, A),
550-
rabbit_ct_client_helpers:close_connection(Conn),
551-
rabbit_ct_client_helpers:close_connection(Conn2),
552-
passed.
553-
554-
mirror_reset_policy(Config) ->
555-
%% Gives time to the master to go through all stages.
556-
%% Might eventually trigger some race conditions from #802,
557-
%% although for that I would expect a longer run and higher
558-
%% number of messages in the system.
559-
mirror_reset_policy(Config, 5000).
560-
561-
mirror_fast_reset_policy(Config) ->
562-
%% This test seems to trigger the bug tested in invoke/1, but it
563-
%% cannot guarantee it will always happen. Thus, both tests
564-
%% should stay in the test suite.
565-
mirror_reset_policy(Config, 5).
566-
567-
568-
mirror_reset_policy(Config, Wait) ->
569-
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
570-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
571-
Q = <<"mirror_reset_policy-queue">>,
572-
declare(Ch, Q, 5),
573-
Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
574-
publish_many(Ch, Q, 20000),
575-
[begin
576-
rabbit_ct_broker_helpers:set_ha_policy(
577-
Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
578-
[{<<"ha-sync-mode">>, <<"automatic">>}]),
579-
timer:sleep(Wait),
580-
rabbit_ct_broker_helpers:clear_policy(
581-
Config, A, <<"^mirror_reset_policy-queue$">>),
582-
timer:sleep(Wait)
583-
end || _ <- lists:seq(1, 10)],
584-
timer:sleep(1000),
585-
ok = rabbit_ct_broker_helpers:set_ha_policy(
586-
Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
587-
[{<<"ha-sync-mode">>, <<"automatic">>}]),
588-
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2),
589-
%% Verify master has not crashed
590-
Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
591-
delete(Ch, Q),
592-
593-
rabbit_ct_client_helpers:close_connection(Conn),
594-
passed.
595-
596-
mirror_stop_pending_followers(Config) ->
597-
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
598-
B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
599-
C = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
600-
601-
[ok = rabbit_ct_broker_helpers:rpc(
602-
Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 0]) || Nodename <- [A, B, C]],
603-
604-
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
605-
Q = <<"mirror_stop_pending_followers-queue">>,
606-
declare(Ch, Q, 5),
607-
publish_many(Ch, Q, 20000),
608-
609-
[begin
610-
rabbit_ct_broker_helpers:set_ha_policy(
611-
Config, A, <<"^mirror_stop_pending_followers-queue$">>, <<"all">>,
612-
[{<<"ha-sync-mode">>, <<"automatic">>}]),
613-
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2),
614-
rabbit_ct_broker_helpers:clear_policy(
615-
Config, A, <<"^mirror_stop_pending_followers-queue$">>)
616-
end || _ <- lists:seq(1, 15)],
617-
618-
delete(Ch, Q),
619-
620-
[ok = rabbit_ct_broker_helpers:rpc(
621-
Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 15000]) || Nodename <- [A, B, C]],
622-
623-
rabbit_ct_client_helpers:close_connection(Conn),
624-
passed.
625-
626413
%%----------------------------------------------------------------------------
627414

628415
declare(Ch, Q, Args) when is_list(Args) ->
@@ -723,43 +510,6 @@ priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
723510

724511
%%----------------------------------------------------------------------------
725512

726-
wait_for_sync(Config, Nodename, Q) ->
727-
wait_for_sync(Config, Nodename, Q, 1).
728-
729-
wait_for_sync(Config, Nodename, Q, Nodes) ->
730-
wait_for_sync(Config, Nodename, Q, Nodes, 600).
731-
732-
wait_for_sync(_, _, _, _, 0) ->
733-
throw(sync_timeout);
734-
wait_for_sync(Config, Nodename, Q, Nodes, N) ->
735-
case synced(Config, Nodename, Q, Nodes) of
736-
true -> ok;
737-
false -> timer:sleep(100),
738-
wait_for_sync(Config, Nodename, Q, Nodes, N-1)
739-
end.
740-
741-
synced(Config, Nodename, Q, Nodes) ->
742-
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
743-
rabbit_amqqueue, info_all, [<<"/">>, [name, synchronised_slave_pids]]),
744-
[SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info,
745-
Q =:= Q1],
746-
length(SSPids) =:= Nodes.
747-
748-
synced_msgs(Config, Nodename, Q, Expected) ->
749-
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
750-
rabbit_amqqueue, info_all, [<<"/">>, [name, messages]]),
751-
[M] = [M || [{name, Q1}, {messages, M}] <- Info, Q =:= Q1],
752-
M =:= Expected.
753-
754-
nodes_and_pids(SPids) ->
755-
lists:zip([node(S) || S <- SPids], SPids).
756-
757-
slave_pids(Config, Nodename, Q) ->
758-
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
759-
rabbit_amqqueue, info_all, [<<"/">>, [name, slave_pids]]),
760-
[SPids] = [SPids || [{name, Q1}, {slave_pids, SPids}] <- Info,
761-
Q =:= Q1],
762-
SPids.
763513

764514
queue_pid(Config, Nodename, Q) ->
765515
Info = rabbit_ct_broker_helpers:rpc(

0 commit comments

Comments
 (0)