From 612afff7cc6ac40e46f5f73c508f93a5be7790d3 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 19 Jan 2024 00:06:51 +0100 Subject: [PATCH] feat(pub): Add expiry interval for publishers --- doc/src/schema.adoc | 9 +++++++++ src/behaviors/emqttb_behavior_pub.erl | 19 +++++++++++++------ src/scenarios/emqttb_scenario_pub.erl | 16 +++++++++++++++- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/doc/src/schema.adoc b/doc/src/schema.adoc index b71f8b9..4da98f9 100644 --- a/doc/src/schema.adoc +++ b/doc/src/schema.adoc @@ -199,6 +199,15 @@ Topic is a mandatory parameter. It supports the following substitutions: * `%h` is replaced with the hostname +[id=scenarios.pub._.clean_start] +== Clean start +Set https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901039[clean start] flag in the CONNECT packet. + +[id=scenarios.pub._.expiry] +== Session Expiry Interval +Add https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048[Session Expiry Interval] property to the CONNECT packet. + + [id=scenarios.pubsub_forward] == run scenario pubsub_forward diff --git a/src/behaviors/emqttb_behavior_pub.erl b/src/behaviors/emqttb_behavior_pub.erl index 1856641..15f3260 100644 --- a/src/behaviors/emqttb_behavior_pub.erl +++ b/src/behaviors/emqttb_behavior_pub.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -43,6 +43,8 @@ , metadata => boolean() , host_shift => integer() , host_selection => random | round_robin + , clean_start => boolean() + , expiry => non_neg_integer() | undefined }. -type prototype() :: {?MODULE, config()}. @@ -102,21 +104,26 @@ init_per_group(Group, , metadata => AddMetadata , host_shift => HostShift , host_selection => HostSelection + , expiry => maps:get(expiry, Conf, undefined) + , clean_start => maps:get(clean_start, Conf, true) , pub_opstat => emqttb_metrics:opstat_from_model(MetricsKey ++ [pub_latency]) , conn_opstat => emqttb_metrics:opstat_from_model(MetricsKey ++ [conn_latency]) , pub_counter => emqttb_metrics:from_model(MetricsKey ++ [n_published]) }. -init(PubOpts = #{pubinterval := I, conn_opstat := ConnOpstat}) -> +init(PubOpts = #{pubinterval := I, conn_opstat := ConnOpstat, + expiry := Expiry, clean_start := CleanStart}) -> rand:seed(default), {SleepTime, N} = emqttb:get_duration_and_repeats(I), send_after_rand(SleepTime, {publish, N}), HostShift = maps:get(host_shift, PubOpts, 0), HostSelection = maps:get(host_selection, PubOpts, random), - {ok, Conn} = emqttb_worker:connect(ConnOpstat, - #{ host_shift => HostShift - , host_selection => HostSelection - }), + Props = case Expiry of + undefined -> #{}; + _ -> #{'Session-Expiry-Interval' => Expiry} + end, + {ok, Conn} = emqttb_worker:connect(ConnOpstat, Props#{host_shift => HostShift, host_selection => HostSelection}, + [{clean_start, CleanStart}], [], []), Conn. handle_message(Shared, Conn, {publish, N1}) -> diff --git a/src/scenarios/emqttb_scenario_pub.erl b/src/scenarios/emqttb_scenario_pub.erl index bae8094..6a6a83a 100644 --- a/src/scenarios/emqttb_scenario_pub.erl +++ b/src/scenarios/emqttb_scenario_pub.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%%Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -123,6 +123,20 @@ model() -> , default => 0 , cli_operand => "start-n" }} + , expiry => + {[value, cli_param], + #{ type => union(non_neg_integer(), undefined) + , default => undefined + , cli_operand => "expiry" + , cli_short => $x + }} + , clean_start => + {[value, cli_param], + #{ type => boolean() + , default => true + , cli_operand => "clean-start" + , cli_short => $c + }} , metrics => emqttb_behavior_pub:model('pub/pub') }.