Skip to content

Commit 1a1aaaf

Browse files
Merge pull request #2693 from rabbitmq/mqtt-machine-opt
MQTT machine versions backward compatibility (cherry picked from commit 1e66a74)
1 parent 9653d50 commit 1a1aaaf

File tree

5 files changed

+206
-13
lines changed

5 files changed

+206
-13
lines changed

deps/rabbitmq_mqtt/include/mqtt_machine.hrl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,8 @@
66
%%
77

88
-record(machine_state, {client_ids = #{},
9-
pids = #{}}).
9+
pids = #{},
10+
%% add acouple of fields for future extensibility
11+
reserved_1,
12+
reserved_2}).
13+
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
8+
-record(machine_state, {client_ids = #{}}).

deps/rabbitmq_mqtt/src/mqtt_machine.erl

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99

1010
-include("mqtt_machine.hrl").
1111

12-
-export([init/1,
12+
-export([version/0,
13+
which_module/1,
14+
init/1,
1315
apply/3,
1416
state_enter/2,
1517
notify_connection/2]).
@@ -24,6 +26,10 @@
2426
-type command() :: {register, client_id(), pid()} |
2527
{unregister, client_id(), pid()} |
2628
list.
29+
version() -> 1.
30+
31+
which_module(1) -> ?MODULE;
32+
which_module(0) -> mqtt_machine_v0.
2733

2834
-spec init(config()) -> state().
2935
init(_Conf) ->
@@ -41,12 +47,25 @@ apply(_Meta, {register, ClientId, Pid},
4147
{monitor, process, Pid},
4248
{mod_call, ?MODULE, notify_connection,
4349
[OldPid, duplicate_id]}],
44-
{Effects0, maps:remove(ClientId, Ids), Pids0};
45-
_ ->
46-
Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
50+
Pids2 = case maps:take(OldPid, Pids0) of
51+
error ->
52+
Pids0;
53+
{[ClientId], Pids1} ->
54+
Pids1;
55+
{ClientIds, Pids1} ->
56+
Pids1#{ClientId => lists:delete(ClientId, ClientIds)}
57+
end,
58+
Pids3 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
59+
[ClientId], Pids2),
60+
{Effects0, maps:remove(ClientId, Ids), Pids3};
61+
62+
{ok, Pid} ->
63+
{[], Ids, Pids0};
64+
error ->
65+
Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
4766
[ClientId], Pids0),
48-
Effects0 = [{monitor, process, Pid}],
49-
{Effects0, Ids, Pids1}
67+
Effects0 = [{monitor, process, Pid}],
68+
{Effects0, Ids, Pids1}
5069
end,
5170
State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1),
5271
pids = Pids},
@@ -139,9 +158,17 @@ apply(Meta, {leave, Node}, #machine_state{client_ids = Ids,
139158
State = State0#machine_state{client_ids = Keep,
140159
pids = maps:without(maps:keys(Remove), Pids0)},
141160
{State, ok, Effects ++ snapshot_effects(Meta, State)};
142-
161+
apply(_Meta, {machine_version, 0, 1}, {machine_state, Ids}) ->
162+
Pids = maps:fold(
163+
fun(Id, Pid, Acc) ->
164+
maps:update_with(Pid,
165+
fun(CIds) -> [Id | CIds] end,
166+
[Id], Acc)
167+
end, #{}, Ids),
168+
{#machine_state{client_ids = Ids,
169+
pids = Pids}, ok, []};
143170
apply(_Meta, Unknown, State) ->
144-
error_logger:error_msg("MQTT Raft state machine received unknown command ~p~n", [Unknown]),
171+
error_logger:error_msg("MQTT Raft state machine v1 received unknown command ~p~n", [Unknown]),
145172
{State, {error, {unknown_command, Unknown}}, []}.
146173

