From c3b18d5edad67c251c271c6c587f3a8fa1c6a291 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Sat, 29 Jun 2024 15:48:48 +0200 Subject: [PATCH] Upgrade all throttle calls to use only explicit config maps --- src/throttle/amoc_throttle.erl | 74 +++++++----------- src/throttle/amoc_throttle_controller.erl | 39 ++++++---- src/throttle/amoc_throttle_process.erl | 54 +++++++------ test/throttle_SUITE.erl | 94 ++++++++++++++--------- 4 files changed, 141 insertions(+), 120 deletions(-) diff --git a/src/throttle/amoc_throttle.erl b/src/throttle/amoc_throttle.erl index ff754856..97b11c62 100644 --- a/src/throttle/amoc_throttle.erl +++ b/src/throttle/amoc_throttle.erl @@ -3,43 +3,44 @@ -module(amoc_throttle). %% API --export([start/2, start/3, start/4, stop/1, - send/2, send/3, send_and_wait/2, wait/1, +-export([start/2, stop/1, + send/2, send/3, wait/1, run/2, pause/1, resume/1, change_rate/2, change_rate/3, change_rate_gradually/2, change_rate_gradually/6]). --deprecated([ - {start, 3, "use start/2 with a config"}, - {start, 4, "use start/2 with a config"}, - {send_and_wait, 2, "use wait/1 instead"} - ]). - -type name() :: atom(). --type rate() :: pos_integer(). +%% Atom representing the name of the throttle. +-type rate() :: infinity | non_neg_integer(). +%% Number of events per given `t:interval/0', or infinity for effectively unlocking all throttling. +%% Note that a rate of zero means effectively pausing the throttle. +-type interarrival() :: infinity | non_neg_integer(). +%% Time in milliseconds between two events, or infinity for effectively pausing the throttle. Note +%% that an interarrival of zero means effectively unlocking all throttling. -type interval() :: non_neg_integer(). -%% In milliseconds, defaults to 60000 (one minute) when not given. -%% An interval of 0 means no delay at all, only the number of simultaneous executions will be -%% controlled, which corresponds to the number of processes started --type throttle() :: #{rate := rate(), - interval := interval()}. --type interarrival() :: #{interarrival := non_neg_integer()}. +%% In milliseconds, defaults to 60000 (one minute). +-type throttle() :: #{rate := rate(), interval := interval()} | + #{interarrival := interarrival()}. %% Throttle unit of measurement -type config() :: #{rate := rate(), interval => interval(), parallelism => non_neg_integer()} | #{interarrival := non_neg_integer(), parallelism => non_neg_integer()}. -%% Literal throttle configuration. It can state `interarrival', in milliseconds, -%% in which case the rate per interval is calculated to allow one event every given milliseconds, -%% or, literally give the rate per interval. +%% Literal throttle configuration. --type gradual_rate_config() :: #{from_rate := rate(), - to_rate := rate(), +-type gradual_rate_config() :: #{from_rate := non_neg_integer(), + to_rate := non_neg_integer(), interval => interval(), step_interval => pos_integer(), step_size => pos_integer(), step_count => pos_integer(), + duration => pos_integer()} | + #{from_interarrival := interarrival(), + to_interarrival := interarrival(), + step_interval => pos_integer(), + step_size => pos_integer(), + step_count => pos_integer(), duration => pos_integer()}. %% Configuration for a gradual throttle rate change %% @@ -50,31 +51,17 @@ -export_type([name/0, rate/0, interval/0, throttle/0, config/0, gradual_rate_config/0]). -%% @see start/4 +%% @doc Starts the throttle mechanism for a given `Name' with a given config. +%% +%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10). +%% `Name' is needed to identify the rate as a single test can have different rates for different tasks. +%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity. -spec start(name(), config() | rate()) -> {ok, started | already_started} | {error, any()}. start(Name, #{} = Config) -> amoc_throttle_controller:ensure_throttle_processes_started(Name, Config); start(Name, Rate) -> amoc_throttle_controller:ensure_throttle_processes_started(Name, #{rate => Rate}). -%% @see start/4 --spec start(name(), rate(), non_neg_integer()) -> {ok, started | already_started} | {error, any()}. -start(Name, Rate, Interval) -> - Config = #{rate => Rate, interval => Interval}, - amoc_throttle_controller:ensure_throttle_processes_started(Name, Config). - -%% @doc Starts the throttle mechanism for a given `Name' with a given `Rate' per `Interval'. -%% -%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10). -%% `Name' is needed to identify the rate as a single test can have different rates for different tasks. -%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity. -%% It also accepts a special value of `0' which limits the number of parallel executions associated with `Name' to `Rate'. --spec start(name(), rate(), interval(), pos_integer()) -> - {ok, started | already_started} | {error, any()}. -start(Name, Rate, Interval, NoOfProcesses) -> - Config = #{rate => Rate, interval => Interval, parallelism => NoOfProcesses}, - amoc_throttle_controller:ensure_throttle_processes_started(Name, Config). - %% @doc Pauses executions for the given `Name' as if `Rate' was set to `0'. %% %% Does not stop the scheduled rate changes. @@ -127,8 +114,6 @@ change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, StepCo %% %% `Fn' is executed in the context of a new process spawned on the same node on which %% the process executing `run/2' runs, so a call to `run/2' is non-blocking. -%% This function is used internally by both `send' and `send_and_wait/2' functions, -%% so all those actions will be limited to the same rate when called with the same `Name'. %% %% Diagram showing function execution flow in distributed environment, %% generated using https://sequencediagram.org/: @@ -178,13 +163,6 @@ send(Name, Msg) -> send(Name, Pid, Msg) -> amoc_throttle_runner:throttle(Name, {Pid, Msg}). -%% @doc Sends and receives the given message `Msg'. -%% -%% Deprecated in favour of `wait/1' --spec send_and_wait(name(), any()) -> ok | {error, any()}. -send_and_wait(Name, _) -> - amoc_throttle_runner:throttle(Name, wait). - %% @doc Blocks the caller until the throttle mechanism allows. -spec wait(name()) -> ok | {error, any()}. wait(Name) -> diff --git a/src/throttle/amoc_throttle_controller.erl b/src/throttle/amoc_throttle_controller.erl index f0425afe..58818846 100644 --- a/src/throttle/amoc_throttle_controller.erl +++ b/src/throttle/amoc_throttle_controller.erl @@ -25,6 +25,7 @@ -define(DEFAULT_STEP_SIZE, 1). -define(DEFAULT_INTERVAL, 60000). %% one minute -define(DEFAULT_NO_PROCESSES, 10). +-define(TIMEOUT(N), (infinity =:= N orelse is_integer(N) andalso N >= 0)). -define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)). -define(POS_INT(N), (is_integer(N) andalso N > 0)). @@ -51,8 +52,8 @@ -type config() :: #{rate := amoc_throttle:rate(), interval := amoc_throttle:interval(), parallelism := non_neg_integer()}. --type gradual_rate_change() :: #{from_rate := amoc_throttle:rate(), - to_rate := amoc_throttle:rate(), +-type gradual_rate_change() :: #{from_rate := non_neg_integer(), + to_rate := non_neg_integer(), interval := amoc_throttle:interval(), step_interval := pos_integer(), step_size := pos_integer(), @@ -71,26 +72,32 @@ start_link() -> {ok, started | already_started} | {error, invalid_throttle | wrong_reconfiguration | wrong_no_of_procs}. ensure_throttle_processes_started( - Name, #{interarrival := EveryMs} = Config) - when is_atom(Name), ?NONNEG_INT(EveryMs) -> + Name, #{interarrival := Interarrival} = Config) + when is_atom(Name), ?TIMEOUT(Interarrival) -> raise_event_on_slave_node(Name, init), - Config1 = #{rate => ?DEFAULT_INTERVAL div EveryMs, interval => ?DEFAULT_INTERVAL}, + Config1 = #{rate => ?DEFAULT_INTERVAL div Interarrival, interval => ?DEFAULT_INTERVAL}, Config2 = Config1#{parallelism => maps:get(parallelism, Config, ?DEFAULT_NO_PROCESSES)}, gen_server:call(?MASTER_SERVER, {start_processes, Name, Config2}); ensure_throttle_processes_started( Name, #{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config) - when is_atom(Name), ?POS_INT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) -> + when is_atom(Name), ?TIMEOUT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) -> raise_event_on_slave_node(Name, init), gen_server:call(?MASTER_SERVER, {start_processes, Name, Config}); ensure_throttle_processes_started( Name, #{rate := Rate, interval := Interval} = Config) - when is_atom(Name), ?POS_INT(Rate), ?NONNEG_INT(Interval) -> + when is_atom(Name), ?TIMEOUT(Rate), ?NONNEG_INT(Interval) -> raise_event_on_slave_node(Name, init), Config1 = Config#{parallelism => ?DEFAULT_NO_PROCESSES}, gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1}); +ensure_throttle_processes_started( + Name, #{rate := Rate, parallelism := NoOfProcesses} = Config) + when is_atom(Name), ?TIMEOUT(Rate), ?POS_INT(NoOfProcesses) -> + raise_event_on_slave_node(Name, init), + Config1 = Config#{interval => ?DEFAULT_INTERVAL}, + gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1}); ensure_throttle_processes_started( Name, #{rate := Rate} = Config) - when is_atom(Name), ?POS_INT(Rate) -> + when is_atom(Name), ?TIMEOUT(Rate) -> raise_event_on_slave_node(Name, init), Config1 = Config#{interval => ?DEFAULT_INTERVAL, parallelism => ?DEFAULT_NO_PROCESSES}, gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1}); @@ -147,7 +154,7 @@ init([]) -> From :: {pid(), Tag :: term()}, state()) -> {reply, {ok, started | already_started}, state()} | {reply, {error, wrong_reconfiguration | wrong_no_of_procs}, state()}; - ({pause | resume | stop}, From :: {pid(), Tag :: term()}, state()) -> + ({pause | resume | unlock | stop}, From :: {pid(), Tag :: term()}, state()) -> {reply, ok, state()} | {reply, Error :: any(), state()}; ({change_rate, name(), amoc_throttle:rate(), amoc_throttle:interval()}, @@ -175,8 +182,8 @@ handle_call({pause, Name}, _From, State) -> Error -> {reply, Error, State} end; -handle_call({resume, Name}, _From, State) -> - case run_in_all_processes(Name, resume) of +handle_call({Op, Name}, _From, State) when unlock =:= Op; resume =:= Op -> + case run_in_all_processes(Name, Op) of ok -> Info = maps:get(Name, State), {reply, ok, State#{Name => Info#throttle_info{active = true}}}; @@ -266,6 +273,7 @@ continue_plan(Name, State, Info, Plan) -> State#{Name => Info#throttle_info{rate = NewRate, change_plan = NewPlan}}. -spec rate_per_minute(amoc_throttle:rate(), amoc_throttle:interval()) -> amoc_throttle:rate(). +rate_per_minute(infinity, _) -> infinity; rate_per_minute(_, 0) -> 0; rate_per_minute(Rate, Interval) -> (Rate * 60000) div Interval. @@ -275,7 +283,7 @@ start_processes(Name, #{rate := Rate, interval := Interval, parallelism := NoOfP raise_event(Name, init), RatePerMinute = rate_per_minute(Rate, Interval), report_rate(Name, RatePerMinute), - RealNoOfProcs = min(Rate, NoOfProcesses), + RealNoOfProcs = expected_no_of_processes(Rate, NoOfProcesses), start_throttle_processes(Name, Interval, Rate, RealNoOfProcs), #throttle_info{rate = Rate, interval = Interval, active = true, no_of_procs = RealNoOfProcs}. @@ -334,7 +342,7 @@ run_in_all_processes(Name, Cmd) -> verify_new_start_matches_running(Name, Config, Group, State) -> #{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config, - ExpectedNoOfProcesses = min(Rate, NoOfProcesses), + ExpectedNoOfProcesses = expected_no_of_processes(Rate, NoOfProcesses), case {length(Group), State} of {ExpectedNoOfProcesses, #{Name := #throttle_info{rate = Rate, interval = Interval}}} -> {reply, {ok, already_started}, State}; @@ -344,6 +352,11 @@ verify_new_start_matches_running(Name, Config, Group, State) -> {reply, {error, wrong_no_of_procs}, State} end. +expected_no_of_processes(0, NoOfProcesses) -> + min(1, NoOfProcesses); +expected_no_of_processes(Rate, NoOfProcesses) -> + min(Rate, NoOfProcesses). + run_cmd(Pid, stop) -> amoc_throttle_process:stop(Pid); run_cmd(Pid, pause) -> diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index 4b67d69a..90ab1689 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -30,12 +30,12 @@ -define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute -record(state, {can_run_fn = true :: boolean(), - pause = false :: boolean(), - max_n :: non_neg_integer(), + status = running :: running | paused, + max_n :: infinity | non_neg_integer(), name :: atom(), - n :: integer(), + n :: infinity | non_neg_integer(), interval = 0 :: amoc_throttle:interval(), %%ms - delay_between_executions = 0 :: non_neg_integer(), %%ms + delay_between_executions = 0 :: timeout(), %%ms tref :: timer:tref() | undefined, schedule = [] :: [AmocThrottleRunnerProcess :: pid()], schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()]}). @@ -120,9 +120,9 @@ handle_info(timeout, State) -> handle_cast(stop_process, State) -> {stop, normal, State}; handle_cast(pause_process, State) -> - {noreply, State#state{pause = true}, {continue, maybe_run_fn}}; + {noreply, State#state{status = paused}, {continue, maybe_run_fn}}; handle_cast(resume_process, State) -> - {noreply, State#state{pause = false}, {continue, maybe_run_fn}}; + {noreply, State#state{status = running}, {continue, maybe_run_fn}}; handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) -> amoc_throttle_controller:telemetry_event(Name, request), {noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}}; @@ -155,25 +155,28 @@ format_status(#{state := #state{} = State} = FormatStatus) -> %% internal functions %%------------------------------------------------------------------------------ +initial_state(_Name, Interval, infinity) -> + #state{interval = Interval, n = infinity, max_n = infinity, delay_between_executions = 0}; +initial_state(_Name, Interval, 0) -> + #state{interval = Interval, n = 0, max_n = 0, delay_between_executions = infinity}; initial_state(Name, Interval, Rate) when Rate > 0 -> - NewRate = case Rate < 5 of - true -> - Msg = <<"too low rate, please reduce NoOfProcesses">>, - internal_error(Msg, Name, Rate, Interval), - Rate; - false -> - Rate - end, - Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of + case Rate < 5 of + true -> + Msg = <<"too low rate, please reduce NoOfProcesses">>, + internal_warning(Msg, Name, Rate, Interval); + false -> + ok + end, + Delay = case {Interval, Interval div Rate, Interval rem Rate} of {0, _, _} -> 0; %% limit only No of simultaneous executions {_, I, _} when I < 10 -> Message = <<"too high rate, please increase NoOfProcesses">>, - internal_error(Message, Name, Rate, Interval), + internal_warning(Message, Name, Rate, Interval), 10; {_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions; {_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 end, - #state{interval = Interval, n = NewRate, max_n = NewRate, delay_between_executions = Delay}. + #state{interval = Interval, n = Rate, max_n = Rate, delay_between_executions = Delay}. merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = MaxN}, #state{n = OldN, max_n = OldMaxN} = OldState) -> @@ -183,6 +186,8 @@ merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = Ma max_n = MaxN, tref = undefined}, maybe_start_timer(NewState). +maybe_start_timer(#state{delay_between_executions = infinity, tref = undefined} = State) -> + State#state{can_run_fn = false}; maybe_start_timer(#state{delay_between_executions = 0, tref = undefined} = State) -> State#state{can_run_fn = true}; maybe_start_timer(#state{delay_between_executions = D, tref = undefined} = State) -> @@ -207,15 +212,20 @@ maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) -> NewSchedule = lists:reverse(SchRev), NewState = State#state{schedule = NewSchedule, schedule_reversed = []}, maybe_run_fn(NewState); -maybe_run_fn(#state{interval = 0, pause = false, n = N} = State) when N > 0 -> +maybe_run_fn(#state{interval = 0, status = running, n = N} = State) when N > 0 -> NewState = run_fn(State), maybe_run_fn(NewState); -maybe_run_fn(#state{can_run_fn = true, pause = false, n = N} = State) when N > 0 -> +maybe_run_fn(#state{can_run_fn = true, status = running, n = N} = State) when N > 0 -> NewState = run_fn(State), NewState#state{can_run_fn = false}; maybe_run_fn(State) -> State. +run_fn(#state{schedule = [RunnerPid | T], name = Name, n = infinity} = State) -> + erlang:monitor(process, RunnerPid), + amoc_throttle_runner:run(RunnerPid), + amoc_throttle_controller:telemetry_event(Name, execute), + State#state{schedule = T}; run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) -> erlang:monitor(process, RunnerPid), amoc_throttle_runner:run(RunnerPid), @@ -244,10 +254,10 @@ internal_event(Msg, #state{name = Name} = State) -> amoc_telemetry:execute_log( debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg). --spec internal_error(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any(). -internal_error(Msg, Name, Rate, Interval) -> +-spec internal_warning(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any(). +internal_warning(Msg, Name, Rate, Interval) -> amoc_telemetry:execute_log( - error, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg). + warning, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg). printable_state(#state{} = State) -> Fields = record_info(fields, state), diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index b7499e60..2804a74a 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -20,7 +20,8 @@ groups() -> start, start_descriptive, start_interarrival, - rate_zero_is_not_accepted, + start_rate_zero, + start_rate_infinity, low_rate_gets_remapped, low_interval_get_remapped, start_and_stop, @@ -28,7 +29,6 @@ groups() -> change_rate_gradually, change_rate_gradually_descriptive, change_rate_gradually_verify_descriptions, - send_and_wait, just_wait, wait_for_process_to_die_sends_a_kill, async_runner_dies_while_waiting_raises_exit, @@ -68,10 +68,11 @@ start(_) -> ?assertMatch({ok, already_started}, amoc_throttle:start(?FUNCTION_NAME, 100)), ?assertMatch({error, wrong_reconfiguration}, - amoc_throttle:start(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL + 1)), + amoc_throttle:start(?FUNCTION_NAME, 101)), ?assertMatch({error, wrong_no_of_procs}, - amoc_throttle:start(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL, - ?DEFAULT_NO_PROCESSES + 1)). + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 100, + parallelism => ?DEFAULT_NO_PROCESSES + 1})). start_descriptive(_) -> %% Starts successfully @@ -89,26 +90,49 @@ start_interarrival(_) -> n := 1200}, State). -rate_zero_is_not_accepted(_) -> - ?assertMatch({error, invalid_throttle}, amoc_throttle:start(?FUNCTION_NAME, 0, 100, 1)). +start_rate_zero(_) -> + %% Starts successfully + Description = #{rate => 0, parallelism => 1}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_state_of_one_process(?FUNCTION_NAME), + ?assertMatch(#{name := ?FUNCTION_NAME, + interval := 60000, + delay_between_executions := infinity, + n := 0}, + State). + +start_rate_infinity(_) -> + %% Starts successfully + Description = #{rate => infinity, parallelism => 1}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_state_of_one_process(?FUNCTION_NAME), + ?assertMatch(#{name := ?FUNCTION_NAME, + interval := 60000, + delay_between_executions := 0, + n := infinity}, + State). low_rate_gets_remapped(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 2, 100, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 2, interval => 100, parallelism => 1})), State = get_state_of_one_process(?FUNCTION_NAME), ?assertMatch(#{name := ?FUNCTION_NAME, interval := 100, delay_between_executions := 50}, State), - assert_telemetry_event([amoc, throttle, process], error, ?FUNCTION_NAME, 2, 100). + assert_telemetry_event([amoc, throttle, process], warning, ?FUNCTION_NAME, 2, 100). low_interval_get_remapped(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 1, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 1, interval => 1, parallelism => 1})), State = get_state_of_one_process(?FUNCTION_NAME), ?assertMatch(#{name := ?FUNCTION_NAME, interval := 1, delay_between_executions := 10}, State), - assert_telemetry_event([amoc, throttle, process], error, ?FUNCTION_NAME, 1, 1). + assert_telemetry_event([amoc, throttle, process], warning, ?FUNCTION_NAME, 1, 1). start_and_stop(_) -> %% Starts successfully @@ -205,29 +229,14 @@ change_rate_gradually_verify_descriptions(_) -> {error, _}, amoc_throttle_controller:verify_config(E1)). -send_and_wait(_) -> - %% it failts if the throttle wasn't started yet - ?assertMatch({error, no_throttle_process_registered}, - amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)), - %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), - %% send_and_wait passes fine - ?assertMatch(ok, amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)), - %% One message is received sufficiently fast - amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch(ok, ?RECV(receive_this, 100)), - %% If someone else fills the throttle heavily, - %% it will take proportionally so long to execute for me - fill_throttle(?FUNCTION_NAME, 100 * 10), - amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)). - just_wait(_) -> %% it failts if the throttle wasn't started yet ?assertMatch({error, no_throttle_process_registered}, amoc_throttle:wait(?FUNCTION_NAME)), %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 100, interval => 10, parallelism => 1})), %% wait passes fine ?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)), %% One message is received sufficiently fast @@ -241,19 +250,25 @@ just_wait(_) -> wait_for_process_to_die_sends_a_kill(_) -> erlang:process_flag(trap_exit, true), - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 100, interval => 10, parallelism => 1})), amoc_throttle:run(?FUNCTION_NAME, fun() -> exit(?FUNCTION_NAME) end), ?assertMatch(ok, ?RECV({'EXIT', _, ?FUNCTION_NAME}, 100)). async_runner_dies_while_waiting_raises_exit(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 1, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 1, interval => 1, parallelism => 1})), find_new_link_and_kill_it(self()), ?assertExit({throttle_wait_died, _, killed}, amoc_throttle:wait(?FUNCTION_NAME)). async_runner_dies_when_throttler_dies(_) -> erlang:process_flag(trap_exit, true), {links, OriginalLinks} = erlang:process_info(self(), links), - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 60000, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 1, interval => 60000, parallelism => 1})), wait_until_one_throttle_worker(?FUNCTION_NAME), amoc_throttle:send(?FUNCTION_NAME, receive_this), wait_until_one_async_runner(self(), OriginalLinks), @@ -262,7 +277,9 @@ async_runner_dies_when_throttler_dies(_) -> run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> %% Start 10 actions at once in 10 processes - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 10, 0, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 10, interval => 0, parallelism => 1})), %% If someone else fills the throttle heavily, %% it will take proportionally so long to execute for me fill_throttle(?FUNCTION_NAME, 100), @@ -271,9 +288,10 @@ run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> pause_and_resume(_) -> %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), - %% send_and_wait passes fine - ?assertMatch(ok, amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 5000, parallelism => 1})), + ?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)), %% pauses runs correctly ?assertMatch(ok, amoc_throttle:pause(?FUNCTION_NAME)), %% It is paused, so messages aren't received @@ -284,7 +302,9 @@ pause_and_resume(_) -> ?assertMatch(ok, ?RECV(receive_this, 200)). get_state(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 60000, 1)), + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 100, interval => 60000, parallelism => 1})), State = get_state_of_one_process(?FUNCTION_NAME), ?assertMatch(#{name := ?FUNCTION_NAME, interval := 60000,