Skip to content

Commit

Permalink
Merge pull request #12699 from rabbitmq/mergify/bp/v4.0.x/pr-12659
Browse files Browse the repository at this point in the history
Make it possible to set some cluster metadata besides the name using tags (backport #12659)
  • Loading branch information
michaelklishin authored Nov 11, 2024
2 parents 3e07594 + 81c897b commit d1a5dc2
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 6 deletions.
14 changes: 14 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2688,6 +2688,20 @@ fun(Conf) ->
end
end}.

{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
{datatype, [binary]}
]}.

{translation, "rabbit.cluster_tags",
fun(Conf) ->
case cuttlefish:conf_get("cluster_tags", Conf, undefined) of
none -> [];
_ ->
Settings = cuttlefish_variable:filter_by_prefix("cluster_tags", Conf),
[ {list_to_binary(K), V} || {[_, K], V} <- Settings]
end
end}.

% ===============================
% Validators
% ===============================
Expand Down
17 changes: 16 additions & 1 deletion deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

%%---------------------------------------------------------------------------
%% Boot steps.
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0,
-export([update_cluster_tags/0, maybe_insert_default_data/0, boot_delegate/0, recover/0,
pg_local_amqp_session/0,
pg_local_amqp_connection/0]).

Expand Down Expand Up @@ -208,6 +208,12 @@
{requires, recovery},
{enables, routing_ready}]}).


-rabbit_boot_step({cluster_tags,
[{description, "Set cluster tags"},
{mfa, {?MODULE, update_cluster_tags, []}},
{requires, core_initialized}]}).

-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
{requires, [core_initialized, recovery]}]}).
Expand Down Expand Up @@ -1138,6 +1144,15 @@ pg_local_amqp_connection() ->
pg_local_scope(Prefix) ->
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).


-spec update_cluster_tags() -> 'ok'.

