Skip to content

Commit bcac37d

Browse files
author
dcorbacho
committed
Disallow removal of the last stream member
1 parent a20cd9e commit bcac37d

File tree

2 files changed

+67
-19
lines changed

2 files changed

+67
-19
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -341,26 +341,31 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
341341
monitors = Monitors0} = State0) ->
342342
Stream0 = maps:get(StreamId, Streams0, undefined),
343343
Meta = maps:without([term, machine_version], Meta0),
344-
Stream1 = update_stream(Meta, Cmd, Stream0),
345-
Reply = case Stream1 of
346-
#stream{reply_to = undefined} ->
347-
ok;
344+
case filter_command(Meta, Cmd, Stream0) of
345+
ok ->
346+
Stream1 = update_stream(Meta, Cmd, Stream0),
347+
Reply = case Stream1 of
348+
#stream{reply_to = undefined} ->
349+
ok;
350+
_ ->
351+
%% reply_to is set so we'll reply later
352+
'$ra_no_reply'
353+
end,
354+
case Stream1 of
355+
undefined ->
356+
return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
357+
Reply, []);
348358
_ ->
349-
%% reply_to is set so we'll reply later
350-
'$ra_no_reply'
351-
end,
352-
case Stream1 of
353-
undefined ->
354-
return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
355-
Reply, []);
356-
_ ->
357-
{Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
358-
{Stream3, Effects1} = eval_listeners(Stream2, Effects0),
359-
{Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
360-
{Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
361-
return(Meta,
362-
State0#?MODULE{streams = Streams0#{StreamId => Stream},
363-
monitors = Monitors}, Reply, Effects)
359+
{Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
360+
{Stream3, Effects1} = eval_listeners(Stream2, Effects0),
361+
{Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
362+
{Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
363+
return(Meta,
364+
State0#?MODULE{streams = Streams0#{StreamId => Stream},
365+
monitors = Monitors}, Reply, Effects)
366+
end;
367+
Reply ->
368+
return(Meta, State0, Reply, [])
364369
end;
365370
apply(Meta, {down, Pid, Reason} = Cmd,
366371
#?MODULE{streams = Streams0,
@@ -874,6 +879,23 @@ make_ra_conf(Node, Nodes) ->
874879
machine => {module, ?MODULE, #{}},
875880
ra_event_formatter => Formatter}.
876881

882+
filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{members = Members0}) ->
883+
Members = maps:filter(fun(_, #member{target = S}) when S =/= deleted ->
884+
true;
885+
(_, _) ->
886+
false
887+
end, Members0),
888+
case maps:size(Members) =< 1 of
889+
true ->
890+
rabbit_log:warning(
891+
"~s failed to delete ~p replica, last cluster member",
892+
[?MODULE, Node]),
893+
{error, last_stream_member};
894+
false ->
895+
ok
896+
end;
897+
filter_command(_, _, _) ->
898+
ok.
877899

878900
update_stream(Meta, Cmd, Stream) ->
879901
try
@@ -1536,6 +1558,7 @@ set_running_to_stopped(Members) ->
15361558
(_, M) ->
15371559
M
15381560
end, Members).
1561+
15391562
-ifdef(TEST).
15401563
-include_lib("eunit/include/eunit.hrl").
15411564
-endif.

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ groups() ->
5353
leader_failover_dedupe,
5454
add_replicas]},
5555
{cluster_size_3_parallel, [parallel], [delete_replica,
56+
delete_last_replica,
5657
delete_classic_replica,
5758
delete_quorum_replica,
5859
consume_from_replica,
@@ -444,6 +445,30 @@ delete_replica(Config) ->
444445
check_leader_and_replicas(Config, [Server0]),
445446
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
446447

448+
delete_last_replica(Config) ->
449+
[Server0, Server1, Server2] =
450+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
451+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
452+
Q = ?config(queue_name, Config),
453+
?assertEqual({'queue.declare_ok', Q, 0, 0},
454+
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
455+
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
456+
?assertEqual(ok,
457+
rpc:call(Server0, rabbit_stream_queue, delete_replica,
458+
[<<"/">>, Q, Server1])),
459+
?assertEqual(ok,
460+
rpc:call(Server0, rabbit_stream_queue, delete_replica,
461+
[<<"/">>, Q, Server2])),
462+
%% check they're gone
463+
check_leader_and_replicas(Config, [Server0]),
464+
%% delete the last one
465+
?assertEqual({error, last_stream_member},
466+
rpc:call(Server0, rabbit_stream_queue, delete_replica,
467+
[<<"/">>, Q, Server0])),
468+
%% It's still here
469+
check_leader_and_replicas(Config, [Server0]),
470+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
471+
447472
grow_coordinator_cluster(Config) ->
448473
[Server0, Server1, _Server2] =
449474
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)