Skip to content

Commit

Permalink
Start sending appropriate error message to the clients, if the broker…
Browse files Browse the repository at this point in the history
… mapping is not configured for particular vhost
  • Loading branch information
Chinmay1412 committed May 23, 2022
1 parent c557d12 commit 5412802
Show file tree
Hide file tree
Showing 16 changed files with 482 additions and 358 deletions.
35 changes: 18 additions & 17 deletions amqpprox/amqpprox.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <amqpprox_backendselectorstore.h>
#include <amqpprox_backendstore.h>
#include <amqpprox_bufferpool.h>
#include <amqpprox_connectionselector.h>
#include <amqpprox_control.h>
#include <amqpprox_cpumonitor.h>
#include <amqpprox_datacenter.h>
Expand All @@ -24,7 +25,6 @@
#include <amqpprox_frame.h>
#include <amqpprox_logging.h>
#include <amqpprox_loggingcontrolcommand.h>
#include <amqpprox_mappingconnectionselector.h>
#include <amqpprox_partitionpolicy.h>
#include <amqpprox_partitionpolicystore.h>
#include <amqpprox_resourcemapper.h>
Expand Down Expand Up @@ -168,16 +168,16 @@ int main(int argc, char *argv[])
// Buffer sizes in range, skipping some of the larger powers of 2 once we
// get around page sizes.
BufferPool bufferPool({32,
64,
128,
256,
512,
1024,
4096,
16384,
32768,
65536,
Frame::getMaxFrameSize()});
64,
128,
256,
512,
1024,
4096,
16384,
32768,
65536,
Frame::getMaxFrameSize()});
CpuMonitor monitor;
Datacenter datacenter;
EventSource eventSource;
Expand All @@ -187,14 +187,14 @@ int main(int argc, char *argv[])
StatCollector statCollector;
statCollector.setCpuMonitor(&monitor);
statCollector.setBufferPool(&bufferPool);
VhostState vhostState;
PartitionPolicyStore partitionPolicyStore;
BackendSelectorStore backendSelectorStore;
MappingConnectionSelector mappingSelector(
VhostState vhostState;
PartitionPolicyStore partitionPolicyStore;
BackendSelectorStore backendSelectorStore;
ConnectionSelector connectionSelector(
&farmStore, &backendStore, &resourceMapper);
SessionCleanup cleaner(&statCollector, &eventSource);

Server server(&mappingSelector, &eventSource, &bufferPool);
Server server(&connectionSelector, &eventSource, &bufferPool);
Control control(&server, &eventSource, controlSocket);

// Set up the backend selector store
Expand Down Expand Up @@ -228,7 +228,8 @@ int main(int argc, char *argv[])
&backendSelectorStore,
&partitionPolicyStore)),
CommandPtr(new BackendControlCommand(&backendStore)),
CommandPtr(new MapControlCommand(&resourceMapper, &mappingSelector)),
CommandPtr(
new MapControlCommand(&resourceMapper, &connectionSelector)),
CommandPtr(new VhostControlCommand(&vhostState)),
CommandPtr(new ListenControlCommand),
CommandPtr(new LoggingControlCommand),
Expand Down
4 changes: 2 additions & 2 deletions libamqpprox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ add_library(libamqpprox STATIC
amqpprox_bufferpool.cpp
amqpprox_buffersource.cpp
amqpprox_connectionmanager.cpp
amqpprox_connectionselector.cpp
amqpprox_connectionselectorinterface.cpp
amqpprox_connectionscontrolcommand.cpp
amqpprox_connectionstats.cpp
amqpprox_connector.cpp
Expand Down Expand Up @@ -44,7 +44,7 @@ add_library(libamqpprox STATIC
amqpprox_loggingcontrolcommand.cpp
amqpprox_mapcontrolcommand.cpp
amqpprox_maphostnamecontrolcommand.cpp
amqpprox_mappingconnectionselector.cpp
amqpprox_connectionselector.cpp
amqpprox_maybesecuresocketadaptor.cpp
amqpprox_method.cpp
amqpprox_packetprocessor.cpp
Expand Down
106 changes: 105 additions & 1 deletion libamqpprox/amqpprox_connectionselector.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
** Copyright 2020 Bloomberg Finance L.P.
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
Expand All @@ -14,3 +14,107 @@
** limitations under the License.
*/
#include <amqpprox_connectionselector.h>

#include <amqpprox_backend.h>
#include <amqpprox_backendset.h>
#include <amqpprox_backendstore.h>
#include <amqpprox_connectionmanager.h>
#include <amqpprox_farmstore.h>
#include <amqpprox_logging.h>
#include <amqpprox_resourcemapper.h>
#include <amqpprox_sessionstate.h>

#include <iostream>

namespace Bloomberg {
namespace amqpprox {

ConnectionSelector::ConnectionSelector(FarmStore *farmStore,
BackendStore *backendStore,
ResourceMapper *resourceMapper)
: d_farmStore_p(farmStore)
, d_backendStore_p(backendStore)
, d_resourceMapper_p(resourceMapper)
, d_defaultFarmName("")
, d_mutex()
{
}

ConnectionSelector::~ConnectionSelector()
{
}

SessionState::ConnectionStatus ConnectionSelector::acquireConnection(
std::shared_ptr<ConnectionManager> *connectionOut,
const SessionState &sessionState)
{
std::shared_ptr<ConnectionManager> connectionManager;

bool isFarm = false;
std::string resourceName;

if (!d_resourceMapper_p->getResourceMap(
&isFarm, &resourceName, sessionState)) {
std::lock_guard<std::mutex> lg(d_mutex);
if (d_defaultFarmName.empty()) {
LOG_INFO << "No farm available for: " << sessionState;
return SessionState::ConnectionStatus::NO_FARM;
}
else {
isFarm = true;
resourceName = d_defaultFarmName;
}
}

if (isFarm) {
// Return the BackendSet and BackendSelector generated by the Farm
try {
const auto &farm = d_farmStore_p->getFarmByName(resourceName);

connectionManager.reset(new ConnectionManager(
farm.backendSet(), farm.backendSelector()));
}
catch (std::runtime_error &e) {
LOG_WARN << "Unable to acquire backend from Farm: " << resourceName
<< " for: " << sessionState;
return SessionState::ConnectionStatus::ERROR_FARM;
}

LOG_INFO << "Selected farm: " << resourceName << " For "
<< sessionState;
}
else {
// Construct a BackendSet directly and pass a nullptr BackendSelector
auto backend = d_backendStore_p->lookup(resourceName);
if (!backend) {
return SessionState::ConnectionStatus::NO_BACKEND;
}

std::vector<BackendSet::Partition> partitions = {{backend}};

connectionManager.reset(new ConnectionManager(
std::make_shared<BackendSet>(std::move(partitions)), nullptr));

LOG_INFO << "Selected directly: " << *backend << " For "
<< sessionState;
}

connectionOut->swap(connectionManager);

return SessionState::ConnectionStatus::SUCCESS;
}

void ConnectionSelector::setDefaultFarm(const std::string &farmName)
{
std::lock_guard<std::mutex> lg(d_mutex);
d_defaultFarmName = farmName;
}

void ConnectionSelector::unsetDefaultFarm()
{
std::lock_guard<std::mutex> lg(d_mutex);
d_defaultFarmName = "";
}

}
}
57 changes: 46 additions & 11 deletions libamqpprox/amqpprox_connectionselector.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
** Copyright 2020 Bloomberg Finance L.P.
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
Expand All @@ -16,30 +16,65 @@
#ifndef BLOOMBERG_AMQPPROX_CONNECTIONSELECTOR
#define BLOOMBERG_AMQPPROX_CONNECTIONSELECTOR

#include <amqpprox_connectionselectorinterface.h>

#include <memory>
#include <mutex>
#include <string>

namespace Bloomberg {
namespace amqpprox {

class ConnectionManager;
class SessionState;
class FarmStore;
class BackendStore;
class ResourceMapper;

/**
* \brief Represents a connection selector
* \brief Determines where to make the egress connection(proxy to broker),
* implements the ConnectionSelectorInterface
*/
class ConnectionSelector {
class ConnectionSelector : public ConnectionSelectorInterface {
FarmStore *d_farmStore_p;
BackendStore *d_backendStore_p;
ResourceMapper *d_resourceMapper_p;
std::string d_defaultFarmName;
mutable std::mutex d_mutex;

public:
virtual ~ConnectionSelector() = default;
// CREATORS
/**
* \brief Construct a ConnectionSelector
* \param farmStore
* \param backendStore
* \param resourceMapper
*/
ConnectionSelector(FarmStore *farmStore,
BackendStore *backendStore,
ResourceMapper *resourceMapper);

virtual ~ConnectionSelector();

// MANIPULATORS
/**
* \brief Acquire a connection from the specified session `state` and set
* `connectionOut` to be a `ConnectionManager` instance tracking the
* connection attempt
* \return Zero on success, or a non-zero value otherwise
* `connectionOut` to be a `ConnectionManager` instance tracking connection
* attempt.
* \return connection status to represent whether the connection should go
* through
*/
virtual int
virtual SessionState::ConnectionStatus
acquireConnection(std::shared_ptr<ConnectionManager> *connectionOut,
const SessionState &state) = 0;
const SessionState &state) override;

/**
* \brief Set the default farm if a mapping is not found
*/
void setDefaultFarm(const std::string &farmName);

/**
* \brief Unset any default farm if a mapping is not found
*/
void unsetDefaultFarm();
};

}
Expand Down
16 changes: 16 additions & 0 deletions libamqpprox/amqpprox_connectionselectorinterface.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/
#include <amqpprox_connectionselectorinterface.h>
51 changes: 51 additions & 0 deletions libamqpprox/amqpprox_connectionselectorinterface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/
#ifndef BLOOMBERG_AMQPPROX_CONNECTIONSELECTORINTERFACE
#define BLOOMBERG_AMQPPROX_CONNECTIONSELECTORINTERFACE

#include <amqpprox_sessionstate.h>

#include <memory>

namespace Bloomberg {
namespace amqpprox {

class ConnectionManager;
class SessionState;

/**
* \brief Represents a connection selector interface
*/
class ConnectionSelectorInterface {
public:
virtual ~ConnectionSelectorInterface() = default;

/**
* \brief Acquire a connection from the specified session `state` and set
* `connectionOut` to be a `ConnectionManager` instance tracking the
* connection attempt
* \return connection status to represent whether the connection should go
* through
*/
virtual SessionState::ConnectionStatus
acquireConnection(std::shared_ptr<ConnectionManager> *connectionOut,
const SessionState &state) = 0;
};

}
}

#endif
6 changes: 3 additions & 3 deletions libamqpprox/amqpprox_mapcontrolcommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
#include <amqpprox_mapcontrolcommand.h>

#include <amqpprox_mappingconnectionselector.h>
#include <amqpprox_connectionselector.h>
#include <amqpprox_resourcemapper.h>

#include <boost/algorithm/string.hpp>
Expand All @@ -26,8 +26,8 @@
namespace Bloomberg {
namespace amqpprox {

MapControlCommand::MapControlCommand(ResourceMapper *mapper,
MappingConnectionSelector *selector)
MapControlCommand::MapControlCommand(ResourceMapper *mapper,
ConnectionSelector *selector)
: d_mapper_p(mapper)
, d_selector_p(selector)
{
Expand Down
Loading

0 comments on commit 5412802

Please sign in to comment.