Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start sending appropriate error message to the clients, if the broker mapping is not configured for particular vhost #79

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions amqpprox/amqpprox.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <amqpprox_backendselectorstore.h>
#include <amqpprox_backendstore.h>
#include <amqpprox_bufferpool.h>
#include <amqpprox_connectionselector.h>
#include <amqpprox_control.h>
#include <amqpprox_cpumonitor.h>
#include <amqpprox_datacenter.h>
Expand All @@ -24,7 +25,6 @@
#include <amqpprox_frame.h>
#include <amqpprox_logging.h>
#include <amqpprox_loggingcontrolcommand.h>
#include <amqpprox_mappingconnectionselector.h>
#include <amqpprox_partitionpolicy.h>
#include <amqpprox_partitionpolicystore.h>
#include <amqpprox_resourcemapper.h>
Expand Down Expand Up @@ -168,16 +168,16 @@ int main(int argc, char *argv[])
// Buffer sizes in range, skipping some of the larger powers of 2 once we
// get around page sizes.
BufferPool bufferPool({32,
64,
128,
256,
512,
1024,
4096,
16384,
32768,
65536,
Frame::getMaxFrameSize()});
64,
128,
256,
512,
1024,
4096,
16384,
32768,
65536,
Frame::getMaxFrameSize()});
CpuMonitor monitor;
Datacenter datacenter;
EventSource eventSource;
Expand All @@ -187,14 +187,14 @@ int main(int argc, char *argv[])
StatCollector statCollector;
statCollector.setCpuMonitor(&monitor);
statCollector.setBufferPool(&bufferPool);
VhostState vhostState;
PartitionPolicyStore partitionPolicyStore;
BackendSelectorStore backendSelectorStore;
MappingConnectionSelector mappingSelector(
VhostState vhostState;
PartitionPolicyStore partitionPolicyStore;
BackendSelectorStore backendSelectorStore;
ConnectionSelector connectionSelector(
&farmStore, &backendStore, &resourceMapper);
SessionCleanup cleaner(&statCollector, &eventSource);

Server server(&mappingSelector, &eventSource, &bufferPool);
Server server(&connectionSelector, &eventSource, &bufferPool);
Control control(&server, &eventSource, controlSocket);

// Set up the backend selector store
Expand Down Expand Up @@ -228,7 +228,8 @@ int main(int argc, char *argv[])
&backendSelectorStore,
&partitionPolicyStore)),
CommandPtr(new BackendControlCommand(&backendStore)),
CommandPtr(new MapControlCommand(&resourceMapper, &mappingSelector)),
CommandPtr(
new MapControlCommand(&resourceMapper, &connectionSelector)),
CommandPtr(new VhostControlCommand(&vhostState)),
CommandPtr(new ListenControlCommand),
CommandPtr(new LoggingControlCommand),
Expand Down
4 changes: 2 additions & 2 deletions libamqpprox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ add_library(libamqpprox STATIC
amqpprox_bufferpool.cpp
amqpprox_buffersource.cpp
amqpprox_connectionmanager.cpp
amqpprox_connectionselector.cpp
amqpprox_connectionselectorinterface.cpp
amqpprox_connectionscontrolcommand.cpp
amqpprox_connectionstats.cpp
amqpprox_connector.cpp
Expand Down Expand Up @@ -44,7 +44,7 @@ add_library(libamqpprox STATIC
amqpprox_loggingcontrolcommand.cpp
amqpprox_mapcontrolcommand.cpp
amqpprox_maphostnamecontrolcommand.cpp
amqpprox_mappingconnectionselector.cpp
amqpprox_connectionselector.cpp
amqpprox_maybesecuresocketadaptor.cpp
amqpprox_method.cpp
amqpprox_packetprocessor.cpp
Expand Down
106 changes: 105 additions & 1 deletion libamqpprox/amqpprox_connectionselector.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
** Copyright 2020 Bloomberg Finance L.P.
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
Expand All @@ -14,3 +14,107 @@
** limitations under the License.
*/
#include <amqpprox_connectionselector.h>

#include <amqpprox_backend.h>
#include <amqpprox_backendset.h>
#include <amqpprox_backendstore.h>
#include <amqpprox_connectionmanager.h>
#include <amqpprox_farmstore.h>
#include <amqpprox_logging.h>
#include <amqpprox_resourcemapper.h>
#include <amqpprox_sessionstate.h>

#include <iostream>

namespace Bloomberg {
namespace amqpprox {

ConnectionSelector::ConnectionSelector(FarmStore *farmStore,
BackendStore *backendStore,
ResourceMapper *resourceMapper)
: d_farmStore_p(farmStore)
, d_backendStore_p(backendStore)
, d_resourceMapper_p(resourceMapper)
, d_defaultFarmName("")
, d_mutex()
{
}

ConnectionSelector::~ConnectionSelector()
{
}

SessionState::ConnectionStatus ConnectionSelector::acquireConnection(
std::shared_ptr<ConnectionManager> *connectionOut,
const SessionState &sessionState)
{
std::shared_ptr<ConnectionManager> connectionManager;

bool isFarm = false;
std::string resourceName;

if (!d_resourceMapper_p->getResourceMap(
&isFarm, &resourceName, sessionState)) {
std::lock_guard<std::mutex> lg(d_mutex);
if (d_defaultFarmName.empty()) {
LOG_INFO << "No farm available for: " << sessionState;
return SessionState::ConnectionStatus::NO_FARM;
}
else {
isFarm = true;
resourceName = d_defaultFarmName;
}
}

if (isFarm) {
// Return the BackendSet and BackendSelector generated by the Farm
try {
const auto &farm = d_farmStore_p->getFarmByName(resourceName);

connectionManager.reset(new ConnectionManager(
farm.backendSet(), farm.backendSelector()));
}
catch (std::runtime_error &e) {
LOG_WARN << "Unable to acquire backend from Farm: " << resourceName
<< " for: " << sessionState;
return SessionState::ConnectionStatus::ERROR_FARM;
}

LOG_INFO << "Selected farm: " << resourceName << " For "
<< sessionState;
}
else {
// Construct a BackendSet directly and pass a nullptr BackendSelector
auto backend = d_backendStore_p->lookup(resourceName);
if (!backend) {
return SessionState::ConnectionStatus::NO_BACKEND;
}

std::vector<BackendSet::Partition> partitions = {{backend}};

connectionManager.reset(new ConnectionManager(
std::make_shared<BackendSet>(std::move(partitions)), nullptr));

LOG_INFO << "Selected directly: " << *backend << " For "
<< sessionState;
}

connectionOut->swap(connectionManager);

return SessionState::ConnectionStatus::SUCCESS;
}

void ConnectionSelector::setDefaultFarm(const std::string &farmName)
{
std::lock_guard<std::mutex> lg(d_mutex);
d_defaultFarmName = farmName;
}

void ConnectionSelector::unsetDefaultFarm()
{
std::lock_guard<std::mutex> lg(d_mutex);
d_defaultFarmName = "";
}

}
}
57 changes: 46 additions & 11 deletions libamqpprox/amqpprox_connectionselector.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
** Copyright 2020 Bloomberg Finance L.P.
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
Expand All @@ -16,30 +16,65 @@
#ifndef BLOOMBERG_AMQPPROX_CONNECTIONSELECTOR
#define BLOOMBERG_AMQPPROX_CONNECTIONSELECTOR

#include <amqpprox_connectionselectorinterface.h>

#include <memory>
#include <mutex>
#include <string>

namespace Bloomberg {
namespace amqpprox {

class ConnectionManager;
class SessionState;
class FarmStore;
class BackendStore;
class ResourceMapper;

/**
* \brief Represents a connection selector
* \brief Determines where to make the egress connection(proxy to broker),
* implements the ConnectionSelectorInterface
*/
class ConnectionSelector {
class ConnectionSelector : public ConnectionSelectorInterface {
FarmStore *d_farmStore_p;
BackendStore *d_backendStore_p;
ResourceMapper *d_resourceMapper_p;
std::string d_defaultFarmName;
mutable std::mutex d_mutex;

public:
virtual ~ConnectionSelector() = default;
// CREATORS
/**
* \brief Construct a ConnectionSelector
* \param farmStore
* \param backendStore
* \param resourceMapper
*/
ConnectionSelector(FarmStore *farmStore,
BackendStore *backendStore,
ResourceMapper *resourceMapper);

virtual ~ConnectionSelector();

// MANIPULATORS
/**
* \brief Acquire a connection from the specified session `state` and set
* `connectionOut` to be a `ConnectionManager` instance tracking the
* connection attempt
* \return Zero on success, or a non-zero value otherwise
* `connectionOut` to be a `ConnectionManager` instance tracking connection
* attempt.
* \return connection status to represent whether the connection should go
* through
*/
virtual int
virtual SessionState::ConnectionStatus
acquireConnection(std::shared_ptr<ConnectionManager> *connectionOut,
const SessionState &state) = 0;
const SessionState &state) override;

/**
* \brief Set the default farm if a mapping is not found
*/
void setDefaultFarm(const std::string &farmName);

/**
* \brief Unset any default farm if a mapping is not found
*/
void unsetDefaultFarm();
};

}
Expand Down
16 changes: 16 additions & 0 deletions libamqpprox/amqpprox_connectionselectorinterface.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/
#include <amqpprox_connectionselectorinterface.h>
51 changes: 51 additions & 0 deletions libamqpprox/amqpprox_connectionselectorinterface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
** Copyright 2022 Bloomberg Finance L.P.
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
*/
#ifndef BLOOMBERG_AMQPPROX_CONNECTIONSELECTORINTERFACE
#define BLOOMBERG_AMQPPROX_CONNECTIONSELECTORINTERFACE

#include <amqpprox_sessionstate.h>

#include <memory>

namespace Bloomberg {
namespace amqpprox {

class ConnectionManager;
class SessionState;

/**
* \brief Represents a connection selector interface
*/
class ConnectionSelectorInterface {
public:
virtual ~ConnectionSelectorInterface() = default;

/**
* \brief Acquire a connection from the specified session `state` and set
* `connectionOut` to be a `ConnectionManager` instance tracking the
* connection attempt
* \return connection status to represent whether the connection should go
* through
*/
virtual SessionState::ConnectionStatus
acquireConnection(std::shared_ptr<ConnectionManager> *connectionOut,
const SessionState &state) = 0;
};

}
}

#endif
6 changes: 3 additions & 3 deletions libamqpprox/amqpprox_mapcontrolcommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
#include <amqpprox_mapcontrolcommand.h>

#include <amqpprox_mappingconnectionselector.h>
#include <amqpprox_connectionselector.h>
#include <amqpprox_resourcemapper.h>

#include <boost/algorithm/string.hpp>
Expand All @@ -26,8 +26,8 @@
namespace Bloomberg {
namespace amqpprox {

MapControlCommand::MapControlCommand(ResourceMapper *mapper,
MappingConnectionSelector *selector)
MapControlCommand::MapControlCommand(ResourceMapper *mapper,
ConnectionSelector *selector)
: d_mapper_p(mapper)
, d_selector_p(selector)
{
Expand Down
Loading