Skip to content

Commit

Permalink
Merge pull request #64 from emqx/dev/grafana-spans
Browse files Browse the repository at this point in the history
Various features
  • Loading branch information
ieQu1 authored Jan 29, 2024
2 parents 985e2b3 + eb25f61 commit f3ca5a9
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 23 deletions.
6 changes: 6 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@
{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
deprecated_function_calls,
deprecated_functions]}.

{dialyzer, [
{plt_apps, all_apps},
{extra_plt_apps, [lee, typerefl, ssl, stdlib]},
{statistics, true}
]}.
15 changes: 11 additions & 4 deletions src/behaviors/emqttb_behavior_pub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
, retain => boolean()
, metadata => boolean()
, host_shift => integer()
, random => boolean()
, host_selection => random | round_robin
, clean_start => boolean()
, expiry => non_neg_integer() | undefined
Expand Down Expand Up @@ -97,13 +98,16 @@ init_per_group(Group,
HostShift = maps:get(host_shift, Conf, 0),
HostSelection = maps:get(host_selection, Conf, random),
Retain = maps:get(retain, Conf, false),
Size = max(0, MsgSize - MetadataSize),
#{ topic => Topic
, message => message(max(0, MsgSize - MetadataSize))
, message => message(Size)
, size => Size
, pub_opts => [{qos, QoS}, {retain, Retain}]
, pubinterval => PubRate
, metadata => AddMetadata
, host_shift => HostShift
, host_selection => HostSelection
, random => maps:get(random, Conf, false)
, expiry => maps:get(expiry, Conf, undefined)
, clean_start => maps:get(clean_start, Conf, true)
, pub_opstat => emqttb_metrics:opstat_from_model(MetricsKey ++ [pub_latency])
Expand Down Expand Up @@ -131,12 +135,15 @@ handle_message(Shared, Conn, {publish, N1}) ->
, pub_counter := PubCounter
, pub_opstat := PubOpstat
, metadata := AddMetadata
, random := Random
, size := Size
} = Shared,
{SleepTime, N2} = emqttb:get_duration_and_repeats(I),
send_after(SleepTime, {publish, N2}),
Msg = case AddMetadata of
true -> [message_metadata(), Msg0];
false -> Msg0
Msg = if AddMetadata andalso Random -> [message_metadata(), rand:bytes(Size)];
Random -> rand:bytes(Size);
AddMetadata -> [message_metadata(), Msg0];
true -> Msg0
end,
T = emqttb_worker:format_topic(TP),
repeat(N1, fun() ->
Expand Down
14 changes: 14 additions & 0 deletions src/conf/emqttb_conf_model.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,20 @@ model() ->
, type => union(false, string())
, default => false
}}
, motto =>
{[value, os_env, cli_param],
#{ oneliner => "Add Grafana annotation at the end of the run"
, type => string()
, default => ""
, cli_operand => "motto"
}}
, motto_tags =>
{[value, os_env, cli_param],
#{ oneliner => "Custom tags added to the Grafana annotation span"
, type => emqttb_grafana:tags()
, default => []
, cli_operand => "motto-tags"
}}
}
}
, convenience =>
Expand Down
11 changes: 10 additions & 1 deletion src/emqttb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@
-type ifaddr_list() :: list(typerefl:ip_address()).
-typerefl_from_string({ifaddr_list/0, ?MODULE, parse_addresses}).

-type n_cycles() :: non_neg_integer() | undefined.

-reflect_type([scenario/0, stage/0, group/0, transport/0, proto_ver/0, qos/0,
net_port/0, hosts/0, ifaddr_list/0, ssl_verify/0, host_selection/0,
duration_ms/0, duration_us/0, duration_s/0, byte_size/0]).
duration_ms/0, duration_us/0, duration_s/0, byte_size/0, n_cycles/0]).

