Skip to content

Commit 305af33

Browse files
committed
Add test case to avoid broken RpcClient after broken restart
[#159461281] References #383 Fixes #382
1 parent 16fb625 commit 305af33

File tree

1 file changed

+108
-9
lines changed

1 file changed

+108
-9
lines changed

src/test/java/com/rabbitmq/client/test/RpcTest.java

Lines changed: 108 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,25 @@
1313
// If you have any questions regarding licensing, please contact us at
1414
// info@rabbitmq.com.
1515

16-
1716
package com.rabbitmq.client.test;
1817

19-
import com.rabbitmq.client.*;
18+
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.Channel;
20+
import com.rabbitmq.client.Connection;
21+
import com.rabbitmq.client.ConnectionFactory;
22+
import com.rabbitmq.client.QueueingConsumer;
23+
import com.rabbitmq.client.Recoverable;
24+
import com.rabbitmq.client.RecoveryListener;
25+
import com.rabbitmq.client.RpcClient;
26+
import com.rabbitmq.client.RpcServer;
27+
import com.rabbitmq.client.ShutdownSignalException;
2028
import com.rabbitmq.client.impl.NetworkConnection;
2129
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
30+
import com.rabbitmq.client.impl.recovery.RecordedBinding;
31+
import com.rabbitmq.client.impl.recovery.RecordedConsumer;
32+
import com.rabbitmq.client.impl.recovery.RecordedExchange;
33+
import com.rabbitmq.client.impl.recovery.RecordedQueue;
34+
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2235
import com.rabbitmq.tools.Host;
2336
import org.junit.After;
2437
import org.junit.Before;
@@ -32,6 +45,7 @@
3245

3346
import static org.junit.Assert.assertEquals;
3447
import static org.junit.Assert.assertTrue;
48+
import static org.junit.Assert.fail;
3549

3650
public class RpcTest {
3751

@@ -40,19 +54,21 @@ public class RpcTest {
4054
String queue = "rpc.queue";
4155
RpcServer rpcServer;
4256

43-
@Before public void init() throws Exception {
57+
@Before
58+
public void init() throws Exception {
4459
clientConnection = TestUtils.connectionFactory().newConnection();
4560
clientChannel = clientConnection.createChannel();
4661
serverConnection = TestUtils.connectionFactory().newConnection();
4762
serverChannel = serverConnection.createChannel();
4863
serverChannel.queueDeclare(queue, false, false, false, null);
4964
}
5065

51-
@After public void tearDown() throws Exception {
52-
if(rpcServer != null) {
66+
@After
67+
public void tearDown() throws Exception {
68+
if (rpcServer != null) {
5369
rpcServer.terminateMainloop();
5470
}
55-
if(serverChannel != null) {
71+
if (serverChannel != null) {
5672
serverChannel.queueDelete(queue);
5773
}
5874
clientConnection.close();
@@ -63,6 +79,7 @@ public class RpcTest {
6379
public void rpc() throws Exception {
6480
rpcServer = new TestRpcServer(serverChannel, queue);
6581
new Thread(new Runnable() {
82+
6683
@Override
6784
public void run() {
6885
try {
@@ -81,10 +98,12 @@ public void run() {
8198
client.close();
8299
}
83100

84-
@Test public void brokenAfterBrokerRestart() throws Exception {
101+
@Test
102+
public void givenConsumerNotRecoveredCanCreateNewClientOnSameChannelAfterConnectionFailure() throws Exception {
85103
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/382
86104
rpcServer = new TestRpcServer(serverChannel, queue);
87105
new Thread(new Runnable() {
106+
88107
@Override
89108
public void run() {
90109
try {
@@ -96,8 +115,8 @@ public void run() {
96115
}).start();
97116

98117
ConnectionFactory cf = TestUtils.connectionFactory();
99-
cf.setTopologyRecoveryEnabled(false);
100-
cf.setNetworkRecoveryInterval(2000);
118+
cf.setTopologyRecoveryFilter(new NoDirectReplyToConsumerTopologyRecoveryFilter());
119+
cf.setNetworkRecoveryInterval(1000);
101120
Connection connection = null;
102121
try {
103122
connection = cf.newConnection();
@@ -107,10 +126,12 @@ public void run() {
107126
assertEquals("*** hello ***", new String(response.getBody()));
108127
final CountDownLatch recoveryLatch = new CountDownLatch(1);
109128
((AutorecoveringConnection) connection).addRecoveryListener(new RecoveryListener() {
129+
110130
@Override
111131
public void handleRecovery(Recoverable recoverable) {
112132
recoveryLatch.countDown();
113133
}
134+
114135
@Override
115136
public void handleRecoveryStarted(Recoverable recoverable) {
116137

@@ -126,7 +147,62 @@ public void handleRecoveryStarted(Recoverable recoverable) {
126147
connection.close();
127148
}
128149
}
150+
}
151+
152+
@Test
153+
public void givenConsumerIsRecoveredCanNotCreateNewClientOnSameChannelAfterConnectionFailure() throws Exception {
154+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/382
155+
rpcServer = new TestRpcServer(serverChannel, queue);
156+
new Thread(new Runnable() {
157+
158+
@Override
159+
public void run() {
160+
try {
161+
rpcServer.mainloop();
162+
} catch (Exception e) {
163+
// safe to ignore when loops ends/server is canceled
164+
}
165+
}
166+
}).start();
167+
168+
ConnectionFactory cf = TestUtils.connectionFactory();
169+
cf.setNetworkRecoveryInterval(1000);
170+
Connection connection = null;
171+
try {
172+
connection = cf.newConnection();
173+
Channel channel = connection.createChannel();
174+
RpcClient client = new RpcClient(channel, "", queue, 1000);
175+
RpcClient.Response response = client.doCall(null, "hello".getBytes());
176+
assertEquals("*** hello ***", new String(response.getBody()));
177+
final CountDownLatch recoveryLatch = new CountDownLatch(1);
178+
((AutorecoveringConnection) connection).addRecoveryListener(new RecoveryListener() {
129179

180+
@Override
181+
public void handleRecovery(Recoverable recoverable) {
182+
recoveryLatch.countDown();
183+
}
184+
185+
@Override
186+
public void handleRecoveryStarted(Recoverable recoverable) {
187+
188+
}
189+
});
190+
Host.closeConnection((NetworkConnection) connection);
191+
assertTrue("Connection should have recovered by now", recoveryLatch.await(10, TimeUnit.SECONDS));
192+
try {
193+
new RpcClient(channel, "", queue, 1000);
194+
fail("Cannot create RPC client on same channel, an exception should have been thrown");
195+
} catch (IOException e) {
196+
assertTrue(e.getCause() instanceof ShutdownSignalException);
197+
ShutdownSignalException cause = (ShutdownSignalException) e.getCause();
198+
assertTrue(cause.getReason() instanceof AMQP.Channel.Close);
199+
assertEquals(406, ((AMQP.Channel.Close) cause.getReason()).getReplyCode());
200+
}
201+
} finally {
202+
if (connection != null) {
203+
connection.close();
204+
}
205+
}
130206
}
131207

132208
private static class TestRpcServer extends RpcServer {
@@ -157,4 +233,27 @@ protected AMQP.BasicProperties postprocessReplyProperties(QueueingConsumer.Deliv
157233
return builder.build();
158234
}
159235
}
236+
237+
private static class NoDirectReplyToConsumerTopologyRecoveryFilter implements TopologyRecoveryFilter {
238+
239+
@Override
240+
public boolean filterExchange(RecordedExchange recordedExchange) {
241+
return true;
242+
}
243+
244+
@Override
245+
public boolean filterQueue(RecordedQueue recordedQueue) {
246+
return true;
247+
}
248+
249+
@Override
250+
public boolean filterBinding(RecordedBinding recordedBinding) {
251+
return true;
252+
}
253+
254+
@Override
255+
public boolean filterConsumer(RecordedConsumer recordedConsumer) {
256+
return !"amq.rabbitmq.reply-to".equals(recordedConsumer.getQueue());
257+
}
258+
}
160259
}

0 commit comments

Comments
 (0)