Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[MQB, MWC]: report the number of tcp connections #384

Merged
merged 12 commits into from
Sep 19, 2024
48 changes: 29 additions & 19 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include <bdlf_bind.h>
#include <bdlf_placeholder.h>
#include <bdlma_localsequentialallocator.h>
#include <bdlpcre_regex.h>
#include <bdlt_timeunitratio.h>
#include <bsl_algorithm.h>
#include <bsl_cstdlib.h>
Expand Down Expand Up @@ -100,6 +101,21 @@ const int k_SESSION_DESTROY_WAIT = 20;
const int k_CLIENT_CLOSE_WAIT = 20;
// Time to wait incrementally (in seconds) for all clients and
// proxies to be destroyed during stop sequence.
const char k_PORT_PATTERN[] = ":(\\d{1,5})";

bsl::string portFromUri(const bsl::string& endpoint)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bsl::string portFromUri(const bsl::string& endpoint)
inline bsl::string portFromUri(const bsl::string& endpoint)

{
bdlpcre::RegEx regEx;
bsl::string errorMessage;
size_t errorOffset;
std::vector<bsl::string_view> matchVector;

BSLS_ASSERT_SAFE(
0 == regEx.prepare(&errorMessage, &errorOffset, k_PORT_PATTERN));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary to prepare the same regEx on any opened channel.
We can initialize it once and reuse it from different threads since it's const thread-safe:
https://bloomberg.github.io/bde-resources/doxygen/bde_api_prod/group__bdlpcre__regex.html#3.5

BSLS_ASSERT_SAFE(0 == regEx.match(&matchVector, endpoint));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to use match overload with one string_view* instead, not needed to construct an entire vector for this
https://bloomberg.github.io/bde-resources/doxygen/bde_api_prod/classbdlpcre_1_1RegEx.html#a6c7d41da3bddf9caa0aba7fd2677a68b


return bsl::string(matchVector[1]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return a string_view as well, since it relies on the string passed in arg, so we don't construct a string with a default allocator

}

char calculateInitialMissedHbCounter(const mqbcfg::TcpInterfaceConfig& config)
{
Expand Down Expand Up @@ -280,36 +296,30 @@ TCPSessionFactory::channelStatContextCreator(
const bsl::shared_ptr<mwcio::Channel>& channel,
const bsl::shared_ptr<mwcio::StatChannelFactoryHandle>& handle)
{
mwcst::StatContext* parent = 0;

int peerAddress;
channel->properties().load(&peerAddress, k_CHANNEL_PROPERTY_PEER_IP);

ntsa::Ipv4Address ipv4Address(static_cast<bsl::uint32_t>(peerAddress));
ntsa::IpAddress ipAddress(ipv4Address);
if (!mwcio::ChannelUtil::isLocalHost(ipAddress)) {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_LOCAL);
}
else {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_REMOTE);
}
mqbstat::StatController::ChannelSelector::Enum selector =
mwcio::ChannelUtil::isLocalHost(ipAddress)
? mqbstat::StatController::ChannelSelector::e_REMOTE
: mqbstat::StatController::ChannelSelector::e_LOCAL;

BSLS_ASSERT_SAFE(parent);

bsl::string name;
bsl::string name, localPort;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather use the provided allocator. If we do so, we can see in the stats table any small allocations within this component, and debug it if needed

Suggested change
bsl::string name, localPort;
bsl::string name(d_allocator_p), localPort(d_allocator_p);

if (handle->options().is<mwcio::ConnectOptions>()) {
name = handle->options().the<mwcio::ConnectOptions>().endpoint();
name = handle->options().the<mwcio::ConnectOptions>().endpoint();
localPort = portFromUri(channel->peerUri());
}
else {
name = channel->peerUri();
name = channel->peerUri();
localPort = portFromUri(
handle->options().the<mwcio::ListenOptions>().endpoint());
}

bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(name, &localAllocator);

return parent->addSubcontext(statConfig);
return d_statController_p->addChannelStatContext(selector,
localPort,
name);
}

