Skip to content

Commit

Permalink
feat: Add generic SCRAM configuration for the autorates
Browse files Browse the repository at this point in the history
  • Loading branch information
ieQu1 committed Dec 7, 2023
1 parent 8c9c920 commit cb15c6d
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dialyzer: $(REBAR)

.PHONY: test
test: $(REBAR)
$(REBAR) do eunit, ct
$(REBAR) do compile, eunit, ct

.PHONY: release
release: compile docs
Expand Down
46 changes: 32 additions & 14 deletions doc/src/schema.adoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
:!sectids:
:stem:
= Documentation

[id=cluster.node_name]
Expand All @@ -20,24 +21,54 @@ This can be very expensive in man-hours and computing resources.
In order to prevent that, emqttb can tune some parameters (such as message publishing interval)
automatically using https://controlguru.com/integral-reset-windup-jacketing-logic-and-the-velocity-pi-form/[PI controller].

The following formula is used for the error function:

stem:[e=(a_{SP} - a_{PV}) a_{coeff}]

=== Autoscale

A special autorate controlling the rate of spawning new clients is implicitly created for each client group.
Its name usually follows the pattern `%scenario%/conn_interval`.


By default, the number of pending (unacked) connections is used as the process variable.
Number of pending connections is a metric that responds very fast to target overload, so it makes a reasonable default.

For example the following command can automatically adjust the rate of connections:

[code,bash]
----
./emqttb --pushgw @conn -I 10ms -N 5_000 \
@a -a conn/conninterval -V 1000 --Ti 0.01 --setpoint 100
@a -a conn/conninterval -V 1000 --setpoint 10
----


[id=autorate._.id]
== ID of the autorate configuration

Autorate configuration can be referred by id.
This value must be equal to one of the elements returned by `emqttb @ls autorate` command.


[id=autorate._.speed]
== Maximum rate of change of the controlled parameter

Note: by default this parameter is set to 0 for each autorate, effectively locking the control parameter in place.


[id=autorate._.process_variable]
== Process variable

This parameter specifies ID of the metric that senses pressure on the SUT and serves as the process variable (PV).
Its value must be equal to one of the metric IDs returned by `emqttb @ls metric` command.

[id=autorate._.setpoint]
== Setpoint

The desired value of the process variable (PV) is called the setpoint.
Autorate adjusts the value of the control variable (CV) to bring the PV close to the setpoint.


[id=interval]
== Default interval between events

Expand All @@ -51,11 +82,6 @@ Supported units:

If unit is not specified then `ms` is assumed.

[id=autorate._.id]
== ID of the autorate configuration

Autorate configuration can be referred by id.

[id=scenarios.sub]
== Run scenario sub

Expand Down Expand Up @@ -278,14 +304,6 @@ The following substitutions are supported:

How often the clients will send `PING` MQTT message to the broker on idle connections.

[id=groups._.target_conn_pending]
== Target number of unacked connections

In order to optimize the connection rate autoscale relies on the number of unacked (pending) connections.
This parameter configures the value that emqttb autoscale will try to approach.

Number of pending connections is a metric that responds very fast to target overload, so we use it.


[id=groups._.scram.threshold]
== Maximum unacked CONNECT packets
Expand Down
77 changes: 65 additions & 12 deletions src/framework/emqttb_autorate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,33 @@ model() ->
, cli_operand => "update-interval"
, cli_short => $u
}}
, scram =>
#{ enabled =>
{[value, cli_param],
#{ type => boolean()
, default => false
, cli_operand => "olp"
}}
, threshold =>
{[value, cli_param],
#{ type => non_neg_integer()
, default => 1000
, cli_operand => "olp-threshold"
}}
, hysteresis =>
{[value, cli_param],
#{ oneliner => "Hysteresis (%) of overload detection"
, type => typerefl:range(1, 100)
, default => 50
, cli_operand => "olp-hysteresis"
}}
, override =>
{[value, cli_param],
#{ type => emqttb:duration_us()
, default_str => "10s"
, cli_operand => "olp-override"
}}
}
}.

