Skip to content

Commit

Permalink
Merge pull request #3582 from esl/mod_global_distrib_mapping-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
gustawlippa authored Mar 10, 2022
2 parents 6e3fd8a + b00733a commit ef8b27c
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 80 deletions.
15 changes: 7 additions & 8 deletions big_tests/tests/mod_global_distrib_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,11 @@ unpause_refresher(NodeName, _) ->
%% Reads Redis to confirm that endpoints (in Redis) are overwritten
%% with `advertised_endpoints` option value
test_advertised_endpoints_override_endpoints(_Config) ->
Endps = execute_on_each_node(mod_global_distrib_mapping_redis,
get_endpoints,
[<<"reg1">>]),
true = lists:all(fun({ok, E}) ->
lists:sort(iptuples_to_string(E)) =:=
lists:sort(advertised_endpoints()) end, Endps).
Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]),
true = lists:all(
fun(E) ->
lists:sort(iptuples_to_string(E)) =:= lists:sort(advertised_endpoints())
end, Endps).

%% @doc Verifies that hosts refresher will restart the outgoing connection pool if
%% it goes down for some reason (crash or domain unavailability).
Expand Down Expand Up @@ -1295,8 +1294,8 @@ service_port() ->

wait_for_domain(Node, Domain) ->
F = fun() ->
{ok, Domains} = rpc:call(Node, mod_global_distrib_mapping, all_domains, []),
lists:member(Domain, Domains)
Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []),
lists:member(Domain, Domains)
end,
mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}).

Expand Down
6 changes: 1 addition & 5 deletions src/gen_mod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
-export([
% Modules start & stop, do NOT use in the tests, use mongoose_modules API instead
start_module/3,
start_backend_module/2,
start_backend_module/3,
stop_module/2,
does_module_support/2,
Expand Down Expand Up @@ -175,10 +174,7 @@ is_common_test_running() ->
false
end.

-spec start_backend_module(module(), list()) -> any().
start_backend_module(Module, Opts) ->
start_backend_module(Module, Opts, []).

%% @deprecated To be removed when mod_pubsub does not use it anymore
start_backend_module(Module, Opts, TrackedFuncs) ->
Backend = gen_mod:get_opt(backend, Opts, mnesia),
backend_module:create(Module, Backend, TrackedFuncs).
Expand Down
9 changes: 3 additions & 6 deletions src/global_distrib/mod_global_distrib_disco.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,12 @@ opt(Key) ->
-spec domains_for_disco(Host :: jid:lserver(), From :: jid:jid()) -> Domains :: [binary()].
domains_for_disco(_Host, #jid{ luser = <<>> } = _From) ->
%% Currently all non-user entities may discover all services
{ok, Domains} = mod_global_distrib_mapping:all_domains(),
Domains;
mod_global_distrib_mapping:all_domains();
domains_for_disco(Host, _From) ->
case gen_mod:get_module_opt(Host, mod_disco, users_can_see_hidden_services, true) of
true ->
{ok, Domains} = mod_global_distrib_mapping:all_domains(),
Domains;
mod_global_distrib_mapping:all_domains();
false ->
{ok, Domains} = mod_global_distrib_mapping:public_domains(),
Domains
mod_global_distrib_mapping:public_domains()
end.

41 changes: 5 additions & 36 deletions src/global_distrib/mod_global_distrib_mapping.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,44 +35,14 @@
session_opened/5, session_closed/5]).
-export([endpoints/1, hosts/0]).

-define(MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, mod_global_distrib_mapping_backend).
-ignore_xref([
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, get_domains, 0},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, delete_domain, 1},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, delete_session, 1},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, get_endpoints, 1},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, get_domain, 1},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, get_session, 1},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, get_hosts, 0},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, get_public_domains, 0},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, put_domain, 2},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, put_session, 1},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, start, 1},
{?MOD_GLOBAL_DISTRIB_MAPPING_BACKEND, stop, 0},
behaviour_info/1, delete_for_domain/1, delete_for_jid/1, insert_for_domain/1,
delete_for_domain/1, delete_for_jid/1, insert_for_domain/1,
insert_for_domain/2, insert_for_domain/3, insert_for_jid/1, packet_to_component/3,
register_subhost/3, session_closed/5, session_opened/5, unregister_subhost/2
]).

