Skip to content

Commit

Permalink
Add a new STAT ENABLE/DISABLE feature to skip collecting per-source data
Browse files Browse the repository at this point in the history
From looking at perf data, and also our internal timeseries usage, per-source metric data
is expensive to collect, store, and we rarely look at it.

Sometimes it might be useful to collect this, for example when there are few distinct sources,
but I suspect we'll disable these nearly all the time going forward.

This is slightly different from the existing filter feature because
enabling/disabling these applies to all stats output functions. The
filters also don't actually change what is internally collected - i.e.
the work for per-source metrics still shows up in perf with a filter
applied
  • Loading branch information
adamncasey committed Jul 25, 2022
1 parent 376179d commit 620af8b
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 28 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ MAP (BACKEND vhost backend | FARM vhost name | UNMAP vhost | DEFAULT farmName |
MAPHOSTNAME DNS - Set up mapping of IPs to hostnames
SESSION id# (PAUSE|DISCONNECT_GRACEFUL|FORCE_DISCONNECT) - Control a particular session
STAT (STOP SEND | SEND <host> <port> | (LISTEN (json|human) (overall|vhost=foo|backend=bar|source=baz|all|process|bufferpool))) - Output statistics
STAT (DISABLE|ENABLE) per-source - Enable/Disable internal collection of per-source statistics. Applies to all send/listeners
TLS (INGRESS | EGRESS) (KEY_FILE file | CERT_CHAIN_FILE file | RSA_KEY_FILE file | TMP_DH_FILE file | CA_CERT_FILE file | VERIFY_MODE mode* | CIPHERS (PRINT | SET ciphersuite(:ciphersuite)*))
VHOST PAUSE vhost | UNPAUSE vhost | PRINT | BACKEND_DISCONNECT vhost | FORCE_DISCONNECT vhost
```
Expand Down
2 changes: 1 addition & 1 deletion amqpprox/amqpprox.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ int main(int argc, char *argv[])
CommandPtr(new VhostControlCommand(&vhostState)),
CommandPtr(new ListenControlCommand),
CommandPtr(new LoggingControlCommand),
CommandPtr(new StatControlCommand(&eventSource)),
CommandPtr(new StatControlCommand(&eventSource, &statCollector)),
CommandPtr(new MapHostnameControlCommand()),
CommandPtr(new TlsControlCommand),
CommandPtr(new AuthControlCommand),
Expand Down
7 changes: 7 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,13 @@ Stops sending metrics to all configured endpoints.

Streams metrics to stdout. Pass `json` or `human` to specify output format. Metrics can be filtered by passing `overall|vhost=foo|backend=bar|source=baz|all|process|bufferpool`.

#### STAT ENABLE/DISABLE

Disable internal collection of certain types of metrics. This is different from the filtering available under `STAT LISTEN` because this completely skips collection
of these metrics. Where possible, use this instead of filters.

At the moment, only `STAT DISABLE per-source` (or enable equivalent) is available.

## TLS commands

`TLS` command is used to configure TLS-enabled connections for both ingress (client => proxy) and egress (proxy => broker). See [TLS usage documentation](https://github.com/bloomberg/amqpprox/blob/main/docs/tls.md).
Expand Down
44 changes: 26 additions & 18 deletions libamqpprox/amqpprox_statcollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ StatCollector::StatCollector()
, d_previous()
, d_cpuMonitor_p(nullptr)
, d_bufferPool_p(nullptr)
, d_collectPerSourceStats(true)
{
}

Expand All @@ -51,23 +52,6 @@ void StatCollector::setBufferPool(BufferPool *pool)

void StatCollector::collect(const SessionState &session)
{
auto vhost = session.getVirtualHost();

// For backends we care about the broker's port to disambiguate
// the different brokers running on a host. Note that we are
// using '_' as the separator, as ':' is not compatible with
// statsD.
auto backend =
session.hostname(session.getEgress().second) + "_" +
boost::lexical_cast<std::string>(session.getEgress().second.port());

// For sources we only look at the machine, not the ephemeral port
auto source = session.hostname(session.getIngress().second);

auto &vhostStats = d_current.vhosts()[vhost];
auto &backendStats = d_current.backends()[backend];
auto &sourceStats = d_current.sources()[source];

uint64_t ingressPackets, ingressFrames, ingressBytes, ingressLatencyCount,
ingressLatencyTotal, egressPackets, egressFrames, egressBytes,
egressLatencyCount, egressLatencyTotal;
Expand Down Expand Up @@ -139,9 +123,28 @@ void StatCollector::collect(const SessionState &session)
}
};

auto vhost = session.getVirtualHost();
auto &vhostStats = d_current.vhosts()[vhost];
addStats(vhostStats);

// For backends we care about the broker's port to disambiguate
// the different brokers running on a host. Note that we are
// using '_' as the separator, as ':' is not compatible with
// statsD.
auto backend =
session.hostname(session.getEgress().second) + "_" +
boost::lexical_cast<std::string>(session.getEgress().second.port());
auto &backendStats = d_current.backends()[backend];
addStats(backendStats);
addStats(sourceStats);

if (d_collectPerSourceStats) {
// For sources we only look at the machine, not the ephemeral port
auto source = session.hostname(session.getIngress().second);

auto &sourceStats = d_current.sources()[source];
addStats(sourceStats);
}

addStats(d_current.overall());
}

Expand Down Expand Up @@ -306,5 +309,10 @@ void StatCollector::populateMap(StatSnapshot::StatsMap *map,
map->swap(output);
}

void StatCollector::collectPerSourceStats(bool enabled)
{
d_collectPerSourceStats = enabled;
}

}
}
8 changes: 8 additions & 0 deletions libamqpprox/amqpprox_statcollector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <amqpprox_connectionstats.h>
#include <amqpprox_statsnapshot.h>

#include <atomic>
#include <memory>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -53,6 +54,8 @@ class StatCollector {
CpuMonitor *d_cpuMonitor_p; // HELD NOT OWNED
BufferPool *d_bufferPool_p; // HELD NOT OWNED

std::atomic<bool> d_collectPerSourceStats;

public:
// CREATORS
StatCollector();
Expand Down Expand Up @@ -90,6 +93,11 @@ class StatCollector {
*/
void setBufferPool(BufferPool *pool);

/**
* \brief Enable/Disable per-source statistics
*/
void collectPerSourceStats(bool enabled);

// ACCESSORS
/**
* \brief Retrieve the statistics as a `snapshot` that have been
Expand Down
46 changes: 38 additions & 8 deletions libamqpprox/amqpprox_statcontrolcommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ void stopSendStats(std::list<std::pair<StatFunctor, bool>> *d_functors)

}

StatControlCommand::StatControlCommand(EventSource *eventSource)
StatControlCommand::StatControlCommand(EventSource *eventSource,
StatCollector *statCollector)
: d_functors()
, d_statisticsAvailableSignal()
, d_eventSource_p(eventSource)
, d_statCollector_p(statCollector)
{
d_statisticsAvailableSignal =
d_eventSource_p->statisticsAvailable().subscribe(std::bind(
Expand Down Expand Up @@ -173,9 +175,13 @@ std::string StatControlCommand::commandVerb() const
std::string StatControlCommand::helpText() const
{
return "(STOP SEND | SEND <host> <port> | (LISTEN (json|human) "
"(overall|vhost=foo|backend=bar|source=baz|all|process|bufferpool))"
"(overall|vhost=foo|backend=bar|source=baz|all|all-except-per-"
"source|process|bufferpool))"
" - "
"Output statistics";
"Output statistics\n"
"STAT (DISABLE|ENABLE) per-source - Enable/Disable internal "
"collection of per-source statistics. Applies to all "
"send/listeners";
}

void StatControlCommand::handleCommand(const std::string &command,
Expand Down Expand Up @@ -233,9 +239,34 @@ void StatControlCommand::handleCommand(const std::string &command,
}
return;
}
else if (subcommand == "DISABLE" || subcommand == "ENABLE") {
std::string type;
if (!(iss >> type)) {
outputFunctor("Missing enable/disable type.\n", true);
return;
}

boost::to_upper(type);

if (type == "PER-SOURCE") {
d_statCollector_p->collectPerSourceStats(subcommand == "ENABLE");
outputFunctor(subcommand + " per-source applied.\n", true);
return;
}
else {
outputFunctor(
"Only LISTEN, SEND, STOP, ENABLE, and DISABLE subcommands are "
"supported.\n",
true);
return;
}
return;
}
else {
outputFunctor(
"Only LISTEN, SEND and STOP subcommands are supported.\n", true);
"Only LISTEN, SEND, STOP, ENABLE, and DISABLE subcommands are "
"supported.\n",
true);
return;
}

Expand Down Expand Up @@ -287,11 +318,11 @@ void StatControlCommand::handleCommand(const std::string &command,
}
else {
if (filterType != "ALL") {
outputFunctor(
"Filters are currently not supported when sending metrics.\n",
true);
outputFunctor("Filters aren't supported when sending metrics.\n",
true);
return;
}

std::shared_ptr<StatsDPublisher> publisher =
std::make_shared<StatsDPublisher>(
&controlHandle->ioContext(), outputHost, outputPort);
Expand All @@ -306,6 +337,5 @@ void StatControlCommand::handleCommand(const std::string &command,
true);
}
}

}
}
4 changes: 3 additions & 1 deletion libamqpprox/amqpprox_statcontrolcommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ class StatControlCommand : public ControlCommand {
std::list<std::pair<StatFunctor, bool>> d_functors;
EventSubscriptionHandle d_statisticsAvailableSignal;
EventSource *d_eventSource_p; // HELD NOT OWNED
StatCollector *d_statCollector_p; // HELD NOT OWNED

public:
explicit StatControlCommand(EventSource *eventSource);
explicit StatControlCommand(EventSource *eventSource,
StatCollector *statCollector);

/**
* \return the command verb this handles
Expand Down

0 comments on commit 620af8b

Please sign in to comment.