Skip to content

Commit 9271b32

Browse files
Import runtime parameters after the topology
To reduce the likelihood of Shovel startup racing with queue startup. See #2799 for details. Closes #2799 for details. (cherry picked from commit 83e3f75)
1 parent af5a1de commit 9271b32

File tree

1 file changed

+19
-10
lines changed

1 file changed

+19
-10
lines changed

deps/rabbit/src/rabbit_definitions.erl

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -236,14 +236,17 @@ apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
236236
validate_limits(Map),
237237
concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
238238
concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
239-
sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
239+
240+
concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
241+
concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
242+
concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
243+
240244
sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
241245
%% importing policies concurrently can be unsafe as queues will be getting
242246
%% potentially out of order notifications of applicable policy changes
243247
sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
244-
concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
245-
concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
246-
concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
248+
sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
249+
247250
SuccessFun(),
248251
ok
249252
catch {error, E} -> {error, E};
@@ -260,13 +263,16 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
260263
[VHost, ActingUser]),
261264
try
262265
validate_limits(Map, VHost),
266+
267+
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
268+
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
269+
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
270+
263271
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
264272
%% importing policies concurrently can be unsafe as queues will be getting
265273
%% potentially out of order notifications of applicable policy changes
266274
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
267-
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
268-
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
269-
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
275+
270276
SuccessFun()
271277
catch {error, E} -> {error, format(E)};
272278
exit:E -> {error, format(E)}
@@ -283,13 +289,16 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
283289
[VHost, ActingUser]),
284290
try
285291
validate_limits(Map, VHost),
292+
293+
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
294+
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
295+
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
296+
286297
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
287298
%% importing policies concurrently can be unsafe as queues will be getting
288299
%% potentially out of order notifications of applicable policy changes
289300
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
290-
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
291-
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
292-
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
301+
293302
SuccessFun()
294303
catch {error, E} -> ErrorFun(format(E));
295304
exit:E -> ErrorFun(format(E))

0 commit comments

Comments
 (0)