diff --git a/src/event_pusher/mod_event_pusher_push.erl b/src/event_pusher/mod_event_pusher_push.erl index b119c4ac675..63687489354 100644 --- a/src/event_pusher/mod_event_pusher_push.erl +++ b/src/event_pusher/mod_event_pusher_push.erl @@ -74,26 +74,31 @@ %%-------------------------------------------------------------------- %% gen_mod callbacks %%-------------------------------------------------------------------- --spec start(Host :: jid:server(), Opts :: list()) -> any(). +-spec start(HostType :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> any(). start(Host, Opts) -> ?LOG_INFO(#{what => event_pusher_starting, server => Host}), - - WpoolOpts = [{strategy, available_worker} | gen_mod:get_opt(wpool, Opts, [])], - {ok, _} = mongoose_wpool:start(generic, Host, pusher_push, WpoolOpts), - + start_pool(Host, Opts), gen_mod:start_backend_module(?MODULE, Opts, [enable, disable, get_publish_services]), mod_event_pusher_push_backend:init(Host, Opts), - mod_event_pusher_push_plugin:init(Host), + init_iq_handlers(Host, Opts), + ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 90), + ok. + +start_pool(Host, Opts) -> + WpoolOpts = wpool_opts(Opts), + {ok, _} = mongoose_wpool:start(generic, Host, pusher_push, WpoolOpts). +-spec wpool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts(). +wpool_opts(Opts) -> + [{strategy, available_worker} | gen_mod:get_opt(wpool, Opts, [])]. + +init_iq_handlers(Host, Opts) -> IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_PUSH, ?MODULE, iq_handler, IQDisc), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUSH, ?MODULE, - iq_handler, IQDisc), - - ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 90), - ok. + iq_handler, IQDisc). -spec stop(Host :: jid:server()) -> ok. stop(Host) -> diff --git a/src/event_pusher/mod_event_pusher_sns.erl b/src/event_pusher/mod_event_pusher_sns.erl index b4cda33b953..dac4d10649a 100644 --- a/src/event_pusher/mod_event_pusher_sns.erl +++ b/src/event_pusher/mod_event_pusher_sns.erl @@ -41,16 +41,26 @@ %% Types -export_type([user_guid/0, topic_arn/0, topic/0, attributes/0]). --spec start(Host :: jid:server(), Opts :: proplists:proplist()) -> ok. +-spec start(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> ok. start(Host, Opts) -> application:ensure_all_started(erlcloud), application:ensure_all_started(worker_pool), + start_pool(Host, Opts), + ok. - WorkerNum = gen_mod:get_opt(pool_size, Opts, 100), - {ok, _} = mongoose_wpool:start(generic, Host, pusher_sns, - [{workers, WorkerNum}, {strategy, available_worker}]), +-spec start_pool(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> + term(). +start_pool(Host, Opts) -> + {ok, _} = mongoose_wpool:start(generic, Host, pusher_sns, pool_opts(Opts)). - ok. +-spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts(). +pool_opts(Opts) -> + WorkerNum = get_worker_num(Opts), + [{workers, WorkerNum}, {strategy, available_worker}]. + +-spec get_worker_num(gen_mod:module_opts()) -> pos_integer(). +get_worker_num(Opts) -> + gen_mod:get_opt(pool_size, Opts, 100). -spec stop(Host :: jid:server()) -> ok. stop(Host) -> diff --git a/src/mod_push_service_mongoosepush.erl b/src/mod_push_service_mongoosepush.erl index 8faad3b0b9a..5e53cce7ec8 100644 --- a/src/mod_push_service_mongoosepush.erl +++ b/src/mod_push_service_mongoosepush.erl @@ -49,20 +49,24 @@ %% Module callbacks %%-------------------------------------------------------------------- --spec start(Host :: jid:server(), Opts :: list()) -> any(). +-spec start(Host :: mongooseim:host_type(), Opts :: gen_mod:module_opts()) -> any(). start(Host, Opts) -> ?LOG_INFO(#{what => push_service_starting, server => Host}), - - MaxHTTPConnections = gen_mod:get_opt(max_http_connections, Opts, 100), - {ok, _} = mongoose_wpool:start(generic, Host, mongoosepush_service, - [{strategy, available_worker}, - {workers, MaxHTTPConnections}]), - + start_pool(Host, Opts), %% Hooks ejabberd_hooks:add(push_notifications, Host, ?MODULE, push_notifications, 10), - ok. +-spec start_pool(mongooseim:host_type(), gen_mod:module_opts()) -> term(). +start_pool(Host, Opts) -> + {ok, _} = mongoose_wpool:start(generic, Host, mongoosepush_service, pool_opts(Opts)). + +-spec pool_opts(gen_mod:module_opts()) -> mongoose_wpool:pool_opts(). +pool_opts(Opts) -> + MaxHTTPConnections = gen_mod:get_opt(max_http_connections, Opts, 100), + [{strategy, available_worker}, + {workers, MaxHTTPConnections}]. + -spec stop(Host :: jid:server()) -> ok. stop(Host) -> ejabberd_hooks:delete(push_notifications, Host, ?MODULE, push_notifications, 10), diff --git a/src/wpool/mongoose_wpool.erl b/src/wpool/mongoose_wpool.erl index 76213d09db3..ffb2c03df94 100644 --- a/src/wpool/mongoose_wpool.erl +++ b/src/wpool/mongoose_wpool.erl @@ -10,10 +10,11 @@ -include("mongoose.hrl"). -include("mongoose_wpool.hrl"). +-type call_timeout() :: pos_integer() | undefined. -record(mongoose_wpool, { - name :: term(), - strategy :: atom(), - call_timeout :: pos_integer() | undefined + name :: pool_name(), + strategy :: wpool:strategy() | undefined, + call_timeout :: call_timeout() }). -dialyzer({no_match, start/4}). @@ -37,24 +38,59 @@ %% Mostly for tests -export([expand_pools/2]). --type type() :: redis | riak | http | rdbms | cassandra | elastic | generic - | rabbit | ldap. --type host() :: global | host | jid:lserver(). --type tag() :: atom(). --type name() :: atom(). +-type pool_type() :: redis | riak | http | rdbms | cassandra | elastic | generic + | rabbit | ldap. + +%% Config scope +-type scope() :: global | host | mongooseim:host_type(). +-type host_type_or_global() :: mongooseim:host_type() | global. --export_type([type/0]). +-type tag() :: atom(). +%% Name of a process +-type proc_name() :: atom(). + +%% ID of a pool. Used as a key for an ETS table +-type pool_name() :: {PoolType :: pool_type(), + HostType :: host_type_or_global(), + Tag :: tag()}. + +-type pool_opts() :: [wpool:option()]. +-type conn_opts() :: [{atom(), any()}]. + +-type pool_tuple_in() :: {PoolType :: pool_type(), + HostType :: scope(), + Tag :: tag(), + WpoolOpts :: pool_opts(), + ConnOpts :: conn_opts()}. +%% Pool tuple with expanded HostType argument +-type pool_tuple() :: {PoolType :: pool_type(), + %% does not contain `host' atom + HostType :: host_type_or_global(), + Tag :: tag(), + WpoolOpts :: pool_opts(), + ConnOpts :: conn_opts()}. +-type worker_result() :: {ok, pid()} | {error, pool_not_started}. +-type pool_record_result() :: {ok, #mongoose_wpool{}} | {error, pool_not_started}. +-type start_result() :: {ok, pid()} | {error, term()}. +-type stop_result() :: ok | term(). + +-export_type([pool_type/0]). -export_type([tag/0]). --export_type([host/0]). --export_type([name/0]). +-export_type([scope/0]). +-export_type([proc_name/0]). +-export_type([pool_opts/0]). +-export_type([conn_opts/0]). +-export_type([host_type_or_global/0]). + +-type callback_fun() :: init | start | default_opts | is_supported_strategy | stop. -callback init() -> ok | {error, term()}. --callback start(host(), tag(), WPoolOpts :: [wpool:option()], ConnOpts :: [{atom(), any()}]) -> +-callback start(scope(), tag(), WPoolOpts :: pool_opts(), ConnOpts :: conn_opts()) -> {ok, {pid(), proplists:proplist()}} | {ok, pid()} | {external, pid()} | {error, Reason :: term()}. --callback default_opts() -> proplist:proplists(). +-callback default_opts() -> conn_opts(). -callback is_supported_strategy(Strategy :: wpool:strategy()) -> boolean(). --callback stop(host(), tag()) -> ok. +-callback stop(scope(), tag()) -> ok. -optional_callbacks([default_opts/0, is_supported_strategy/1]). @@ -83,50 +119,58 @@ start_configured_pools() -> start_configured_pools(Pools). start_configured_pools(PoolsIn) -> - start_configured_pools(PoolsIn, ?MYHOSTS). + start_configured_pools(PoolsIn, ?ALL_HOST_TYPES). -start_configured_pools(PoolsIn, Hosts) -> - [call_callback(init, Type, []) || Type <- get_unique_types(PoolsIn)], - Pools = expand_pools(PoolsIn, Hosts), +start_configured_pools(PoolsIn, HostTypes) -> + [call_callback(init, PoolType, []) || PoolType <- get_unique_types(PoolsIn)], + Pools = expand_pools(PoolsIn, HostTypes), [start(Pool) || Pool <- Pools]. -start({Type, Host, Tag, PoolOpts, ConnOpts}) -> - start(Type, Host, Tag, PoolOpts, ConnOpts). +-spec start(pool_tuple()) -> start_result(). +start({PoolType, HostType, Tag, PoolOpts, ConnOpts}) -> + start(PoolType, HostType, Tag, PoolOpts, ConnOpts). +-spec start(pool_type(), pool_opts()) -> start_result(). +start(PoolType, PoolOpts) -> + start(PoolType, global, PoolOpts). -start(Type, PoolOpts) -> - start(Type, global, PoolOpts). +-spec start(pool_type(), host_type_or_global(), pool_opts()) -> start_result(). +start(PoolType, HostType, PoolOpts) -> + start(PoolType, HostType, default, PoolOpts). -start(Type, Host, PoolOpts) -> - start(Type, Host, default, PoolOpts). +-spec start(pool_type(), host_type_or_global(), tag(), + pool_opts()) -> start_result(). +start(PoolType, HostType, Tag, PoolOpts) -> + start(PoolType, HostType, Tag, PoolOpts, []). -start(Type, Host, Tag, PoolOpts) -> - start(Type, Host, Tag, PoolOpts, []). - -start(Type, Host, Tag, PoolOpts, ConnOpts) -> +-spec start(pool_type(), host_type_or_global(), tag(), + pool_opts(), conn_opts()) -> start_result(). +start(PoolType, HostType, Tag, PoolOpts, ConnOpts) -> {Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]), - Opts = lists:append(Opts0) ++ default_opts(Type), + Opts = lists:append(Opts0) ++ default_opts(PoolType), Strategy = proplists:get_value(strategy, Opts, best_worker), CallTimeout = proplists:get_value(call_timeout, Opts, 5000), - %% If a callback doesn't explicitly blacklist a strategy, let's proceed. - CallbackModule = make_callback_module_name(Type), + CallbackModule = make_callback_module_name(PoolType), case catch CallbackModule:is_supported_strategy(Strategy) of false -> - error({strategy_not_supported, Type, Host, Tag, Strategy}); + error({strategy_not_supported, PoolType, HostType, Tag, Strategy}); _ -> - start(Type, Host, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) + start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) end. -start(Type, Host, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) -> - case mongoose_wpool_mgr:start(Type, Host, Tag, WpoolOptsIn, ConnOpts) of +-spec start(pool_type(), host_type_or_global(), tag(), + pool_opts(), conn_opts(), wpool:strategy(), call_timeout()) -> + start_result(). +start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) -> + case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of {ok, Pid} -> - ets:insert(?MODULE, #mongoose_wpool{name = {Type, Host, Tag}, + ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag}, strategy = Strategy, call_timeout = CallTimeout}), {ok, Pid}; {external, Pid} -> - ets:insert(?MODULE, #mongoose_wpool{name = {Type, Host, Tag}}), + ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag}}), {ok, Pid}; Error -> Error @@ -138,52 +182,63 @@ start(Type, Host, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) -> %% 1. We want to have a full control of all the pools and its restarts %% 2. When a pool is started via wpool:start_pool it's supposed be called by a supervisor, %% if not, there is no way to stop the pool. --spec start_sup_pool(type(), name(), [wpool:option()]) -> +-spec start_sup_pool(pool_type(), proc_name(), [wpool:option()]) -> {ok, pid()} | {error, term()}. -start_sup_pool(Type, Name, WpoolOpts) -> - SupName = mongoose_wpool_type_sup:name(Type), - ChildSpec = #{id => Name, - start => {wpool, start_pool, [Name, WpoolOpts]}, +start_sup_pool(PoolType, ProcName, WpoolOpts) -> + SupName = mongoose_wpool_type_sup:name(PoolType), + ChildSpec = #{id => ProcName, + start => {wpool, start_pool, [ProcName, WpoolOpts]}, restart => temporary, type => supervisor, modules => [wpool]}, supervisor:start_child(SupName, ChildSpec). +-spec stop() -> term(). stop() -> - [ stop(Type, Host, Tag) || {Type, Host, Tag} <- get_pools() ]. + [stop_pool(PoolName) || PoolName <- get_pools()]. + +-spec stop_pool(pool_name()) -> stop_result(). +stop_pool({PoolType, HostType, Tag}) -> + stop(PoolType, HostType, Tag). -stop(Type) -> - stop(Type, global). +-spec stop(pool_type()) -> stop_result(). +stop(PoolType) -> + stop(PoolType, global). -stop(Type, Host) -> - stop(Type, Host, default). +-spec stop(pool_type(), host_type_or_global()) -> stop_result(). +stop(PoolType, HostType) -> + stop(PoolType, HostType, default). -stop(Type, Host, Tag) -> +-spec stop(pool_type(), host_type_or_global(), tag()) -> stop_result(). +stop(PoolType, HostType, Tag) -> try - ets:delete(?MODULE, {Type, Host, Tag}), - call_callback(stop, Type, [Host, Tag]), - mongoose_wpool_mgr:stop(Type, Host, Tag) + ets:delete(?MODULE, {PoolType, HostType, Tag}), + call_callback(stop, PoolType, [HostType, Tag]), + mongoose_wpool_mgr:stop(PoolType, HostType, Tag) catch C:R:S -> ?LOG_ERROR(#{what => pool_stop_failed, - pool_type => Type, server => Host, pool_tag => Tag, - pool_key => {Type, Host, Tag}, + pool_type => PoolType, server => HostType, pool_tag => Tag, + pool_key => {PoolType, HostType, Tag}, class => C, reason => R, stacktrace => S}) end. --spec is_configured(type()) -> boolean(). -is_configured(Type) -> +-spec is_configured(pool_type()) -> boolean(). +is_configured(PoolType) -> Pools = ejabberd_config:get_local_option_or_default(outgoing_pools, []), - lists:keymember(Type, 1, Pools). + lists:keymember(PoolType, 1, Pools). -get_worker(Type) -> - get_worker(Type, global). +-spec get_worker(pool_type()) -> worker_result(). +get_worker(PoolType) -> + get_worker(PoolType, global). -get_worker(Type, Host) -> - get_worker(Type, Host, default). +-spec get_worker(pool_type(), host_type_or_global()) -> worker_result(). +get_worker(PoolType, HostType) -> + get_worker(PoolType, HostType, default). -get_worker(Type, Host, Tag) -> - case get_pool(Type, Host, Tag) of +-spec get_worker(pool_type(), host_type_or_global(), tag()) -> worker_result(). +get_worker(PoolType, HostType, Tag) -> + case get_pool(PoolType, HostType, Tag) of {ok, #mongoose_wpool{strategy = Strategy} = Pool} -> Worker = wpool_pool:Strategy(make_pool_name(Pool)), {ok, whereis(Worker)}; @@ -191,117 +246,128 @@ get_worker(Type, Host, Tag) -> Err end. -call(Type, Request) -> - call(Type, global, Request). +call(PoolType, Request) -> + call(PoolType, global, Request). -call(Type, Host, Request) -> - call(Type, Host, default, Request). +call(PoolType, HostType, Request) -> + call(PoolType, HostType, default, Request). -call(Type, Host, Tag, Request) -> - case get_pool(Type, Host, Tag) of +call(PoolType, HostType, Tag, Request) -> + case get_pool(PoolType, HostType, Tag) of {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} -> wpool:call(make_pool_name(Pool), Request, Strategy, CallTimeout); Err -> Err end. -call(Type, Host, Tag, HashKey, Request) -> - case get_pool(Type, Host, Tag) of +call(PoolType, HostType, Tag, HashKey, Request) -> + case get_pool(PoolType, HostType, Tag) of {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} -> wpool:call(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout); Err -> Err end. -cast(Type, Request) -> - cast(Type, global, Request). +cast(PoolType, Request) -> + cast(PoolType, global, Request). -cast(Type, Host, Request) -> - cast(Type, Host, default, Request). +cast(PoolType, HostType, Request) -> + cast(PoolType, HostType, default, Request). -cast(Type, Host, Tag, Request) -> - case get_pool(Type, Host, Tag) of +cast(PoolType, HostType, Tag, Request) -> + case get_pool(PoolType, HostType, Tag) of {ok, #mongoose_wpool{strategy = Strategy} = Pool} -> wpool:cast(make_pool_name(Pool), Request, Strategy); Err -> Err end. -cast(Type, Host, Tag, HashKey, Request) -> - case get_pool(Type, Host, Tag) of +cast(PoolType, HostType, Tag, HashKey, Request) -> + case get_pool(PoolType, HostType, Tag) of {ok, #mongoose_wpool{} = Pool} -> wpool:cast(make_pool_name(Pool), Request, {hash_worker, HashKey}); Err -> Err end. -get_pool_settings(Type, Host, Tag) -> - case get_pool(Type, Host, Tag) of - {ok, PoolOpts} -> PoolOpts; +-spec get_pool_settings(pool_type(), host_type_or_global(), tag()) -> + #mongoose_wpool{} | undefined. +get_pool_settings(PoolType, HostType, Tag) -> + case get_pool(PoolType, HostType, Tag) of + {ok, PoolRec} -> PoolRec; {error, pool_not_started} -> undefined end. +-spec get_pools() -> [pool_name()]. get_pools() -> lists:map(fun(#mongoose_wpool{name = Name}) -> Name end, ets:tab2list(?MODULE)). -stats(Type, Host, Tag) -> - wpool:stats(make_pool_name(Type, Host, Tag)). +stats(PoolType, HostType, Tag) -> + wpool:stats(make_pool_name(PoolType, HostType, Tag)). --spec make_pool_name(type(), host(), tag()) -> atom(). -make_pool_name(Type, Host, Tag) when is_atom(Host) -> - make_pool_name(Type, atom_to_binary(Host, utf8), Tag); -make_pool_name(Type, Host, Tag) when is_binary(Host) -> - binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(Type, utf8))/binary, $$, - Host/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8). +-spec make_pool_name(pool_type(), scope(), tag()) -> proc_name(). +make_pool_name(PoolType, HostType, Tag) when is_atom(HostType) -> + make_pool_name(PoolType, atom_to_binary(HostType, utf8), Tag); +make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) -> + binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$, + HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8). -make_pool_name(#mongoose_wpool{name = {Type, Host, Tag}}) -> - make_pool_name(Type, Host, Tag). +make_pool_name(#mongoose_wpool{name = {PoolType, HostType, Tag}}) -> + make_pool_name(PoolType, HostType, Tag). -call_start_callback(Type, Args) -> - call_callback(start, Type, Args). +-spec call_start_callback(pool_type(), list()) -> term(). +call_start_callback(PoolType, Args) -> + call_callback(start, PoolType, Args). -call_callback(Name, Type, Args) -> +-spec call_callback(callback_fun(), pool_type(), list()) -> term(). +call_callback(CallbackFun, PoolType, Args) -> try - CallbackModule = make_callback_module_name(Type), - erlang:apply(CallbackModule, Name, Args) + CallbackModule = make_callback_module_name(PoolType), + erlang:apply(CallbackModule, CallbackFun, Args) catch E:R:ST -> ?LOG_ERROR(#{what => pool_callback_failed, - pool_type => Type, callback_function => Name, + pool_type => PoolType, callback_function => CallbackFun, error => E, reason => R, stacktrace => ST}), - {error, {callback_crashed, Name, E, R, ST}} + {error, {callback_crashed, CallbackFun, E, R, ST}} end. --spec make_callback_module_name(type()) -> module(). -make_callback_module_name(Type) -> - Name = "mongoose_wpool_" ++ atom_to_list(Type), +-spec make_callback_module_name(pool_type()) -> module(). +make_callback_module_name(PoolType) -> + Name = "mongoose_wpool_" ++ atom_to_list(PoolType), list_to_atom(Name). -default_opts(Type) -> - Mod = make_callback_module_name(Type), +-spec default_opts(pool_type()) -> conn_opts(). +default_opts(PoolType) -> + Mod = make_callback_module_name(PoolType), case erlang:function_exported(Mod, default_opts, 0) of true -> Mod:default_opts(); false -> [] end. -expand_pools(Pools, AllHosts) -> +-spec expand_pools([pool_tuple_in()], [mongooseim:host_type()]) -> [pool_tuple()]. +expand_pools(Pools, HostTypes) -> %% First we select only pools for a specific vhost - HostSpecific = [{Type, Host, Tag} || - {Type, Host, Tag, _, _} <- Pools, is_binary(Host)], - %% Then we expand all pools with `host` as Host parameter but using host specific configs + HostSpecific = [{PoolType, HostType, Tag} || + {PoolType, HostType, Tag, _, _} <- Pools, + is_binary(HostType)], + %% Then we expand all pools with `host` as HostType parameter but using host specific configs %% if they were provided - F = fun({Type, host, Tag, WpoolOpts, ConnOpts}) -> - [{Type, Host, Tag, WpoolOpts, ConnOpts} || Host <- AllHosts, - not lists:member({Type, Host, Tag}, HostSpecific)]; + F = fun({PoolType, host, Tag, WpoolOpts, ConnOpts}) -> + [{PoolType, HostType, Tag, WpoolOpts, ConnOpts} || + HostType <- HostTypes, + not lists:member({PoolType, HostType, Tag}, HostSpecific)]; (Other) -> [Other] end, lists:flatmap(F, Pools). +-spec get_unique_types([pool_tuple_in()]) -> [pool_type()]. get_unique_types(Pools) -> - ordsets:to_list(ordsets:from_list([Type || {Type, _, _, _, _} <- Pools])). + lists:usort([PoolType || {PoolType, _, _, _, _} <- Pools]). -get_pool(Type, Host, Tag) -> - case ets:lookup(?MODULE, {Type, Host, Tag}) of - [] when is_binary(Host) -> get_pool(Type, global, Tag); +-spec get_pool(pool_type(), host_type_or_global(), tag()) -> pool_record_result(). +get_pool(PoolType, HostType, Tag) -> + case ets:lookup(?MODULE, {PoolType, HostType, Tag}) of + [] when is_binary(HostType) -> get_pool(PoolType, global, Tag); [] -> {error, pool_not_started}; [Pool] -> {ok, Pool} end. diff --git a/src/wpool/mongoose_wpool_cassandra.erl b/src/wpool/mongoose_wpool_cassandra.erl index ba30afd797b..46d1d88c770 100644 --- a/src/wpool/mongoose_wpool_cassandra.erl +++ b/src/wpool/mongoose_wpool_cassandra.erl @@ -5,11 +5,13 @@ -export([start/4]). -export([stop/2]). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> {ok, []} = application:ensure_all_started(cqerl), application:set_env(cqerl, maps, true). -start(Host, Tag, WpoolOptsIn, CqerlOpts) -> +start(HostType, Tag, WpoolOptsIn, CqerlOpts) -> PoolSize = proplists:get_value(workers, WpoolOptsIn, 20), application:set_env(cqerl, num_clients, PoolSize), ExtConfig = extend_config(CqerlOpts), @@ -22,7 +24,7 @@ start(Host, Tag, WpoolOptsIn, CqerlOpts) -> _ -> erlang:throw({not_all_nodes_added, Res}) end, - Name = mongoose_wpool:make_pool_name(cassandra, Host, Tag), + Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag), Worker = {mongoose_cassandra_worker, [Tag]}, WpoolOpts = [{worker, Worker} | WpoolOptsIn], mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts). @@ -30,13 +32,14 @@ start(Host, Tag, WpoolOptsIn, CqerlOpts) -> stop(_, _) -> ok. +%% -------------------------------------------------------------- +%% Internal functions extend_config(PoolConfig) -> Defaults = #{ servers => [{"localhost", 9042}], tcp_opts => [{keepalive, true}], keyspace => mongooseim }, - ConfigMap = maps:merge(Defaults, maps:from_list(PoolConfig)), maps:to_list(ConfigMap). diff --git a/src/wpool/mongoose_wpool_elastic.erl b/src/wpool/mongoose_wpool_elastic.erl index 927a0c9191f..6b42c40715f 100644 --- a/src/wpool/mongoose_wpool_elastic.erl +++ b/src/wpool/mongoose_wpool_elastic.erl @@ -5,20 +5,22 @@ -export([start/4]). -export([stop/2]). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> tirerl:start(), ok. -start(Host, Tag, WpoolOptsIn, ConnOpts) -> +start(HostType, Tag, WpoolOptsIn, ConnOpts) -> ElasticHost = proplists:get_value(host, ConnOpts, "localhost"), Port = proplists:get_value(port, ConnOpts, 9200), - PoolName = mongoose_wpool:make_pool_name(elastic, Host, Tag), + ProcName = mongoose_wpool:make_pool_name(elastic, HostType, Tag), Opts = [{host, list_to_binary(ElasticHost)}, {port, Port}], WPoolOptions = [{overrun_warning, infinity}, {overrun_handler, {error_logger, warning_report}}, {worker, {tirerl_worker, Opts}} | WpoolOptsIn], - case mongoose_wpool:start_sup_pool(elastic, PoolName, WPoolOptions) of + case mongoose_wpool:start_sup_pool(elastic, ProcName, WPoolOptions) of {ok, Pid} -> {external, Pid}; Other -> diff --git a/src/wpool/mongoose_wpool_generic.erl b/src/wpool/mongoose_wpool_generic.erl index 6030e0baec4..396abe5efd3 100644 --- a/src/wpool/mongoose_wpool_generic.erl +++ b/src/wpool/mongoose_wpool_generic.erl @@ -5,12 +5,14 @@ -export([start/4]). -export([stop/2]). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> ok. -start(Host, Tag, WpoolOptsIn, _ConnOpts) -> - Name = mongoose_wpool:make_pool_name(generic, Host, Tag), - mongoose_wpool:start_sup_pool(generic, Name, WpoolOptsIn). +start(HostType, Tag, WpoolOptsIn, _ConnOpts) -> + ProcName = mongoose_wpool:make_pool_name(generic, HostType, Tag), + mongoose_wpool:start_sup_pool(generic, ProcName, WpoolOptsIn). stop(_, _) -> ok. diff --git a/src/wpool/mongoose_wpool_http.erl b/src/wpool/mongoose_wpool_http.erl index fc00d9b5a43..94bca9dc69f 100644 --- a/src/wpool/mongoose_wpool_http.erl +++ b/src/wpool/mongoose_wpool_http.erl @@ -13,10 +13,11 @@ -export([start/4, stop/2]). -export([get_params/2]). -%% -------------------------------------------------------------- -%% API -%% -------------------------------------------------------------- +-type path_prefix() :: binary(). +-type request_timeout() :: non_neg_integer(). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> case ets:info(?MODULE) of undefined -> @@ -31,28 +32,31 @@ init() -> ok end. -start(Host, Tag, WpoolOptsIn, ConnOpts) -> - Name = mongoose_wpool:make_pool_name(http, Host, Tag), - WpoolOpts = wpool_spec(Host, WpoolOptsIn, ConnOpts), +start(HostType, Tag, WpoolOptsIn, ConnOpts) -> + Name = mongoose_wpool:make_pool_name(http, HostType, Tag), + WpoolOpts = wpool_spec(WpoolOptsIn, ConnOpts), PathPrefix = list_to_binary(gen_mod:get_opt(path_prefix, ConnOpts, "/")), RequestTimeout = gen_mod:get_opt(request_timeout, ConnOpts, 2000), case mongoose_wpool:start_sup_pool(http, Name, WpoolOpts) of {ok, Pid} -> - ets:insert(?MODULE, {{Host, Tag}, PathPrefix, RequestTimeout}), + ets:insert(?MODULE, {{HostType, Tag}, PathPrefix, RequestTimeout}), {ok, Pid}; Other -> Other end. -stop(Host, Tag) -> - true = ets:delete(?MODULE, {Host, Tag}), +stop(HostType, Tag) -> + true = ets:delete(?MODULE, {HostType, Tag}), ok. --spec get_params(Host :: jid:lserver() | global, Tag :: atom()) -> - {ok, PathPrefix :: binary(), RequestTimeout :: non_neg_integer()} +%% -------------------------------------------------------------- +%% Other API functions +-spec get_params(HostType :: mongoose_wpool:host_type_or_global(), + Tag :: mongoose_wpool:tag()) -> + {ok, PathPrefix :: path_prefix(), RequestTimeout :: request_timeout()} | {error, pool_not_started}. -get_params(Host, Tag) -> - case {ets:lookup(?MODULE, {Host, Tag}), Host} of +get_params(HostType, Tag) -> + case {ets:lookup(?MODULE, {HostType, Tag}), HostType} of {[], global} -> {error, pool_not_started}; {[], _} -> get_params(global, Tag); {[{_, PathPrefix, RequestTimeout}], _} -> {ok, PathPrefix, RequestTimeout} @@ -60,9 +64,8 @@ get_params(Host, Tag) -> %% -------------------------------------------------------------- %% Internal functions -%% -------------------------------------------------------------- -wpool_spec(Host, WpoolOptsIn, ConnOpts) -> +wpool_spec(WpoolOptsIn, ConnOpts) -> TargetServer = gen_mod:get_opt(server, ConnOpts), HttpOpts = gen_mod:get_opt(http_opts, ConnOpts, []), Worker = {fusco, {TargetServer, HttpOpts}}, diff --git a/src/wpool/mongoose_wpool_ldap.erl b/src/wpool/mongoose_wpool_ldap.erl index 0b6a73c40f7..675d37b1f07 100644 --- a/src/wpool/mongoose_wpool_ldap.erl +++ b/src/wpool/mongoose_wpool_ldap.erl @@ -4,13 +4,15 @@ -export([init/0]). -export([start/4, stop/2]). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> ok. -start(Host, Tag, WpoolOpts, ConnOpts) -> +start(HostType, Tag, WpoolOpts, ConnOpts) -> WorkerSpec = {mongoose_ldap_worker, ConnOpts}, - Name = mongoose_wpool:make_pool_name(ldap, Host, Tag), - mongoose_wpool:start_sup_pool(ldap, Name, [{worker, WorkerSpec} | WpoolOpts]). + ProcName = mongoose_wpool:make_pool_name(ldap, HostType, Tag), + mongoose_wpool:start_sup_pool(ldap, ProcName, [{worker, WorkerSpec} | WpoolOpts]). -stop(_Host, _Tag) -> +stop(_HostType, _Tag) -> ok. diff --git a/src/wpool/mongoose_wpool_mgr.erl b/src/wpool/mongoose_wpool_mgr.erl index a6a6919f560..cdb6ee62af8 100644 --- a/src/wpool/mongoose_wpool_mgr.erl +++ b/src/wpool/mongoose_wpool_mgr.erl @@ -42,15 +42,15 @@ -record(state, {type, pools, monitors}). -type start_request() :: {start_pool, - mongoose_wpool:host(), mongoose_wpool:tag(), + mongoose_wpool:scope(), mongoose_wpool:tag(), [any()], [any()]}. -type stop_request() :: {stop_pool, - mongoose_wpool:host(), mongoose_wpool:tag()}. + mongoose_wpool:scope(), mongoose_wpool:tag()}. -type request() :: start_request() | stop_request(). --type monitored_pool() :: {mongoose_wpool:type(), mongoose_wpool:host(), mongoose_wpool:tag()}. +-type monitored_pool() :: {mongoose_wpool:pool_type(), mongoose_wpool:scope(), mongoose_wpool:tag()}. -type known_pools() :: #{monitored_pool() := #{monitor := undefined | reference(), wpool_opts := [wpool:option()], @@ -75,7 +75,7 @@ start(Type, Host, Tag, PoolOpts, ConnOpts) -> stop(Type, Host, Tag) -> gen_server:call(name(Type), {stop_pool, Host, Tag}). --spec name(mongoose_wpool:type()) -> mongoose_wpool:name(). +-spec name(mongoose_wpool:pool_type()) -> mongoose_wpool:proc_name(). name(Type) -> list_to_atom("mongoose_wpool_" ++ atom_to_list(Type) ++ "_mgr"). %%%=================================================================== diff --git a/src/wpool/mongoose_wpool_rdbms.erl b/src/wpool/mongoose_wpool_rdbms.erl index 7585135c462..66ae6d13bf8 100644 --- a/src/wpool/mongoose_wpool_rdbms.erl +++ b/src/wpool/mongoose_wpool_rdbms.erl @@ -6,6 +6,8 @@ -export([default_opts/0]). -export([stop/2]). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> case ets:info(prepared_statements) of undefined -> @@ -20,8 +22,8 @@ init() -> ok end. -start(Host, Tag, WpoolOpts, RdbmsOpts) -> - try do_start(Host, Tag, WpoolOpts, RdbmsOpts) +start(HostType, Tag, WpoolOpts, RdbmsOpts) -> + try do_start(HostType, Tag, WpoolOpts, RdbmsOpts) catch Err -> {error, Err} end. @@ -32,7 +34,9 @@ default_opts() -> stop(_, _) -> ok. -do_start(Host, Tag, WpoolOpts0, RdbmsOpts) when is_list(WpoolOpts0) and is_list(RdbmsOpts) -> +%% -------------------------------------------------------------- +%% Helper functions +do_start(HostType, Tag, WpoolOpts0, RdbmsOpts) when is_list(WpoolOpts0) and is_list(RdbmsOpts) -> BackendName = backend_name(RdbmsOpts), try mongoose_rdbms_backend:backend_name() of BackendName -> ok; @@ -43,15 +47,14 @@ do_start(Host, Tag, WpoolOpts0, RdbmsOpts) when is_list(WpoolOpts0) and is_list( error:undef -> backend_module:create(mongoose_rdbms, BackendName, [query, execute]) end, + mongoose_metrics:ensure_db_pool_metric({rdbms, HostType, Tag}), + WpoolOpts = make_wpool_opts(WpoolOpts0, RdbmsOpts), + ProcName = mongoose_wpool:make_pool_name(rdbms, HostType, Tag), + mongoose_wpool:start_sup_pool(rdbms, ProcName, WpoolOpts). - mongoose_metrics:ensure_db_pool_metric({rdbms, Host, Tag}), - +make_wpool_opts(WpoolOpts0, RdbmsOpts) -> Worker = {mongoose_rdbms, RdbmsOpts}, - %% Without lists:map dialyzer doesn't understand that WpoolOpts is a list (?) and the - %% do_start function has no return. - WpoolOpts = lists:map(fun(X) -> X end, [{worker, Worker}, {pool_sup_shutdown, infinity} | WpoolOpts0]), - Name = mongoose_wpool:make_pool_name(rdbms, Host, Tag), - mongoose_wpool:start_sup_pool(rdbms, Name, WpoolOpts). + [{worker, Worker}, {pool_sup_shutdown, infinity} | WpoolOpts0]. -spec backend_name(proplist:proplists()) -> odbc | pgsql | mysql. backend_name(RdbmsOpts) -> diff --git a/src/wpool/mongoose_wpool_redis.erl b/src/wpool/mongoose_wpool_redis.erl index 84adc0e5eb6..ad88249821c 100644 --- a/src/wpool/mongoose_wpool_redis.erl +++ b/src/wpool/mongoose_wpool_redis.erl @@ -6,13 +6,15 @@ -export([stop/2]). -export([is_supported_strategy/1]). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> ok. -start(Host, Tag, WpoolOptsIn, ConnOpts) -> - Name = mongoose_wpool:make_pool_name(redis, Host, Tag), +start(HostType, Tag, WpoolOptsIn, ConnOpts) -> + ProcName = mongoose_wpool:make_pool_name(redis, HostType, Tag), WpoolOpts = wpool_spec(WpoolOptsIn, ConnOpts), - mongoose_wpool:start_sup_pool(redis, Name, WpoolOpts). + mongoose_wpool:start_sup_pool(redis, ProcName, WpoolOpts). stop(_, _) -> ok. @@ -20,15 +22,12 @@ stop(_, _) -> is_supported_strategy(available_worker) -> false; is_supported_strategy(_) -> true. -%%%=================================================================== +%% -------------------------------------------------------------- %%% Internal functions -%%%=================================================================== - wpool_spec(WpoolOptsIn, ConnOpts) -> Worker = {eredis_client, makeargs(ConnOpts)}, [{worker, Worker} | WpoolOptsIn]. - makeargs(RedisOpts) -> Host = proplists:get_value(host, RedisOpts, "127.0.0.1"), Port = proplists:get_value(port, RedisOpts, 6379), diff --git a/src/wpool/mongoose_wpool_riak.erl b/src/wpool/mongoose_wpool_riak.erl index c3970b266cb..b55f5428482 100644 --- a/src/wpool/mongoose_wpool_riak.erl +++ b/src/wpool/mongoose_wpool_riak.erl @@ -8,13 +8,15 @@ -export([get_riak_opt/2]). -export([get_riak_opt/3]). +%% -------------------------------------------------------------- +%% mongoose_wpool callbacks init() -> ok. -start(Host, Tag, WpoolOptsIn, ConnOpts) -> - Name = mongoose_wpool:make_pool_name(riak, Host, Tag), +start(HostType, Tag, WpoolOptsIn, ConnOpts) -> + ProcName = mongoose_wpool:make_pool_name(riak, HostType, Tag), WpoolOpts = wpool_spec(WpoolOptsIn, ConnOpts), - mongoose_wpool:start_sup_pool(riak, Name, WpoolOpts). + mongoose_wpool:start_sup_pool(riak, ProcName, WpoolOpts). stop(_, _) -> ok. @@ -22,6 +24,9 @@ stop(_, _) -> is_supported_strategy(available_worker) -> false; is_supported_strategy(_) -> true. +%% -------------------------------------------------------------- +%% Other functions + wpool_spec(WpoolOptsIn, ConnOpts) -> {_, RiakAddr} = mongoose_wpool_riak:get_riak_opt(address, ConnOpts), {_, RiakPort} = mongoose_wpool_riak:get_riak_opt(port, ConnOpts), diff --git a/src/wpool/mongoose_wpool_sup.erl b/src/wpool/mongoose_wpool_sup.erl index 1dd505d753f..d87eec4b30d 100644 --- a/src/wpool/mongoose_wpool_sup.erl +++ b/src/wpool/mongoose_wpool_sup.erl @@ -68,9 +68,9 @@ init([]) -> %%%=================================================================== %%% Internal functions %%%=================================================================== --spec child_spec(mongoose_wpool:type()) -> - #{id := mongoose_wpool:name(), - start := {mongoose_wpool_type_sup, start_link, [mongoose_wpool:type()]}, +-spec child_spec(mongoose_wpool:pool_type()) -> + #{id := mongoose_wpool:proc_name(), + start := {mongoose_wpool_type_sup, start_link, [mongoose_wpool:pool_type()]}, restart => transient, shutdown => brutal_kill, type => supervisor, diff --git a/src/wpool/mongoose_wpool_type_sup.erl b/src/wpool/mongoose_wpool_type_sup.erl index f555c738d52..d782898c4d5 100644 --- a/src/wpool/mongoose_wpool_type_sup.erl +++ b/src/wpool/mongoose_wpool_type_sup.erl @@ -36,14 +36,14 @@ %% %% @end %%-------------------------------------------------------------------- --spec start_link(mongoose_wpool:type()) -> +-spec start_link(mongoose_wpool:pool_type()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}. -start_link(Type) -> - supervisor:start_link({local, name(Type)}, ?MODULE, [Type]). +start_link(PoolType) -> + supervisor:start_link({local, name(PoolType)}, ?MODULE, [PoolType]). --spec name(mongoose_wpool:type()) -> mongoose_wpool:name(). -name(Type) -> - list_to_atom("mongoose_wpool_" ++ atom_to_list(Type) ++ "_sup"). +-spec name(mongoose_wpool:pool_type()) -> mongoose_wpool:proc_name(). +name(PoolType) -> + list_to_atom("mongoose_wpool_" ++ atom_to_list(PoolType) ++ "_sup"). %%%=================================================================== %%% Supervisor callbacks @@ -60,19 +60,19 @@ name(Type) -> %% @end %%-------------------------------------------------------------------- -spec init(Args :: term()) -> {ok, {#{strategy => one_for_one, intensity => 100, period => 5}, - [#{id := mongoose_wpool:name(), - start := {mongoose_wpool_mgr, start_link, [mongoose_wpool:type()]}, + [#{id := mongoose_wpool:proc_name(), + start := {mongoose_wpool_mgr, start_link, [mongoose_wpool:pool_type()]}, restart => transient, shutdown => brutal_kill, type => worker, modules => [module()]}]}}. -init([Type]) -> +init([PoolType]) -> SupFlags = #{strategy => one_for_one, intensity => 100, period => 5}, - ChildSpec = #{id => mongoose_wpool_mgr:name(Type), - start => {mongoose_wpool_mgr, start_link, [Type]}, + ChildSpec = #{id => mongoose_wpool_mgr:name(PoolType), + start => {mongoose_wpool_mgr, start_link, [PoolType]}, restart => transient, shutdown => brutal_kill, type => worker, diff --git a/test/mongoose_wpool_SUITE.erl b/test/mongoose_wpool_SUITE.erl index a9596ebab45..42e8d98425e 100644 --- a/test/mongoose_wpool_SUITE.erl +++ b/test/mongoose_wpool_SUITE.erl @@ -50,7 +50,8 @@ init_per_suite(Config) -> ok = meck:new(mongoose_wpool, [no_link, passthrough]), ok = meck:new(ejabberd_config, [no_link]), meck:expect(ejabberd_config, get_global_option_or_default, - fun(hosts, _) -> [<<"a.com">>, <<"b.com">>, <<"c.eu">>] end), + fun(hosts, _) -> [<<"a.com">>, <<"b.com">>, <<"c.eu">>]; + (host_types, _) -> [] end), Self = self(), spawn(fun() -> register(test_helper, self()),