Skip to content

Commit

Permalink
Merge pull request #87 from bloomberg/0-rate
Browse files Browse the repository at this point in the history
Add support to provide zero connection rate limit
  • Loading branch information
Chinmay1412 authored Jun 20, 2022
2 parents 79858c7 + 565efb0 commit f40e458
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 57 deletions.
26 changes: 14 additions & 12 deletions libamqpprox/amqpprox_connectionlimitermanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <amqpprox_logging.h>

#include <memory>
#include <optional>
#include <string>

namespace Bloomberg {
Expand All @@ -29,15 +30,15 @@ namespace amqpprox {
namespace {
void maybePopulateDefaultLimiters(
const std::string &vhostName,
uint32_t defaultLimit,
std::optional<uint32_t> defaultLimit,
ConnectionLimiterManager::ConnectionLimiters &limitersPerVhost)
{
if (limitersPerVhost.find(vhostName) == limitersPerVhost.end()) {
if (defaultLimit) {
limitersPerVhost[vhostName] = {
false,
std::make_shared<FixedWindowConnectionRateLimiter>(
defaultLimit)};
*defaultLimit)};
}
}
}
Expand All @@ -46,8 +47,8 @@ void maybePopulateDefaultLimiters(
ConnectionLimiterManager::ConnectionLimiterManager()
: d_connectionRateLimitersPerVhost()
, d_alarmOnlyConnectionRateLimitersPerVhost()
, d_defaultConnectionRateLimit(0)
, d_defaultAlarmOnlyConnectionRateLimit(0)
, d_defaultConnectionRateLimit()
, d_defaultAlarmOnlyConnectionRateLimit()
, d_mutex()
{
}
Expand Down Expand Up @@ -95,7 +96,7 @@ void ConnectionLimiterManager::setDefaultConnectionRateLimit(
if (!limiter.second.first) {
limiter.second.second =
std::make_shared<FixedWindowConnectionRateLimiter>(
d_defaultConnectionRateLimit);
*d_defaultConnectionRateLimit);
}
}
}
Expand All @@ -113,7 +114,7 @@ void ConnectionLimiterManager::setAlarmOnlyDefaultConnectionRateLimit(
if (!limiter.second.first) {
limiter.second.second =
std::make_shared<FixedWindowConnectionRateLimiter>(
d_defaultAlarmOnlyConnectionRateLimit);
*d_defaultAlarmOnlyConnectionRateLimit);
}
}
}
Expand All @@ -127,7 +128,7 @@ void ConnectionLimiterManager::removeConnectionRateLimiter(
d_connectionRateLimitersPerVhost[vhostName] = {
false,
std::make_shared<FixedWindowConnectionRateLimiter>(
d_defaultConnectionRateLimit)};
*d_defaultConnectionRateLimit)};
}
else {
d_connectionRateLimitersPerVhost.erase(vhostName);
Expand All @@ -143,7 +144,7 @@ void ConnectionLimiterManager::removeAlarmOnlyConnectionRateLimiter(
d_alarmOnlyConnectionRateLimitersPerVhost[vhostName] = {
false,
std::make_shared<FixedWindowConnectionRateLimiter>(
d_defaultAlarmOnlyConnectionRateLimit)};
*d_defaultAlarmOnlyConnectionRateLimit)};
}
else {
d_alarmOnlyConnectionRateLimitersPerVhost.erase(vhostName);
Expand All @@ -154,7 +155,7 @@ void ConnectionLimiterManager::removeDefaultConnectionRateLimit()
{
std::lock_guard<std::mutex> lg(d_mutex);

d_defaultConnectionRateLimit = 0;
d_defaultConnectionRateLimit.reset();
for (auto it = d_connectionRateLimitersPerVhost.cbegin();
it != d_connectionRateLimitersPerVhost.cend();) {
if (!it->second.first) {
Expand All @@ -170,7 +171,7 @@ void ConnectionLimiterManager::removeAlarmOnlyDefaultConnectionRateLimit()
{
std::lock_guard<std::mutex> lg(d_mutex);

d_defaultAlarmOnlyConnectionRateLimit = 0;
d_defaultAlarmOnlyConnectionRateLimit.reset();
for (auto it = d_alarmOnlyConnectionRateLimitersPerVhost.cbegin();
it != d_alarmOnlyConnectionRateLimitersPerVhost.cend();) {
if (!it->second.first) {
Expand Down Expand Up @@ -263,12 +264,13 @@ ConnectionLimiterManager::getAlarmOnlyConnectionRateLimiter(
return nullptr;
}

uint32_t ConnectionLimiterManager::getDefaultConnectionRateLimit() const
std::optional<uint32_t>
ConnectionLimiterManager::getDefaultConnectionRateLimit() const
{
return d_defaultConnectionRateLimit;
}

uint32_t
std::optional<uint32_t>
ConnectionLimiterManager::getAlarmOnlyDefaultConnectionRateLimit() const
{
return d_defaultAlarmOnlyConnectionRateLimit;
Expand Down
19 changes: 12 additions & 7 deletions libamqpprox/amqpprox_connectionlimitermanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

#include <amqpprox_connectionlimiterinterface.h>

#include <iostream>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -58,9 +58,9 @@ class ConnectionLimiterManager {
ConnectionLimiters d_connectionRateLimitersPerVhost;
ConnectionLimiters d_alarmOnlyConnectionRateLimitersPerVhost;

uint32_t d_defaultConnectionRateLimit;
uint32_t d_defaultAlarmOnlyConnectionRateLimit;
mutable std::mutex d_mutex;
std::optional<uint32_t> d_defaultConnectionRateLimit;
std::optional<uint32_t> d_defaultAlarmOnlyConnectionRateLimit;
mutable std::mutex d_mutex;

public:
// CREATORS
Expand Down Expand Up @@ -142,6 +142,11 @@ class ConnectionLimiterManager {
*/
bool allowNewConnectionForVhost(const std::string &vhostName);

/**
* \brief Called when a session is marked as disconnected.
*/
void sessionClosedForVhost(const std::string &vhostName);

// ACCESSORS
/**
* \brief Get particular connection rate limiter based on specified vhost
Expand All @@ -162,13 +167,13 @@ class ConnectionLimiterManager {
* \brief Get default connection rate limit (allowed connections per
* second) for all the connecting vhosts
*/
uint32_t getDefaultConnectionRateLimit() const;
std::optional<uint32_t> getDefaultConnectionRateLimit() const;

/**
* \brief Get alarm onlt default connection rate limit (allowed connections
* \brief Get alarm only default connection rate limit (allowed connections
* per second) for all the connecting vhosts
*/
uint32_t getAlarmOnlyDefaultConnectionRateLimit() const;
std::optional<uint32_t> getAlarmOnlyDefaultConnectionRateLimit() const;
};

}
Expand Down
1 change: 0 additions & 1 deletion libamqpprox/amqpprox_fixedwindowconnectionratelimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include <chrono>
#include <memory>
#include <mutex>
#include <sstream>

namespace Bloomberg {
Expand Down
7 changes: 3 additions & 4 deletions libamqpprox/amqpprox_fixedwindowconnectionratelimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include <chrono>
#include <memory>
#include <mutex>
#include <string>

namespace Bloomberg {
Expand All @@ -47,8 +46,8 @@ struct LimiterClock {
* provided connection limit and time window. The connection rate limit will be
* connection limit/timeWindow (average allowed connections in the specified
* time window). allowNewConnection member function will return true or false
* based on the rate limit calculation. Implements the LimiterInterface
* interface
* based on the rate limit calculation. Implements the
* ConnectionLimiterInterface interface
*/
class FixedWindowConnectionRateLimiter : public ConnectionLimiterInterface {
protected:
Expand Down Expand Up @@ -97,7 +96,7 @@ class FixedWindowConnectionRateLimiter : public ConnectionLimiterInterface {

// ACCESSORS
/**
* \return Information about limiter as a string
* \return Information about connection limiter as a string
*/
virtual std::string toString() const override;

Expand Down
18 changes: 10 additions & 8 deletions libamqpprox/amqpprox_limitcontrolcommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void handleConnectionLimitAlarm(
output << "Default connection rate limit is set to "
<< connectionLimiterManager
->getAlarmOnlyDefaultConnectionRateLimit()
.value()
<< " connections per second in alarm only mode.\n";
output << "The limiter will only log at warning level with "
"AMQPPROX_CONNECTION_LIMIT as a substring and the "
Expand Down Expand Up @@ -122,6 +123,7 @@ void handleConnectionLimit(
numberOfConnections);
output << "Default connection rate limit is set to "
<< connectionLimiterManager->getDefaultConnectionRateLimit()
.value()
<< " connections per second.\n";
}
else {
Expand Down Expand Up @@ -155,19 +157,19 @@ void printVhostLimits(
}

if (!alarmLimiter && !limiter) {
uint32_t alarmOnlyConnectionRateLimit =
std::optional<uint32_t> alarmOnlyConnectionRateLimit =
connectionLimiterManager->getAlarmOnlyDefaultConnectionRateLimit();
uint32_t connectionRateLimit =
std::optional<uint32_t> connectionRateLimit =
connectionLimiterManager->getDefaultConnectionRateLimit();
if (alarmOnlyConnectionRateLimit || connectionRateLimit) {
if (alarmOnlyConnectionRateLimit) {
output << "Alarm only limit, for vhost " << vhostName
<< ", allow average " << alarmOnlyConnectionRateLimit
<< ", allow average " << *alarmOnlyConnectionRateLimit
<< " number of connections per second.\n";
}
if (connectionRateLimit) {
output << "For vhost " << vhostName << ", allow average "
<< connectionRateLimit
<< *connectionRateLimit
<< " number of connections per second.\n";
}
}
Expand All @@ -182,19 +184,19 @@ void printAllLimits(
ConnectionLimiterManager *connectionLimiterManager,
ControlCommandOutput<ControlCommand::OutputFunctor> &output)
{
uint32_t alarmOnlyConnectionRateLimit =
std::optional<uint32_t> alarmOnlyConnectionRateLimit =
connectionLimiterManager->getAlarmOnlyDefaultConnectionRateLimit();
uint32_t connectionRateLimit =
std::optional<uint32_t> connectionRateLimit =
connectionLimiterManager->getDefaultConnectionRateLimit();
if (alarmOnlyConnectionRateLimit || connectionRateLimit) {
if (alarmOnlyConnectionRateLimit) {
output << "Default limit for any vhost, allow average "
<< alarmOnlyConnectionRateLimit
<< *alarmOnlyConnectionRateLimit
<< " connections per second in alarm only mode.\n";
}
if (connectionRateLimit) {
output << "Default limit for any vhost, allow average "
<< connectionRateLimit << " connections per second.\n";
<< *connectionRateLimit << " connections per second.\n";
}
}
else {
Expand Down
Loading

0 comments on commit f40e458

Please sign in to comment.