Skip to content

Commit

Permalink
Expose received broker connection close method, in case of any error,…
Browse files Browse the repository at this point in the history
… to client during handshake
  • Loading branch information
Chinmay1412 committed Apr 8, 2022
1 parent 373ac06 commit e4a5cf6
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 15 deletions.
3 changes: 2 additions & 1 deletion libamqpprox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ add_library(libamqpprox STATIC
amqpprox_authinterceptinterface.cpp
amqpprox_defaultauthintercept.cpp
amqpprox_httpauthintercept.cpp
amqpprox_authcontrolcommand.cpp)
amqpprox_authcontrolcommand.cpp
amqpprox_closeerror.cpp)

target_include_directories(libamqpprox PRIVATE ${PROTO_HDR_PATH})
target_link_libraries(libamqpprox LINK_PUBLIC authproto ${LIBAMQPPROX_LIBS})
Expand Down
36 changes: 36 additions & 0 deletions libamqpprox/amqpprox_closeerror.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/

#include <amqpprox_closeerror.h>
#include <amqpprox_methods_close.h>
#include <stdexcept>

namespace Bloomberg {
namespace amqpprox {

CloseError::CloseError(const std::string &msg, methods::Close &closeMethod)
: std::runtime_error(msg)
, d_closeMethod(closeMethod)
{
}

methods::Close CloseError::closeMethod() const
{
return d_closeMethod;
}

}
}
52 changes: 52 additions & 0 deletions libamqpprox/amqpprox_closeerror.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/

#ifndef BLOOMBERG_AMQPPROX_CLOSEERROR
#define BLOOMBERG_AMQPPROX_CLOSEERROR

#include <amqpprox_methods_close.h>
#include <stdexcept>

namespace Bloomberg {
namespace amqpprox {

/**
* \brief Custom exception class
*
* This class will be used to throw custom exception, whenever we get the
* unexpected connection Close method from server during handshake. Using this
* exception, we are going to pass connection Close method to the exception
* handler. The exception handler eventually can send the Close method to
* clients for better error handling.
*/
class CloseError : public std::runtime_error {
methods::Close d_closeMethod;

public:
// CREATORS
CloseError(const std::string &msg, methods::Close &closeMethod);

// ACCESSORS
/**
* \return connection Close method
*/
methods::Close closeMethod() const;
};

}
}

#endif
32 changes: 25 additions & 7 deletions libamqpprox/amqpprox_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <amqpprox_buffer.h>
#include <amqpprox_bufferpool.h>
#include <amqpprox_closeerror.h>
#include <amqpprox_connectorutil.h>
#include <amqpprox_constants.h>
#include <amqpprox_eventsource.h>
Expand Down Expand Up @@ -60,13 +61,30 @@ std::ostream &streamOutMethod(std::ostream &os, const Method &method)
}