update_cluster_tags() ->
Tags = application:get_env(rabbit, cluster_tags, []),
?LOG_DEBUG("Seeding cluster tags from application environment key...",
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
rabbit_runtime_parameters:set_global(cluster_tags, Tags, <<"internal_user">>).

-spec maybe_insert_default_data() -> 'ok'.

maybe_insert_default_data() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning

def run([], %{node: node_name, timeout: timeout} = opts) do
status =
status0 =
case :rabbit_misc.rpc_call(node_name, :rabbit_db_cluster, :cli_cluster_status, []) do
{:badrpc, {:EXIT, {:undef, _}}} ->
:rabbit_misc.rpc_call(node_name, :rabbit_mnesia, :status, [])
Expand All @@ -45,11 +45,13 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
status
end

case status do
case status0 do
{:badrpc, _} = err ->
err

status ->
status0 ->
tags = cluster_tags(node_name, timeout)
status = status0 ++ [{:cluster_tags, tags}]
case :rabbit_misc.rpc_call(node_name, :rabbit_nodes, :list_running, []) do
{:badrpc, _} = err ->
err
Expand Down Expand Up @@ -122,7 +124,6 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do

def output(result, %{node: node_name}) when is_list(result) do
m = result_map(result)

total_cores = Enum.reduce(m[:cpu_cores], 0, fn {_, val}, acc -> acc + val end)

cluster_name_section = [
Expand All @@ -131,6 +132,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
"Total CPU cores available cluster-wide: #{total_cores}"
]

cluster_tag_section =
[
"\n#{bright("Cluster Tags")}\n"
] ++
case m[:cluster_tags] do
[] -> ["(none)"]
tags -> cluster_tag_lines(tags)
end

disk_nodes_section =
[
"\n#{bright("Disk Nodes")}\n"
Expand Down Expand Up @@ -210,6 +220,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do

lines =
cluster_name_section ++
cluster_tag_section ++
disk_nodes_section ++
ram_nodes_section ++
running_nodes_section ++
Expand Down Expand Up @@ -260,6 +271,7 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
# {rabbit@warp10,[{resource_limit,memory,rabbit@warp10}]}]}]
%{
cluster_name: Keyword.get(result, :cluster_name),
cluster_tags: result |> Keyword.get(:cluster_tags, []),
disk_nodes: result |> Keyword.get(:nodes, []) |> Keyword.get(:disc, []),
ram_nodes: result |> Keyword.get(:nodes, []) |> Keyword.get(:ram, []),
running_nodes: result |> Keyword.get(:running_nodes, []) |> Enum.map(&to_string/1),
Expand Down Expand Up @@ -383,6 +395,18 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
{node, result}
end

defp cluster_tags(node, timeout) do
case :rabbit_misc.rpc_call(
node,
:rabbit_runtime_parameters,
:value_global,
[:cluster_tags],
timeout) do
:not_found -> []
tags -> tags
end
end

defp node_lines(nodes) do
Enum.map(nodes, &to_string/1) |> Enum.sort()
end
Expand Down Expand Up @@ -413,4 +437,10 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ClusterStatusCommand do
defp maintenance_lines(mapping) do
Enum.map(mapping, fn {node, status} -> "Node: #{node}, status: #{status}" end)
end

defp cluster_tag_lines(mapping) do
Enum.map(mapping, fn {key, value} ->
"#{key}: #{value}"
end)
end
end
8 changes: 8 additions & 0 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_overview.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ to_json(ReqData, Context = #context{user = User = #user{tags = Tags}}) ->
{product_name, list_to_binary(rabbit:product_name())},
{rabbitmq_version, list_to_binary(rabbit:base_product_version())},
{cluster_name, rabbit_nodes:cluster_name()},
{cluster_tags, cluster_tags()},
{erlang_version, erlang_version()},
{erlang_full_version, erlang_full_version()},
{release_series_support_status, rabbit_release_series:readable_support_status()},
Expand Down Expand Up @@ -182,3 +183,10 @@ transform_retention_intervals([{MaxAgeInSeconds, _}|Rest], Acc) ->
0
end,
transform_retention_intervals(Rest, [AccVal|Acc]).

cluster_tags() ->
case rabbit_runtime_parameters:value_global(cluster_tags) of
not_found ->
[];
Tags -> Tags
end.
22 changes: 21 additions & 1 deletion deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ all_tests() -> [
disabled_qq_replica_opers_test,
qq_status_test,
list_deprecated_features_test,
list_used_deprecated_features_test
list_used_deprecated_features_test,
cluster_tags_test
].

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -282,6 +283,12 @@ init_per_testcase(Testcase = disabled_qq_replica_opers_test, Config) ->
rabbit_ct_broker_helpers:rpc_all(Config,
application, set_env, [rabbitmq_management, restrictions, Restrictions]),
rabbit_ct_helpers:testcase_started(Config, Testcase);
init_per_testcase(Testcase = cluster_tags_test, Config) ->
Tags = [{<<"az">>, <<"us-east-3">>}, {<<"region">>,<<"us-east">>}, {<<"environment">>,<<"production">>}],
rpc(
Config, rabbit_runtime_parameters, set_global,
[cluster_tags, Tags, none]),
rabbit_ct_helpers:testcase_started(Config, Testcase);
init_per_testcase(queues_detailed_test, Config) ->
IsEnabled = rabbit_ct_broker_helpers:is_feature_flag_enabled(
Config, detailed_queues_endpoint),
Expand Down Expand Up @@ -348,6 +355,11 @@ end_per_testcase0(disabled_operator_policy_test, Config) ->
end_per_testcase0(disabled_qq_replica_opers_test, Config) ->
rpc(Config, application, unset_env, [rabbitmq_management, restrictions]),
Config;
end_per_testcase0(cluster_tags_test, Config) ->
rpc(
Config, rabbit_runtime_parameters, clear_global,
[cluster_tags, none]),
Config;
end_per_testcase0(Testcase, Config)
when Testcase == list_deprecated_features_test;
Testcase == list_used_deprecated_features_test ->
Expand Down Expand Up @@ -3936,6 +3948,14 @@ list_used_deprecated_features_test(Config) ->
?assertEqual(list_to_binary(Desc), maps:get(desc, Feature)),
?assertEqual(list_to_binary(DocUrl), maps:get(doc_url, Feature)).

cluster_tags_test(Config) ->
Overview = http_get(Config, "/overview"),
Tags = maps:get(cluster_tags, Overview),
ExpectedTags = #{az => <<"us-east-3">>,environment => <<"production">>,
region => <<"us-east">>},
?assertEqual(ExpectedTags, Tags),
passed.

%% -------------------------------------------------------------------
%% Helpers.
%% -------------------------------------------------------------------
Expand Down

0 comments on commit d1a5dc2

Please sign in to comment.