Skip to content

Commit 1f8f248

Browse files
Merge pull request #3670 from rabbitmq/mergify/bp/v3.9.x/pr-3628
QQ: stability and channel side improvements (backport #3628)
2 parents ca46d45 + 6549636 commit 1f8f248

File tree

6 files changed

+102
-55
lines changed

6 files changed

+102
-55
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ priv_absent(QueueName, QPid, true, nodedown) ->
677677
rabbit_misc:protocol_error(
678678
not_found,
679679
"home node '~s' of durable ~s is down or inaccessible",
680-
[node(QPid), rabbit_misc:rs(QueueName)]);
680+
[amqqueue:qnode(QPid), rabbit_misc:rs(QueueName)]);
681681

682682
priv_absent(QueueName, _QPid, _IsDurable, stopped) ->
683683
rabbit_misc:protocol_error(

deps/rabbit/src/rabbit_channel.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
14751475
rabbit_misc:protocol_error(
14761476
not_implemented, "~s does not support global qos",
14771477
[rabbit_misc:rs(QueueName)]);
1478+
{error, timeout} ->
1479+
rabbit_misc:protocol_error(
1480+
internal_error, "~s timeout occurred during consume operation",
1481+
[rabbit_misc:rs(QueueName)]);
14781482
{error, no_local_stream_replica_available} ->
14791483
rabbit_misc:protocol_error(
14801484
resource_error, "~s does not not have a running local replica",
@@ -1803,6 +1807,8 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
18031807
E;
18041808
{{error, no_local_stream_replica_available} = E, _Q} ->
18051809
E;
1810+
{{error, timeout} = E, _Q} ->
1811+
E;
18061812
{{protocol_error, Type, Reason, ReasonArgs}, _Q} ->
18071813
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
18081814
end.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
update_machine_state/2,
3232
pending_size/1,
3333
stat/1,
34-
stat/2
34+
stat/2,
35+
query_single_active_consumer/1
3536
]).
3637

3738
-include_lib("rabbit_common/include/rabbit.hrl").
3839

3940
-define(SOFT_LIMIT, 32).
4041
-define(TIMER_TIME, 10000).
42+
-define(COMMAND_TIMEOUT, 30000).
4143

4244
-type seq() :: non_neg_integer().
4345
%% last_applied is initialised to -1
@@ -142,27 +144,33 @@ init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
142144
enqueue(Correlation, Msg,
143145
#state{queue_status = undefined,
144146
next_enqueue_seq = 1,
145-
cfg = #cfg{timeout = Timeout}} = State0) ->
147+
cfg = #cfg{servers = Servers,
148+
timeout = Timeout}} = State0) ->
146149
%% it is the first enqueue, check the version
147-
{_, Node} = Server = pick_server(State0),
150+
{_, Node} = pick_server(State0),
148151
case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
149152
0 ->
150153
%% the leader is running the old version
151-
%% so we can't initialize the enqueuer session safely
152-
%% fall back on old behavour
153154
enqueue(Correlation, Msg, State0#state{queue_status = go});
154155
1 ->
155156
%% were running the new version on the leader do sync initialisation
156157
%% of enqueuer session
157158
Reg = rabbit_fifo:make_register_enqueuer(self()),
158-
case ra:process_command(Server, Reg, Timeout) of
159-
{ok, reject_publish, _} ->
160-
{reject_publish, State0#state{queue_status = reject_publish}};
161-
{ok, ok, _} ->
162-
enqueue(Correlation, Msg, State0#state{queue_status = go});
159+
case ra:process_command(Servers, Reg, Timeout) of
160+
{ok, reject_publish, Leader} ->
161+
{reject_publish, State0#state{leader = Leader,
162+
queue_status = reject_publish}};
163+
{ok, ok, Leader} ->
164+
enqueue(Correlation, Msg, State0#state{leader = Leader,
165+
queue_status = go});
166+
{error, {no_more_servers_to_try, _Errs}} ->
167+
%% if we are not able to process the register command
168+
%% it is safe to reject the message as we never attempted
169+
%% to send it
170+
{reject_publish, State0};
171+
%% TODO: not convinced this can ever happen when using
172+
%% a list of servers
163173
{timeout, _} ->
164-
%% if we timeout it is probably better to reject
165-
%% the message than being uncertain
166174
{reject_publish, State0};
167175
Err ->
168176
exit(Err)
@@ -387,6 +395,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
387395
ack = Ack}, CDels0),
388396
try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).
389397