-type endpoint() :: mod_global_distrib_utils:endpoint().

%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------

-callback start(Opts :: proplists:proplist()) -> any().
-callback stop() -> any().
-callback put_session(JID :: binary()) -> ok | error.
-callback get_session(JID :: binary()) -> {ok, Host :: binary()} | error.
-callback delete_session(JID :: binary()) -> ok | error.
-callback put_domain(Domain :: binary(), IsHidden :: boolean()) -> ok | error.
-callback get_domain(Domain :: binary()) -> {ok, Host :: binary()} | error.
-callback delete_domain(Domain :: binary()) -> ok | error.
-callback get_domains() -> {ok, [Domain :: binary()]} | error.
-callback get_public_domains() -> {ok, [Domain :: binary()]} | error.
-callback get_endpoints(Host :: binary()) -> {ok, [endpoint()]}.
-callback get_hosts() -> [Host :: jid:lserver()].

%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -143,15 +113,15 @@ delete_for_jid({_, _, _} = Jid) ->
end,
normalize_jid(Jid)).

-spec all_domains() -> {ok, [jid:lserver()]}.
-spec all_domains() -> [jid:lserver()].
all_domains() ->
mod_global_distrib_mapping_backend:get_domains().

-spec public_domains() -> {ok, [jid:lserver()]}.
-spec public_domains() -> [jid:lserver()].
public_domains() ->
mod_global_distrib_mapping_backend:get_public_domains().

-spec endpoints(Host :: jid:lserver()) -> {ok, [endpoint()]}.
-spec endpoints(Host :: jid:lserver()) -> [endpoint()].
endpoints(Host) ->
mod_global_distrib_mapping_backend:get_endpoints(Host).

Expand Down Expand Up @@ -237,8 +207,7 @@ deps(_Opts) ->
start() ->
Host = opt(global_host),
Backend = opt(backend),
gen_mod:start_backend_module(?MODULE, [{backend, Backend}]),
mod_global_distrib_mapping_backend:start(opt(Backend)),
mod_global_distrib_mapping_backend:start([{backend, Backend} | opt(Backend)]),

mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_MAPPING_FETCH_TIME, histogram),
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_MAPPING_FETCHES, spiral),
Expand Down
92 changes: 92 additions & 0 deletions src/global_distrib/mod_global_distrib_mapping_backend.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
%%% @doc A proxy interface module between the main mod_global_distrib_mapping module
%%% and the backend modules.
%%% There is only one backend implementation (redis), so the backend module is global.
-module(mod_global_distrib_mapping_backend).

-export([start/1,
stop/0,
put_session/1,
get_session/1,
delete_session/1,
put_domain/2,
get_domain/1,
delete_domain/1,
get_domains/0,
get_public_domains/0,
get_endpoints/1,
get_hosts/0]).

-define(MAIN_MODULE, mod_global_distrib_mapping).

-type endpoint() :: mod_global_distrib_utils:endpoint().

%% Callbacks

-callback start(gen_mod:module_opts()) -> any().
-callback stop() -> any().

%% Mapping from a JID to the local host
-callback put_session(jid:literal_jid()) -> ok.
-callback get_session(jid:literal_jid()) -> {ok, jid:lserver()} | error.
-callback delete_session(jid:literal_jid()) -> ok.

%% Mapping from a domain to the local host
-callback put_domain(jid:lserver(), IsHidden :: boolean()) -> ok.
-callback get_domain(jid:lserver()) -> {ok, jid:lserver()} | error.
-callback delete_domain(jid:lserver()) -> ok.

-callback get_domains() -> [jid:lserver()].
-callback get_public_domains() -> [jid:lserver()].
-callback get_endpoints(jid:lserver()) -> [endpoint()].
-callback get_hosts() -> [jid:lserver()].

%% API Functions

