Skip to content

Commit

Permalink
Expose received broker connection close method, in case of any error,…
Browse files Browse the repository at this point in the history
… to client during handshake
  • Loading branch information
Chinmay1412 committed Apr 12, 2022
1 parent 373ac06 commit 5679a65
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 147 deletions.
3 changes: 2 additions & 1 deletion libamqpprox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ add_library(libamqpprox STATIC
amqpprox_authinterceptinterface.cpp
amqpprox_defaultauthintercept.cpp
amqpprox_httpauthintercept.cpp
amqpprox_authcontrolcommand.cpp)
amqpprox_authcontrolcommand.cpp
amqpprox_closeerror.cpp)

target_include_directories(libamqpprox PRIVATE ${PROTO_HDR_PATH})
target_link_libraries(libamqpprox LINK_PUBLIC authproto ${LIBAMQPPROX_LIBS})
Expand Down
36 changes: 36 additions & 0 deletions libamqpprox/amqpprox_closeerror.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/

#include <amqpprox_closeerror.h>
#include <amqpprox_methods_close.h>
#include <stdexcept>

namespace Bloomberg {
namespace amqpprox {

CloseError::CloseError(const std::string &msg, methods::Close &closeMethod)
: std::runtime_error(msg)
, d_closeMethod(closeMethod)
{
}

methods::Close CloseError::closeMethod() const
{
return d_closeMethod;
}

}
}
52 changes: 52 additions & 0 deletions libamqpprox/amqpprox_closeerror.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/

#ifndef BLOOMBERG_AMQPPROX_CLOSEERROR
#define BLOOMBERG_AMQPPROX_CLOSEERROR

#include <amqpprox_methods_close.h>
#include <stdexcept>

namespace Bloomberg {
namespace amqpprox {

/**
* \brief Custom exception class
*
* This class will be used to throw custom exception, whenever we get the
* unexpected connection Close method from server during handshake. Using this
* exception, we are going to pass connection Close method to the exception
* handler. The exception handler eventually can send the Close method to
* clients for better error handling.
*/
class CloseError : public std::runtime_error {
methods::Close d_closeMethod;

public:
// CREATORS
CloseError(const std::string &msg, methods::Close &closeMethod);

// ACCESSORS
/**
* \return connection Close method
*/
methods::Close closeMethod() const;
};

}
}

#endif
33 changes: 26 additions & 7 deletions libamqpprox/amqpprox_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <amqpprox_buffer.h>
#include <amqpprox_bufferpool.h>
#include <amqpprox_closeerror.h>
#include <amqpprox_connectorutil.h>
#include <amqpprox_constants.h>
#include <amqpprox_eventsource.h>
Expand Down Expand Up @@ -60,13 +61,31 @@ std::ostream &streamOutMethod(std::ostream &os, const Method &method)
}