398+
399+
-spec query_single_active_consumer(state()) ->
400+
{ok, term()} | {error, term()} | {timeout, term()}.
401+
query_single_active_consumer(#state{leader = undefined}) ->
402+
{error, leader_not_known};
403+
query_single_active_consumer(#state{leader = Leader}) ->
404+
case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1,
405+
?COMMAND_TIMEOUT) of
406+
{ok, {_, Reply}, _} ->
407+
{ok, Reply};
408+
Err ->
409+
Err
410+
end.
411+
390412
%% @doc Provide credit to the queue
391413
%%
392414
%% This only has an effect if the consumer uses credit mode: credited
@@ -436,8 +458,8 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
436458
%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
437459
%% of messages purged.
438460
-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
439-
purge(Node) ->
440-
case ra:process_command(Node, rabbit_fifo:make_purge()) of
461+
purge(Server) ->
462+
case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of
441463
{ok, {purge, Reply}, _} ->
442464
{ok, Reply};
443465
Err ->
@@ -474,7 +496,7 @@ cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
474496
ClusterName.
475497

476498
update_machine_state(Server, Conf) ->
477-
case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
499+
case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of
478500
{ok, ok, _} ->
479501
ok;
480502
Err ->
@@ -632,8 +654,9 @@ untracked_enqueue([Node | _], Msg) ->
632654

633655
%% Internal
634656

635-
try_process_command([Server | Rem], Cmd, State) ->
636-
case ra:process_command(Server, Cmd, 30000) of
657+
try_process_command([Server | Rem], Cmd,
658+
#state{cfg = #cfg{timeout = Timeout}} = State) ->
659+
case ra:process_command(Server, Cmd, Timeout) of
637660
{ok, _, Leader} ->
638661
{ok, State#state{leader = Leader}};
639662
Err when length(Rem) =:= 0 ->
@@ -793,7 +816,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
793816
Query = fun (State) ->
794817
rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
795818
end,
796-
case ra:local_query(Leader, Query) of
819+
case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of
797820
{ok, {_, Missing}, _} ->
798821
Missing;
799822
{error, Error} ->

deps/rabbit/src/rabbit_queue_type.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,9 @@ recover(VHost, Qs) ->
384384
rabbit_quorum_queue => [],
385385
rabbit_stream_queue => []}, Qs),
386386
maps:fold(fun (Mod, Queues, {R0, F0}) ->
387-
{R, F} = Mod:recover(VHost, Queues),
387+
{Taken, {R, F}} = timer:tc(Mod, recover, [VHost, Queues]),
388+
rabbit_log:info("Recovering ~b queues of type ~s took ~bms",
389+
[length(Queues), Mod, Taken div 1000]),
388390
{R0 ++ R, F0 ++ F}
389391
end, {[], []}, ByType).
390392

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,9 @@ handle_tick(QName,
415415
%% this makes calls to remote processes so cannot be run inside the
416416
%% ra server
417417
Self = self(),
418-
_ = spawn(fun() ->
418+
_ = spawn(
419+
fun() ->
420+
try
419421
R = reductions(Name),
420422
rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
421423
Util = case C of
@@ -454,7 +456,11 @@ handle_tick(QName,
454456

455457
ok
456458
end
457-
end),
459+
catch
460+
_:_ ->
461+
ok
462+
end
463+
end),
458464
ok.
459465

460466
repair_leader_record(QName, Self) ->
@@ -704,7 +710,7 @@ dequeue(NoAck, _LimiterPid, CTag0, QState0) ->
704710
rabbit_queue_type:consume_spec(),
705711
rabbit_fifo_client:state()) ->
706712
{ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
707-
{error, global_qos_not_supported_for_queue_type}.
713+
{error, global_qos_not_supported_for_queue_type | timeout}.
708714
consume(Q, #{limiter_active := true}, _State)
709715
when ?amqqueue_is_quorum(Q) ->
710716
{error, global_qos_not_supported_for_queue_type};
@@ -720,7 +726,6 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
720726
%% TODO: validate consumer arguments
721727
%% currently quorum queues do not support any arguments
722728
QName = amqqueue:get_name(Q),
723-
QPid = amqqueue:get_pid(Q),
724729
maybe_send_reply(ChPid, OkMsg),
725730
ConsumerTag = quorum_ctag(ConsumerTag0),
726731
%% A prefetch count of 0 means no limitation,
@@ -752,36 +757,43 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
752757
QState1);
753758
_ -> QState1
754759
end,
755-
case ra:local_query(QPid,
756-
fun rabbit_fifo:query_single_active_consumer/1) of
757-
{ok, {_, SacResult}, _} ->
758-
SingleActiveConsumerOn = single_active_consumer_on(Q),
759-
{IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
760-
{false, _} ->
761-
{true, up};
762-
{true, {value, {ConsumerTag, ChPid}}} ->
763-
{true, single_active};
764-
_ ->
765-
{false, waiting}
766-
end,
760+
case single_active_consumer_on(Q) of
761+
true ->
762+
%% get the leader from state
763+
case rabbit_fifo_client:query_single_active_consumer(QState) of
764+
{ok, SacResult} ->
765+
ActivityStatus = case SacResult of
766+
{value, {ConsumerTag, ChPid}} ->
767+
single_active;
768+
_ ->
769+
waiting
770+
end,
771+
rabbit_core_metrics:consumer_created(
772+
ChPid, ConsumerTag, ExclusiveConsume,
773+
AckRequired, QName,
774+
ConsumerPrefetchCount, true, %% IsSingleSctiveconsumer
775+
ActivityStatus, Args),
776+
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
777+
AckRequired, QName, Prefetch,
778+
Args, none, ActingUser),
779+
{ok, QState, []};
780+
{error, Error} ->
781+
Error;
782+
{timeout, _} ->
783+
{error, timeout}
784+
end;
785+
false ->
767786
rabbit_core_metrics:consumer_created(
768-
ChPid, ConsumerTag, ExclusiveConsume,
769-
AckRequired, QName,
770-
ConsumerPrefetchCount, IsSingleActiveConsumer,
771-
ActivityStatus, Args),
787+
ChPid, ConsumerTag, ExclusiveConsume,
788+
AckRequired, QName,
789+
ConsumerPrefetchCount, false, %% issingleactiveconsumer
790+
up, Args),
772791
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
773-
AckRequired, QName, Prefetch,
774-
Args, none, ActingUser),
775-
{ok, QState, []};
776-
{error, Error} ->
777-
Error;
778-
{timeout, _} ->
779-
{error, timeout}
792+
AckRequired, QName, Prefetch,
793+
Args, none, ActingUser),
794+
{ok, QState, []}
780795
end.
781796

