diff --git a/libamqpprox/amqpprox_closeerror.h b/libamqpprox/amqpprox_closeerror.h index 35c3a27..feb9e1c 100644 --- a/libamqpprox/amqpprox_closeerror.h +++ b/libamqpprox/amqpprox_closeerror.h @@ -23,6 +23,14 @@ namespace Bloomberg { namespace amqpprox { +/** + * \brief Custom exception class + * + * This class will be used to throw custom exception, whenever we get the + * connection Close method from server. Using this exception, we are going to + * pass connection Close method to the exception handler. The exception handler + * eventually can send the Close method to clients for better error handling. + */ class CloseError : public std::runtime_error { methods::Close d_closeMethod; diff --git a/tests/amqpprox_session.t.cpp b/tests/amqpprox_session.t.cpp index 9514d7c..a9cdacf 100644 --- a/tests/amqpprox_session.t.cpp +++ b/tests/amqpprox_session.t.cpp @@ -195,6 +195,7 @@ class SessionTest : public ::testing::Test { const methods::StartOk &overriddenStartOk = methods::StartOk()); void testSetupProxyOpen(int idx); void testSetupProxyOutOfOrderOpen(int idx); + void testSetupProxyReceiveClose(int idx); void testSetupProxyPassOpenOkThrough(int idx); void testSetupBrokerSendsHeartbeat(int idx); void testSetupClientSendsHeartbeat(int idx); @@ -365,6 +366,24 @@ void SessionTest::testSetupProxyOutOfOrderOpen(int idx) }); } +void SessionTest::testSetupProxyReceiveClose(int idx) +{ + // Client Proxy <--------Close------ Broker + d_clientState.pushItem(idx, Data(encode(close()))); + + d_serverState.expect(idx, [this](const auto &items) { + auto data = filterVariant(items); + ASSERT_EQ(data.size(), 1); + EXPECT_EQ(data[0], Data(encode(close()))); + EXPECT_THAT(items, Contains(VariantWith(Call("shutdown")))); + EXPECT_THAT(items, Contains(VariantWith(Call("close")))); + }); + d_clientState.expect(idx, [this](const auto &items) { + EXPECT_THAT(items, Contains(VariantWith(Call("shutdown")))); + EXPECT_THAT(items, Contains(VariantWith(Call("close")))); + }); +} + void SessionTest::testSetupProxyOpen(int idx) { // Client Proxy <-------Tune-------- Broker @@ -1502,6 +1521,85 @@ TEST_F(SessionTest, EXPECT_TRUE(session->finished()); } +TEST_F(SessionTest, Forward_Received_Close_Method_To_Client_During_Handshake) +{ + EXPECT_CALL(d_selector, acquireConnection(_, _)) + .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + + EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) + .WillRepeatedly(Return(std::string("host1"))); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) + .WillRepeatedly(Return(std::string("host0"))); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) + .WillRepeatedly(Return(std::string("host0"))); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) + .WillRepeatedly(Return(std::string("host2"))); + + TestSocketState::State base; + base.d_local = makeEndpoint("1.2.3.4", 1234); + base.d_remote = makeEndpoint("2.3.4.5", 2345); + base.d_secure = false; + + TestSocketState::State clientBase; + clientBase.d_local = makeEndpoint("1.2.3.4", 32000); + clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); + clientBase.d_secure = false; + + // Initialise the state + d_serverState.pushItem(0, base); + driveTo(0); + + testSetupServerHandshake(1); + + // Read a protocol header from the client and reply with Start method + // Client ----AMQP Header---> Proxy Broker + // Client <-----Start-------- Proxy Broker + testSetupClientSendsProtocolHeader(2); + + // Client ------StartOk-----> Proxy Broker + // Client <-----Tune--------- Proxy Broker + testSetupClientStartOk(3); + + // Client ------TuneOk------> Proxy Broker + // Client ------Open--------> Proxy Broker + testSetupClientOpen(4); + + // Client Proxy <----TCP CONNECT----> Broker + testSetupProxyConnect(5, &clientBase); + + // Client Proxy <-----HANDSHAKE-----> Broker + // Client Proxy -----AMQP Header----> Broker + testSetupProxySendsProtocolHeader(6); + + // Client Proxy <-------Start-------- Broker + // Client Proxy --------StartOk-----> Broker + testSetupProxySendsStartOk(7, "host1", 2345, LOCAL_HOSTNAME, 1234, 32000); + + // Client <-------Close------- Proxy <-------Close-------- Broker + testSetupProxyReceiveClose(8); + + MaybeSecureSocketAdaptor clientSocket(d_ioService, d_client, false); + MaybeSecureSocketAdaptor serverSocket(d_ioService, d_server, false); + auto session = std::make_shared(d_ioService, + std::move(serverSocket), + std::move(clientSocket), + &d_selector, + &d_eventSource, + &d_pool, + &d_dnsResolver, + d_mapper, + LOCAL_HOSTNAME, + d_authIntercept); + + session->start(); + + // Run the tests through to completion + driveTo(9); + + EXPECT_TRUE(session->finished()); +} + TEST_F(SessionTest, Printing_Breathing_Test) { EXPECT_CALL(d_selector, acquireConnection(_, _))