void TCPSessionFactory::negotiate(
Expand Down
53 changes: 53 additions & 0 deletions src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <bdlbb_blob.h>
#include <bdlf_bind.h>
#include <bdlf_placeholder.h>
#include <bdlma_localsequentialallocator.h>
#include <bdlmt_eventscheduler.h>
#include <bdlt_timeunitratio.h>
#include <bsl_algorithm.h>
Expand Down Expand Up @@ -81,6 +82,20 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE = 15 *

const char k_PUBLISHINTERVAL_SUFFIX[] = ".PUBLISHINTERVAL";

void portsDeleter(
bsl::unordered_map<bsl::string, bslma::ManagedPtr<mwcst::StatContext> >*
map,
bslmt::Mutex* mutex,
const mwcst::StatContext& context)
{
// Lookup the port's StatContext and remove it from the 'map'
bslmt::LockGuard<bslmt::Mutex> guard(mutex); // LOCK
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, you introduce a synchronization between different threads. We might assume it's okay, but if it's not, it's difficult to diagnose.

I think it's possible to avoid this, by storing ManagedPtr<StatContext> within a corresponding channel object. So if we close this channel and call stop or destructor (not sure which one), we free this ManagedPtr. StatContext class already has the mechanism to keep track of deleted subcontexts. So, if we call a destructor for ManagedPtr to subcontext, the parent context will know about this and remove raw pointer stored to it.

Copy link
Collaborator Author

@waldgange waldgange Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the threadsafety of StatContext class is only partial.

// In simpler terms, only 'addSubContext', 'addSubTable', 'adjustValue',
// 'reportValue', and 'setValue' are thread-safe. All other functions should
// be considered not thread safe.

For example, I can't iterate over subcontexts that can be added/deleted without additional synchronisation anywhere except the snapshot() method or its synchronious pre- or post- steps, as handling the deleted subcontexts happens inside this method. Please find below some arguments in favor of the current imlementation:
Each channel stores its own StatContext, and this approach hasn't changed. What I have done is another 'layer' of StatContexts between channel contexts and their parent. This layer serves for breaking down the channels stats by ports. I really need to have only one StatContext for each port as I report aggregated stats. The inter-thread synchronisation is required here, because:

  1. portsDeleter() is always called from StatController's thread, but TCPSessionFactory::channelStatContextCreator and, hence, constructors and destructors of channel-related StatContexts are called from TCPSessionFactory's threads.
  2. Although StatContext::addSubcontext() is a thread-safe method, it will create a new StatContext instance even if there are sub-contexts with the same name.

So, in order to add a channel StatContext, I need to check if there is a port StatContext for that port and use it, otherways I have to create a new port Statcontext. After that I add a channel sub context to it. So I must have a ManagedPtr's to the StatContexts of the ports and to lock a container (currently, the unordered_map), where they are stored. Otherways race conditions are possible, either with two insertions of the same name port StatContexts, or parent deletion during the adding a new channel subcontext.

If you have an idea how to reach it without the synchronization, please let me know

Copy link
Collaborator

@678098 678098 Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have an idea how to reach it without the synchronization, please let me know

@waldgange let me think about this for some time

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @waldgange
I think the correct way to do it is to pass ManagedPtr to the StatChannel here, instead of shared_ptr:

bsl::shared_ptr<mwcst::StatContext> statContext(
d_config.d_statContextCreator(channel, handleSp));
// Create the channel and notify user
bsl::shared_ptr<StatChannel> newChannel;
newChannel.createInplace(
handleSp->d_allocator_p,
StatChannelConfig(channel, statContext, handleSp->d_allocator_p),
handleSp->d_allocator_p);

So, StatChannel keeps ownership of this StatContext. When a StatChannel goes down, the ManagedPtr to its StatContext goes down too.

Note that it requires to change MWC interfaces. However, StatChannel and StatChannelFactory are used only within BlazingMQ, so I don't see any harm in this. Also, since we plan to own ManagedPtr in StatChannel, the StatChannel class is not copy constructible anymore, so we might explicitly delete copy constructor and assignment

if (context.numSnapshots() != 0 && !context.isDeleted() &&
context.numSubcontexts() == 0) {
map->erase(context.name());
}
};

typedef bsl::unordered_set<mqbplug::PluginFactory*> PluginFactories;

/// Post on the optionally specified `semaphore`.
Expand Down Expand Up @@ -983,5 +998,43 @@ int StatController::processCommand(
return -1;
}

StatController::StatContextMp
StatController::addChannelStatContext(ChannelSelector::Enum selector,
const bsl::string& port,
const bsl::string& endpoint)
{
mwcst::StatContext* parent = channelsStatContext(selector);
BSLS_ASSERT_SAFE(parent);

bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration portConfig(port, &localAllocator);
mwcst::StatContextConfiguration statConfig(endpoint, &localAllocator);

bslma::ManagedPtr<mwcst::StatContext> channelStatContext;

{
bslmt::LockGuard<bslmt::Mutex> guard(&d_portsMutex); // LOCK
StatContextMap::iterator portIt = d_portsMap.find(port);

if (portIt == d_portsMap.end()) {
bslma::ManagedPtr<mwcst::StatContext> portStatContext =
parent->addSubcontext(
portConfig.storeExpiredSubcontextValues(true)
.preSnapshotCallback(
bdlf::BindUtil::bind(portsDeleter,
&d_portsMap,
&d_portsMutex,
bdlf::PlaceHolders::_1)));
channelStatContext = portStatContext->addSubcontext(statConfig);
d_portsMap.emplace(portStatContext->name(), portStatContext);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it doesn't compile on Solaris

[2024-08-07T12:57:33.062Z] "/SunOS/opt/bb/include/bslstl_pair.h", line 2528: Error: Could not find a match for BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>::ManagedPtr<MANAGED_TYPE>(const BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>) needed in void bsl::Pair_Second<BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>::Pair_Second<BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>(const BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>&, BloombergLP::bslma::Allocator*, bsl::integral_constant<int, 0>).

...

[2024-08-07T12:57:33.062Z] "/opt/bb/include/bslstl_unorderedmap_cpp03.h", line 3617:     Where: Instantiated from bsl::pair<BloombergLP::bslstl::HashTableIterator<bsl::pair<const bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>, long>, bool> bsl::unordered_map<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>, bsl::hash<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>>, bsl::equal_to<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>>, bsl::allocator<bsl::pair<const bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>>>::emplace<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>(const bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>&, const BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>&).

You might introduce parallel structures bsl::list<ManagedPtr<...> > and bsl::unordered_map<bsl::string, StatContext*> like it is done in this PR:
#389

Or you might find another solution

}
else {
channelStatContext = portIt->second->addSubcontext(statConfig);
}
}

return channelStatContext;
}

} // close package namespace
} // close enterprise namespace
30 changes: 22 additions & 8 deletions src/groups/mqb/mqbstat/mqbstat_statcontroller.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,15 @@ class StatController {

private:
// PRIVATE TYPES
typedef bslma::ManagedPtr<bdlmt::TimerEventScheduler> SchedulerMp;
typedef bslma::ManagedPtr<mwcst::StatContext> StatContextMp;
typedef bsl::shared_ptr<mwcst::StatContext> StatContextSp;
typedef bslma::ManagedPtr<mwcsys::StatMonitor> SystemStatMonitorMp;
typedef bslma::ManagedPtr<Printer> PrinterMp;
typedef bslma::ManagedPtr<JsonPrinter> JsonPrinterMp;
typedef bslma::ManagedPtr<mqbplug::StatPublisher> StatPublisherMp;
typedef bslma::ManagedPtr<mqbplug::StatConsumer> StatConsumerMp;
typedef bslma::ManagedPtr<bdlmt::TimerEventScheduler> SchedulerMp;
typedef bslma::ManagedPtr<mwcst::StatContext> StatContextMp;
typedef bsl::shared_ptr<mwcst::StatContext> StatContextSp;
typedef bslma::ManagedPtr<mwcsys::StatMonitor> SystemStatMonitorMp;
typedef bslma::ManagedPtr<Printer> PrinterMp;
typedef bslma::ManagedPtr<JsonPrinter> JsonPrinterMp;
typedef bslma::ManagedPtr<mqbplug::StatPublisher> StatPublisherMp;
typedef bslma::ManagedPtr<mqbplug::StatConsumer> StatConsumerMp;
typedef bsl::unordered_map<bsl::string, StatContextMp> StatContextMap;

/// Struct containing a statcontext and bool specifying if the
/// statcontext is managed.
Expand Down Expand Up @@ -189,6 +190,12 @@ class StatController {
/// 'channels' stat context
StatContextMp d_statContextChannelsRemote_mp;

/// Mutex for thread safety of the 'd_portsMap'
bslmt::Mutex d_portsMutex;

/// Map of all open ports to their StatContext's
StatContextMap d_portsMap;

/// System stat monitor (for cpu and
/// memory).
SystemStatMonitorMp d_systemStatMonitor_mp;
Expand Down Expand Up @@ -339,6 +346,13 @@ class StatController {
/// Retrieve the channels stat context corresponding to the specified
/// `selector`.
mwcst::StatContext* channelsStatContext(ChannelSelector::Enum selector);

/// Add a StatContext for the specified 'port' as a subcontext to the root
/// StatContext of local or remote channels, corresponding to the specified
/// 'selector'.
StatContextMp addChannelStatContext(ChannelSelector::Enum selector,
const bsl::string& port,
const bsl::string& endpoint);
};

// ============================================================================
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ StatChannel::StatChannel(const StatChannelConfig& config,
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(config.d_statContext_sp);
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, 1);
}

StatChannel::~StatChannel()
{
// NOTHING
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, -1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do per-channel stat contexts, then the number of connections will always be 0 or 1, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I utilize the aggregation functionality Of StatContextTable here in order to deal only with the Channel's StatContext here. Otherways I would have to deal with its parent (port's StatContext), and, hence, store its shared_ptr here in StatChannel class

}