template <typename T>
void decodeMethod(T *t, const Method &method, Buffer &buffer)
void decodeMethod(T * t,
const Method &method,
Buffer & buffer,
FlowType direction)
{
if (method.methodType != T::methodType()) {
std::ostringstream oss;
oss << "Expected " << typeid(T).name() << ", got: ";
streamOutMethod(oss, method);
throw std::runtime_error(oss.str());
if (method.methodType == methods::Close::methodType() &&
method.classType == methods::Close::classType() &&
direction == FlowType::EGRESS) {
methods::Close closeMethod;
if (!methods::Close::decode(&closeMethod, buffer)) {
oss << ". And Failed to decode received close method from "
"server";
throw std::runtime_error(oss.str());
}
else {
throw CloseError(oss.str(), closeMethod);
}
}
else {
throw std::runtime_error(oss.str());
}
}

if (!T::decode(t, buffer)) {
Expand Down Expand Up @@ -146,23 +165,23 @@ void Connector::receive(const Method &method, FlowType direction)
} break;
// Acting as a server
case State::START_SENT: {
decodeMethod(&d_startOk, method, methodPayload);
decodeMethod(&d_startOk, method, methodPayload, direction);

LOG_TRACE << "StartOk: " << d_startOk;

sendResponse(d_synthesizedTune, true);
d_state = State::TUNE_SENT;
} break;
case State::TUNE_SENT: {
decodeMethod(&d_tuneOk, method, methodPayload);
decodeMethod(&d_tuneOk, method, methodPayload, direction);

LOG_TRACE << "TuneOk: " << d_tuneOk;

d_state = State::AWAITING_OPEN;
} break;

case State::AWAITING_OPEN: {
decodeMethod(&d_open, method, methodPayload);
decodeMethod(&d_open, method, methodPayload, direction);

d_sessionState_p->setVirtualHost(d_open.virtualHost());
d_eventSource_p->connectionVhostEstablished().emit(
Expand All @@ -175,7 +194,7 @@ void Connector::receive(const Method &method, FlowType direction)
} break;
// Acting as a client
case State::AWAITING_CONNECTION: {
decodeMethod(&d_receivedStart, method, methodPayload);
decodeMethod(&d_receivedStart, method, methodPayload, direction);

LOG_TRACE << "Server Start: " << d_receivedStart;

Expand All @@ -197,7 +216,7 @@ void Connector::receive(const Method &method, FlowType direction)
} break;
// Acting as a client
case State::STARTOK_SENT: {
decodeMethod(&d_receivedTune, method, methodPayload);
decodeMethod(&d_receivedTune, method, methodPayload, direction);
LOG_TRACE << "Server Tune: " << d_receivedTune;

sendResponse(d_tuneOk, false);
Expand Down
38 changes: 31 additions & 7 deletions libamqpprox/amqpprox_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <amqpprox_backend.h>
#include <amqpprox_bufferhandle.h>
#include <amqpprox_bufferpool.h>
#include <amqpprox_closeerror.h>
#include <amqpprox_connectionmanager.h>
#include <amqpprox_connectionselector.h>
#include <amqpprox_constants.h>
Expand Down Expand Up @@ -70,6 +71,20 @@ namespace amqpprox {
using namespace boost::asio::ip;
using namespace boost::system;

namespace {
void logException(const std::string_view error,
const SessionState & sessionState,
FlowType direction)
{
LOG_ERROR << "Received exception: " << error << " conn="
<< sessionState.hostname(sessionState.getIngress().second) << ":"
<< sessionState.getIngress().second.port() << "->"
<< sessionState.hostname(sessionState.getEgress().second) << ":"
<< sessionState.getEgress().second.port()
<< " direction=" << direction;
}
}

Session::Session(boost::asio::io_service & ioservice,
MaybeSecureSocketAdaptor && serverSocket,
MaybeSecureSocketAdaptor && clientSocket,
Expand Down Expand Up @@ -741,14 +756,23 @@ void Session::handleData(FlowType direction)
disconnect(true);
}
}
catch (CloseError &error) {
methods::Close receivedClose = error.closeMethod();
// Send received Close method from server to client for better error
// handling
d_connector.synthesizeCustomCloseError(
true, receivedClose.replyCode(), receivedClose.replyString());
sendSyntheticData();

std::ostringstream oss;
oss << error.what()
<< ", Received method from server: " << receivedClose;
logException(oss.str(), d_sessionState, direction);

disconnect(true);
}
catch (std::runtime_error &error) {
LOG_ERROR << "Received exception: " << error.what() << " conn="
<< d_sessionState.hostname(
d_sessionState.getIngress().second)
<< ":" << d_sessionState.getIngress().second.port() << "->"
<< d_sessionState.hostname(d_sessionState.getEgress().second)
<< ":" << d_sessionState.getEgress().second.port()
<< " direction=" << direction;
logException(error.what(), d_sessionState, direction);

disconnect(true);
}
Expand Down
Loading

0 comments on commit 5679a65

Please sign in to comment.