-spec start(gen_mod:module_opts()) -> any().
start(Opts) ->
mongoose_backend:init(global, ?MAIN_MODULE, [], Opts),
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [Opts]).

-spec stop() -> any().
stop() ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, []).

-spec put_session(jid:literal_jid()) -> ok.
put_session(JID) ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [JID]).

-spec get_session(jid:literal_jid()) -> {ok, jid:lserver()} | error.
get_session(JID) ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [JID]).

-spec delete_session(jid:literal_jid()) -> ok.
delete_session(JID) ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [JID]).

-spec put_domain(jid:lserver(), boolean()) -> ok.
put_domain(Domain, IsHidden) ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [Domain, IsHidden]).

-spec get_domain(jid:lserver()) -> {ok, jid:lserver()} | error.
get_domain(Domain) ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [Domain]).

-spec delete_domain(jid:lserver()) -> ok.
delete_domain(Domain) ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [Domain]).

-spec get_domains() -> [jid:lserver()].
get_domains() ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, []).

-spec get_public_domains() -> [jid:lserver()].
get_public_domains() ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, []).

-spec get_endpoints(jid:lserver()) -> [endpoint()].
get_endpoints(Domain) ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, [Domain]).

-spec get_hosts() -> [jid:lserver()].
get_hosts() ->
mongoose_backend:call(global, ?MAIN_MODULE, ?FUNCTION_NAME, []).
33 changes: 17 additions & 16 deletions src/global_distrib/mod_global_distrib_mapping_redis.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
-module(mod_global_distrib_mapping_redis).
-author('konrad.zemek@erlang-solutions.com').

-behaviour(mod_global_distrib_mapping).
-behaviour(mod_global_distrib_mapping_backend).

-include("mongoose.hrl").

Expand All @@ -43,9 +43,9 @@
%% API
%%--------------------------------------------------------------------

-spec start(proplists:proplist()) -> any().
-spec start(gen_mod:module_opts()) -> any().
start(Opts) ->
RefreshAfter = proplists:get_value(refresh_after, Opts, 60),
RefreshAfter = gen_mod:get_opt(refresh_after, Opts, 60),

mod_global_distrib_utils:create_ets([?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]),
ExpireAfter = proplists:get_value(expire_after, Opts, 120),
Expand All @@ -68,21 +68,21 @@ stop() ->
[?MODULE, mod_global_distrib_redis_refresher]),
[ets:delete(Tab) || Tab <- [?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]].

-spec put_session(Jid :: binary()) -> ok.
-spec put_session(jid:literal_jid()) -> ok.
put_session(Jid) ->
ets:insert(?JIDS_ETS, {Jid}),
do_put(Jid, opt(local_host)).

-spec get_session(Jid :: binary()) -> {ok, Host :: binary()} | error.
-spec get_session(jid:literal_jid()) -> {ok, jid:lserver()} | error.
get_session(Jid) ->
do_get(Jid).

-spec delete_session(Jid :: binary()) -> ok.
-spec delete_session(jid:literal_jid()) -> ok.
delete_session(Jid) ->
ets:delete(?JIDS_ETS, Jid),
do_delete(Jid).

-spec put_domain(Domain :: binary(), IsHidden :: boolean()) -> ok.
-spec put_domain(jid:lserver(), IsHidden :: boolean()) -> ok.
put_domain(Domain, IsHidden) ->
ets:insert(?DOMAINS_ETS, {Domain}),
{ok, _} = q([<<"SADD">>, domains_key(), Domain]),
Expand All @@ -96,37 +96,37 @@ put_domain(Domain, IsHidden) ->
ok
end.

-spec get_domain(Domain :: binary()) -> {ok, Host :: binary()} | error.
-spec get_domain(jid:lserver()) -> {ok, jid:lserver()} | error.
get_domain(Domain) ->
do_get(Domain).

-spec delete_domain(Domain :: binary()) -> ok.
-spec delete_domain(jid:lserver()) -> ok.
delete_domain(Domain) ->
ets:delete(?DOMAINS_ETS, Domain),
ets:delete(?PUBLIC_DOMAINS_ETS, Domain),
do_delete(Domain),
{ok, _} = q([<<"SREM">>, domains_key(), Domain]),
ok.

