diff --git a/src/test/java/com/rabbitmq/client/test/RpcTest.java b/src/test/java/com/rabbitmq/client/test/RpcTest.java index 4c919d4ed9..b7a11ce9a5 100644 --- a/src/test/java/com/rabbitmq/client/test/RpcTest.java +++ b/src/test/java/com/rabbitmq/client/test/RpcTest.java @@ -13,12 +13,25 @@ // If you have any questions regarding licensing, please contact us at // info@rabbitmq.com. - package com.rabbitmq.client.test; -import com.rabbitmq.client.*; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoveryListener; +import com.rabbitmq.client.RpcClient; +import com.rabbitmq.client.RpcServer; +import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.impl.NetworkConnection; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; +import com.rabbitmq.client.impl.recovery.RecordedExchange; +import com.rabbitmq.client.impl.recovery.RecordedQueue; +import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import com.rabbitmq.tools.Host; import org.junit.After; import org.junit.Before; @@ -32,6 +45,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RpcTest { @@ -40,7 +54,8 @@ public class RpcTest { String queue = "rpc.queue"; RpcServer rpcServer; - @Before public void init() throws Exception { + @Before + public void init() throws Exception { clientConnection = TestUtils.connectionFactory().newConnection(); clientChannel = clientConnection.createChannel(); serverConnection = TestUtils.connectionFactory().newConnection(); @@ -48,11 +63,12 @@ public class RpcTest { serverChannel.queueDeclare(queue, false, false, false, null); } - @After public void tearDown() throws Exception { - if(rpcServer != null) { + @After + public void tearDown() throws Exception { + if (rpcServer != null) { rpcServer.terminateMainloop(); } - if(serverChannel != null) { + if (serverChannel != null) { serverChannel.queueDelete(queue); } clientConnection.close(); @@ -63,6 +79,7 @@ public class RpcTest { public void rpc() throws Exception { rpcServer = new TestRpcServer(serverChannel, queue); new Thread(new Runnable() { + @Override public void run() { try { @@ -81,10 +98,12 @@ public void run() { client.close(); } - @Test public void brokenAfterBrokerRestart() throws Exception { + @Test + public void givenConsumerNotRecoveredCanCreateNewClientOnSameChannelAfterConnectionFailure() throws Exception { // see https://github.com/rabbitmq/rabbitmq-java-client/issues/382 rpcServer = new TestRpcServer(serverChannel, queue); new Thread(new Runnable() { + @Override public void run() { try { @@ -96,8 +115,8 @@ public void run() { }).start(); ConnectionFactory cf = TestUtils.connectionFactory(); - cf.setTopologyRecoveryEnabled(false); - cf.setNetworkRecoveryInterval(2000); + cf.setTopologyRecoveryFilter(new NoDirectReplyToConsumerTopologyRecoveryFilter()); + cf.setNetworkRecoveryInterval(1000); Connection connection = null; try { connection = cf.newConnection(); @@ -107,10 +126,12 @@ public void run() { assertEquals("*** hello ***", new String(response.getBody())); final CountDownLatch recoveryLatch = new CountDownLatch(1); ((AutorecoveringConnection) connection).addRecoveryListener(new RecoveryListener() { + @Override public void handleRecovery(Recoverable recoverable) { recoveryLatch.countDown(); } + @Override public void handleRecoveryStarted(Recoverable recoverable) { @@ -126,7 +147,62 @@ public void handleRecoveryStarted(Recoverable recoverable) { connection.close(); } } + } + + @Test + public void givenConsumerIsRecoveredCanNotCreateNewClientOnSameChannelAfterConnectionFailure() throws Exception { + // see https://github.com/rabbitmq/rabbitmq-java-client/issues/382 + rpcServer = new TestRpcServer(serverChannel, queue); + new Thread(new Runnable() { + + @Override + public void run() { + try { + rpcServer.mainloop(); + } catch (Exception e) { + // safe to ignore when loops ends/server is canceled + } + } + }).start(); + + ConnectionFactory cf = TestUtils.connectionFactory(); + cf.setNetworkRecoveryInterval(1000); + Connection connection = null; + try { + connection = cf.newConnection(); + Channel channel = connection.createChannel(); + RpcClient client = new RpcClient(channel, "", queue, 1000); + RpcClient.Response response = client.doCall(null, "hello".getBytes()); + assertEquals("*** hello ***", new String(response.getBody())); + final CountDownLatch recoveryLatch = new CountDownLatch(1); + ((AutorecoveringConnection) connection).addRecoveryListener(new RecoveryListener() { + @Override + public void handleRecovery(Recoverable recoverable) { + recoveryLatch.countDown(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + + } + }); + Host.closeConnection((NetworkConnection) connection); + assertTrue("Connection should have recovered by now", recoveryLatch.await(10, TimeUnit.SECONDS)); + try { + new RpcClient(channel, "", queue, 1000); + fail("Cannot create RPC client on same channel, an exception should have been thrown"); + } catch (IOException e) { + assertTrue(e.getCause() instanceof ShutdownSignalException); + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + assertTrue(cause.getReason() instanceof AMQP.Channel.Close); + assertEquals(406, ((AMQP.Channel.Close) cause.getReason()).getReplyCode()); + } + } finally { + if (connection != null) { + connection.close(); + } + } } private static class TestRpcServer extends RpcServer { @@ -157,4 +233,27 @@ protected AMQP.BasicProperties postprocessReplyProperties(QueueingConsumer.Deliv return builder.build(); } } + + private static class NoDirectReplyToConsumerTopologyRecoveryFilter implements TopologyRecoveryFilter { + + @Override + public boolean filterExchange(RecordedExchange recordedExchange) { + return true; + } + + @Override + public boolean filterQueue(RecordedQueue recordedQueue) { + return true; + } + + @Override + public boolean filterBinding(RecordedBinding recordedBinding) { + return true; + } + + @Override + public boolean filterConsumer(RecordedConsumer recordedConsumer) { + return !"amq.rabbitmq.reply-to".equals(recordedConsumer.getQueue()); + } + } }