782-
% -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
783-
% {'ok', rabbit_fifo_client:state()}.
784-
785797
cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) ->
786798
maybe_send_reply(self(), OkMsg),
787799
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State).
@@ -898,8 +910,8 @@ stat(Q, Timeout) when ?is_amqqueue(Q) ->
898910
-spec purge(amqqueue:amqqueue()) ->
899911
{ok, non_neg_integer()}.
900912
purge(Q) when ?is_amqqueue(Q) ->
901-
Node = amqqueue:get_pid(Q),
902-
rabbit_fifo_client:purge(Node).
913+
Server = amqqueue:get_pid(Q),
914+
rabbit_fifo_client:purge(Server).
903915

904916
requeue(ConsumerTag, MsgIds, QState) ->
905917
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,7 +1441,7 @@ confirm_availability_on_leader_change(Config) ->
14411441
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
14421442
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
14431443
ConfirmLoop = fun Loop() ->
1444-
ok = publish_confirm(Ch, QQ, 5000),
1444+
ok = publish_confirm(Ch, QQ, 15000),
14451445
receive
14461446
{done, P} ->
14471447
P ! publisher_done,
@@ -1461,9 +1461,13 @@ confirm_availability_on_leader_change(Config) ->
14611461
timer:sleep(500),
14621462
Publisher ! {done, self()},
14631463
receive
1464-
publisher_done -> ok;
1465-
{'EXIT', Publisher, Err} -> exit(Err)
1464+
publisher_done ->
1465+
ok;
1466+
{'EXIT', Publisher, Err} ->
1467+
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
1468+
exit(Err)
14661469
after 30000 ->
1470+
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
14671471
flush(100),
14681472
exit(nothing_received_from_publisher_process)
14691473
end,

0 commit comments

Comments
 (0)