-spec get_domains() -> {ok, [Domain :: binary()]}.
-spec get_domains() -> [jid:lserver()].
get_domains() ->
get_domains(fun domains_key/2).

-spec get_public_domains() -> {ok, [Domain :: binary()]}.
-spec get_public_domains() -> [jid:lserver()].
get_public_domains() ->
get_domains(fun public_domains_key/2).

-spec get_endpoints(Host :: jid:lserver()) -> {ok, [mod_global_distrib_utils:endpoint()]}.
-spec get_endpoints(Host :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()].
get_endpoints(Host) ->
Nodes = get_nodes(Host),
get_endpoints_for_nodes(Host, Nodes).

get_endpoints_for_nodes(_Host, []) ->
{ok, []};
[];
get_endpoints_for_nodes(Host, Nodes) ->
EndpointKeys = [endpoints_key(Host, Node) || Node <- Nodes],
{ok, BinEndpoints} = q([<<"SUNION">> | EndpointKeys]),
{ok, lists:map(fun binary_to_endpoint/1, BinEndpoints)}.
lists:map(fun binary_to_endpoint/1, BinEndpoints).

%%--------------------------------------------------------------------
%% gen_server API
Expand Down Expand Up @@ -258,12 +258,13 @@ do_delete(Key) ->
ok.

-spec get_domains(KeyFun :: fun((Host :: binary(), Node :: binary()) -> Key :: binary())) ->
{ok, Domains :: [binary()]}.
[jid:lserver()].
get_domains(KeyFun) ->
Hosts = get_hosts(),
Nodes = lists:flatmap(fun(Host) -> [{Host, Node} || Node <- get_nodes(Host)] end, Hosts),
Keys = [KeyFun(Host, Node) || {Host, Node} <- Nodes],
{ok, _Domains} = q([<<"SUNION">> | Keys]).
{ok, Domains} = q([<<"SUNION">> | Keys]),
Domains.

-spec refresh_hosts() -> any().
refresh_hosts() ->
Expand Down
17 changes: 8 additions & 9 deletions src/global_distrib/mod_global_distrib_server_mgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pick_connection_pool(Enabled) ->
refresh_connections(#state{ server = Server, pending_endpoints = PendingEndpoints,
last_endpoints = LastEndpoints } = State) ->
?LOG_DEBUG(ls(#{what => gd_refreshing_endpoints}, State)),
{ok, NewEndpoints} = get_endpoints(Server),
NewEndpoints = get_endpoints(Server),
case NewEndpoints of
LastEndpoints ->
nothing_new;
Expand Down Expand Up @@ -403,15 +403,14 @@ refresh_connections(#state{ server = Server, pending_endpoints = PendingEndpoint
end,
State#state{ pending_endpoints = FinalPendingEndpoints, last_endpoints = NewEndpoints }.

-spec get_endpoints(Server :: jid:lserver()) -> {ok, [mod_global_distrib_utils:endpoint()]}.
-spec get_endpoints(Server :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()].
get_endpoints(Server) ->
{ok, EndpointsToResolve} =
case mongoose_config:lookup_opt({global_distrib_addr, Server}) of
{error, not_found} -> mod_global_distrib_mapping:endpoints(Server);
{ok, Endpoints} -> {ok, Endpoints}
end,
Resolved = mod_global_distrib_utils:resolve_endpoints(EndpointsToResolve),
{ok, Resolved}.
EndpointsToResolve =
case mongoose_config:lookup_opt({global_distrib_addr, Server}) of
{error, not_found} -> mod_global_distrib_mapping:endpoints(Server);
{ok, Endpoints} -> Endpoints
end,
mod_global_distrib_utils:resolve_endpoints(EndpointsToResolve).

-spec resolve_pending(NewEndpointList :: [mod_global_distrib_utils:endpoint()],
OldEnabled :: [endpoint_pid_tuple()]) ->
Expand Down

0 comments on commit ef8b27c

Please sign in to comment.