From 541280264e55b91569bc288d088345c4bef6940b Mon Sep 17 00:00:00 2001 From: cpatel129 Date: Fri, 20 May 2022 11:49:48 +0100 Subject: [PATCH] Start sending appropriate error message to the clients, if the broker mapping is not configured for particular vhost --- amqpprox/amqpprox.m.cpp | 35 ++-- libamqpprox/CMakeLists.txt | 4 +- libamqpprox/amqpprox_connectionselector.cpp | 106 +++++++++++- libamqpprox/amqpprox_connectionselector.h | 57 +++++-- .../amqpprox_connectionselectorinterface.cpp | 16 ++ .../amqpprox_connectionselectorinterface.h | 51 ++++++ libamqpprox/amqpprox_mapcontrolcommand.cpp | 6 +- libamqpprox/amqpprox_mapcontrolcommand.h | 9 +- .../amqpprox_mappingconnectionselector.cpp | 118 ------------- .../amqpprox_mappingconnectionselector.h | 82 --------- libamqpprox/amqpprox_session.cpp | 32 +++- libamqpprox/amqpprox_session.h | 49 +++--- libamqpprox/amqpprox_sessionstate.h | 7 + tests/CMakeLists.txt | 2 +- ....cpp => amqpprox_connectionselector.t.cpp} | 111 +++++++------ tests/amqpprox_session.t.cpp | 155 +++++++++++++----- 16 files changed, 482 insertions(+), 358 deletions(-) create mode 100644 libamqpprox/amqpprox_connectionselectorinterface.cpp create mode 100644 libamqpprox/amqpprox_connectionselectorinterface.h delete mode 100644 libamqpprox/amqpprox_mappingconnectionselector.cpp delete mode 100644 libamqpprox/amqpprox_mappingconnectionselector.h rename tests/{amqpprox_mappingconnectionselector.t.cpp => amqpprox_connectionselector.t.cpp} (50%) diff --git a/amqpprox/amqpprox.m.cpp b/amqpprox/amqpprox.m.cpp index 48de1f2..45eec59 100644 --- a/amqpprox/amqpprox.m.cpp +++ b/amqpprox/amqpprox.m.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -24,7 +25,6 @@ #include #include #include -#include #include #include #include @@ -168,16 +168,16 @@ int main(int argc, char *argv[]) // Buffer sizes in range, skipping some of the larger powers of 2 once we // get around page sizes. BufferPool bufferPool({32, - 64, - 128, - 256, - 512, - 1024, - 4096, - 16384, - 32768, - 65536, - Frame::getMaxFrameSize()}); + 64, + 128, + 256, + 512, + 1024, + 4096, + 16384, + 32768, + 65536, + Frame::getMaxFrameSize()}); CpuMonitor monitor; Datacenter datacenter; EventSource eventSource; @@ -187,14 +187,14 @@ int main(int argc, char *argv[]) StatCollector statCollector; statCollector.setCpuMonitor(&monitor); statCollector.setBufferPool(&bufferPool); - VhostState vhostState; - PartitionPolicyStore partitionPolicyStore; - BackendSelectorStore backendSelectorStore; - MappingConnectionSelector mappingSelector( + VhostState vhostState; + PartitionPolicyStore partitionPolicyStore; + BackendSelectorStore backendSelectorStore; + ConnectionSelector connectionSelector( &farmStore, &backendStore, &resourceMapper); SessionCleanup cleaner(&statCollector, &eventSource); - Server server(&mappingSelector, &eventSource, &bufferPool); + Server server(&connectionSelector, &eventSource, &bufferPool); Control control(&server, &eventSource, controlSocket); // Set up the backend selector store @@ -228,7 +228,8 @@ int main(int argc, char *argv[]) &backendSelectorStore, &partitionPolicyStore)), CommandPtr(new BackendControlCommand(&backendStore)), - CommandPtr(new MapControlCommand(&resourceMapper, &mappingSelector)), + CommandPtr( + new MapControlCommand(&resourceMapper, &connectionSelector)), CommandPtr(new VhostControlCommand(&vhostState)), CommandPtr(new ListenControlCommand), CommandPtr(new LoggingControlCommand), diff --git a/libamqpprox/CMakeLists.txt b/libamqpprox/CMakeLists.txt index f2d2d88..db0992b 100644 --- a/libamqpprox/CMakeLists.txt +++ b/libamqpprox/CMakeLists.txt @@ -13,7 +13,7 @@ add_library(libamqpprox STATIC amqpprox_bufferpool.cpp amqpprox_buffersource.cpp amqpprox_connectionmanager.cpp - amqpprox_connectionselector.cpp + amqpprox_connectionselectorinterface.cpp amqpprox_connectionscontrolcommand.cpp amqpprox_connectionstats.cpp amqpprox_connector.cpp @@ -44,7 +44,7 @@ add_library(libamqpprox STATIC amqpprox_loggingcontrolcommand.cpp amqpprox_mapcontrolcommand.cpp amqpprox_maphostnamecontrolcommand.cpp - amqpprox_mappingconnectionselector.cpp + amqpprox_connectionselector.cpp amqpprox_maybesecuresocketadaptor.cpp amqpprox_method.cpp amqpprox_packetprocessor.cpp diff --git a/libamqpprox/amqpprox_connectionselector.cpp b/libamqpprox/amqpprox_connectionselector.cpp index 7968e1d..c333521 100644 --- a/libamqpprox/amqpprox_connectionselector.cpp +++ b/libamqpprox/amqpprox_connectionselector.cpp @@ -1,5 +1,5 @@ /* -** Copyright 2020 Bloomberg Finance L.P. +** Copyright 2022 Bloomberg Finance L.P. ** ** Licensed under the Apache License, Version 2.0 (the "License"); ** you may not use this file except in compliance with the License. @@ -14,3 +14,107 @@ ** limitations under the License. */ #include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace Bloomberg { +namespace amqpprox { + +ConnectionSelector::ConnectionSelector(FarmStore *farmStore, + BackendStore *backendStore, + ResourceMapper *resourceMapper) +: d_farmStore_p(farmStore) +, d_backendStore_p(backendStore) +, d_resourceMapper_p(resourceMapper) +, d_defaultFarmName("") +, d_mutex() +{ +} + +ConnectionSelector::~ConnectionSelector() +{ +} + +SessionState::ConnectionStatus ConnectionSelector::acquireConnection( + std::shared_ptr *connectionOut, + const SessionState &sessionState) +{ + std::shared_ptr connectionManager; + + bool isFarm = false; + std::string resourceName; + + if (!d_resourceMapper_p->getResourceMap( + &isFarm, &resourceName, sessionState)) { + std::lock_guard lg(d_mutex); + if (d_defaultFarmName.empty()) { + LOG_INFO << "No farm available for: " << sessionState; + return SessionState::ConnectionStatus::NO_FARM; + } + else { + isFarm = true; + resourceName = d_defaultFarmName; + } + } + + if (isFarm) { + // Return the BackendSet and BackendSelector generated by the Farm + try { + const auto &farm = d_farmStore_p->getFarmByName(resourceName); + + connectionManager.reset(new ConnectionManager( + farm.backendSet(), farm.backendSelector())); + } + catch (std::runtime_error &e) { + LOG_WARN << "Unable to acquire backend from Farm: " << resourceName + << " for: " << sessionState; + return SessionState::ConnectionStatus::ERROR_FARM; + } + + LOG_INFO << "Selected farm: " << resourceName << " For " + << sessionState; + } + else { + // Construct a BackendSet directly and pass a nullptr BackendSelector + auto backend = d_backendStore_p->lookup(resourceName); + if (!backend) { + return SessionState::ConnectionStatus::NO_BACKEND; + } + + std::vector partitions = {{backend}}; + + connectionManager.reset(new ConnectionManager( + std::make_shared(std::move(partitions)), nullptr)); + + LOG_INFO << "Selected directly: " << *backend << " For " + << sessionState; + } + + connectionOut->swap(connectionManager); + + return SessionState::ConnectionStatus::SUCCESS; +} + +void ConnectionSelector::setDefaultFarm(const std::string &farmName) +{ + std::lock_guard lg(d_mutex); + d_defaultFarmName = farmName; +} + +void ConnectionSelector::unsetDefaultFarm() +{ + std::lock_guard lg(d_mutex); + d_defaultFarmName = ""; +} + +} +} diff --git a/libamqpprox/amqpprox_connectionselector.h b/libamqpprox/amqpprox_connectionselector.h index f4d8658..92773e4 100644 --- a/libamqpprox/amqpprox_connectionselector.h +++ b/libamqpprox/amqpprox_connectionselector.h @@ -1,5 +1,5 @@ /* -** Copyright 2020 Bloomberg Finance L.P. +** Copyright 2022 Bloomberg Finance L.P. ** ** Licensed under the Apache License, Version 2.0 (the "License"); ** you may not use this file except in compliance with the License. @@ -16,30 +16,65 @@ #ifndef BLOOMBERG_AMQPPROX_CONNECTIONSELECTOR #define BLOOMBERG_AMQPPROX_CONNECTIONSELECTOR +#include + #include +#include +#include namespace Bloomberg { namespace amqpprox { -class ConnectionManager; -class SessionState; +class FarmStore; +class BackendStore; +class ResourceMapper; /** - * \brief Represents a connection selector + * \brief Determines where to make the egress connection(proxy to broker), + * implements the ConnectionSelectorInterface */ -class ConnectionSelector { +class ConnectionSelector : public ConnectionSelectorInterface { + FarmStore *d_farmStore_p; + BackendStore *d_backendStore_p; + ResourceMapper *d_resourceMapper_p; + std::string d_defaultFarmName; + mutable std::mutex d_mutex; + public: - virtual ~ConnectionSelector() = default; + // CREATORS + /** + * \brief Construct a ConnectionSelector + * \param farmStore + * \param backendStore + * \param resourceMapper + */ + ConnectionSelector(FarmStore *farmStore, + BackendStore *backendStore, + ResourceMapper *resourceMapper); + + virtual ~ConnectionSelector(); + // MANIPULATORS /** * \brief Acquire a connection from the specified session `state` and set - * `connectionOut` to be a `ConnectionManager` instance tracking the - * connection attempt - * \return Zero on success, or a non-zero value otherwise + * `connectionOut` to be a `ConnectionManager` instance tracking connection + * attempt. + * \return connection status to represent whether the connection should go + * through */ - virtual int + virtual SessionState::ConnectionStatus acquireConnection(std::shared_ptr *connectionOut, - const SessionState &state) = 0; + const SessionState &state) override; + + /** + * \brief Set the default farm if a mapping is not found + */ + void setDefaultFarm(const std::string &farmName); + + /** + * \brief Unset any default farm if a mapping is not found + */ + void unsetDefaultFarm(); }; } diff --git a/libamqpprox/amqpprox_connectionselectorinterface.cpp b/libamqpprox/amqpprox_connectionselectorinterface.cpp new file mode 100644 index 0000000..cd08bc7 --- /dev/null +++ b/libamqpprox/amqpprox_connectionselectorinterface.cpp @@ -0,0 +1,16 @@ +/* +** Copyright 2022 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#include diff --git a/libamqpprox/amqpprox_connectionselectorinterface.h b/libamqpprox/amqpprox_connectionselectorinterface.h new file mode 100644 index 0000000..ae4ccce --- /dev/null +++ b/libamqpprox/amqpprox_connectionselectorinterface.h @@ -0,0 +1,51 @@ +/* +** Copyright 2022 Bloomberg Finance L.P. +** +** Licensed under the Apache License, Version 2.0 (the "License"); +** you may not use this file except in compliance with the License. +** You may obtain a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, software +** distributed under the License is distributed on an "AS IS" BASIS, +** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +** See the License for the specific language governing permissions and +** limitations under the License. +*/ +#ifndef BLOOMBERG_AMQPPROX_CONNECTIONSELECTORINTERFACE +#define BLOOMBERG_AMQPPROX_CONNECTIONSELECTORINTERFACE + +#include + +#include + +namespace Bloomberg { +namespace amqpprox { + +class ConnectionManager; +class SessionState; + +/** + * \brief Represents a connection selector interface + */ +class ConnectionSelectorInterface { + public: + virtual ~ConnectionSelectorInterface() = default; + + /** + * \brief Acquire a connection from the specified session `state` and set + * `connectionOut` to be a `ConnectionManager` instance tracking the + * connection attempt + * \return connection status to represent whether the connection should go + * through + */ + virtual SessionState::ConnectionStatus + acquireConnection(std::shared_ptr *connectionOut, + const SessionState &state) = 0; +}; + +} +} + +#endif diff --git a/libamqpprox/amqpprox_mapcontrolcommand.cpp b/libamqpprox/amqpprox_mapcontrolcommand.cpp index 19f0968..feae07b 100644 --- a/libamqpprox/amqpprox_mapcontrolcommand.cpp +++ b/libamqpprox/amqpprox_mapcontrolcommand.cpp @@ -15,7 +15,7 @@ */ #include -#include +#include #include #include @@ -26,8 +26,8 @@ namespace Bloomberg { namespace amqpprox { -MapControlCommand::MapControlCommand(ResourceMapper *mapper, - MappingConnectionSelector *selector) +MapControlCommand::MapControlCommand(ResourceMapper *mapper, + ConnectionSelector *selector) : d_mapper_p(mapper) , d_selector_p(selector) { diff --git a/libamqpprox/amqpprox_mapcontrolcommand.h b/libamqpprox/amqpprox_mapcontrolcommand.h index 0701315..f2f774b 100644 --- a/libamqpprox/amqpprox_mapcontrolcommand.h +++ b/libamqpprox/amqpprox_mapcontrolcommand.h @@ -22,19 +22,18 @@ namespace Bloomberg { namespace amqpprox { class ResourceMapper; -class MappingConnectionSelector; +class ConnectionSelector; /** * \brief Control command to change mappings of resources to servers, * implements the ControlCommand interface */ class MapControlCommand : public ControlCommand { - ResourceMapper *d_mapper_p; // HELD NOT OWNED - MappingConnectionSelector *d_selector_p; // HELD NOT OWNED + ResourceMapper *d_mapper_p; // HELD NOT OWNED + ConnectionSelector *d_selector_p; // HELD NOT OWNED public: - MapControlCommand(ResourceMapper *mapper, - MappingConnectionSelector *selector); + MapControlCommand(ResourceMapper *mapper, ConnectionSelector *selector); /** * \return the command verb this handles diff --git a/libamqpprox/amqpprox_mappingconnectionselector.cpp b/libamqpprox/amqpprox_mappingconnectionselector.cpp deleted file mode 100644 index bf57bf4..0000000 --- a/libamqpprox/amqpprox_mappingconnectionselector.cpp +++ /dev/null @@ -1,118 +0,0 @@ -/* -** Copyright 2020 Bloomberg Finance L.P. -** -** Licensed under the Apache License, Version 2.0 (the "License"); -** you may not use this file except in compliance with the License. -** You may obtain a copy of the License at -** -** http://www.apache.org/licenses/LICENSE-2.0 -** -** Unless required by applicable law or agreed to in writing, software -** distributed under the License is distributed on an "AS IS" BASIS, -** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -** See the License for the specific language governing permissions and -** limitations under the License. -*/ -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace Bloomberg { -namespace amqpprox { - -MappingConnectionSelector::MappingConnectionSelector( - FarmStore *farmStore, - BackendStore *backendStore, - ResourceMapper *resourceMapper) -: d_farmStore_p(farmStore) -, d_backendStore_p(backendStore) -, d_resourceMapper_p(resourceMapper) -, d_defaultFarmName("") -, d_mutex() -{ -} - -MappingConnectionSelector::~MappingConnectionSelector() -{ -} - -int MappingConnectionSelector::acquireConnection( - std::shared_ptr *connectionOut, - const SessionState &state) -{ - std::shared_ptr connectionManager; - - bool isFarm = false; - std::string resourceName; - - if (!d_resourceMapper_p->getResourceMap(&isFarm, &resourceName, state)) { - std::lock_guard lg(d_mutex); - if (d_defaultFarmName.empty()) { - LOG_INFO << "No farm available for: " << state; - return 1; - } - else { - isFarm = true; - resourceName = d_defaultFarmName; - } - } - - if (isFarm) { - // Return the BackendSet and BackendSelector generated by the Farm - try { - const auto &farm = d_farmStore_p->getFarmByName(resourceName); - - connectionManager.reset(new ConnectionManager( - farm.backendSet(), farm.backendSelector())); - } - catch (std::runtime_error &e) { - LOG_WARN << "Unable to acquire backend from Farm: " << resourceName - << " for: " << state; - return 4; - } - - LOG_INFO << "Selected farm: " << resourceName << " For " << state; - } - else { - // Construct a BackendSet directly and pass a nullptr BackendSelector - auto backend = d_backendStore_p->lookup(resourceName); - if (!backend) { - return 3; - } - - std::vector partitions = {{backend}}; - - connectionManager.reset(new ConnectionManager( - std::make_shared(std::move(partitions)), nullptr)); - - LOG_INFO << "Selected directly: " << *backend << " For " << state; - } - - connectionOut->swap(connectionManager); - - return 0; -} - -void MappingConnectionSelector::setDefaultFarm(const std::string &farmName) -{ - std::lock_guard lg(d_mutex); - d_defaultFarmName = farmName; -} - -void MappingConnectionSelector::unsetDefaultFarm() -{ - std::lock_guard lg(d_mutex); - d_defaultFarmName = ""; -} - -} -} diff --git a/libamqpprox/amqpprox_mappingconnectionselector.h b/libamqpprox/amqpprox_mappingconnectionselector.h deleted file mode 100644 index 34e9076..0000000 --- a/libamqpprox/amqpprox_mappingconnectionselector.h +++ /dev/null @@ -1,82 +0,0 @@ -/* -** Copyright 2020 Bloomberg Finance L.P. -** -** Licensed under the Apache License, Version 2.0 (the "License"); -** you may not use this file except in compliance with the License. -** You may obtain a copy of the License at -** -** http://www.apache.org/licenses/LICENSE-2.0 -** -** Unless required by applicable law or agreed to in writing, software -** distributed under the License is distributed on an "AS IS" BASIS, -** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -** See the License for the specific language governing permissions and -** limitations under the License. -*/ -#ifndef BLOOMBERG_AMQPPROX_MAPPINGCONNECTIONSELECTOR -#define BLOOMBERG_AMQPPROX_MAPPINGCONNECTIONSELECTOR - -#include - -#include -#include -#include - -namespace Bloomberg { -namespace amqpprox { - -class FarmStore; -class BackendStore; -class ResourceMapper; - -/** - * \brief Determines where to make the egress connection(proxy to broker), - * implements the ConnectionSelector interface - */ -class MappingConnectionSelector : public ConnectionSelector { - FarmStore *d_farmStore_p; - BackendStore *d_backendStore_p; - ResourceMapper *d_resourceMapper_p; - std::string d_defaultFarmName; - mutable std::mutex d_mutex; - - public: - // CREATORS - /** - * \brief Construct a MappingConnectionSelector - * \param farmStore - * \param backendStore - * \param resourceMapper - */ - MappingConnectionSelector(FarmStore *farmStore, - BackendStore *backendStore, - ResourceMapper *resourceMapper); - - virtual ~MappingConnectionSelector(); - - // MANIPULATORS - /** - * \brief Acquire a connection from the specified session `state` and set - * `connectionOut` to be a `ConnectionManager` instance tracking connection - * attempt. - * \return zero on success, or a non-zero value otherwise - */ - virtual int - acquireConnection(std::shared_ptr *connectionOut, - const SessionState &state) override; - - /** - * \brief Set the default farm if a mapping is not found - */ - void setDefaultFarm(const std::string &farmName); - - /** - * \brief Unset any default farm if a mapping is not found - */ - void unsetDefaultFarm(); -}; - -} -} - -#endif diff --git a/libamqpprox/amqpprox_session.cpp b/libamqpprox/amqpprox_session.cpp index aadff2f..debda8b 100644 --- a/libamqpprox/amqpprox_session.cpp +++ b/libamqpprox/amqpprox_session.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -113,7 +113,7 @@ void logException(const std::string_view error, Session::Session(boost::asio::io_service &ioservice, MaybeSecureSocketAdaptor &&serverSocket, MaybeSecureSocketAdaptor &&clientSocket, - ConnectionSelector *connectionSelector, + ConnectionSelectorInterface *connectionSelector, EventSource *eventSource, BufferPool *bufferPool, DNSResolver *dnsResolver, @@ -446,11 +446,31 @@ void Session::establishConnection() } std::shared_ptr connectionManager; - int rc = d_connectionSelector_p->acquireConnection(&connectionManager, - d_sessionState); - if (0 != rc) { + SessionState::ConnectionStatus rc = + d_connectionSelector_p->acquireConnection(&connectionManager, + d_sessionState); + if (rc != SessionState::ConnectionStatus::SUCCESS) { // Failure reason logged within acquireConnection - disconnect(true); + switch (rc) { + case SessionState::ConnectionStatus::NO_FARM: + case SessionState::ConnectionStatus::ERROR_FARM: + case SessionState::ConnectionStatus::NO_BACKEND: + d_connector.synthesizeCustomCloseError( + true, + Reply::Codes::resource_error, + "No known broker mapping for vhost " + + d_sessionState.getVirtualHost()); + sendSyntheticData(); + disconnect(true); + break; + + default: + LOG_INFO << "Failed to acquire connection for vhost " + << d_sessionState.getVirtualHost() + << ", rc: " << static_cast(rc); + disconnect(true); + } + return; } diff --git a/libamqpprox/amqpprox_session.h b/libamqpprox/amqpprox_session.h index 8dbf0eb..cc9a366 100644 --- a/libamqpprox/amqpprox_session.h +++ b/libamqpprox/amqpprox_session.h @@ -32,6 +32,7 @@ #include #include +#include #include #include @@ -39,7 +40,7 @@ namespace Bloomberg { namespace amqpprox { class ConnectionManager; -class ConnectionSelector; +class ConnectionSelectorInterface; class EventSource; class DNSResolver; @@ -56,28 +57,28 @@ class Session : public std::enable_shared_from_this { using TimePoint = std::chrono::time_point; - boost::asio::io_service &d_ioService; - MaybeSecureSocketAdaptor d_serverSocket; - MaybeSecureSocketAdaptor d_clientSocket; - BufferHandle d_serverDataHandle; - BufferHandle d_serverWriteDataHandle; - BufferHandle d_clientDataHandle; - BufferHandle d_clientWriteDataHandle; - std::size_t d_serverWaterMark; - std::size_t d_clientWaterMark; - SessionState d_sessionState; - Connector d_connector; - ConnectionSelector *d_connectionSelector_p; // HELD NOT OWNED - EventSource *d_eventSource_p; // HELD NOT OWNED - BufferPool *d_bufferPool_p; // HELD NOT OWNED - DNSResolver *d_dnsResolver_p; - TimePoint d_ingressWaitingSince; - TimePoint d_egressWaitingSince; - uint32_t d_egressRetryCounter; - bool d_ingressCurrentlyReading; - TimePoint d_ingressStartedAt; - bool d_egressCurrentlyReading; - TimePoint d_egressStartedAt; + boost::asio::io_service &d_ioService; + MaybeSecureSocketAdaptor d_serverSocket; + MaybeSecureSocketAdaptor d_clientSocket; + BufferHandle d_serverDataHandle; + BufferHandle d_serverWriteDataHandle; + BufferHandle d_clientDataHandle; + BufferHandle d_clientWriteDataHandle; + std::size_t d_serverWaterMark; + std::size_t d_clientWaterMark; + SessionState d_sessionState; + Connector d_connector; + ConnectionSelectorInterface *d_connectionSelector_p; // HELD NOT OWNED + EventSource *d_eventSource_p; // HELD NOT OWNED + BufferPool *d_bufferPool_p; // HELD NOT OWNED + DNSResolver *d_dnsResolver_p; + TimePoint d_ingressWaitingSince; + TimePoint d_egressWaitingSince; + uint32_t d_egressRetryCounter; + bool d_ingressCurrentlyReading; + TimePoint d_ingressStartedAt; + bool d_egressCurrentlyReading; + TimePoint d_egressStartedAt; std::vector d_resolvedEndpoints; uint32_t d_resolvedEndpointsIndex; std::shared_ptr d_authIntercept; @@ -87,7 +88,7 @@ class Session : public std::enable_shared_from_this { Session(boost::asio::io_service &ioservice, MaybeSecureSocketAdaptor &&serverSocket, MaybeSecureSocketAdaptor &&clientSocket, - ConnectionSelector *connectionSelector, + ConnectionSelectorInterface *connectionSelector, EventSource *eventSource, BufferPool *bufferPool, DNSResolver *dnsResolver, diff --git a/libamqpprox/amqpprox_sessionstate.h b/libamqpprox/amqpprox_sessionstate.h index d25a847..83b8f75 100644 --- a/libamqpprox/amqpprox_sessionstate.h +++ b/libamqpprox/amqpprox_sessionstate.h @@ -42,6 +42,13 @@ class SessionState { DISCONNECTED_PROXY }; + enum class ConnectionStatus { + SUCCESS = 0, + NO_FARM, + ERROR_FARM, + NO_BACKEND + }; + private: static uint64_t s_nextId; boost::asio::ip::tcp::endpoint d_ingressLocalEndpoint; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6bf4f9c..09482a1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -24,7 +24,7 @@ add_executable(amqpprox_tests amqpprox_farmstore.t.cpp amqpprox_frame.t.cpp amqpprox_flowtype.t.cpp - amqpprox_mappingconnectionselector.t.cpp + amqpprox_connectionselector.t.cpp amqpprox_methods_start.t.cpp amqpprox_packetprocessor.t.cpp amqpprox_partitionpolicystore.t.cpp diff --git a/tests/amqpprox_mappingconnectionselector.t.cpp b/tests/amqpprox_connectionselector.t.cpp similarity index 50% rename from tests/amqpprox_mappingconnectionselector.t.cpp rename to tests/amqpprox_connectionselector.t.cpp index cab931b..939abae 100644 --- a/tests/amqpprox_mappingconnectionselector.t.cpp +++ b/tests/amqpprox_connectionselector.t.cpp @@ -1,5 +1,5 @@ /* -** Copyright 2021 Bloomberg Finance L.P. +** Copyright 2022 Bloomberg Finance L.P. ** ** Licensed under the Apache License, Version 2.0 (the "License"); ** you may not use this file except in compliance with the License. @@ -14,13 +14,13 @@ ** limitations under the License. */ -#include +#include #include -#include -#include +#include #include -#include #include +#include +#include #include #include @@ -29,83 +29,102 @@ using namespace Bloomberg; using namespace amqpprox; using namespace testing; -TEST(MappingConnectionSelector, Breathing) { - FarmStore farmStore; - BackendStore backendStore; - ResourceMapper resourceMapper; - MappingConnectionSelector connectionSelector(&farmStore, &backendStore, &resourceMapper); +TEST(ConnectionSelector, Breathing) +{ + FarmStore farmStore; + BackendStore backendStore; + ResourceMapper resourceMapper; + ConnectionSelector connectionSelector( + &farmStore, &backendStore, &resourceMapper); } -TEST(MappingConnectionSelector, Find_Nothing) { - FarmStore farmStore; - BackendStore backendStore; - ResourceMapper resourceMapper; - MappingConnectionSelector connectionSelector(&farmStore, &backendStore, &resourceMapper); - SessionState state; +TEST(ConnectionSelector, Find_Nothing) +{ + FarmStore farmStore; + BackendStore backendStore; + ResourceMapper resourceMapper; + ConnectionSelector connectionSelector( + &farmStore, &backendStore, &resourceMapper); + SessionState state; std::shared_ptr out; - EXPECT_EQ(connectionSelector.acquireConnection(&out, state), 1); + EXPECT_EQ(connectionSelector.acquireConnection(&out, state), + SessionState::ConnectionStatus::NO_FARM); } -TEST(MappingConnectionSelector, Find_Nothing_Defaulted) { - FarmStore farmStore; - BackendStore backendStore; - ResourceMapper resourceMapper; - MappingConnectionSelector connectionSelector(&farmStore, &backendStore, &resourceMapper); +TEST(ConnectionSelector, Find_Nothing_Defaulted) +{ + FarmStore farmStore; + BackendStore backendStore; + ResourceMapper resourceMapper; + ConnectionSelector connectionSelector( + &farmStore, &backendStore, &resourceMapper); connectionSelector.setDefaultFarm("DEFAULT"); - SessionState state; + SessionState state; std::shared_ptr out; - EXPECT_EQ(connectionSelector.acquireConnection(&out, state), 4); + EXPECT_EQ(connectionSelector.acquireConnection(&out, state), + SessionState::ConnectionStatus::ERROR_FARM); // Check after unsetting the default we go back to the previous rc connectionSelector.unsetDefaultFarm(); - EXPECT_EQ(connectionSelector.acquireConnection(&out, state), 1); + EXPECT_EQ(connectionSelector.acquireConnection(&out, state), + SessionState::ConnectionStatus::NO_FARM); } -TEST(MappingConnectionSelector, Successful_Farm_Find) { - FarmStore farmStore; - BackendStore backendStore; - ResourceMapper resourceMapper; - RobinBackendSelector backendSelector; +TEST(ConnectionSelector, Successful_Farm_Find) +{ + FarmStore farmStore; + BackendStore backendStore; + ResourceMapper resourceMapper; + RobinBackendSelector backendSelector; std::vector members; - farmStore.addFarm(std::make_unique("DEFAULT", members, &backendStore, &backendSelector)); - MappingConnectionSelector connectionSelector(&farmStore, &backendStore, &resourceMapper); + farmStore.addFarm(std::make_unique( + "DEFAULT", members, &backendStore, &backendSelector)); + ConnectionSelector connectionSelector( + &farmStore, &backendStore, &resourceMapper); connectionSelector.setDefaultFarm("DEFAULT"); - SessionState state; + SessionState state; std::shared_ptr out; - EXPECT_EQ(connectionSelector.acquireConnection(&out, state), 0); + EXPECT_EQ(connectionSelector.acquireConnection(&out, state), + SessionState::ConnectionStatus::SUCCESS); } -TEST(MappingConnectionSelector, Find_Nothing_Backend) { - FarmStore farmStore; - BackendStore backendStore; +TEST(ConnectionSelector, Find_Nothing_Backend) +{ + FarmStore farmStore; + BackendStore backendStore; ResourceMapper resourceMapper; - MappingConnectionSelector connectionSelector(&farmStore, &backendStore, &resourceMapper); + ConnectionSelector connectionSelector( + &farmStore, &backendStore, &resourceMapper); SessionState state; state.setVirtualHost("/"); resourceMapper.mapVhostToBackend("/", "non-existing"); std::shared_ptr out; - EXPECT_EQ(connectionSelector.acquireConnection(&out, state), 3); + EXPECT_EQ(connectionSelector.acquireConnection(&out, state), + SessionState::ConnectionStatus::NO_BACKEND); } -TEST(MappingConnectionSelector, Successful_Backend_Find) { - FarmStore farmStore; - BackendStore backendStore; - ResourceMapper resourceMapper; +TEST(ConnectionSelector, Successful_Backend_Find) +{ + FarmStore farmStore; + BackendStore backendStore; + ResourceMapper resourceMapper; RobinBackendSelector backendSelector; Backend backend1( "backend1", "dc1", "backend1.bloomberg.com", "127.0.0.1", 5672, true); backendStore.insert(backend1); - MappingConnectionSelector connectionSelector(&farmStore, &backendStore, &resourceMapper); + ConnectionSelector connectionSelector( + &farmStore, &backendStore, &resourceMapper); SessionState state; state.setVirtualHost("/"); resourceMapper.mapVhostToBackend("/", "backend1"); std::shared_ptr out; - EXPECT_EQ(connectionSelector.acquireConnection(&out, state), 0); -} + EXPECT_EQ(connectionSelector.acquireConnection(&out, state), + SessionState::ConnectionStatus::SUCCESS); +} \ No newline at end of file diff --git a/tests/amqpprox_session.t.cpp b/tests/amqpprox_session.t.cpp index 9789bd4..fe33422 100644 --- a/tests/amqpprox_session.t.cpp +++ b/tests/amqpprox_session.t.cpp @@ -42,7 +42,7 @@ #include #include #include -#include +#include #include #include #include @@ -72,6 +72,7 @@ #include #include +#include #include using namespace Bloomberg; @@ -87,12 +88,13 @@ using ConnectComplete = TestSocketState::ConnectComplete; const char LOCAL_HOSTNAME[] = "amqpprox-host"; -struct SelectorMock : public ConnectionSelector { +struct SelectorMock : public ConnectionSelectorInterface { virtual ~SelectorMock() {} - MOCK_METHOD2(acquireConnection, - int(std::shared_ptr *, - const SessionState &)); + MOCK_METHOD2( + acquireConnection, + SessionState::ConnectionStatus(std::shared_ptr *, + const SessionState &)); }; struct HostnameMapperMock : public HostnameMapper { @@ -182,9 +184,9 @@ class SessionTest : public ::testing::Test { int idx, const methods::StartOk &overriddenStartOk = methods::StartOk()); void testSetupClientOpen(int idx); - void - testSetupUnauthClientOpenWithShutdown(int idx, - bool authenticationFailureClose); + void testSetupClientOpenWithProxyClose( + int idx, + const std::shared_ptr &closeMethodPtr = nullptr); void testSetupClientOpenWithoutTune(int idx); void testSetupProxyConnect(int idx, TestSocketState::State *clientBase); void testSetupProxySendsProtocolHeader(int idx); @@ -282,30 +284,26 @@ void SessionTest::testSetupClientOpen(int idx) d_serverState.pushItem(idx, Data(encode(clientOpen()))); } -void SessionTest::testSetupUnauthClientOpenWithShutdown( - int idx, - bool authenticationFailureClose) +void SessionTest::testSetupClientOpenWithProxyClose( + int idx, + const std::shared_ptr &closeMethodPtr) { // Client ------TuneOk------> Proxy Broker // Client ------Open--------> Proxy Broker // Client <-----Close-------- Proxy Broker d_serverState.pushItem(idx, Data(encode(clientTuneOk()))); d_serverState.pushItem(idx, Data(encode(clientOpen()))); - methods::Close closeMethod = methods::Close(); - closeMethod.setReply(Reply::Codes::access_refused, - "Unauthorized test client"); - d_serverState.expect( - idx, - [this, closeMethod, authenticationFailureClose](const auto &items) { - if (authenticationFailureClose) { - auto data = filterVariant(items); - ASSERT_EQ(data.size(), 1); - EXPECT_EQ(data[0], Data(encode(closeMethod))); - } - EXPECT_THAT(items, - Contains(VariantWith(Call("async_shutdown")))); - EXPECT_THAT(items, Contains(VariantWith(Call("close")))); - }); + + d_serverState.expect(idx, [this, closeMethodPtr](const auto &items) { + if (closeMethodPtr) { + auto data = filterVariant(items); + ASSERT_EQ(data.size(), 1); + EXPECT_EQ(data[0], Data(encode(*closeMethodPtr))); + } + EXPECT_THAT(items, + Contains(VariantWith(Call("async_shutdown")))); + EXPECT_THAT(items, Contains(VariantWith(Call("close")))); + }); d_clientState.expect(idx, [this](const auto &items) { EXPECT_THAT(items, Contains(VariantWith(Call("async_shutdown")))); @@ -613,7 +611,8 @@ void SessionTest::runStandardConnectWithDisconnect( TEST_F(SessionTest, Connection_Then_Ping_Then_Disconnect) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); TestSocketState::State base, clientBase; testSetupHostnameMapperForServerClientBase(base, clientBase); @@ -719,7 +718,8 @@ TEST_F(SessionTest, BadClientHandshake) TEST_F(SessionTest, BadServerHandshake) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); TestSocketState::State base, clientBase; testSetupHostnameMapperForServerClientBase(base, clientBase); @@ -836,7 +836,8 @@ TEST_F(SessionTest, Connection_To_Proxy_Protocol) auto cm = std::make_shared( std::make_shared(partitions), &d_robinSelector); EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(cm), + Return(SessionState::ConnectionStatus::SUCCESS))); auto protocolHeader = std::vector( Constants::protocolHeader(), @@ -937,7 +938,8 @@ TEST_F(SessionTest, Connect_Multiple_Dns) std::make_shared(partitions), &d_robinSelector); EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) @@ -1078,7 +1080,8 @@ TEST_F(SessionTest, Failover_Dns_Failure) std::make_shared(partitions), &d_robinSelector); EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); EXPECT_CALL(*d_mapper, prime(_, _)).Times(AtLeast(1)); EXPECT_CALL(*d_mapper, mapToHostname(makeEndpoint("2.3.4.5", 2345))) @@ -1173,7 +1176,8 @@ TEST_F(SessionTest, Failover_Dns_Failure) TEST_F(SessionTest, Connection_Then_Ping_Then_Force_Disconnect) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); TestSocketState::State base, clientBase; testSetupHostnameMapperForServerClientBase(base, clientBase); @@ -1228,7 +1232,8 @@ TEST_F(SessionTest, Connection_Then_Ping_Then_Force_Disconnect) TEST_F(SessionTest, Connection_Then_Ping_Then_Backend_Disconnect) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); TestSocketState::State base, clientBase; testSetupHostnameMapperForServerClientBase(base, clientBase); @@ -1290,7 +1295,8 @@ TEST_F(SessionTest, Connection_Then_Ping_Then_Backend_Disconnect) TEST_F(SessionTest, Authorized_Client_Test) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); std::string modifiedMechanism = "TEST_MECHANISM"; std::string modifiedCredentials = "credentials"; @@ -1369,7 +1375,8 @@ TEST_F( Unauthorized_Client_Test_Without_Authentication_Failure_Close_Capability) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); authproto::AuthResponse authResponseData; authResponseData.set_result(authproto::AuthResponse::DENY); @@ -1395,7 +1402,7 @@ TEST_F( testSetupServerHandshake(1); testSetupClientSendsProtocolHeader(2); testSetupClientStartOk(3); - testSetupUnauthClientOpenWithShutdown(4, false); + testSetupClientOpenWithProxyClose(4); MaybeSecureSocketAdaptor clientSocket(d_ioService, d_client, false); MaybeSecureSocketAdaptor serverSocket(d_ioService, d_server, false); @@ -1422,7 +1429,8 @@ TEST_F(SessionTest, Unauthorized_Client_Test_With_Authentication_Failure_Close_Capability) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); authproto::AuthResponse authResponseData; authResponseData.set_result(authproto::AuthResponse::DENY); @@ -1456,7 +1464,12 @@ TEST_F(SessionTest, FieldValue('F', capabilitiesTable)); overriddenStartOk.setClientProperties(clientProperties); testSetupClientStartOk(3, overriddenStartOk); - testSetupUnauthClientOpenWithShutdown(4, true); + + std::shared_ptr closeMethodPtr = + std::make_shared(); + closeMethodPtr->setReply(Reply::Codes::access_refused, + "Unauthorized test client"); + testSetupClientOpenWithProxyClose(4, closeMethodPtr); MaybeSecureSocketAdaptor clientSocket(d_ioService, d_client, false); MaybeSecureSocketAdaptor serverSocket(d_ioService, d_server, false); @@ -1482,7 +1495,8 @@ TEST_F(SessionTest, TEST_F(SessionTest, Forward_Received_Close_Method_To_Client_During_Handshake) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); TestSocketState::State base, clientBase; testSetupHostnameMapperForServerClientBase(base, clientBase); @@ -1543,10 +1557,65 @@ TEST_F(SessionTest, Forward_Received_Close_Method_To_Client_During_Handshake) EXPECT_TRUE(session->finished()); } +TEST_F(SessionTest, Close_Connection_No_Broker_Mapping) +{ + EXPECT_CALL(d_selector, acquireConnection(_, _)) + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::NO_BACKEND))); + + TestSocketState::State base, clientBase; + testSetupHostnameMapperForServerClientBase(base, clientBase); + + MaybeSecureSocketAdaptor clientSocket(d_ioService, d_client, false); + MaybeSecureSocketAdaptor serverSocket(d_ioService, d_server, false); + auto session = std::make_shared(d_ioService, + std::move(serverSocket), + std::move(clientSocket), + &d_selector, + &d_eventSource, + &d_pool, + &d_dnsResolver, + d_mapper, + LOCAL_HOSTNAME, + d_authIntercept); + + // Initialise the state + d_serverState.pushItem(0, base); + driveTo(0); + + testSetupServerHandshake(1); + + // Read a protocol header from the client and reply with Start method + // Client ----AMQP Header---> Proxy Broker + // Client <-----Start-------- Proxy Broker + testSetupClientSendsProtocolHeader(2); + + // Client ------StartOk-----> Proxy Broker + // Client <-----Tune--------- Proxy Broker + testSetupClientStartOk(3); + + // Client ------TuneOk------> Proxy Broker + // Client ------Open--------> Proxy Broker + // Client <-----Close-------- Proxy Broker + std::shared_ptr closeMethodPtr = + std::make_shared(); + closeMethodPtr->setReply(Reply::Codes::resource_error, + "No known broker mapping for vhost "); + testSetupClientOpenWithProxyClose(4, closeMethodPtr); + + session->start(); + + // Run the tests through to completion + driveTo(5); + + EXPECT_TRUE(session->finished()); +} + TEST_F(SessionTest, Printing_Breathing_Test) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); TestSocketState::State base, clientBase; testSetupHostnameMapperForServerClientBase(base, clientBase); @@ -1597,7 +1666,8 @@ TEST_F(SessionTest, Printing_Breathing_Test) TEST_F(SessionTest, Pause_Disconnects_Previously_Established_Connection) { EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); TestSocketState::State base, clientBase; testSetupHostnameMapperForServerClientBase(base, clientBase); @@ -1711,7 +1781,8 @@ TEST_F(SessionTest, })); EXPECT_CALL(d_selector, acquireConnection(_, _)) - .WillOnce(DoAll(SetArgPointee<0>(d_cm), Return(0))); + .WillOnce(DoAll(SetArgPointee<0>(d_cm), + Return(SessionState::ConnectionStatus::SUCCESS))); // Run the tests through to completion driveTo(16);