template <typename T>
void decodeMethod(T *t, const Method &method, Buffer &buffer)
void decodeMethod(T * t,
const Method &method,
Buffer & buffer,
FlowType direction)
{
if (method.methodType != T::methodType()) {
std::ostringstream oss;
oss << "Expected " << typeid(T).name() << ", got: ";
streamOutMethod(oss, method);
throw std::runtime_error(oss.str());
if (method.methodType == methods::Close::methodType() &&
direction == FlowType::EGRESS) {
methods::Close closeMethod;
if (!methods::Close::decode(&closeMethod, buffer)) {
oss << "And Failed to decode received close method from "
"server";
throw std::runtime_error(oss.str());
}
else {
throw CloseError(oss.str(), closeMethod);
}
}
else {
throw std::runtime_error(oss.str());
}
}

if (!T::decode(t, buffer)) {
Expand Down Expand Up @@ -146,23 +164,23 @@ void Connector::receive(const Method &method, FlowType direction)
} break;
// Acting as a server
case State::START_SENT: {
decodeMethod(&d_startOk, method, methodPayload);
decodeMethod(&d_startOk, method, methodPayload, direction);

LOG_TRACE << "StartOk: " << d_startOk;

sendResponse(d_synthesizedTune, true);
d_state = State::TUNE_SENT;
} break;
case State::TUNE_SENT: {
decodeMethod(&d_tuneOk, method, methodPayload);
decodeMethod(&d_tuneOk, method, methodPayload, direction);

LOG_TRACE << "TuneOk: " << d_tuneOk;

d_state = State::AWAITING_OPEN;
} break;

case State::AWAITING_OPEN: {
decodeMethod(&d_open, method, methodPayload);
decodeMethod(&d_open, method, methodPayload, direction);

d_sessionState_p->setVirtualHost(d_open.virtualHost());
d_eventSource_p->connectionVhostEstablished().emit(
Expand All @@ -175,7 +193,7 @@ void Connector::receive(const Method &method, FlowType direction)
} break;
// Acting as a client
case State::AWAITING_CONNECTION: {
decodeMethod(&d_receivedStart, method, methodPayload);
decodeMethod(&d_receivedStart, method, methodPayload, direction);

LOG_TRACE << "Server Start: " << d_receivedStart;

Expand All @@ -197,7 +215,7 @@ void Connector::receive(const Method &method, FlowType direction)
} break;
// Acting as a client
case State::STARTOK_SENT: {
decodeMethod(&d_receivedTune, method, methodPayload);
decodeMethod(&d_receivedTune, method, methodPayload, direction);
LOG_TRACE << "Server Tune: " << d_receivedTune;

sendResponse(d_tuneOk, false);
Expand Down
38 changes: 31 additions & 7 deletions libamqpprox/amqpprox_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <amqpprox_backend.h>
#include <amqpprox_bufferhandle.h>
#include <amqpprox_bufferpool.h>
#include <amqpprox_closeerror.h>
#include <amqpprox_connectionmanager.h>
#include <amqpprox_connectionselector.h>
#include <amqpprox_constants.h>
Expand Down Expand Up @@ -70,6 +71,20 @@ namespace amqpprox {
using namespace boost::asio::ip;
using namespace boost::system;

namespace {
void logException(const std::string_view &error,
const SessionState & sessionState,
FlowType direction)
{
LOG_ERROR << "Received exception: " << error << " conn="
<< sessionState.hostname(sessionState.getIngress().second) << ":"
<< sessionState.getIngress().second.port() << "->"
<< sessionState.hostname(sessionState.getEgress().second) << ":"
<< sessionState.getEgress().second.port()
<< " direction=" << direction;
}
}

Session::Session(boost::asio::io_service & ioservice,
MaybeSecureSocketAdaptor && serverSocket,
MaybeSecureSocketAdaptor && clientSocket,
Expand Down Expand Up @@ -741,14 +756,23 @@ void Session::handleData(FlowType direction)
disconnect(true);
}
}
catch (CloseError &error) {
methods::Close receivedClose = error.closeMethod();
// Send received Close method from server to client for better error
// handling
d_connector.synthesizeCustomCloseError(
true, receivedClose.replyCode(), receivedClose.replyString());
sendSyntheticData();

std::ostringstream oss;
oss << error.what()
<< ", Received method from server: " << receivedClose;
logException(oss.str(), d_sessionState, direction);

disconnect(true);
}
catch (std::runtime_error &error) {
LOG_ERROR << "Received exception: " << error.what() << " conn="
<< d_sessionState.hostname(
d_sessionState.getIngress().second)
<< ":" << d_sessionState.getIngress().second.port() << "->"
<< d_sessionState.hostname(d_sessionState.getEgress().second)
<< ":" << d_sessionState.getEgress().second.port()
<< " direction=" << direction;
logException(error.what(), d_sessionState, direction);

disconnect(true);
}
Expand Down
100 changes: 100 additions & 0 deletions tests/amqpprox_session.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class SessionTest : public ::testing::Test {
const methods::StartOk &overriddenStartOk = methods::StartOk());
void testSetupProxyOpen(int idx);
void testSetupProxyOutOfOrderOpen(int idx);
void testSetupProxyReceiveClose(int idx);
void testSetupProxyPassOpenOkThrough(int idx);
void testSetupBrokerSendsHeartbeat(int idx);
void testSetupClientSendsHeartbeat(int idx);
Expand Down Expand Up @@ -365,6 +366,26 @@ void SessionTest::testSetupProxyOutOfOrderOpen(int idx)
});
}

