Skip to content

Commit

Permalink
feat: implement scale down
Browse files Browse the repository at this point in the history
Without this, scenarios like `sub_flapping` will just hang if they try to scale down.
  • Loading branch information
thalesmg committed Aug 15, 2024
1 parent 788a82c commit 0047fb9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
35 changes: 31 additions & 4 deletions src/framework/emqttb_group.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-behavior(gen_server).

%% API:
-export([ensure/1, stop/1, set_target/2, set_target/3, set_target_async/3, broadcast/2, report_dead_id/2, info/0]).
-export([ensure/1, stop/1, set_target/2, set_target/3, set_target_async/3, broadcast/2, report_dead_id/2, report_live_id/2, info/0]).

%% gen_server callbacks:
-export([init/1, handle_call/3, handle_cast/2, terminate/2, handle_info/2]).
Expand Down Expand Up @@ -106,8 +106,13 @@ broadcast(Group, Message) ->
%% @doc Add an expired ID to the pool
-spec report_dead_id(emqttb:group(), integer()) -> true.
report_dead_id(Group, Id) ->
ets:delete(live_id_pool(Group), Id),
ets:insert(dead_id_pool(Group), {Id, []}).

-spec report_live_id(emqttb:group(), integer()) -> true.
report_live_id(Group, Id) ->
ets:insert(live_id_pool(Group), {Id, self()}).

info() ->
[#{ '$id' => Id
, conf_root => persistent_term:get(?GROUP_CONF_ID(Pid))
Expand Down Expand Up @@ -159,6 +164,7 @@ init([Conf]) ->
, {labels, [group]}
]),
ets:new(dead_id_pool(ID), [ordered_set, public, named_table]),
ets:new(live_id_pool(ID), [ordered_set, public, named_table]),
StartN = maps:get(start_n, Conf, 0),
persistent_term:put(?GROUP_LEADER_TO_GROUP_ID(self()), ID),
persistent_term:put(?GROUP_BEHAVIOR(self()), Behavior),
Expand Down Expand Up @@ -245,7 +251,6 @@ wait_group_stop([MRef|Rest]) ->
end.

do_set_target(Target, InitInterval, OnComplete, S = #s{ scaling = Scaling
, id = ID
, autorate = Autorate
}) ->
N = n_clients(S),
Expand Down Expand Up @@ -321,8 +326,27 @@ scale_up(NRepeats, S = #s{behavior = Behavior, id = Group}) ->
?tp(start_worker, #{group => Group, pid => _Pid}),
scale_up(NRepeats - 1, S#s{next_id = NextId}).

scale_down(_N, S0) ->
S0.
scale_down(0, S) ->
S;
scale_down(NRepeats, S = #s{id = Group}) ->
case ets:first(live_id_pool(Group)) of
'$end_of_table' ->
ok;
Id ->
case ets:lookup(live_id_pool(Group), Id) of
[] ->
%% Race: died?
scale_down(NRepeats, S);
[{Id, Pid}] ->
MRef = monitor(process, Pid),
exit(Pid, shutdown),
receive
{'DOWN', MRef, process, Pid, _R} ->
ok
end,
scale_down(NRepeats - 1, S)
end
end.

set_tick_timer() ->
erlang:send_after(?TICK_TIME, self(), tick).
Expand Down Expand Up @@ -381,5 +405,8 @@ maybe_monitor_parent(#{parent := Pid}) ->
maybe_monitor_parent(_) ->
undefined.

live_id_pool(Group) ->
list_to_atom(atom_to_list(Group) ++ "_live").

dead_id_pool(Group) ->
Group.
4 changes: 3 additions & 1 deletion src/framework/emqttb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ entrypoint(Behavior, Group, Number) ->
emqttb_metrics:counter_inc(?GROUP_N_WORKERS(my_group()), 1),
logger:set_process_metadata(#{domain => [group, Group]}),
try apply(Behavior, init, [my_settings()]) of
State -> loop(State)
State ->
emqttb_group:report_live_id(my_group(), my_id()),
loop(State)
catch
EC:Err:Stack ->
?tp(error, emqttb_worker_crash,
Expand Down

0 comments on commit 0047fb9

Please sign in to comment.