Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global distribution opts in a map #3587

Merged
merged 19 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions big_tests/tests/mam_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@
-import(config_parser_helper, [config/2, mod_config/2]).

config_opts(ExtraOpts) ->
lists:foldl(fun(Step, OptsIn) -> set_opts(Step, OptsIn) end,
ExtraOpts, [defaults, backend, pm, muc, async_writer]).
lists:foldl(fun set_opts/2, ExtraOpts, [defaults, backend, pm, muc, async_writer]).

set_opts(defaults, Opts) ->
mod_config(mod_mam_meta, Opts);
Expand Down
216 changes: 112 additions & 104 deletions big_tests/tests/mod_global_distrib_SUITE.erl

Large diffs are not rendered by default.

17 changes: 3 additions & 14 deletions doc/modules/mod_global_distrib.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ The endpoint list will be shared with other datacenters via the replicated backe

#### `modules.mod_global_distrib.connections.advertised_endpoints`
* **Syntax:** Array of TOML tables with the following keys: `host` and `port`, and the following values: {host = `string`, port = `non_negative_integer`}
* **Default:** not set
* **Default:** not set, the value of `endpoints` is used (without resolution).
* **Example:** `advertised_endpoints = [{host = "172.16.0.2", port = 5555}]`

A list of endpoints which will be advertised in Redis and therefore used to establish connection with this node by other nodes. If not specified, `endpoints` value (after resolution) is considered `advertised_endpoints`. The host may be either IP or domain, just like in case of endpoints. The difference is, the domain name won't be resolved but inserted directly to the mappings backend instead.
A list of endpoints which will be advertised in Redis and therefore used to establish connection with this node by other nodes. The host may be either IP or domain, just like in case of endpoints. The difference is, the domain name won't be resolved but inserted directly to the mappings backend instead.

#### `modules.mod_global_distrib.connections.connections_per_endpoint`
* **Syntax:** non-negative integer
Expand Down Expand Up @@ -208,7 +208,7 @@ These options will be passed to the `fast_tls` driver.

#### `modules.mod_global_distrib.connections.tls.ciphers`
* **Syntax:** string
* **Default:** not set
* **Default:** `"TLSv1.2:TLSv1.3"`
* **Example:** `ciphers = "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384"`

