From b66dc4017e5761b8a4bb5d9abfe4b9f303482c1d Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 23 Sep 2025 11:05:02 +0100 Subject: [PATCH] QQ: fix resend issues after network partition. A queue client that send a message during a network partition that later caused a distribution disconnection would in some cases never resend the lost message, even if kept in the pending buffer. Subsequent sends would be accepted by the state machine but would never be enqueued as there would be a missing sequence. In the case of publishers that use pre-settled sends the pending messages would have also been incorrectly removed from the pending map. To fix we removed timer resend aapproach and instead have the leader send leader_change messages on node up to prompt any queue clients to resend their pending buffer. --- deps/rabbit/src/rabbit_fifo.erl | 10 ++++- deps/rabbit/src/rabbit_fifo_client.erl | 43 +++++++++++------- deps/rabbit/test/quorum_queue_SUITE.erl | 52 +++++++++++++++++++++- deps/rabbit/test/rabbit_fifo_int_SUITE.erl | 4 +- 4 files changed, 89 insertions(+), 20 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index fa9c0d99540a..0c195b04ab29 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -629,6 +629,14 @@ apply(#{machine_version := Vsn} = Meta, E#enqueuer{status = up}; (_, E) -> E end, Enqs0), + %% send leader change events to all disconnected enqueuers to prompt them + %% to resend any messages stuck during disconnection, + %% ofc it may not be a leader change per se + Effects0 = maps:fold(fun(P, _E, Acc) when node(P) =:= Node -> + [{send_msg, P, leader_change, ra_event} | Acc]; + (_, _E, Acc) -> Acc + end, Monitors, Enqs0), + ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), %% mark all consumers as up {State1, Effects1} = @@ -643,7 +651,7 @@ apply(#{machine_version := Vsn} = Meta, SAcc), EAcc1}; (_, _, Acc) -> Acc - end, {State0, Monitors}, Cons0, Vsn), + end, {State0, Effects0}, Cons0, Vsn), Waiting = update_waiting_consumer_status(Node, State1, up), State2 = State1#?STATE{enqueuers = Enqs1, waiting_consumers = Waiting}, diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index f2f50301da5e..462d80362e93 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -160,7 +160,7 @@ enqueue(_QName, _Correlation, _Msg, #state{queue_status = reject_publish, cfg = #cfg{}} = State) -> {reject_publish, State}; -enqueue(QName, Correlation, Msg, +enqueue(_QName, Correlation, Msg, #state{slow = WasSlow, pending = Pending, queue_status = go, @@ -176,8 +176,9 @@ enqueue(QName, Correlation, Msg, next_seq = Seq + 1, next_enqueue_seq = EnqueueSeq + 1, slow = IsSlow}, + if IsSlow andalso not WasSlow -> - {ok, set_timer(QName, State), [{block, cluster_name(State)}]}; + {ok, State, [{block, cluster_name(State)}]}; true -> {ok, State, []} end. @@ -632,10 +633,10 @@ handle_ra_event(QName, Leader, {applied, Seqs}, when ActualLeader =/= OldLeader -> %% there is a new leader ?LOG_DEBUG("~ts: Detected QQ leader change (applied) " - "from ~w to ~w, " - "resending ~b pending commands", - [?MODULE, OldLeader, ActualLeader, - maps:size(State1#state.pending)]), + "from ~w to ~w, " + "resending ~b pending commands", + [?MODULE, OldLeader, ActualLeader, + maps:size(State1#state.pending)]), resend_all_pending(State1#state{leader = ActualLeader}); _ -> State1 @@ -702,9 +703,9 @@ handle_ra_event(QName, Leader, {machine, leader_change}, %% we need to update leader %% and resend any pending commands ?LOG_DEBUG("~ts: ~s Detected QQ leader change from ~w to ~w, " - "resending ~b pending commands", - [rabbit_misc:rs(QName), ?MODULE, OldLeader, - Leader, maps:size(Pending)]), + "resending ~b pending commands", + [rabbit_misc:rs(QName), ?MODULE, OldLeader, + Leader, maps:size(Pending)]), State = resend_all_pending(State0#state{leader = Leader}), {ok, State, []}; handle_ra_event(_QName, _From, {rejected, {not_leader, Leader, _Seq}}, @@ -714,21 +715,27 @@ handle_ra_event(QName, _From, {rejected, {not_leader, Leader, _Seq}}, #state{leader = OldLeader, pending = Pending} = State0) -> ?LOG_DEBUG("~ts: ~s Detected QQ leader change (rejection) from ~w to ~w, " - "resending ~b pending commands", - [rabbit_misc:rs(QName), ?MODULE, OldLeader, - Leader, maps:size(Pending)]), + "resending ~b pending commands", + [rabbit_misc:rs(QName), ?MODULE, OldLeader, + Leader, maps:size(Pending)]), State = resend_all_pending(State0#state{leader = Leader}), {ok, cancel_timer(State), []}; handle_ra_event(_QName, _From, {rejected, {not_leader, _UndefinedMaybe, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {ok, State0, []}; -handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}} = State0) -> +handle_ra_event(QName, _, timeout, #state{cfg = #cfg{servers = Servers}, + leader = OldLeader, + pending = Pending} = State0) -> case find_leader(Servers) of undefined -> %% still no leader, set the timer again {ok, set_timer(QName, State0), []}; Leader -> + ?LOG_DEBUG("~ts: ~s Pending applied Timeout ~w to ~w, " + "resending ~b pending commands", + [rabbit_misc:rs(QName), ?MODULE, OldLeader, + Leader, maps:size(Pending)]), State = resend_all_pending(State0#state{leader = Leader}), {ok, State, []} end; @@ -743,7 +750,7 @@ handle_ra_event(QName, Leader, close_cached_segments, case now_ms() > Last + ?CACHE_SEG_TIMEOUT of true -> ?LOG_DEBUG("~ts: closing_cached_segments", - [rabbit_misc:rs(QName)]), + [rabbit_misc:rs(QName)]), %% its been long enough, evict all _ = ra_flru:evict_all(Cache), State#state{cached_segments = undefined}; @@ -804,12 +811,16 @@ seq_applied({Seq, Response}, {Corrs, Actions0, #state{} = State0}) -> %% sequences aren't guaranteed to be applied in order as enqueues are %% low priority commands and may be overtaken by others with a normal priority. + %% + %% if the response is 'not_enqueued' we need to still keep the pending + %% command for a later resend {Actions, State} = maybe_add_action(Response, Actions0, State0), case maps:take(Seq, State#state.pending) of - {{undefined, _}, Pending} -> + {{undefined, _}, Pending} + when Response =/= not_enqueued -> {Corrs, Actions, State#state{pending = Pending}}; {{Corr, _}, Pending} - when Response /= not_enqueued -> + when Response =/= not_enqueued -> {[Corr | Corrs], Actions, State#state{pending = Pending}}; _ -> {Corrs, Actions, State} diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index dbf6d8a821c6..1a3fed31227a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -121,6 +121,7 @@ groups() -> ]}, {clustered_with_partitions, [], [ + partitioned_publisher, reconnect_consumer_and_publish, reconnect_consumer_and_wait, reconnect_consumer_and_wait_channel_down @@ -285,7 +286,8 @@ end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; +init_per_testcase(Testcase, Config) when Testcase == partitioned_publisher; + Testcase == reconnect_consumer_and_publish; Testcase == reconnect_consumer_and_wait; Testcase == reconnect_consumer_and_wait_channel_down -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), @@ -383,7 +385,8 @@ merge_app_env(Config) -> {rabbit, [{core_metrics_gc_interval, 100}]}), {ra, [{min_wal_roll_over_interval, 30000}]}). -end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; +end_per_testcase(Testcase, Config) when Testcase == partitioned_publisher; + Testcase == reconnect_consumer_and_publish; Testcase == reconnect_consumer_and_wait; Testcase == reconnect_consumer_and_wait_channel_down -> Config1 = rabbit_ct_helpers:run_steps(Config, @@ -3026,6 +3029,51 @@ cleanup_data_dir(Config) -> ?awaitMatch(false, filelib:is_dir(DataDir2), 30000), ok. +partitioned_publisher(Config) -> + [Node0, Node1, Node2] = Nodes = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch0 = rabbit_ct_client_helpers:open_channel(Config, Node0), + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Node1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Node1}} = ra:members({RaName, Node1}), + + #'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}), + #'confirm.select_ok'{} = amqp_channel:call(Ch1, #'confirm.select'{}), + %% first publish with confirm + publish_confirm(Ch0, QQ), + + %% then partition + rabbit_ct_broker_helpers:block_traffic_between(Node0, Node1), + rabbit_ct_broker_helpers:block_traffic_between(Node0, Node2), + + %% check that we can still publish from another channel that is on the + %% majority side + publish_confirm(Ch1, QQ), + + %% publish one from partitioned node that will not go through + publish(Ch0, QQ), + + %% wait for disconnections + rabbit_ct_helpers:await_condition( + fun() -> + ConnectedNodes = erpc:call(Node0, erlang, nodes, []), + not lists:member(Node1, ConnectedNodes) + end, 30000), + + flush(10), + + %% then heal the partition + rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node1), + rabbit_ct_broker_helpers:allow_traffic_between(Node0, Node2), + + publish(Ch0, QQ), + wait_for_messages_ready(Nodes, RaName, 4), + ok. + reconnect_consumer_and_publish(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl index 68811230ec0c..c160bd473f5e 100644 --- a/deps/rabbit/test/rabbit_fifo_int_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_int_SUITE.erl @@ -219,7 +219,9 @@ lost_return_is_resent_on_applied_after_leader_change(Config) -> RaEvt, F5), %% this should resend the never applied enqueue {_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6), - ?assertEqual(0, rabbit_fifo_client:pending_size(F7)), + {_, _, F8} = process_ra_events(receive_ra_events(1, 0), ClusterName, F7), + + ?assertEqual(0, rabbit_fifo_client:pending_size(F8)), flush(), ok.