diff --git a/src/framework/emqttb_group.erl b/src/framework/emqttb_group.erl index 366496b..bcb21b2 100644 --- a/src/framework/emqttb_group.erl +++ b/src/framework/emqttb_group.erl @@ -18,7 +18,7 @@ -behavior(gen_server). %% API: --export([ensure/1, stop/1, set_target/2, set_target/3, set_target_async/3, broadcast/2, report_dead_id/2, info/0]). +-export([ensure/1, stop/1, set_target/2, set_target/3, set_target_async/3, broadcast/2, report_dead_id/2, report_live_id/2, info/0]). %% gen_server callbacks: -export([init/1, handle_call/3, handle_cast/2, terminate/2, handle_info/2]). @@ -106,8 +106,13 @@ broadcast(Group, Message) -> %% @doc Add an expired ID to the pool -spec report_dead_id(emqttb:group(), integer()) -> true. report_dead_id(Group, Id) -> + ets:delete(live_id_pool(Group), Id), ets:insert(dead_id_pool(Group), {Id, []}). +-spec report_live_id(emqttb:group(), integer()) -> true. +report_live_id(Group, Id) -> + ets:insert(live_id_pool(Group), {Id, self()}). + info() -> [#{ '$id' => Id , conf_root => persistent_term:get(?GROUP_CONF_ID(Pid)) @@ -159,6 +164,7 @@ init([Conf]) -> , {labels, [group]} ]), ets:new(dead_id_pool(ID), [ordered_set, public, named_table]), + ets:new(live_id_pool(ID), [ordered_set, public, named_table]), StartN = maps:get(start_n, Conf, 0), persistent_term:put(?GROUP_LEADER_TO_GROUP_ID(self()), ID), persistent_term:put(?GROUP_BEHAVIOR(self()), Behavior), @@ -245,7 +251,6 @@ wait_group_stop([MRef|Rest]) -> end. do_set_target(Target, InitInterval, OnComplete, S = #s{ scaling = Scaling - , id = ID , autorate = Autorate }) -> N = n_clients(S), @@ -321,8 +326,27 @@ scale_up(NRepeats, S = #s{behavior = Behavior, id = Group}) -> ?tp(start_worker, #{group => Group, pid => _Pid}), scale_up(NRepeats - 1, S#s{next_id = NextId}). -scale_down(_N, S0) -> - S0. +scale_down(0, S) -> + S; +scale_down(NRepeats, S = #s{id = Group}) -> + case ets:first(live_id_pool(Group)) of + '$end_of_table' -> + ok; + Id -> + case ets:lookup(live_id_pool(Group), Id) of + [] -> + %% Race: died? + scale_down(NRepeats, S); + [{Id, Pid}] -> + MRef = monitor(process, Pid), + exit(Pid, shutdown), + receive + {'DOWN', MRef, process, Pid, _R} -> + ok + end, + scale_down(NRepeats - 1, S) + end + end. set_tick_timer() -> erlang:send_after(?TICK_TIME, self(), tick). @@ -381,5 +405,8 @@ maybe_monitor_parent(#{parent := Pid}) -> maybe_monitor_parent(_) -> undefined. +live_id_pool(Group) -> + list_to_atom(atom_to_list(Group) ++ "_live"). + dead_id_pool(Group) -> Group. diff --git a/src/framework/emqttb_worker.erl b/src/framework/emqttb_worker.erl index b62e14f..3f0f830 100644 --- a/src/framework/emqttb_worker.erl +++ b/src/framework/emqttb_worker.erl @@ -190,7 +190,9 @@ entrypoint(Behavior, Group, Number) -> emqttb_metrics:counter_inc(?GROUP_N_WORKERS(my_group()), 1), logger:set_process_metadata(#{domain => [group, Group]}), try apply(Behavior, init, [my_settings()]) of - State -> loop(State) + State -> + emqttb_group:report_live_id(my_group(), my_id()), + loop(State) catch EC:Err:Stack -> ?tp(error, emqttb_worker_crash,