1212-include_lib (" amqp_client/include/amqp_client.hrl" ).
1313-compile ([nowarn_export_all , export_all ]).
1414
15+ -define (EXCHANGE_LIMIT , 10 ).
1516
1617all () ->
1718 [
@@ -22,7 +23,8 @@ groups() ->
2223 [
2324 {clustered , [],
2425 [
25- {size_2 , [], [queue_limit ]}
26+ {size_2 , [], [queue_limit ,
27+ exchange_limit ]}
2628 ]}
2729 ].
2830
@@ -34,7 +36,8 @@ init_per_suite(Config0) ->
3436 rabbit_ct_helpers :log_environment (),
3537 Config1 = rabbit_ct_helpers :merge_app_env (
3638 Config0 , {rabbit , [{quorum_tick_interval , 1000 },
37- {cluster_queue_limit , 3 }]}),
39+ {cluster_queue_limit , 3 },
40+ {exchange_max , ? EXCHANGE_LIMIT }]}),
3841 rabbit_ct_helpers :run_setup_steps (Config1 , []).
3942
4043end_per_suite (Config ) ->
@@ -101,48 +104,102 @@ queue_limit(Config) ->
101104 Ch2 = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
102105 Q1 = ? config (queue_name , Config ),
103106 ? assertEqual ({'queue.declare_ok' , Q1 , 0 , 0 },
104- declare (Ch , Q1 )),
107+ declare_queue (Ch , Q1 )),
105108
106109 Q2 = ? config (alt_queue_name , Config ),
107110 ? assertEqual ({'queue.declare_ok' , Q2 , 0 , 0 },
108- declare (Ch , Q2 )),
111+ declare_queue (Ch , Q2 )),
109112
110113 Q3 = ? config (alt_2_queue_name , Config ),
111114 ? assertEqual ({'queue.declare_ok' , Q3 , 0 , 0 },
112- declare (Ch , Q3 )),
115+ declare_queue (Ch , Q3 )),
113116 Q4 = ? config (over_limit_queue_name , Config ),
114117 ExpectedError = list_to_binary (io_lib :format (" PRECONDITION_FAILED - cannot declare queue '~s ': queue limit in cluster (3) is reached" , [Q4 ])),
115118 ? assertExit (
116119 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
117- declare (Ch , Q4 )),
120+ declare_queue (Ch , Q4 )),
118121
119122 % % Trying the second server, in the cluster, but no queues on it,
120123 % % but should still fail as the limit is cluster wide.
121124 ? assertExit (
122125 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
123- declare (Ch2 , Q4 )),
126+ declare_queue (Ch2 , Q4 )),
124127
125128 % Trying other types of queues
126129 ChQQ = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
127130 ChStream = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
128131 ? assertExit (
129132 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
130- declare (ChQQ , Q4 , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
133+ declare_queue (ChQQ , Q4 , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
131134 ? assertExit (
132135 {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
133- declare (ChStream , Q4 , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
136+ declare_queue (ChStream , Q4 , [{<<" x-queue-type" >>, longstr , <<" stream" >>}])),
134137 rabbit_ct_broker_helpers :rpc (Config , 0 , ? MODULE , delete_queues , []),
135138 ok .
136139
137- declare (Ch , Q ) ->
138- declare (Ch , Q , []).
140+ exchange_limit (Config ) ->
141+ DefaultXs = rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_exchange , count , []),
142+ ? assert (? EXCHANGE_LIMIT > DefaultXs ),
139143
140- declare (Ch , Q , Args ) ->
144+ [Server0 , Server1 ] =
145+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
146+ Ch1 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
147+ Ch2 = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
148+
149+ % % Reach the limit.
150+ [begin
151+ XName = list_to_binary (rabbit_misc :format (" x-~b " , [N ])),
152+ # 'exchange.declare_ok' {} = declare_exchange (Ch1 , XName , <<" fanout" >>)
153+ end || N <- lists :seq (DefaultXs , ? EXCHANGE_LIMIT - 1 )],
154+
155+ % % Trying to declare the next exchange fails.
156+ OverLimitXName = <<" over-limit-x" >>,
157+ ExpectedError = list_to_binary (rabbit_misc :format (
158+ " PRECONDITION_FAILED - cannot declare "
159+ " exchange '~s ' in vhost '/': "
160+ " exchange limit of ~b is reached" ,
161+ [OverLimitXName , ? EXCHANGE_LIMIT ])),
162+ ? assertExit (
163+ {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
164+ declare_exchange (Ch1 , OverLimitXName , <<" fanout" >>)),
165+
166+ % % Existing exchanges can be re-declared.
167+ ExistingX = list_to_binary (rabbit_misc :format (" x-~b " , [DefaultXs ])),
168+ # 'exchange.declare_ok' {} = declare_exchange (Ch2 , ExistingX , <<" fanout" >>),
169+
170+ % % The limit is cluster wide: the other node cannot declare the exchange
171+ % % either.
172+ ? assertExit (
173+ {{shutdown , {server_initiated_close , 406 , ExpectedError }}, _ },
174+ declare_exchange (Ch2 , OverLimitXName , <<" fanout" >>)),
175+
176+ % % Clean up extra exchanges
177+ Ch3 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
178+ [begin
179+ XName = list_to_binary (rabbit_misc :format (" x-~b " , [N ])),
180+ # 'exchange.delete_ok' {} = amqp_channel :call (
181+ Ch3 ,
182+ # 'exchange.delete' {exchange = XName })
183+ end || N <- lists :seq (DefaultXs , ? EXCHANGE_LIMIT - 1 )],
184+
185+ ok .
186+
187+ % % -------------------------------------------------------------------
188+
189+ declare_queue (Ch , Q ) ->
190+ declare_queue (Ch , Q , []).
191+
192+ declare_queue (Ch , Q , Args ) ->
141193 amqp_channel :call (Ch , # 'queue.declare' {queue = Q ,
142194 durable = true ,
143195 auto_delete = false ,
144196 arguments = Args }).
145197
198+ declare_exchange (Ch , Name , Type ) ->
199+ amqp_channel :call (Ch , # 'exchange.declare' {exchange = Name ,
200+ type = Type ,
201+ durable = true }).
202+
146203delete_queues () ->
147204 [rabbit_amqqueue :delete (Q , false , false , <<" dummy" >>)
148205 || Q <- rabbit_amqqueue :list ()].
0 commit comments