147174
state_enter(leader, State) ->
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
6+
%%
7+
-module(mqtt_machine_v0).
8+
-behaviour(ra_machine).
9+
10+
-include("mqtt_machine_v0.hrl").
11+
12+
-export([init/1,
13+
apply/3,
14+
state_enter/2,
15+
notify_connection/2]).
16+
17+
-type state() :: #machine_state{}.
18+
19+
-type config() :: map().
20+
21+
-type reply() :: {ok, term()} | {error, term()}.
22+
-type client_id() :: term().
23+
24+
-type command() :: {register, client_id(), pid()} |
25+
{unregister, client_id(), pid()} |
26+
list.
27+
28+
-spec init(config()) -> state().
29+
init(_Conf) ->
30+
#machine_state{}.
31+
32+
-spec apply(map(), command(), state()) ->
33+
{state(), reply(), ra_machine:effects()}.
34+
apply(_Meta, {register, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) ->
35+
{Effects, Ids1} =
36+
case maps:find(ClientId, Ids) of
37+
{ok, OldPid} when Pid =/= OldPid ->
38+
Effects0 = [{demonitor, process, OldPid},
39+
{monitor, process, Pid},
40+
{mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}],
41+
{Effects0, maps:remove(ClientId, Ids)};
42+
_ ->
43+
Effects0 = [{monitor, process, Pid}],
44+
{Effects0, Ids}
45+
end,
46+
State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1)},
47+
{State, ok, Effects};
48+
49+
apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) ->
50+
State = case maps:find(ClientId, Ids) of
51+
{ok, Pid} -> State0#machine_state{client_ids = maps:remove(ClientId, Ids)};
52+
%% don't delete client id that might belong to a newer connection
53+
%% that kicked the one with Pid out
54+
{ok, _AnotherPid} -> State0;
55+
error -> State0
56+
end,
57+
Effects0 = [{demonitor, process, Pid}],
58+
%% snapshot only when the map has changed
59+
Effects = case State of
60+
State0 -> Effects0;
61+
_ -> Effects0 ++ snapshot_effects(Meta, State)
62+
end,
63+
{State, ok, Effects};
64+
65+
apply(_Meta, {down, DownPid, noconnection}, State) ->
66+
%% Monitor the node the pid is on (see {nodeup, Node} below)
67+
%% so that we can detect when the node is re-connected and discover the
68+
%% actual fate of the connection processes on it
69+
Effect = {monitor, node, node(DownPid)},
70+
{State, ok, Effect};
71+
72+
apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) ->
73+
Ids1 = maps:filter(fun (_ClientId, Pid) when Pid =:= DownPid ->
74+
false;
75+
(_, _) ->
76+
true
77+
end, Ids),
78+
State = State0#machine_state{client_ids = Ids1},
79+
Delta = maps:keys(Ids) -- maps:keys(Ids1),
80+
Effects = lists:map(fun(Id) ->
81+
[{mod_call, rabbit_log, debug,
82+
["MQTT connection with client id '~s' failed", [Id]]}] end, Delta),
83+
{State, ok, Effects ++ snapshot_effects(Meta, State)};
84+
85+
apply(_Meta, {nodeup, Node}, State) ->
86+
%% Work out if any pids that were disconnected are still
87+
%% alive.
88+
%% Re-request the monitor for the pids on the now-back node.
89+
Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node],
90+
{State, ok, Effects};
91+
apply(_Meta, {nodedown, _Node}, State) ->
92+
{State, ok};
93+
94+
apply(Meta, {leave, Node}, #machine_state{client_ids = Ids} = State0) ->
95+
Ids1 = maps:filter(fun (_ClientId, Pid) -> node(Pid) =/= Node end, Ids),
96+
Delta = maps:keys(Ids) -- maps:keys(Ids1),
97+
98+
Effects = lists:foldl(fun (ClientId, Acc) ->
99+
Pid = maps:get(ClientId, Ids),
100+
[
101+
{demonitor, process, Pid},
102+
{mod_call, ?MODULE, notify_connection, [Pid, decommission_node]},
103+
{mod_call, rabbit_log, debug,
104+
["MQTT will remove client ID '~s' from known "
105+
"as its node has been decommissioned", [ClientId]]}
106+
] ++ Acc
107+
end, [], Delta),
108+
109+
State = State0#machine_state{client_ids = Ids1},
110+
{State, ok, Effects ++ snapshot_effects(Meta, State)};
111+
112+
apply(_Meta, Unknown, State) ->
113+
error_logger:error_msg("MQTT Raft state machine received unknown command ~p~n", [Unknown]),
114+
{State, {error, {unknown_command, Unknown}}, []}.
115+
116+
state_enter(leader, State) ->
117+
%% re-request monitors for all known pids, this would clean up
118+
%% records for all connections are no longer around, e.g. right after node restart
119+
[{monitor, process, Pid} || Pid <- all_pids(State)];
120+
state_enter(_, _) ->
121+
[].
122+
123+
%% ==========================
124+
125+
%% Avoids blocking the Raft leader.
126+
notify_connection(Pid, Reason) ->
127+
spawn(fun() -> gen_server2:cast(Pid, Reason) end).
128+
129+
-spec snapshot_effects(map(), state()) -> ra_machine:effects().
130+
snapshot_effects(#{index := RaftIdx}, State) ->
131+
[{release_cursor, RaftIdx, State}].
132+
133+
all_pids(#machine_state{client_ids = Ids}) ->
134+
maps:values(Ids).

deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ all() ->
2222
all_tests() ->
2323
[
2424
basics,
25+
machine_upgrade,
2526
many_downs
2627
].
2728

@@ -55,18 +56,37 @@ end_per_testcase(_TestCase, _Config) ->
5556
basics(_Config) ->
5657
S0 = mqtt_machine:init(#{}),
5758
ClientId = <<"id1">>,
59+
OthPid = spawn(fun () -> ok end),
5860
{S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0),
5961
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1),
6062
?assertMatch(#machine_state{pids = Pids} when map_size(Pids) == 1, S1),
61-
{S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, self()}, S1),
62-
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S2),
63-
{S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2),
63+
{S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, OthPid}, S1),
64+
?assertMatch(#machine_state{client_ids = #{ClientId := OthPid} = Ids}
65+
when map_size(Ids) == 1, S2),
66+
{S3, ok, _} = mqtt_machine:apply(meta(3), {down, OthPid, noproc}, S2),
6467
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3),
65-
{S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, self()}, S2),
68+
{S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, OthPid}, S2),
6669
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4),
6770

6871
ok.
6972

73+
machine_upgrade(_Config) ->
74+
S0 = mqtt_machine_v0:init(#{}),
75+
ClientId = <<"id1">>,
76+
Self = self(),
77+
{S1, ok, _} = mqtt_machine_v0:apply(meta(1), {register, ClientId, self()}, S0),
78+
?assertMatch({machine_state, Ids} when map_size(Ids) == 1, S1),
79+
{S2, ok, _} = mqtt_machine:apply(meta(2), {machine_version, 0, 1}, S1),
80+
?assertMatch(#machine_state{client_ids = #{ClientId := Self},
81+
pids = #{Self := [ClientId]} = Pids}
82+
when map_size(Pids) == 1, S2),
83+
{S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2),
84+
?assertMatch(#machine_state{client_ids = Ids,
85+
pids = Pids}
86+
when map_size(Ids) == 0 andalso map_size(Pids) == 0, S3),
87+
88+
ok.
89+
7090
many_downs(_Config) ->
7191
S0 = mqtt_machine:init(#{}),
7292
Clients = [{list_to_binary(integer_to_list(I)), spawn(fun() -> ok end)}

0 commit comments

Comments
 (0)