Skip to content

Commit

Permalink
Merge pull request #3587 from esl/mod_global_distrib-map
Browse files Browse the repository at this point in the history
  • Loading branch information
gustawlippa authored Mar 18, 2022
2 parents 7a43c61 + 1b29057 commit aa85b21
Show file tree
Hide file tree
Showing 21 changed files with 600 additions and 696 deletions.
3 changes: 1 addition & 2 deletions big_tests/tests/mam_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@
-import(config_parser_helper, [config/2, mod_config/2]).

config_opts(ExtraOpts) ->
lists:foldl(fun(Step, OptsIn) -> set_opts(Step, OptsIn) end,
ExtraOpts, [defaults, backend, pm, muc, async_writer]).
lists:foldl(fun set_opts/2, ExtraOpts, [defaults, backend, pm, muc, async_writer]).

set_opts(defaults, Opts) ->
mod_config(mod_mam_meta, Opts);
Expand Down
216 changes: 112 additions & 104 deletions big_tests/tests/mod_global_distrib_SUITE.erl

Large diffs are not rendered by default.

17 changes: 3 additions & 14 deletions doc/modules/mod_global_distrib.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ The endpoint list will be shared with other datacenters via the replicated backe

#### `modules.mod_global_distrib.connections.advertised_endpoints`
* **Syntax:** Array of TOML tables with the following keys: `host` and `port`, and the following values: {host = `string`, port = `non_negative_integer`}
* **Default:** not set
* **Default:** not set, the value of `endpoints` is used (without resolution).
* **Example:** `advertised_endpoints = [{host = "172.16.0.2", port = 5555}]`

A list of endpoints which will be advertised in Redis and therefore used to establish connection with this node by other nodes. If not specified, `endpoints` value (after resolution) is considered `advertised_endpoints`. The host may be either IP or domain, just like in case of endpoints. The difference is, the domain name won't be resolved but inserted directly to the mappings backend instead.
A list of endpoints which will be advertised in Redis and therefore used to establish connection with this node by other nodes. The host may be either IP or domain, just like in case of endpoints. The difference is, the domain name won't be resolved but inserted directly to the mappings backend instead.

#### `modules.mod_global_distrib.connections.connections_per_endpoint`
* **Syntax:** non-negative integer
Expand Down Expand Up @@ -208,7 +208,7 @@ These options will be passed to the `fast_tls` driver.

#### `modules.mod_global_distrib.connections.tls.ciphers`
* **Syntax:** string
* **Default:** not set
* **Default:** `"TLSv1.2:TLSv1.3"`
* **Example:** `ciphers = "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384"`