// MANIPULATORS
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class StatChannel : public DecoratingChannelPartialImp {
/// `mwcio::StatChannelFactory`).
struct Stat {
// TYPES
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1 };
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1, e_CONNECTIONS = 2 };
};

private:
Expand Down
24 changes: 24 additions & 0 deletions src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ StatChannelFactoryUtil::statContextConfiguration(const bsl::string& name,
config.isTable(true);
config.value("in_bytes")
.value("out_bytes")
.value("connections")
.storeExpiredSubcontextValues(true);

if (historySize != -1) {
Expand Down Expand Up @@ -268,6 +269,10 @@ void StatChannelFactoryUtil::initializeStatsTable(
StatChannel::Stat::e_BYTES_OUT,
mwcst::StatUtil::value,
start);
schema.addColumn("connections",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::value,
start);

if (!(end == mwcst::StatValue::SnapshotLocation())) {
schema.addColumn("in_bytes_delta",
Expand All @@ -280,6 +285,11 @@ void StatChannelFactoryUtil::initializeStatsTable(
mwcst::StatUtil::valueDifference,
start,
end);
schema.addColumn("connections_delta",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::valueDifference,
start,
end);
}

// Configure records
Expand Down Expand Up @@ -316,6 +326,14 @@ void StatChannelFactoryUtil::initializeStatsTable(
.printAsMemory();
}
tip->addColumn("out_bytes", "total").zeroString("").printAsMemory();

tip->setColumnGroup("Connections");
if (!(end == mwcst::StatValue::SnapshotLocation())) {
tip->addColumn("connections_delta", "delta")
.zeroString("")
.setPrecision(0);
}
tip->addColumn("connections", "total").setPrecision(0);
}

bsls::Types::Int64
Expand Down Expand Up @@ -352,6 +370,12 @@ StatChannelFactoryUtil::getValue(const mwcst::StatContext& context,
case Stat::e_BYTES_OUT_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_BYTES_OUT);
}
case Stat::e_CONNECTIONS_DELTA: {
return STAT_RANGE(valueDifference, StatChannel::Stat::e_CONNECTIONS);
}
case Stat::e_CONNECTIONS_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_CONNECTIONS);
}
default: {
BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat");
}
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ struct StatChannelFactoryUtil {
e_BYTES_IN_DELTA,
e_BYTES_IN_ABS,
e_BYTES_OUT_DELTA,
e_BYTES_OUT_ABS
e_BYTES_OUT_ABS,
e_CONNECTIONS_DELTA,
e_CONNECTIONS_ABS
};
};