%%================================================================================
Expand Down Expand Up @@ -298,14 +325,13 @@ init(Config = #{id := Id, conf_root := ConfRoot}) ->
Min = my_cfg(ConfRoot, [min]),
Current = maps:get(init_val, Config, Min),
Err = ErrF(),
ScramFun = maps:get(scram, Config, fun(_) -> false end),
set_timer(ConfRoot),
{ok, update_rate(#s{ id = Id
, parent = MRef
, current = Current
, conf_root = ConfRoot
, error = ErrF
, scram_fun = ScramFun
, scram_fun = make_scram_fun(Id, ConfRoot)
, meltdown = false
, last_err = Err
, last_t = os:system_time(millisecond)
Expand Down Expand Up @@ -395,18 +421,45 @@ my_cfg(ConfRoot, Key) ->
-spec make_error_fun(emqttb:autorate(), lee:key()) -> fun(() -> number()).
make_error_fun(Id, ConfRoot) ->
fun() ->
ProcessVarKey = my_cfg(ConfRoot, [process_variable]),
SetPoint = my_cfg(Id, [set_point]),
Coeff = my_cfg(ConfRoot, [error_coeff]),
MetricKey = emqttb_metrics:from_model(ProcessVarKey),
#mnode{metaparams = PVarMPs} = lee_model:get(ProcessVarKey, ?MYMODEL),
ProcessVar = case ?m_attr(metric, metric_type, PVarMPs) of
counter ->
emqttb_metrics:get_counter(MetricKey);
rolling_average ->
AvgWindow = 5000,
emqttb_metrics:get_rolling_average(MetricKey, AvgWindow)
end,
ProcessVar = read_pvar(ConfRoot),
%% logger:error(#{autorate => Id, pvar => ProcessVar, setpoint => SetPoint, pvar_key => ProcessVarKey}),
Coeff * (SetPoint - ProcessVar)
end.

make_scram_fun(Id, ConfRoot) ->
fun(Meltdown) ->
Enabled = my_cfg(ConfRoot, [scram, enabled]),
Threshold = my_cfg(ConfRoot, [scram, threshold]),
Hysteresis = my_cfg(ConfRoot, [scram, hysteresis]),
Override = my_cfg(ConfRoot, [scram, override]),
PVar = read_pvar(ConfRoot),
if not Enabled ->
false;
Meltdown andalso PVar >= (Threshold * Hysteresis / 100) ->
{true, Override};
PVar >= Threshold ->
logger:warning("SCRAM is activated for autorate ~p. PV=~p. CV=~p.",
[Id, PVar, Override]),
{true, Override};
Meltdown ->
logger:warning("SCRAM is deactivated for autorate ~p. PV=~p.",
[Id, PVar]),
false;
true ->
false
end
end.

read_pvar(ConfRoot) ->
ProcessVarKey = my_cfg(ConfRoot, [process_variable]),
#mnode{metaparams = PVarMPs} = lee_model:get(ProcessVarKey, ?MYMODEL),
MetricKey = emqttb_metrics:from_model(ProcessVarKey),
case ?m_attr(metric, metric_type, PVarMPs) of
counter ->
emqttb_metrics:get_counter(MetricKey);
rolling_average ->
AvgWindow = 5000,
emqttb_metrics:get_rolling_average(MetricKey, AvgWindow)
end.
27 changes: 0 additions & 27 deletions src/framework/emqttb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -368,33 +368,6 @@ model() ->
, default => verify_none
}}
}
, scram =>
#{ threshold =>
{[value, cli_param],
#{ type => non_neg_integer()
, default => 100
, cli_operand => "olp-threshold"
}}
, hysteresis =>
{[value, cli_param],
#{ oneliner => "Hysteresis (%) of overload detection"
, type => typerefl:range(1, 100)
, default => 50
, cli_operand => "olp-hysteresis"
}}
, override =>
{[value, cli_param],
#{ type => emqttb:duration_us()
, default_str => "10s"
, cli_operand => "olp-override"
}}
}
, target_conn_pending =>
{[value, cli_param],
#{ type => non_neg_integer()
, default => 10
, cli_operand => "target-conn-pending"
}}
}.

