Skip to content

Commit

Permalink
Create async pools abstraction on top of batch_workers
Browse files Browse the repository at this point in the history
This module implements a worker pool abstraction to handle batch
workers. It will start a transient supervisor under ejabberd_sup, that
will spawn a pool using the `worker_pool` library. This will then
supervise the three supervisors that `worker_pool` supervises
(time-manager, queue-manager, and worker-sup). One of these,
'worker-sup', will in turn spawn all the mongoose_batch_worker workers
with the correct configuration.
  • Loading branch information
NelsonVides committed Dec 6, 2021
1 parent e7c20aa commit 839a999
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions src/mongoose_async_pools.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
-module(mongoose_async_pools).

-include("mongoose_config_spec.hrl").
-include("mongoose_logger.hrl").

-behaviour(supervisor).
-export([start_link/3, init/1]).

% API
-export([start_pool/3, stop_pool/2, pool_name/2]).

-export([config_spec/0]).

%%%===================================================================
%%% API
%%%===================================================================
-type pool_id() :: atom().
-type pool_opts() :: any().

-spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) ->
supervisor:startchild_ret().
start_pool(HostType, PoolId, Opts) ->
?LOG_INFO(#{what => starting_async_pool, host_type => HostType, pool_id => PoolId}),
Supervisor = sup_name(HostType, PoolId),
ChildSpec = #{id => Supervisor,
start => {?MODULE, start_link, [HostType, PoolId, Opts]},
restart => transient,
type => supervisor},
ejabberd_sup:start_child(ChildSpec).

-spec stop_pool(mongooseim:host_type(), pool_id()) -> ok.
stop_pool(HostType, PoolId) ->
?LOG_INFO(#{what => stoping_async_pool, host_type => HostType, pool_id => PoolId}),
ejabberd_sup:stop_child(sup_name(HostType, PoolId)).

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
#section{
items = #{<<"enabled">> => #option{type = boolean},
<<"flush_interval">> => #option{type = integer, validate = non_negative},
<<"batch_size">> => #option{type = integer, validate = non_negative},
<<"pool_size">> => #option{type = integer, validate = non_negative}}
}.

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
-spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) ->
{ok, pid()} | ignore | {error, term()}.
start_link(HostType, PoolId, Opts) ->
Supervisor = sup_name(HostType, PoolId),
supervisor:start_link({local, Supervisor}, ?MODULE, [HostType, PoolId, Opts]).

-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init([HostType, PoolId, Opts]) ->
PoolName = pool_name(HostType, PoolId),
WPoolOpts = make_wpool_opts(HostType, PoolId, Opts),
WorkerSpec = #{id => PoolName,
start => {wpool, start_pool, [PoolName, WPoolOpts]},
restart => permanent,
type => supervisor},
SupFlags = #{strategy => one_for_one,
intensity => 1,
period => 5},
{ok, {SupFlags, [WorkerSpec]}}.

%%%===================================================================
%%% internal callbacks
%%%===================================================================
-spec sup_name(mongooseim:host_type(), pool_id()) -> atom().
sup_name(HostType, PoolId) ->
list_to_atom(
atom_to_list(PoolId) ++ "-sup-async-pool-" ++ binary_to_list(HostType)).

-spec pool_name(mongooseim:host_type(), pool_id()) -> atom().
pool_name(HostType, PoolId) ->
list_to_atom(
atom_to_list(PoolId) ++ "-async-pool-" ++ binary_to_list(HostType)).

%% @private
%% Defaults are:
%% - 1 second interval
%% - 100 tasks queues
%% - 2 times as many workers as there are online schedulers in the VM
%% - host_type is added to the flush extras
-spec make_wpool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> any().
make_wpool_opts(HostType, _PoolId, Opts) ->
Interval = gen_mod:get_opt(flush_interval, Opts, 1000),
MaxSize = gen_mod:get_opt(batch_size, Opts, 100),
NumWorkers = gen_mod:get_opt(pool_size, Opts, 2 * erlang:system_info(schedulers_online)),
FlushCallback = gen_mod:get_opt(flush_callback, Opts),
FlushExtra = gen_mod:get_opt(flush_extra, Opts,
fun(Val) -> Val#{host_type => HostType} end,
#{host_type => HostType}),
ProcessOpts = [{message_queue_data, off_heap}],
WorkerOpts = {HostType, Interval, MaxSize, FlushCallback, FlushExtra},
Worker = {mongoose_batch_worker, WorkerOpts},
[{worker, Worker},
{workers, NumWorkers},
{worker_opt, ProcessOpts},
{worker_shutdown, 10000}].

0 comments on commit 839a999

Please sign in to comment.