Cipher suites to use with StartTLS or TLS. Please refer to the [OpenSSL documentation](https://www.openssl.org/docs/man1.0.2/man1/ciphers.html) for the cipher string format.
Expand Down Expand Up @@ -302,11 +302,6 @@ Number of times message delivery will be retried in case of errors.

`mod_global_distrib` extension relies on [`mod_disco`](mod_disco.md)'s option `users_can_see_hidden_services`, when provided. If it is not configured, the default value is `true`. `mod_disco` does not have to be enabled for `mod_global_distrib` to work, as this parameter is used only for processing Disco requests by Global Distribution.

## Overriding remote datacenter endpoints

There may be cases when the endpoint list given via **endpoints** option does not accurately specify endpoints on which the node may be reached from other datacenters; e.g. in case the node is behind NAT, or in testing environment.
The endpoints used for connection to a remote datacenter may be overridden by global option `{ {global_distrib_addr, Host}, [{IP, Port}] }`.

## Example configuration

### Configuring mod_global_distrib
Expand All @@ -326,12 +321,6 @@ The endpoints used for connection to a remote datacenter may be overridden by gl
redis.pool = "global_distrib"
```

### Overriding endpoints to a remote datacenter

```erlang
{ {global_distrib_addr, "datacenter2.example.com"}, [{"124.12.4.3", 5556}, {"182.172.23.55", 5555}] }.
```

### Configuring Dynomite

For more information about Dynomite configuration, consult [Dynomite wiki](https://github.com/Netflix/dynomite/wiki).
Expand Down
2 changes: 1 addition & 1 deletion src/gen_mod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
-module(gen_mod).
-author('alexey@process-one.net').

-export_type([opt_key/0, opt_value/0, module_opts/0]).
-export_type([key_path/0, opt_key/0, opt_value/0, module_opts/0]).

-export([
% Modules start & stop, do NOT use in the tests, use mongoose_modules API instead
Expand Down
157 changes: 88 additions & 69 deletions src/global_distrib/mod_global_distrib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,56 @@
-export([deps/2, start/2, stop/1, config_spec/0]).
-export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]).
-export([maybe_reroute/1]).
-export([process_endpoints/1, process_bounce/1]).
-export([process_opts/1, process_endpoint/1]).

-ignore_xref([maybe_reroute/1, remove_metadata/2]).

%%--------------------------------------------------------------------
%% gen_mod API
%% See "gen_mod logic" block below in this file
%%--------------------------------------------------------------------

-spec deps(Host :: jid:server(), Opts :: proplists:proplist()) -> gen_mod_deps:deps().
deps(Host, Opts) ->
mod_global_distrib_utils:deps(?MODULE, Host, Opts, fun deps/1).
%% Note: while this module should be enabled for all hosts,
%% it needs to be started only once - this is why deps/2 and start/2
%% do nothing for hosts other than global_host

-spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any().
start(Host, Opts0) ->
Opts = [{message_ttl, 4} | Opts0],
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0).
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
deps(HostType, Opts = #{global_host := HostType, bounce := BounceOpts}) ->
%% Start each required module with the same opts for simplicity
[{Mod, Opts, hard} || Mod <- dep_modules() ++ bounce_modules(BounceOpts)];
deps(_HostType, #{}) ->
[].

-spec stop(Host :: jid:lserver()) -> any().
stop(Host) ->
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0).
dep_modules() ->
[mod_global_distrib_utils, mod_global_distrib_mapping, mod_global_distrib_disco,
mod_global_distrib_receiver, mod_global_distrib_hosts_refresher].

bounce_modules(#{enabled := true}) -> [mod_global_distrib_bounce];
bounce_modules(#{enabled := false}) -> [].

-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
start(HostType, #{global_host := HostType}) ->
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
ejabberd_hooks:add(hooks());
start(_HostType, #{}) ->
gustawlippa marked this conversation as resolved.
Show resolved Hide resolved
ok.

-spec stop(mongooseim:host_type()) -> any().
stop(HostType) ->
case gen_mod:get_module_opt(HostType, ?MODULE, global_host) of
HostType -> ejabberd_hooks:delete(hooks());
_ -> ok
end.

hooks() ->
[{filter_packet, global, ?MODULE, maybe_reroute, 99}].

-spec config_spec() -> mongoose_config_spec:config_section().
config_spec() ->
#section{
items = #{<<"global_host">> => #option{type = string,
items = #{<<"global_host">> => #option{type = binary,
validate = domain},
<<"local_host">> => #option{type = string,
<<"local_host">> => #option{type = binary,
validate = domain},
<<"message_ttl">> => #option{type = integer,
validate = non_negative},
Expand All @@ -67,13 +89,17 @@ config_spec() ->
<<"cache">> => cache_spec(),
<<"bounce">> => bounce_spec()
},
required = [<<"global_host">>, <<"local_host">>]
required = [<<"global_host">>, <<"local_host">>],
defaults = #{<<"message_ttl">> => 4,
<<"hosts_refresh_interval">> => 3000},
format_items = map,
process = fun ?MODULE:process_opts/1
}.

connections_spec() ->
#section{
items = #{<<"endpoints">> => #list{items = endpoints_spec()},
<<"advertised_endpoints">> => #list{items = endpoints_spec()},
items = #{<<"endpoints">> => #list{items = endpoint_spec()},
<<"advertised_endpoints">> => #list{items = endpoint_spec()},
<<"connections_per_endpoint">> => #option{type = integer,
validate = non_negative},
<<"endpoint_refresh_interval">> => #option{type = integer,
Expand All @@ -83,18 +109,25 @@ connections_spec() ->
<<"disabled_gc_interval">> => #option{type = integer,
validate = positive},
<<"tls">> => tls_spec()
}
},
defaults = #{<<"connections_per_endpoint">> => 1,
<<"endpoint_refresh_interval">> => 60,
<<"endpoint_refresh_interval_when_empty">> => 3,
<<"disabled_gc_interval">> => 60},
format_items = map,
include = always
}.

endpoints_spec() ->
endpoint_spec() ->
#section{
items = #{<<"host">> => #option{type = string,
validate = network_address},
<<"port">> => #option{type = integer,
validate = port}
},
required = all,
process = fun ?MODULE:process_endpoints/1
format_items = map,
process = fun ?MODULE:process_endpoint/1
}.

tls_spec() ->
Expand All @@ -109,7 +142,8 @@ tls_spec() ->
validate = filename}
},
required = [<<"certfile">>, <<"cacertfile">>],
wrap = {kv, tls_opts}
defaults = #{<<"ciphers">> => "TLSv1.2:TLSv1.3"},
format_items = map
}.

redis_spec() ->
Expand All @@ -120,7 +154,12 @@ redis_spec() ->
validate = positive},
<<"refresh_after">> => #option{type = integer,
validate = non_negative}
}
},
defaults = #{<<"pool">> => global_distrib,
<<"expire_after">> => 120,
<<"refresh_after">> => 60},
format_items = map,
include = always
}.

cache_spec() ->
Expand All @@ -132,7 +171,14 @@ cache_spec() ->
validate = non_negative},
<<"max_jids">> => #option{type = integer,
validate = non_negative}
}
},
defaults = #{<<"cache_missed">> => true,
<<"domain_lifetime_seconds">> => 600,
<<"jid_lifetime_seconds">> => 5,
<<"max_jids">> => 10000
},
format_items = map,
include = always
}.

bounce_spec() ->
Expand All @@ -142,20 +188,29 @@ bounce_spec() ->
validate = non_negative},
<<"max_retries">> => #option{type = integer,
validate = non_negative}
},
process = fun ?MODULE:process_bounce/1
},
defaults = #{<<"enabled">> => true,
<<"resend_after_ms">> => 200,
<<"max_retries">> => 4},
format_items = map,
include = always
}.

process_endpoints(KV) ->
{[[{host, Host}], [{port, Port}]], []} = proplists:split(KV, [host, port]),
{Host, Port}.
-spec process_opts(gen_mod:module_opts()) -> gen_mod:module_opts().
process_opts(Opts = #{local_host := LocalHost, connections := Connections}) ->
Opts#{connections := process_connections(LocalHost, Connections)}.

process_bounce(KVs) ->
{[EnabledOpts], Opts} = proplists:split(KVs, [enabled]),
bounce_value(EnabledOpts, Opts).
process_connections(_LocalHost, Opts = #{advertised_endpoints := _,
endpoints := Endpoints}) ->
Opts#{resolved_endpoints => mod_global_distrib_utils:resolve_endpoints(Endpoints)};
process_connections(LocalHost, Opts = #{endpoints := Endpoints}) ->
process_connections(LocalHost, Opts#{advertised_endpoints => Endpoints});
process_connections(LocalHost, Opts) ->
process_connections(LocalHost, Opts#{endpoints => [{binary_to_list(LocalHost), 5555}]}).

bounce_value([{enabled, false}], _) -> false;
bounce_value(_, Opts) -> Opts.
-spec process_endpoint(map()) -> mod_global_distrib_utils:endpoint().
process_endpoint(#{host := Host, port := Port}) ->
{Host, Port}.

%%--------------------------------------------------------------------
%% public functions
Expand Down Expand Up @@ -302,42 +357,6 @@ get_bound_connection(Server, GDID, Pid) when is_pid(Pid) ->
Pid
end.

%%--------------------------------------------------------------------
%% gen_mod logic
%%--------------------------------------------------------------------

-spec deps(Opts :: proplists:proplist()) -> gen_mod_deps:deps().
deps(Opts) ->
ConnectionsOpts =
lists:ukeysort(1, proplists:get_value(connections, Opts, []) ++ default_conn_opts()),
CacheOpts = proplists:get_value(cache, Opts, []),
BounceOpts = proplists:get_value(bounce, Opts, []),

Deps0 = [{mod_global_distrib_mapping, CacheOpts ++ Opts, hard},
{mod_global_distrib_disco, Opts, hard},
{mod_global_distrib_receiver, ConnectionsOpts ++ Opts, hard},
{mod_global_distrib_sender, ConnectionsOpts ++ Opts, hard},
{mod_global_distrib_hosts_refresher, Opts, hard}],
case BounceOpts of
false -> Deps0;
_ -> [{mod_global_distrib_bounce, BounceOpts ++ Opts, hard} | Deps0]
end.

default_conn_opts() ->
[{tls_opts, false}].

-spec start() -> any().
start() ->
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
ejabberd_hooks:add(filter_packet, global, ?MODULE, maybe_reroute, 99).

-spec stop() -> any().
stop() ->
ejabberd_hooks:delete(filter_packet, global, ?MODULE, maybe_reroute, 99).

%%--------------------------------------------------------------------

-spec lookup_recipients_host(TargetHost :: binary() | undefined,
To :: jid:jid(),
LocalHost :: binary(),
Expand Down
Loading