%%================================================================================
Expand Down
2 changes: 1 addition & 1 deletion src/scenarios/emqttb_scenario_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ model() ->
}.

initial_config() ->
emqttb_conf:string2patch("@a -a conn/conninterval --pvar '[scenarios,conn,{},metrics,conn_latency,pending]'").
emqttb_conf:string2patch("@a -a conn/conninterval --pvar '[scenarios,conn,{},metrics,conn_latency,pending]' --olp").

run() ->
GroupId = ?GROUP,
Expand Down
2 changes: 1 addition & 1 deletion src/scenarios/emqttb_scenario_persistent_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ model() ->

initial_config() ->
emqttb_conf:string2patch("@a -a persistent_session/pubinterval --pvar '[scenarios,persistent_session,{},pub,metrics,pub_latency,pending]'") ++
emqttb_conf:string2patch("@a -a persistent_session/conninterval --pvar '[scenarios,persistent_session,{},pub,metrics,conn_latency,pending]'").
emqttb_conf:string2patch("@a -a persistent_session/conninterval --pvar '[scenarios,persistent_session,{},pub,metrics,conn_latency,pending]' --olp").

run() ->
prometheus_summary:declare([ {name, ?PUB_THROUGHPUT}
Expand Down
2 changes: 1 addition & 1 deletion src/scenarios/emqttb_scenario_pub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ model() ->

initial_config() ->
emqttb_conf:string2patch("@a -a pub/pubinterval --pvar '[scenarios,pub,{},metrics,pub_latency,pending]'") ++
emqttb_conf:string2patch("@a -a pub/conninterval --pvar '[scenarios,pub,{},metrics,conn_latency,pending]'").
emqttb_conf:string2patch("@a -a pub/conninterval --pvar '[scenarios,pub,{},metrics,conn_latency,pending]' --olp").

run() ->
PubOpts = #{ topic => my_conf([topic])
Expand Down
2 changes: 1 addition & 1 deletion src/scenarios/emqttb_scenario_pubsub_fwd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ model() ->

initial_config() ->
emqttb_conf:string2patch("@a -a pubsub_fwd/pubinterval --pvar '[scenarios,pubsub_fwd,{},pub,metrics,pub_latency,pending]'") ++
emqttb_conf:string2patch("@a -a pubsub_fwd/conninterval --pvar '[scenarios,pubsub_fwd,{},pub,metrics,conn_latency,pending]'").
emqttb_conf:string2patch("@a -a pubsub_fwd/conninterval --pvar '[scenarios,pubsub_fwd,{},pub,metrics,conn_latency,pending]' --olp").

run() ->
set_stage(subscribe),
Expand Down
2 changes: 1 addition & 1 deletion src/scenarios/emqttb_scenario_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ model() ->
}.

initial_config() ->
emqttb_conf:string2patch("@a -a sub/conninterval --pvar '[scenarios,sub,{},metrics,conn_latency,pending]'").
emqttb_conf:string2patch("@a -a sub/conninterval --pvar '[scenarios,sub,{},metrics,conn_latency,pending]' --olp").

run() ->
SubOpts = #{ topic => my_conf([topic])
Expand Down
2 changes: 1 addition & 1 deletion src/scenarios/emqttb_scenario_sub_flapping.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ model() ->
}.

initial_config() ->
emqttb_conf:string2patch("@a -a sub_flapping/conninterval --pvar '[scenarios,sub_flapping,{},metrics,conn_latency,pending]'").
emqttb_conf:string2patch("@a -a sub_flapping/conninterval --pvar '[scenarios,sub_flapping,{},metrics,conn_latency,pending]' --olp").

run() ->
SubOpts = #{ topic => my_conf([topic])
Expand Down

0 comments on commit cb15c6d

Please sign in to comment.