%%================================================================================
%% API funcions
Expand All @@ -87,6 +89,7 @@
-spec main([string()]) -> no_return().
main(Args) ->
application:set_env(emqttb, cli_args, Args),
application:set_env(emqttb, start_time, os:system_time(millisecond)),
{ok, _} = application:ensure_all_started(?APP, permanent),
%% Wait for completion of the scenarios:
MRef = monitor(process, whereis(emqttb_scenarios_sup)),
Expand Down Expand Up @@ -133,6 +136,7 @@ get_duration_and_repeats(CRef) ->

-spec terminate() -> no_return().
terminate() ->
annotate_run(),
case application:get_env(emqttb, is_fail, false) of
false ->
timer:sleep(100), %% Ugly: give logger time to flush events...
Expand All @@ -144,6 +148,11 @@ terminate() ->
halt(1)
end.

annotate_run() ->
Motto = ?CFG([metrics, grafana, motto]),
Tags = ?CFG([metrics, grafana, motto_tags]),
emqttb_grafana:annotate_range(Motto, Tags, application:get_env(emqttb, start_time, 0), os:system_time(millisecond)).

parse_addresses(Str) ->
L = [inet:parse_address(I) || I <- string:tokens(Str, ", ")],
case lists:keyfind(error, 1, L) of
Expand Down
6 changes: 3 additions & 3 deletions src/framework/emqttb_group.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ stop(ID) ->
logger:info("Stopping group ~p", [ID]),
emqttb_group_sup:stop(ID).

-spec set_target(emqttb:group(), NClients) -> NClients
when NClients :: emqttb:n_cycles().
-spec set_target(emqttb:group(), NClients) -> {ok, NClients} | {error, new_target}
when NClients :: emqttb:n_clients().
set_target(Group, NClients) ->
set_target(Group, NClients, undefined).

Expand All @@ -81,7 +81,7 @@ set_target(Group, NClients) ->
%%
%% Order of workers' removal during ramping down is not specified.
-spec set_target(emqttb:group(), NClients, emqttb:interval() | undefined) ->
{ok, NClients} | {error, new_target | {ratelimited, atom(), NClients}}
{ok, NClients} | {error, new_target}
when NClients :: emqttb:n_clients().
set_target(Id, Target, Interval) ->
gen_server:call(?via(Id), {set_target, Target, Interval}, infinity).
Expand Down
4 changes: 2 additions & 2 deletions src/framework/emqttb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ my_hostname() ->
connect(ConnOpstat, Properties) ->
connect(ConnOpstat, Properties, [], [], []).

