From fb6d005b50118e842558e4043a430ff95f67f34f Mon Sep 17 00:00:00 2001 From: cpatel129 Date: Tue, 12 Apr 2022 12:53:39 +0100 Subject: [PATCH] Expose received broker connection close method, in case of any error, to client during handshake --- libamqpprox/CMakeLists.txt | 3 +- libamqpprox/amqpprox_closeerror.cpp | 36 ++++ libamqpprox/amqpprox_closeerror.h | 52 ++++++ libamqpprox/amqpprox_connector.cpp | 33 +++- libamqpprox/amqpprox_session.cpp | 38 +++- tests/amqpprox_session.t.cpp | 257 ++++++++++++++-------------- 6 files changed, 272 insertions(+), 147 deletions(-) create mode 100644 libamqpprox/amqpprox_closeerror.cpp create mode 100644 libamqpprox/amqpprox_closeerror.h diff --git a/libamqpprox/CMakeLists.txt b/libamqpprox/CMakeLists.txt index da2f586..f2d2d88 100644 --- a/libamqpprox/CMakeLists.txt +++ b/libamqpprox/CMakeLists.txt @@ -84,7 +84,8 @@ add_library(libamqpprox STATIC amqpprox_authinterceptinterface.cpp amqpprox_defaultauthintercept.cpp amqpprox_httpauthintercept.cpp - amqpprox_authcontrolcommand.cpp) + amqpprox_authcontrolcommand.cpp + amqpprox_closeerror.cpp) target_include_directories(libamqpprox PRIVATE ${PROTO_HDR_PATH}) target_link_libraries(libamqpprox LINK_PUBLIC authproto ${LIBAMQPPROX_LIBS}) diff --git a/libamqpprox/amqpprox_closeerror.cpp b/libamqpprox/amqpprox_closeerror.cpp new file mode 100644 index 0000000..ab92c33 --- /dev/null +++ b/libamqpprox/amqpprox_closeerror.cpp @@ -0,0 +1,36 @@ +/* +** Copyright 2022 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ + +#include +#include +#include + +namespace Bloomberg { +namespace amqpprox { + +CloseError::CloseError(const std::string &msg, methods::Close &closeMethod) +: std::runtime_error(msg) +, d_closeMethod(closeMethod) +{ +} + +methods::Close CloseError::closeMethod() const +{ + return d_closeMethod; +} + +} +} diff --git a/libamqpprox/amqpprox_closeerror.h b/libamqpprox/amqpprox_closeerror.h new file mode 100644 index 0000000..e65b4df --- /dev/null +++ b/libamqpprox/amqpprox_closeerror.h @@ -0,0 +1,52 @@ +/* +** Copyright 2022 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ + +#ifndef BLOOMBERG_AMQPPROX_CLOSEERROR +#define BLOOMBERG_AMQPPROX_CLOSEERROR + +#include +#include + +namespace Bloomberg { +namespace amqpprox { + +/** + * \brief Custom exception class + * + * This class will be used to throw custom exception, whenever we get the + * unexpected connection Close method from server during handshake. Using this + * exception, we are going to pass connection Close method to the exception + * handler. The exception handler eventually can send the Close method to + * clients for better error handling. + */ +class CloseError : public std::runtime_error { + methods::Close d_closeMethod; + + public: + // CREATORS + CloseError(const std::string &msg, methods::Close &closeMethod); + + // ACCESSORS + /** + * \return connection Close method + */ + methods::Close closeMethod() const; +}; + +} +} + +#endif diff --git a/libamqpprox/amqpprox_connector.cpp b/libamqpprox/amqpprox_connector.cpp index 9271ff3..143a43a 100644 --- a/libamqpprox/amqpprox_connector.cpp +++ b/libamqpprox/amqpprox_connector.cpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -60,13 +61,31 @@ std::ostream &streamOutMethod(std::ostream &os, const Method &method) } template -void decodeMethod(T *t, const Method &method, Buffer &buffer) +void decodeMethod(T * t, + const Method &method, + Buffer & buffer, + FlowType direction) { if (method.methodType != T::methodType()) { std::ostringstream oss; oss << "Expected " << typeid(T).name() << ", got: "; streamOutMethod(oss, method); - throw std::runtime_error(oss.str()); + if (method.methodType == methods::Close::methodType() && + method.classType == methods::Close::classType() && + direction == FlowType::EGRESS) { + methods::Close closeMethod; + if (!methods::Close::decode(&closeMethod, buffer)) { + oss << ". And Failed to decode received close method from " + "server"; + throw std::runtime_error(oss.str()); + } + else { + throw CloseError(oss.str(), closeMethod); + } + } + else { + throw std::runtime_error(oss.str()); + } } if (!T::decode(t, buffer)) { @@ -146,7 +165,7 @@ void Connector::receive(const Method &method, FlowType direction) } break; // Acting as a server case State::START_SENT: { - decodeMethod(&d_startOk, method, methodPayload); + decodeMethod(&d_startOk, method, methodPayload, direction); LOG_TRACE << "StartOk: " << d_startOk; @@ -154,7 +173,7 @@ void Connector::receive(const Method &method, FlowType direction) d_state = State::TUNE_SENT; } break; case State::TUNE_SENT: { - decodeMethod(&d_tuneOk, method, methodPayload); + decodeMethod(&d_tuneOk, method, methodPayload, direction); LOG_TRACE << "TuneOk: " << d_tuneOk; @@ -162,7 +181,7 @@ void Connector::receive(const Method &method, FlowType direction) } break; case State::AWAITING_OPEN: { - decodeMethod(&d_open, method, methodPayload); + decodeMethod(&d_open, method, methodPayload, direction); d_sessionState_p->setVirtualHost(d_open.virtualHost()); d_eventSource_p->connectionVhostEstablished().emit( @@ -175,7 +194,7 @@ void Connector::receive(const Method &method, FlowType direction) } break; // Acting as a client case State::AWAITING_CONNECTION: { - decodeMethod(&d_receivedStart, method, methodPayload); + decodeMethod(&d_receivedStart, method, methodPayload, direction); LOG_TRACE << "Server Start: " << d_receivedStart; @@ -197,7 +216,7 @@ void Connector::receive(const Method &method, FlowType direction) } break; // Acting as a client case State::STARTOK_SENT: { - decodeMethod(&d_receivedTune, method, methodPayload); + decodeMethod(&d_receivedTune, method, methodPayload, direction); LOG_TRACE << "Server Tune: " << d_receivedTune; sendResponse(d_tuneOk, false); diff --git a/libamqpprox/amqpprox_session.cpp b/libamqpprox/amqpprox_session.cpp index e765ac4..248234b 100644 --- a/libamqpprox/amqpprox_session.cpp +++ b/libamqpprox/amqpprox_session.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -70,6 +71,20 @@ namespace amqpprox { using namespace boost::asio::ip; using namespace boost::system; +namespace { +void logException(const std::string_view error, + const SessionState &sessionState, + FlowType direction) +{ + LOG_ERROR << "Received exception: " << error << " conn=" + << sessionState.hostname(sessionState.getIngress().second) << ":" + << sessionState.getIngress().second.port() << "->" + << sessionState.hostname(sessionState.getEgress().second) << ":" + << sessionState.getEgress().second.port() + << " direction=" << direction; +} +} + Session::Session(boost::asio::io_service &ioservice, MaybeSecureSocketAdaptor &&serverSocket, MaybeSecureSocketAdaptor &&clientSocket, @@ -746,14 +761,23 @@ void Session::handleData(FlowType direction) disconnect(true); } } + catch (CloseError &error) { + methods::Close receivedClose = error.closeMethod(); + // Send received Close method from server to client for better error + // handling + d_connector.synthesizeCustomCloseError( + true, receivedClose.replyCode(), receivedClose.replyString()); + sendSyntheticData(); + + std::ostringstream oss; + oss << error.what() + << ", Received method from server: " << receivedClose; + logException(oss.str(), d_sessionState, direction); + + disconnect(true); + } catch (std::runtime_error &error) { - LOG_ERROR << "Received exception: " << error.what() << " conn=" - << d_sessionState.hostname( - d_sessionState.getIngress().second) - << ":" << d_sessionState.getIngress().second.port() << "->" - << d_sessionState.hostname(d_sessionState.getEgress().second) - << ":" << d_sessionState.getEgress().second.port() - << " direction=" << direction; + logException(error.what(), d_sessionState, direction); disconnect(true); } diff --git a/tests/amqpprox_session.t.cpp b/tests/amqpprox_session.t.cpp index 9514d7c..20629d6 100644 --- a/tests/amqpprox_session.t.cpp +++ b/tests/amqpprox_session.t.cpp @@ -195,6 +195,7 @@ class SessionTest : public ::testing::Test { const methods::StartOk &overriddenStartOk = methods::StartOk()); void testSetupProxyOpen(int idx); void testSetupProxyOutOfOrderOpen(int idx); + void testSetupProxyForwardsBrokerClose(int idx); void testSetupProxyPassOpenOkThrough(int idx); void testSetupBrokerSendsHeartbeat(int idx); void testSetupClientSendsHeartbeat(int idx); @@ -202,6 +203,10 @@ class SessionTest : public ::testing::Test { void testSetupClientSendsCloseOk(int idx); void testSetupBrokerRespondsCloseOk(int idx); void testSetupHandlersCleanedUp(int idx); + + void testSetupHostnameMapperForServerClientBase( + TestSocketState::State &serverState, + TestSocketState::State &clientState); }; template @@ -209,6 +214,29 @@ std::vector filterVariant(const std::vector &items); Data coalesce(std::initializer_list input); +void SessionTest::testSetupHostnameMapperForServerClientBase( + TestSocketState::State &base, + TestSocketState::State &clientBase) +{ + EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) + .WillRepeatedly(Return(std::string("host1"))); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) + .WillRepeatedly(Return(std::string("host0"))); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) + .WillRepeatedly(Return(std::string("host0"))); + EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) + .WillRepeatedly(Return(std::string("host2"))); + + base.d_local = makeEndpoint("1.2.3.4", 1234); + base.d_remote = makeEndpoint("2.3.4.5", 2345); + base.d_secure = false; + + clientBase.d_local = makeEndpoint("1.2.3.4", 32000); + clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); + clientBase.d_secure = false; +} + void SessionTest::testSetupServerHandshake(int idx) { // Perform the 'TLS' handshake (no-op for tests), will just invoke the @@ -365,6 +393,26 @@ void SessionTest::testSetupProxyOutOfOrderOpen(int idx) }); } +void SessionTest::testSetupProxyForwardsBrokerClose(int idx) +{ + // Client <--------Close----- Proxy <--------Close------ Broker + methods::Close receivedClose; + receivedClose.setReply(123, "Broker is closing the connection"); + d_clientState.pushItem(idx, Data(encode(receivedClose))); + + d_serverState.expect(idx, [this, receivedClose](const auto &items) { + auto data = filterVariant(items); + ASSERT_EQ(data.size(), 1); + EXPECT_EQ(data[0], Data(encode(receivedClose))); + EXPECT_THAT(items, Contains(VariantWith(Call("shutdown")))); + EXPECT_THAT(items, Contains(VariantWith(Call("close")))); + }); + d_clientState.expect(idx, [this](const auto &items) { + EXPECT_THAT(items, Contains(VariantWith(Call("shutdown")))); + EXPECT_THAT(items, Contains(VariantWith(Call("close")))); + }); +} + void SessionTest::testSetupProxyOpen(int idx) { // Client Proxy <-------Tune-------- Broker @@ -536,25 +584,8 @@ TEST_F(SessionTest, Connection_Then_Ping_Then_Disconnect) EXPECT_CALL(d_selector, acquireConnection(_, _)) .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); - EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) - .WillRepeatedly(Return(std::string("host1"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) - .WillRepeatedly(Return(std::string("host2"))); - - TestSocketState::State base; - base.d_local = makeEndpoint("1.2.3.4", 1234); - base.d_remote = makeEndpoint("2.3.4.5", 2345); - base.d_secure = false; - - TestSocketState::State clientBase; - clientBase.d_local = makeEndpoint("1.2.3.4", 32000); - clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); - clientBase.d_secure = false; + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); // Initialise the state d_serverState.pushItem(0, base); @@ -659,25 +690,8 @@ TEST_F(SessionTest, BadServerHandshake) EXPECT_CALL(d_selector, acquireConnection(_, _)) .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); - EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) - .WillRepeatedly(Return(std::string("host1"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) - .WillRepeatedly(Return(std::string("host2"))); - - TestSocketState::State base; - base.d_local = makeEndpoint("1.2.3.4", 1234); - base.d_remote = makeEndpoint("2.3.4.5", 2345); - base.d_secure = false; - - TestSocketState::State clientBase; - clientBase.d_local = makeEndpoint("1.2.3.4", 32000); - clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); - clientBase.d_secure = false; + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); // Initialise the state d_serverState.pushItem(0, base); @@ -793,29 +807,12 @@ TEST_F(SessionTest, Connection_To_Proxy_Protocol) EXPECT_CALL(d_selector, acquireConnection(_, _)) .WillOnce(DoAll(SetArgPointee<0>(cm), Return(0))); - EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) - .WillRepeatedly(Return(std::string("host1"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) - .WillRepeatedly(Return(std::string("host2"))); - auto protocolHeader = std::vector( Constants::protocolHeader(), Constants::protocolHeader() + Constants::protocolHeaderLength()); - TestSocketState::State base; - base.d_local = makeEndpoint("1.2.3.4", 1234); - base.d_remote = makeEndpoint("2.3.4.5", 2345); - base.d_secure = false; - - TestSocketState::State clientBase; - clientBase.d_local = makeEndpoint("1.2.3.4", 32000); - clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); - clientBase.d_secure = false; + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); // Initialise the state d_serverState.pushItem(0, base); @@ -1149,25 +1146,8 @@ TEST_F(SessionTest, Connection_Then_Ping_Then_Force_Disconnect) EXPECT_CALL(d_selector, acquireConnection(_, _)) .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); - EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) - .WillRepeatedly(Return(std::string("host1"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) - .WillRepeatedly(Return(std::string("host2"))); - - TestSocketState::State base; - base.d_local = makeEndpoint("1.2.3.4", 1234); - base.d_remote = makeEndpoint("2.3.4.5", 2345); - base.d_secure = false; - - TestSocketState::State clientBase; - clientBase.d_local = makeEndpoint("1.2.3.4", 32000); - clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); - clientBase.d_secure = false; + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); // Initialise the state d_serverState.pushItem(0, base); @@ -1221,25 +1201,8 @@ TEST_F(SessionTest, Connection_Then_Ping_Then_Backend_Disconnect) EXPECT_CALL(d_selector, acquireConnection(_, _)) .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); - EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) - .WillRepeatedly(Return(std::string("host1"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) - .WillRepeatedly(Return(std::string("host2"))); - - TestSocketState::State base; - base.d_local = makeEndpoint("1.2.3.4", 1234); - base.d_remote = makeEndpoint("2.3.4.5", 2345); - base.d_secure = false; - - TestSocketState::State clientBase; - clientBase.d_local = makeEndpoint("1.2.3.4", 32000); - clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); - clientBase.d_secure = false; + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); // Initialise the state d_serverState.pushItem(0, base); @@ -1310,25 +1273,8 @@ TEST_F(SessionTest, Authorized_Client_Test) EXPECT_CALL(*d_mockAuthIntercept, authenticate(_, _)) .WillOnce(InvokeArgument<1>(authResponseData)); - EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) - .WillRepeatedly(Return(std::string("host1"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) - .WillRepeatedly(Return(std::string("host2"))); - - TestSocketState::State base; - base.d_local = makeEndpoint("1.2.3.4", 1234); - base.d_remote = makeEndpoint("2.3.4.5", 2345); - base.d_secure = false; - - TestSocketState::State clientBase; - clientBase.d_local = makeEndpoint("1.2.3.4", 32000); - clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); - clientBase.d_secure = false; + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); // Initialise the state d_serverState.pushItem(0, base); @@ -1502,30 +1448,77 @@ TEST_F(SessionTest, EXPECT_TRUE(session->finished()); } -TEST_F(SessionTest, Printing_Breathing_Test) +TEST_F(SessionTest, Forward_Received_Close_Method_To_Client_During_Handshake) { EXPECT_CALL(d_selector, acquireConnection(_, _)) .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); - EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) - .WillRepeatedly(Return(std::string("host1"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000))) - .WillRepeatedly(Return(std::string("host0"))); - EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672))) - .WillRepeatedly(Return(std::string("host2"))); + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); - TestSocketState::State base; - base.d_local = makeEndpoint("1.2.3.4", 1234); - base.d_remote = makeEndpoint("2.3.4.5", 2345); - base.d_secure = false; + // Initialise the state + d_serverState.pushItem(0, base); + driveTo(0); - TestSocketState::State clientBase; - clientBase.d_local = makeEndpoint("1.2.3.4", 32000); - clientBase.d_remote = makeEndpoint("3.4.5.6", 5672); - clientBase.d_secure = false; + testSetupServerHandshake(1); + + // Read a protocol header from the client and reply with Start method + // Client ----AMQP Header---> Proxy Broker + // Client <-----Start-------- Proxy Broker + testSetupClientSendsProtocolHeader(2); + + // Client ------StartOk-----> Proxy Broker + // Client <-----Tune--------- Proxy Broker + testSetupClientStartOk(3); + + // Client ------TuneOk------> Proxy Broker + // Client ------Open--------> Proxy Broker + testSetupClientOpen(4); + + // Client Proxy <----TCP CONNECT----> Broker + testSetupProxyConnect(5, &clientBase); + + // Client Proxy <-----HANDSHAKE-----> Broker + // Client Proxy -----AMQP Header----> Broker + testSetupProxySendsProtocolHeader(6); + + // Client Proxy <-------Start-------- Broker + // Client Proxy --------StartOk-----> Broker + testSetupProxySendsStartOk(7, "host1", 2345, LOCAL_HOSTNAME, 1234, 32000); + + // This is the important part of the test - ensure that if the broker sends + // a Close during handshake, proxy pass this connection Close method to the + // client Client <-------Close------- Proxy <-------Close-------- Broker + testSetupProxyForwardsBrokerClose(8); + + MaybeSecureSocketAdaptor clientSocket(d_ioService, d_client, false); + MaybeSecureSocketAdaptor serverSocket(d_ioService, d_server, false); + auto session = std::make_shared(d_ioService, + std::move(serverSocket), + std::move(clientSocket), + &d_selector, + &d_eventSource, + &d_pool, + &d_dnsResolver, + d_mapper, + LOCAL_HOSTNAME, + d_authIntercept); + + session->start(); + + // Run the tests through to completion + driveTo(9); + + EXPECT_TRUE(session->finished()); +} + +TEST_F(SessionTest, Printing_Breathing_Test) +{ + EXPECT_CALL(d_selector, acquireConnection(_, _)) + .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); // Initialise the state d_serverState.pushItem(0, base);