Skip to content

Commit 4c9f87d

Browse files
committed
Repro hacks
1 parent 1861a70 commit 4c9f87d

File tree

3 files changed

+84
-3
lines changed

3 files changed

+84
-3
lines changed

deps/rabbit/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ endef
129129
LOCAL_DEPS = sasl os_mon inets compiler public_key crypto ssl syntax_tools xmerl
130130

131131
BUILD_DEPS = rabbitmq_cli
132-
DEPS = ranch cowlib rabbit_common amqp10_common rabbitmq_prelaunch ra sysmon_handler stdout_formatter recon redbug observer_cli osiris syslog systemd seshat horus khepri khepri_mnesia_migration cuttlefish gen_batch_server
133-
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck proper amqp_client rabbitmq_amqp_client rabbitmq_amqp1_0
132+
DEPS = ranch cowlib rabbit_common amqp10_common rabbitmq_prelaunch ra sysmon_handler stdout_formatter recon redbug observer_cli osiris syslog systemd seshat horus khepri khepri_mnesia_migration cuttlefish gen_batch_server amqp_client
133+
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers meck proper rabbitmq_amqp_client rabbitmq_amqp1_0
134134

135135
# We pin a version of Horus even if we don't use it directly (it is a
136136
# dependency of Khepri). But currently, we can't update Khepri while still

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,9 +726,12 @@ handle_down(Meta, Pid, #?STATE{consumers = Cons0,
726726
DownConsumers = maps:keys(maps:filter(fun(_CKey, ?CONSUMER_PID(P)) ->
727727
P =:= Pid
728728
end, Cons0)),
729+
%% Shuffle the map keys to simulate the keys having different orderings
730+
%% among cluster members.
731+
DownConsumers1 = [X || {_, X} <- lists:sort([{rand:uniform(), N} || N <- DownConsumers])],
729732
lists:foldl(fun(ConsumerKey, {S, E}) ->
730733
cancel_consumer(Meta, ConsumerKey, S, E, down)
731-
end, {State2, Effects1}, DownConsumers).
734+
end, {State2, Effects1}, DownConsumers1).
732735

733736
consumer_active_flag_update_function(
734737
#?STATE{cfg = #cfg{consumer_strategy = competing}}) ->

deps/rabbit/src/rabbit_repro.erl

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
-module(rabbit_repro).
2+
3+
-export([run/0]).
4+
5+
-include_lib("amqp_client/include/amqp_client.hrl").
6+
7+
-define(Q, <<"qq">>).
8+
-define(N_MESSAGES, 10).
9+
10+
run() ->
11+
{_, BasicGetRunnerRef} = spawn_monitor(fun() ->
12+
{ok, Conn} = connection(),
13+
link(Conn),
14+
{ok, Ch} = amqp_connection:open_channel(Conn),
15+
link(Ch),
16+
17+
%% Get 10 messages and then exit without checking them out so that
18+
%% they are returned.
19+
basic_get(Ch, ?N_MESSAGES),
20+
ok
21+
end),
22+
23+
receive
24+
{'DOWN', BasicGetRunnerRef, process, _, normal} ->
25+
ok;
26+
{'DOWN', BasicGetRunnerRef, process, _, _} ->
27+
exit(basic_get_runner_fail)
28+
end,
29+
30+
%% Probably not necessary but let's let everything settle. Wait for 1/2
31+
%% second.
32+
timer:sleep(500),
33+
34+
{_, BasicRejectRunnerRef} = spawn_monitor(fun() ->
35+
{ok, Conn} = connection(),
36+
link(Conn),
37+
{ok, Ch} = amqp_connection:open_channel(Conn),
38+
link(Ch),
39+
40+
%% Get and ack 9 messages.
41+
{some, AckDeliveryTag} = basic_get(Ch, ?N_MESSAGES - 1),
42+
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = AckDeliveryTag,
43+
multiple = true}),
44+
45+
%% Then get and reject the last returned message.
46+
{some, RejectDeliveryTag} = basic_get(Ch, 1),
47+
amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = RejectDeliveryTag,
48+
requeue = true}),
49+
ok
50+
end),
51+
52+
receive
53+
{'DOWN', BasicRejectRunnerRef, process, _, normal} ->
54+
ok;
55+
{'DOWN', BasicRejectRunnerRef, process, _, _} ->
56+
exit(basic_reject_runner_fail)
57+
end,
58+
59+
ok.
60+
61+
connection() ->
62+
amqp_connection:start(#amqp_params_direct{virtual_host = <<"/">>,
63+
username = <<"guest">>,
64+
password = <<"guest">>}).
65+
66+
basic_get(Ch, N) ->
67+
basic_get(Ch, N, none).
68+
69+
basic_get(_Ch, 0, Tag) ->
70+
Tag;
71+
basic_get(Ch, N, Tag0) ->
72+
Tag = case amqp_channel:call(Ch, #'basic.get'{queue = ?Q}) of
73+
{#'basic.get_ok'{delivery_tag = DeliveryTag}, _} ->
74+
{some, DeliveryTag};
75+
#'basic.get_empty'{} ->
76+
Tag0
77+
end,
78+
basic_get(Ch, N - 1, Tag).

0 commit comments

Comments
 (0)