Skip to content

Commit 1c6e452

Browse files
committed
QQ: set better timeouts for commands
Refactor how the single active consumer check is performed when consuming. Improve timeouts in rabbit_fifo_client.
1 parent 2fb1291 commit 1c6e452

File tree

4 files changed

+73
-42
lines changed

4 files changed

+73
-42
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
14751475
rabbit_misc:protocol_error(
14761476
not_implemented, "~s does not support global qos",
14771477
[rabbit_misc:rs(QueueName)]);
1478+
{error, timeout} ->
1479+
rabbit_misc:protocol_error(
1480+
internal_error, "~s timeout occurred during consume operation",
1481+
[rabbit_misc:rs(QueueName)]);
14781482
{error, no_local_stream_replica_available} ->
14791483
rabbit_misc:protocol_error(
14801484
resource_error, "~s does not not have a running local replica",
@@ -1803,6 +1807,8 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
18031807
E;
18041808
{{error, no_local_stream_replica_available} = E, _Q} ->
18051809
E;
1810+
{{error, timeout} = E, _Q} ->
1811+
E;
18061812
{{protocol_error, Type, Reason, ReasonArgs}, _Q} ->
18071813
rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
18081814
end.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@
3131
update_machine_state/2,
3232
pending_size/1,
3333
stat/1,
34-
stat/2
34+
stat/2,
35+
query_single_active_consumer/1
3536
]).
3637

3738
-include_lib("rabbit_common/include/rabbit.hrl").
3839

3940
-define(SOFT_LIMIT, 32).
4041
-define(TIMER_TIME, 10000).
42+
-define(COMMAND_TIMEOUT, 30000).
4143