-spec connect(emqttb_metrics:metric_ref(), map(), [emqtt:option()], [gen_tcp:option()], [ssl:option()]) -> gen_statem:start_ret().
-spec connect(emqttb_metrics:metric_ref(), map(), [emqtt:option()], [gen_tcp:option()], [ssl:tls_option()]) -> gen_statem:start_ret().
connect(ConnOpstat, Properties0, CustomOptions, CustomTcpOptions, CustomSslOptions) ->
HostShift = maps:get(host_shift, Properties0, 0),
HostSelection = maps:get(host_selection, Properties0, random),
Expand Down Expand Up @@ -172,7 +172,7 @@ connect(ConnOpstat, Properties0, CustomOptions, CustomTcpOptions, CustomSslOptio
%% Internal exports
%%================================================================================

-spec entrypoint(module(), emqttb:group_id(), integer()) -> no_return().
-spec entrypoint(module(), emqttb:group(), integer()) -> no_return().
entrypoint(Behavior, Group, Number) ->
%% We need to trap exits to make sure the counter is decremented in
%% the end.
Expand Down
40 changes: 32 additions & 8 deletions src/metrics/emqttb_grafana.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,45 @@
-module(emqttb_grafana).

%% API:
-export([annotate/2, annotate/1]).
-export([annotate/2, annotate/1, annotate_range/4]).

-export([parse_comma_separated/1]).

-include("emqttb.hrl").
-include_lib("typerefl/include/types.hrl").

%%================================================================================
%% API funcions
%% Type declarations
%%================================================================================

-type tags() :: [string()].
-typerefl_from_string({tags/0, ?MODULE, parse_comma_separated}).

-reflect_type([tags/0]).

%%================================================================================
%% API functions
%%================================================================================

-spec annotate(iolist()) -> ok.
annotate(Text) ->
annotate(Text, []).

-spec annotate_range(iolist(), [string()], integer(), integer()) -> ok.
annotate_range(Text, Tags, Time, TimeEnd) ->
Range = #{time => Time, 'timeEnd' => TimeEnd},
annotate(Text, Tags, Range).

-spec annotate(iolist(), [atom() | string()]) -> ok.
annotate(Text, Tags0) ->
annotate(Text, Tags0, #{}).

-spec annotate(iolist(), [atom() | string()], map()) -> ok.
annotate(Text, Tags0, Data) ->
case ?CFG([metrics, grafana, enabled]) of
true ->
Tags = lists:map(fun ensure_string/1, Tags0),
spawn(fun() -> do_annotate(Text, Tags) end),
spawn(fun() -> do_annotate(Text, Tags, Data) end),
ok;
false ->
ok
Expand All @@ -43,7 +64,7 @@ annotate(Text, Tags0) ->
%% Internal functions
%%================================================================================

do_annotate(Text, Tags) ->
do_annotate(Text, Tags, Data0) ->
Url = ?CFG([metrics, grafana, url]) ++ "/api/annotations",
AuthToken = case ?CFG([metrics, grafana, api_key]) of
false -> [];
Expand All @@ -54,9 +75,9 @@ do_annotate(Text, Tags) ->
false -> [];
Login -> [{basic_auth, {Login, ?CFG([metrics, grafana, password])}}]
end,
Data = #{ text => iolist_to_binary(Text)
, tags => [<<"emqttb">>, <<"mqtt">> | Tags]
},
Data = Data0#{ text => iolist_to_binary(Text)
, tags => [<<"emqttb">>, <<"mqtt">> | Tags]
},
{ok, Code, _RespHeaders, ClientRef} = hackney:post(Url, Headers, jsone:encode(Data), Options),
case Code of
200 ->
Expand All @@ -69,4 +90,7 @@ do_annotate(Text, Tags) ->
ensure_string(Atom) when is_atom(Atom) ->
atom_to_binary(Atom);
ensure_string(Str) ->
Str.
list_to_binary(Str).

parse_comma_separated(Str) ->
{ok, string:tokens(Str, ",")}.
1 change: 1 addition & 0 deletions src/scenarios/emqttb_scenario_persistent_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ publish_stage(S = #s{produced = NPub0, pubinterval = PubInterval}) ->
, qos => my_conf([pub, qos])
, metrics => my_conf_key([pub, metrics])
, metadata => true
, random => true
},
emqttb_group:ensure(#{ id => ?PUB_GROUP
, client_config => my_conf([group])
Expand Down
7 changes: 7 additions & 0 deletions src/scenarios/emqttb_scenario_pub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ model() ->
, cli_short => $s
, default => 256
}}
, random =>
{[value, cli_param],
#{ oneliner => "Randomize message contents"
, type => boolean()
, cli_operand => "random"
, default => false
}}
, conninterval =>
{[value, cli_param, autorate],
#{ oneliner => "Client connection interval (microsecond)"
Expand Down
6 changes: 1 addition & 5 deletions src/scenarios/emqttb_scenario_sub_flapping.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
%% Type declarations
%%================================================================================

-type n_cycles() :: non_neg_integer() | undefined.

-reflect_type([n_cycles/0]).

-define(GROUP, sub).

%%================================================================================
Expand Down Expand Up @@ -105,7 +101,7 @@ model() ->
}}
, n_cycles =>
{[value, cli_param],
#{ type => n_cycles()
#{ type => emqttb:n_cycles()
, default => 10
, cli_operand => "cycles"
, cli_short => $C
Expand Down

0 comments on commit f3ca5a9

Please sign in to comment.