Cipher suites to use with StartTLS or TLS. Please refer to the [OpenSSL documentation](https://www.openssl.org/docs/man1.0.2/man1/ciphers.html) for the cipher string format.
Expand Down Expand Up @@ -302,11 +302,6 @@ Number of times message delivery will be retried in case of errors.

`mod_global_distrib` extension relies on [`mod_disco`](mod_disco.md)'s option `users_can_see_hidden_services`, when provided. If it is not configured, the default value is `true`. `mod_disco` does not have to be enabled for `mod_global_distrib` to work, as this parameter is used only for processing Disco requests by Global Distribution.

## Overriding remote datacenter endpoints

There may be cases when the endpoint list given via **endpoints** option does not accurately specify endpoints on which the node may be reached from other datacenters; e.g. in case the node is behind NAT, or in testing environment.
The endpoints used for connection to a remote datacenter may be overridden by global option `{ {global_distrib_addr, Host}, [{IP, Port}] }`.

## Example configuration

### Configuring mod_global_distrib
Expand All @@ -326,12 +321,6 @@ The endpoints used for connection to a remote datacenter may be overridden by gl
redis.pool = "global_distrib"
```

### Overriding endpoints to a remote datacenter

```erlang
{ {global_distrib_addr, "datacenter2.example.com"}, [{"124.12.4.3", 5556}, {"182.172.23.55", 5555}] }.
```

### Configuring Dynomite

For more information about Dynomite configuration, consult [Dynomite wiki](https://github.com/Netflix/dynomite/wiki).
Expand Down
2 changes: 1 addition & 1 deletion src/gen_mod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
-module(gen_mod).
-author('alexey@process-one.net').

-export_type([opt_key/0, opt_value/0, module_opts/0]).
-export_type([key_path/0, opt_key/0, opt_value/0, module_opts/0]).

-export([
% Modules start & stop, do NOT use in the tests, use mongoose_modules API instead
Expand Down
157 changes: 88 additions & 69 deletions src/global_distrib/mod_global_distrib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,56 @@
-export([deps/2, start/2, stop/1, config_spec/0]).
-export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]).
-export([maybe_reroute/1]).
-export([process_endpoints/1, process_bounce/1]).
-export([process_opts/1, process_endpoint/1]).

-ignore_xref([maybe_reroute/1, remove_metadata/2]).

%%--------------------------------------------------------------------
%% gen_mod API
%% See "gen_mod logic" block below in this file
%%--------------------------------------------------------------------

-spec deps(Host :: jid:server(), Opts :: proplists:proplist()) -> gen_mod_deps:deps().
deps(Host, Opts) ->
mod_global_distrib_utils:deps(?MODULE, Host, Opts, fun deps/1).
%% Note: while this module should be enabled for all hosts,
%% it needs to be started only once - this is why deps/2 and start/2
%% do nothing for hosts other than global_host

-spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any().
start(Host, Opts0) ->
Opts = [{message_ttl, 4} | Opts0],
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0).
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
deps(HostType, Opts = #{global_host := HostType, bounce := BounceOpts}) ->
%% Start each required module with the same opts for simplicity
[{Mod, Opts, hard} || Mod <- dep_modules() ++ bounce_modules(BounceOpts)];
deps(_HostType, #{}) ->
[].

-spec stop(Host :: jid:lserver()) -> any().
stop(Host) ->
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0).
dep_modules() ->
[mod_global_distrib_utils, mod_global_distrib_mapping, mod_global_distrib_disco,
mod_global_distrib_receiver, mod_global_distrib_hosts_refresher].

bounce_modules(#{enabled := true}) -> [mod_global_distrib_bounce];
bounce_modules(#{enabled := false}) -> [].

-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
start(HostType, #{global_host := HostType}) ->
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
ejabberd_hooks:add(hooks());
start(_HostType, #{}) ->
ok.

-spec stop(mongooseim:host_type()) -> any().
stop(HostType) ->
case gen_mod:get_module_opt(HostType, ?MODULE, global_host) of
HostType -> ejabberd_hooks:delete(hooks());
_ -> ok
end.

hooks() ->
[{filter_packet, global, ?MODULE, maybe_reroute, 99}].

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
#section{
items = #{<<"global_host">> => #option{type = string,
items = #{<<"global_host">> => #option{type = binary,
validate = domain},
<<"local_host">> => #option{type = string,
<<"local_host">> => #option{type = binary,
validate = domain},
<<"message_ttl">> => #option{type = integer,
validate = non_negative},
Expand All @@ -67,13 +89,17 @@ config_spec() ->
<<"cache">> => cache_spec(),
<<"bounce">> => bounce_spec()
},
required = [<<"global_host">>, <<"local_host">>]
required = [<<"global_host">>, <<"local_host">>],
defaults = #{<<"message_ttl">> => 4,
<<"hosts_refresh_interval">> => 3000},
format_items = map,
process = fun ?MODULE:process_opts/1
}.

connections_spec() ->
#section{
items = #{<<"endpoints">> => #list{items = endpoints_spec()},
<<"advertised_endpoints">> => #list{items = endpoints_spec()},
items = #{<<"endpoints">> => #list{items = endpoint_spec()},
<<"advertised_endpoints">> => #list{items = endpoint_spec()},
<<"connections_per_endpoint">> => #option{type = integer,
validate = non_negative},
<<"endpoint_refresh_interval">> => #option{type = integer,
Expand All @@ -83,18 +109,25 @@ connections_spec() ->
<<"disabled_gc_interval">> => #option{type = integer,
validate = positive},
<<"tls">> => tls_spec()
}
},
defaults = #{<<"connections_per_endpoint">> => 1,
<<"endpoint_refresh_interval">> => 60,
<<"endpoint_refresh_interval_when_empty">> => 3,
<<"disabled_gc_interval">> => 60},
format_items = map,
include = always
}.

endpoints_spec() ->
endpoint_spec() ->
#section{
items = #{<<"host">> => #option{type = string,
validate = network_address},
<<"port">> => #option{type = integer,
validate = port}
},
required = all,
process = fun ?MODULE:process_endpoints/1
format_items = map,
process = fun ?MODULE:process_endpoint/1
}.

tls_spec() ->
Expand All @@ -109,7 +142,8 @@ tls_spec() ->
validate = filename}
},
required = [<<"certfile">>, <<"cacertfile">>],
wrap = {kv, tls_opts}
defaults = #{<<"ciphers">> => "TLSv1.2:TLSv1.3"},
format_items = map
}.

redis_spec() ->
Expand All @@ -120,7 +154,12 @@ redis_spec() ->
validate = positive},
<<"refresh_after">> => #option{type = integer,
validate = non_negative}
}
},
defaults = #{<<"pool">> => global_distrib,
<<"expire_after">> => 120,
<<"refresh_after">> => 60},
format_items = map,
include = always
}.

cache_spec() ->
Expand All @@ -132,7 +171,14 @@ cache_spec() ->
validate = non_negative},
<<"max_jids">> => #option{type = integer,
validate = non_negative}
}
},
defaults = #{<<"cache_missed">> => true,
<<"domain_lifetime_seconds">> => 600,
<<"jid_lifetime_seconds">> => 5,
<<"max_jids">> => 10000
},
format_items = map,
include = always
}.

bounce_spec() ->
Expand All @@ -142,20 +188,29 @@ bounce_spec() ->
validate = non_negative},
<<"max_retries">> => #option{type = integer,
validate = non_negative}
},
process = fun ?MODULE:process_bounce/1
},
defaults = #{<<"enabled">> => true,
<<"resend_after_ms">> => 200,
<<"max_retries">> => 4},
format_items = map,
include = always
}.

process_endpoints(KV) ->
{[[{host, Host}], [{port, Port}]], []} = proplists:split(KV, [host, port]),
{Host, Port}.
-spec process_opts(gen_mod:module_opts()) -> gen_mod:module_opts().
process_opts(Opts = #{local_host := LocalHost, connections := Connections}) ->
Opts#{connections := process_connections(LocalHost, Connections)}.

process_bounce(KVs) ->
{[EnabledOpts], Opts} = proplists:split(KVs, [enabled]),
bounce_value(EnabledOpts, Opts).
process_connections(_LocalHost, Opts = #{advertised_endpoints := _,
endpoints := Endpoints}) ->
Opts#{resolved_endpoints => mod_global_distrib_utils:resolve_endpoints(Endpoints)};
process_connections(LocalHost, Opts = #{endpoints := Endpoints}) ->
process_connections(LocalHost, Opts#{advertised_endpoints => Endpoints});
process_connections(LocalHost, Opts) ->
process_connections(LocalHost, Opts#{endpoints => [{binary_to_list(LocalHost), 5555}]}).

bounce_value([{enabled, false}], _) -> false;
bounce_value(_, Opts) -> Opts.
-spec process_endpoint(map()) -> mod_global_distrib_utils:endpoint().
process_endpoint(#{host := Host, port := Port}) ->
{Host, Port}.

%%--------------------------------------------------------------------
%% public functions
Expand Down Expand Up @@ -302,42 +357,6 @@ get_bound_connection(Server, GDID, Pid) when is_pid(Pid) ->
Pid
end.

%%--------------------------------------------------------------------
%% gen_mod logic
%%--------------------------------------------------------------------

-spec deps(Opts :: proplists:proplist()) -> gen_mod_deps:deps().
deps(Opts) ->
ConnectionsOpts =
lists:ukeysort(1, proplists:get_value(connections, Opts, []) ++ default_conn_opts()),
CacheOpts = proplists:get_value(cache, Opts, []),
BounceOpts = proplists:get_value(bounce, Opts, []),

Deps0 = [{mod_global_distrib_mapping, CacheOpts ++ Opts, hard},
{mod_global_distrib_disco, Opts, hard},
{mod_global_distrib_receiver, ConnectionsOpts ++ Opts, hard},
{mod_global_distrib_sender, ConnectionsOpts ++ Opts, hard},
{mod_global_distrib_hosts_refresher, Opts, hard}],
case BounceOpts of
false -> Deps0;
_ -> [{mod_global_distrib_bounce, BounceOpts ++ Opts, hard} | Deps0]
end.

default_conn_opts() ->
[{tls_opts, false}].

-spec start() -> any().
start() ->
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
ejabberd_hooks:add(filter_packet, global, ?MODULE, maybe_reroute, 99).

-spec stop() -> any().
stop() ->
ejabberd_hooks:delete(filter_packet, global, ?MODULE, maybe_reroute, 99).

%%--------------------------------------------------------------------

-spec lookup_recipients_host(TargetHost :: binary() | undefined,
To :: jid:jid(),
LocalHost :: binary(),
Expand Down
Loading

0 comments on commit aa85b21

Please sign in to comment.