void SessionTest::testSetupProxyReceiveClose(int idx)
{
// Client <--------Close----- Proxy <--------Close------ Broker
methods::Close receivedClose;
receivedClose.setReply(123, "Broker is closing the connection");
d_clientState.pushItem(idx, Data(encode(receivedClose)));

d_serverState.expect(idx, [this, receivedClose](const auto &items) {
auto data = filterVariant<Data>(items);
ASSERT_EQ(data.size(), 1);
EXPECT_EQ(data[0], Data(encode(receivedClose)));
EXPECT_THAT(items, Contains(VariantWith<Call>(Call("shutdown"))));
EXPECT_THAT(items, Contains(VariantWith<Call>(Call("close"))));
});
d_clientState.expect(idx, [this](const auto &items) {
EXPECT_THAT(items, Contains(VariantWith<Call>(Call("shutdown"))));
EXPECT_THAT(items, Contains(VariantWith<Call>(Call("close"))));
});
}

void SessionTest::testSetupProxyOpen(int idx)
{
// Client Proxy <-------Tune-------- Broker
Expand Down Expand Up @@ -1502,6 +1523,85 @@ TEST_F(SessionTest,
EXPECT_TRUE(session->finished());
}

TEST_F(SessionTest, Forward_Received_Close_Method_To_Client_During_Handshake)
{
EXPECT_CALL(d_selector, acquireConnection(_, _))
.WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0)));

EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1));
EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345)))
.WillRepeatedly(Return(std::string("host1")));
EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 1234)))
.WillRepeatedly(Return(std::string("host0")));
EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("1.2.3.4", 32000)))
.WillRepeatedly(Return(std::string("host0")));
EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("3.4.5.6", 5672)))
.WillRepeatedly(Return(std::string("host2")));

TestSocketState::State base;
base.d_local = makeEndpoint("1.2.3.4", 1234);
base.d_remote = makeEndpoint("2.3.4.5", 2345);
base.d_secure = false;

TestSocketState::State clientBase;
clientBase.d_local = makeEndpoint("1.2.3.4", 32000);
clientBase.d_remote = makeEndpoint("3.4.5.6", 5672);
clientBase.d_secure = false;

// Initialise the state
d_serverState.pushItem(0, base);
driveTo(0);

testSetupServerHandshake(1);

// Read a protocol header from the client and reply with Start method
// Client ----AMQP Header---> Proxy Broker
// Client <-----Start-------- Proxy Broker
testSetupClientSendsProtocolHeader(2);

// Client ------StartOk-----> Proxy Broker
// Client <-----Tune--------- Proxy Broker
testSetupClientStartOk(3);

// Client ------TuneOk------> Proxy Broker
// Client ------Open--------> Proxy Broker
testSetupClientOpen(4);

// Client Proxy <----TCP CONNECT----> Broker
testSetupProxyConnect(5, &clientBase);

// Client Proxy <-----HANDSHAKE-----> Broker
// Client Proxy -----AMQP Header----> Broker
testSetupProxySendsProtocolHeader(6);

// Client Proxy <-------Start-------- Broker
// Client Proxy --------StartOk-----> Broker
testSetupProxySendsStartOk(7, "host1", 2345, LOCAL_HOSTNAME, 1234, 32000);

// Client <-------Close------- Proxy <-------Close-------- Broker
testSetupProxyReceiveClose(8);

MaybeSecureSocketAdaptor clientSocket(d_ioService, d_client, false);
MaybeSecureSocketAdaptor serverSocket(d_ioService, d_server, false);
auto session = std::make_shared<Session>(d_ioService,
std::move(serverSocket),
std::move(clientSocket),
&d_selector,
&d_eventSource,
&d_pool,
&d_dnsResolver,
d_mapper,
LOCAL_HOSTNAME,
d_authIntercept);

session->start();

// Run the tests through to completion
driveTo(9);

EXPECT_TRUE(session->finished());
}

TEST_F(SessionTest, Printing_Breathing_Test)
{
EXPECT_CALL(d_selector, acquireConnection(_, _))
Expand Down

0 comments on commit e4a5cf6

Please sign in to comment.