4244
-type seq() :: non_neg_integer().
4345
%% last_applied is initialised to -1
@@ -149,8 +151,6 @@ enqueue(Correlation, Msg,
149151
case rpc:call(Node, ra_machine, version, [{machine, rabbit_fifo, #{}}]) of
150152
0 ->
151153
%% the leader is running the old version
152-
%% so we can't initialize the enqueuer session safely
153-
%% fall back on old behavour
154154
enqueue(Correlation, Msg, State0#state{queue_status = go});
155155
1 ->
156156
%% were running the new version on the leader do sync initialisation
@@ -395,6 +395,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
395395
ack = Ack}, CDels0),
396396
try_process_command(Servers, Cmd, State0#state{consumer_deliveries = SDels}).
397397

398+
399+
-spec query_single_active_consumer(state()) ->
400+
{ok, term()} | {error, term()} | {timeout, term()}.
401+
query_single_active_consumer(#state{leader = undefined}) ->
402+
{error, leader_not_known};
403+
query_single_active_consumer(#state{leader = Leader}) ->
404+
case ra:local_query(Leader, fun rabbit_fifo:query_single_active_consumer/1,
405+
?COMMAND_TIMEOUT) of
406+
{ok, {_, Reply}, _} ->
407+
{ok, Reply};
408+
Err ->
409+
Err
410+
end.
411+
398412
%% @doc Provide credit to the queue
399413
%%
400414
%% This only has an effect if the consumer uses credit mode: credited
@@ -444,8 +458,8 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
444458
%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
445459
%% of messages purged.
446460
-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
447-
purge(Node) ->
448-
case ra:process_command(Node, rabbit_fifo:make_purge()) of
461+
purge(Server) ->
462+
case ra:process_command(Server, rabbit_fifo:make_purge(), ?COMMAND_TIMEOUT) of
449463
{ok, {purge, Reply}, _} ->
450464
{ok, Reply};
451465
Err ->
@@ -482,7 +496,7 @@ cluster_name(#state{cfg = #cfg{cluster_name = ClusterName}}) ->
482496
ClusterName.
483497

484498
update_machine_state(Server, Conf) ->
485-
case ra:process_command(Server, rabbit_fifo:make_update_config(Conf)) of
499+
case ra:process_command(Server, rabbit_fifo:make_update_config(Conf), ?COMMAND_TIMEOUT) of
486500
{ok, ok, _} ->
487501
ok;
488502
Err ->
@@ -640,8 +654,9 @@ untracked_enqueue([Node | _], Msg) ->
640654

641655
%% Internal
642656

643-
try_process_command([Server | Rem], Cmd, State) ->
644-
case ra:process_command(Server, Cmd, 30000) of
657+
try_process_command([Server | Rem], Cmd,
658+
#state{cfg = #cfg{timeout = Timeout}} = State) ->
659+
case ra:process_command(Server, Cmd, Timeout) of
645660
{ok, _, Leader} ->
646661
{ok, State#state{leader = Leader}};
647662
Err when length(Rem) =:= 0 ->
@@ -801,7 +816,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
801816
Query = fun (State) ->
802817
rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
803818
end,
804-
case ra:local_query(Leader, Query) of
819+
case ra:local_query(Leader, Query, ?COMMAND_TIMEOUT) of
805820
{ok, {_, Missing}, _} ->
806821
Missing;
807822
{error, Error} ->

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@ dequeue(NoAck, _LimiterPid, CTag0, QState0) ->
710710
rabbit_queue_type:consume_spec(),
711711
rabbit_fifo_client:state()) ->
712712
{ok, rabbit_fifo_client:state(), rabbit_queue_type:actions()} |
713-
{error, global_qos_not_supported_for_queue_type}.
713+
{error, global_qos_not_supported_for_queue_type | timeout}.
714714
consume(Q, #{limiter_active := true}, _State)
715715
when ?amqqueue_is_quorum(Q) ->
716716
{error, global_qos_not_supported_for_queue_type};
@@ -726,7 +726,6 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
726726
%% TODO: validate consumer arguments
727727
%% currently quorum queues do not support any arguments
728728
QName = amqqueue:get_name(Q),
729-
QPid = amqqueue:get_pid(Q),
730729
maybe_send_reply(ChPid, OkMsg),
731730
ConsumerTag = quorum_ctag(ConsumerTag0),
732731
%% A prefetch count of 0 means no limitation,
@@ -758,36 +757,43 @@ consume(Q, Spec, QState0) when ?amqqueue_is_quorum(Q) ->
758757
QState1);
759758
_ -> QState1
760759
end,
761-
case ra:local_query(QPid,
762-
fun rabbit_fifo:query_single_active_consumer/1) of
763-
{ok, {_, SacResult}, _} ->
764-
SingleActiveConsumerOn = single_active_consumer_on(Q),
765-
{IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
766-
{false, _} ->
767-
{true, up};
768-
{true, {value, {ConsumerTag, ChPid}}} ->
769-
{true, single_active};
770-
_ ->
771-
{false, waiting}
772-
end,
760+
case single_active_consumer_on(Q) of
761+
true ->
762+
%% get the leader from state
763+
case rabbit_fifo_client:query_single_active_consumer(QState) of
764+
{ok, SacResult} ->
765+
ActivityStatus = case SacResult of
766+
{value, {ConsumerTag, ChPid}} ->
767+
single_active;
768+
_ ->
769+
waiting
770+
end,
771+
rabbit_core_metrics:consumer_created(
772+
ChPid, ConsumerTag, ExclusiveConsume,
773+
AckRequired, QName,
774+
ConsumerPrefetchCount, true, %% IsSingleSctiveconsumer
775+
ActivityStatus, Args),
776+
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
777+
AckRequired, QName, Prefetch,
778+
Args, none, ActingUser),
779+
{ok, QState, []};
780+
{error, Error} ->
781+
Error;
782+
{timeout, _} ->
783+
{error, timeout}
784+
end;
785+
false ->
773786
rabbit_core_metrics:consumer_created(
774-
ChPid, ConsumerTag, ExclusiveConsume,
775-
AckRequired, QName,
776-
ConsumerPrefetchCount, IsSingleActiveConsumer,
777-
ActivityStatus, Args),
787+
ChPid, ConsumerTag, ExclusiveConsume,
788+
AckRequired, QName,
789+
ConsumerPrefetchCount, false, %% issingleactiveconsumer
790+
up, Args),
778791
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
779-
AckRequired, QName, Prefetch,
780-
Args, none, ActingUser),
781-
{ok, QState, []};
782-
{error, Error} ->
783-
Error;
784-
{timeout, _} ->
785-
{error, timeout}
792+
AckRequired, QName, Prefetch,
793+
Args, none, ActingUser),
794+
{ok, QState, []}
786795
end.
787796

788-
% -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
789-
% {'ok', rabbit_fifo_client:state()}.
790-
791797
cancel(_Q, ConsumerTag, OkMsg, _ActingUser, State) ->
792798
maybe_send_reply(self(), OkMsg),
793799
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), State).
@@ -904,8 +910,8 @@ stat(Q, Timeout) when ?is_amqqueue(Q) ->
904910
-spec purge(amqqueue:amqqueue()) ->
905911
{ok, non_neg_integer()}.
906912
purge(Q) when ?is_amqqueue(Q) ->
907-
Node = amqqueue:get_pid(Q),
908-
rabbit_fifo_client:purge(Node).
913+
Server = amqqueue:get_pid(Q),
914+
rabbit_fifo_client:purge(Server).
909915

910916
requeue(ConsumerTag, MsgIds, QState) ->
911917
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,7 +1450,7 @@ confirm_availability_on_leader_change(Config) ->
14501450
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
14511451
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
14521452
ConfirmLoop = fun Loop() ->
1453-
ok = publish_confirm(Ch, QQ, 5000),
1453+
ok = publish_confirm(Ch, QQ, 15000),
14541454
receive
14551455
{done, P} ->
14561456
P ! publisher_done,
@@ -1470,9 +1470,13 @@ confirm_availability_on_leader_change(Config) ->
14701470
timer:sleep(500),
14711471
Publisher ! {done, self()},
14721472
receive
1473-
publisher_done -> ok;
1474-
{'EXIT', Publisher, Err} -> exit(Err)
1473+
publisher_done ->
1474+
ok;
1475+
{'EXIT', Publisher, Err} ->
1476+
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
1477+
exit(Err)
14751478
after 30000 ->
1479+
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
14761480
flush(100),
14771481
exit(nothing_received_from_publisher_process)
14781482
end,

0 commit comments

Comments
 (0)