Expand Down
9 changes: 9 additions & 0 deletions src/groups/mwc/mwcst/mwcst_statcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,10 @@ class StatContext {
/// all sibling subcontexts.
int uniqueId() const;

/// Return te number of times this 'StatContext' had 'snapshot' called on
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Return te number of times this 'StatContext' had 'snapshot' called on
/// Return the number of times this 'StatContext' had 'snapshot' called on

/// it.
bsls::Types::Int64 numSnapshots() const;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is useful, but also it's an interface change in MWC. Do we want to keep it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to check it in portsDeleter() as before the first snapshot numSubcontexts() returns zero, and I don't want to delete new StatContexts in this case. It happens because StatContext::addSubcontext() puts the newly created contexts to d_newSubcontexts vector.

d_newSubcontexts.push_back(newContext);

Then it moves them to d_statContext vector on next StatContext::snapshot()

However StatContext::numSubcontexts() doesn't take into account d_newSubcontext and will return 0 if there haven't been any stapshots yet.
inline int StatContext::numSubcontexts() const
{
return static_cast<int>(d_subcontexts.size() +
d_deletedSubcontexts.size());
}

So if we don't check the number of snapshots in portsDeleter() we can unintentionally delete the StatContext that has just been added.


/// Return the number of subcontexts held by this `StatContext`
int numSubcontexts() const;

Expand Down Expand Up @@ -1133,6 +1137,11 @@ inline int StatContext::uniqueId() const
return d_uniqueId;
}

inline bsls::Types::Int64 StatContext::numSnapshots() const
{
return d_numSnapshots;
}

inline int StatContext::numSubcontexts() const
{
return static_cast<int>(d_subcontexts.size() +
Expand Down
Loading
Loading