Skip to content

Commit

Permalink
autorate: Add APIs to freeze and thaw the autorate
Browse files Browse the repository at this point in the history
  • Loading branch information
ieQu1 committed Dec 7, 2023
1 parent 5ac3cc7 commit 5a2338e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
46 changes: 31 additions & 15 deletions src/framework/emqttb_autorate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
%% https://controlguru.com/integral-reset-windup-jacketing-logic-and-the-velocity-pi-form/

%% API:
-export([ls/1, ensure/1, get_counter/1, reset/2, info/0, create_autorates/0]).
-export([ls/1, ensure/1, get_counter/1, reset/2, info/0, create_autorates/0, activate/1, deactivate/1]).

%% gen_server callbacks:
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
Expand Down Expand Up @@ -92,21 +92,21 @@ ensure(Conf) ->
{auto, get_counter(Pid)}.

-spec get_counter(lee:key() | emqttb:autorate() | pid()) -> counters:counters_ref().
get_counter(Pid) when is_pid(Pid) ->
gen_server:call(Pid, get_counter);
get_counter(Id) when is_atom(Id) ->
gen_server:call(?via(Id), get_counter);
get_counter(Key) ->
#mnode{metaparams = #{autorate_id := Id}} = lee_model:get(Key, ?MYMODEL),
gen_server:call(?via(Id), get_counter).
get_counter(Autorate) ->
gen_server:call(server(Autorate), get_counter).

%% Set the current value to the specified value
-spec reset(atom() | pid() | lee:ley(), integer()) -> ok.
reset(Key, Val) when is_list(Key) ->
#mnode{metaparams = #{autorate_id := Id}} = lee_model:get(Key, ?MYMODEL),
reset(Id, Val);
reset(Id, Val) ->
gen_server:call(?via(Id), {reset, Val}).
reset(Autorate, Val) ->
gen_server:call(server(Autorate), {reset, Val}).

%% Freeze the value of autorate
deactivate(Autorate) ->
gen_server:call(server(Autorate), deactivate).

%% Thaw the value of autorate
activate(Autorate) ->
gen_server:call(server(Autorate), activate).

-spec info() -> [info()].
info() ->
Expand Down Expand Up @@ -291,6 +291,7 @@ from_model(Key) ->
-record(s,
{ id :: atom()
, parent :: reference() | undefined
, active :: boolean()
, current :: float()
, conf_root :: atom()
, error :: fun(() -> number())
Expand Down Expand Up @@ -327,6 +328,7 @@ init(Config = #{id := Id, conf_root := ConfRoot}) ->
Err = ErrF(),
set_timer(ConfRoot),
{ok, update_rate(#s{ id = Id
, active = true
, parent = MRef
, current = Current
, conf_root = ConfRoot
Expand All @@ -337,6 +339,10 @@ init(Config = #{id := Id, conf_root := ConfRoot}) ->
, last_t = os:system_time(millisecond)
})}.

handle_call(activate, _From, S) ->
{reply, ok, S#s{active = true}};
handle_call(deactivate, _From, S) ->
{reply, ok, S#s{active = false}};
handle_call(get_counter, _From, S) ->
{reply, emqttb_metrics:gauge_ref(?AUTORATE_RATE(S#s.id)), S};
handle_call({reset, Val}, _From, S) ->
Expand All @@ -347,9 +353,11 @@ handle_call(_, _From, S) ->
handle_cast(_, S) ->
{noreply, S}.

handle_info(tick, S) ->
handle_info(tick, S = #s{active = Active}) ->
set_timer(S#s.conf_root),
{noreply, update_rate(S)};
if Active -> {noreply, update_rate(S)};
true -> {noreply, S}
end;
handle_info({'DOWN', MRef, _, _, _}, S = #s{parent = MRef}) ->
{stop, normal, S};
handle_info(_, S) ->
Expand Down Expand Up @@ -463,3 +471,11 @@ read_pvar(ConfRoot) ->
AvgWindow = 5000,
emqttb_metrics:get_rolling_average(MetricKey, AvgWindow)
end.

server(Pid) when is_pid(Pid) ->
Pid;
server(Id) when is_atom(Id) ->
?via(Id);
server(Key) when is_list(Key) ->
#mnode{metaparams = #{autorate_id := Id}} = lee_model:get(Key, ?MYMODEL),
?via(Id).
1 change: 1 addition & 0 deletions src/framework/emqttb_group.erl
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ do_set_target(Target, InitInterval, OnComplete, S = #s{ scaling = Scaling
true -> down
end,
maybe_cancel_previous(Scaling),
emqttb_autorate:activate(Autorate),
case Direction of
stay ->
OnComplete({ok, N}),
Expand Down

0 comments on commit 5a2338e

Please sign in to comment.