Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[MQB, MWC]: report the number of tcp connections #384

Merged
merged 12 commits into from
Sep 19, 2024
109 changes: 85 additions & 24 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ namespace BloombergLP {
namespace mqbnet {

const char* TCPSessionFactory::k_CHANNEL_PROPERTY_PEER_IP = "tcp.peer.ip";
const char* TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT =
"tcp.local.port";
const char* TCPSessionFactory::k_CHANNEL_PROPERTY_CHANNEL_ID =
"channelpool.channel.id";
const char* TCPSessionFactory::k_CHANNEL_STATUS_CLOSE_REASON =
Expand Down Expand Up @@ -155,14 +157,21 @@ void ntcChannelPreCreation(
BSLS_ANNOTATION_UNUSED const
bsl::shared_ptr<mwcio::ChannelFactory::OpHandle>& operationHandle)
{
ntsa::Endpoint peerEndpoint = channel->peerEndpoint();
ntsa::Endpoint peerEndpoint = channel->peerEndpoint();
ntsa::Endpoint sourceEndpoint = channel->sourceEndpoint();

if (peerEndpoint.isIp() && peerEndpoint.ip().host().isV4()) {
channel->properties().set(
TCPSessionFactory::k_CHANNEL_PROPERTY_PEER_IP,
static_cast<int>(peerEndpoint.ip().host().v4().value()));
}

if (sourceEndpoint.isIp()) {
channel->properties().set(
TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT,
static_cast<int>(sourceEndpoint.ip().port()));
}

channel->properties().set(TCPSessionFactory::k_CHANNEL_PROPERTY_CHANNEL_ID,
channel->channelId());
}
Expand Down Expand Up @@ -280,36 +289,31 @@ TCPSessionFactory::channelStatContextCreator(
const bsl::shared_ptr<mwcio::Channel>& channel,
const bsl::shared_ptr<mwcio::StatChannelFactoryHandle>& handle)
{
mwcst::StatContext* parent = 0;

int peerAddress;
channel->properties().load(&peerAddress, k_CHANNEL_PROPERTY_PEER_IP);

ntsa::Ipv4Address ipv4Address(static_cast<bsl::uint32_t>(peerAddress));
ntsa::IpAddress ipAddress(ipv4Address);
if (!mwcio::ChannelUtil::isLocalHost(ipAddress)) {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_LOCAL);
}
else {
parent = d_statController_p->channelsStatContext(
mqbstat::StatController::ChannelSelector::e_REMOTE);
}

ntsa::Ipv4Address ipv4Address(static_cast<bsl::uint32_t>(peerAddress));
ntsa::IpAddress ipAddress(ipv4Address);
mwcst::StatContext* parent = d_statController_p->channelsStatContext(
mwcio::ChannelUtil::isLocalHost(ipAddress)
? mqbstat::StatController::ChannelSelector::e_LOCAL
: mqbstat::StatController::ChannelSelector::e_REMOTE);
BSLS_ASSERT_SAFE(parent);

bsl::string name;
if (handle->options().is<mwcio::ConnectOptions>()) {
name = handle->options().the<mwcio::ConnectOptions>().endpoint();
}
else {
name = channel->peerUri();
}
bsl::string endpoint =
handle->options().is<mwcio::ConnectOptions>()
? handle->options().the<mwcio::ConnectOptions>().endpoint()
: channel->peerUri();

bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(name, &localAllocator);
int localPort;
channel->properties().load(
&localPort,
TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT);

return parent->addSubcontext(statConfig);
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK
return d_ports.addChannelContext(parent,
endpoint,
static_cast<bsl::uint16_t>(localPort));
}

void TCPSessionFactory::negotiate(
Expand Down Expand Up @@ -732,6 +736,11 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr<mwcio::Channel>& channel,
{
--d_nbActiveChannels;

int port;
channel->properties().load(
&port,
TCPSessionFactory::k_CHANNEL_PROPERTY_LOCAL_PORT);

ChannelInfoSp channelInfo;
{
// Lookup the session and remove it from internal map
Expand All @@ -742,6 +751,7 @@ void TCPSessionFactory::onClose(const bsl::shared_ptr<mwcio::Channel>& channel,
channelInfo = it->second;
d_channels.erase(it);
}
d_ports.onDeleteChannelContext(port);
} // close mutex lock guard // UNLOCK

if (!channelInfo) {
Expand Down Expand Up @@ -938,6 +948,7 @@ TCPSessionFactory::TCPSessionFactory(
, d_noSessionCondition(bsls::SystemClockType::e_MONOTONIC)
, d_noClientCondition(bsls::SystemClockType::e_MONOTONIC)
, d_channels(allocator)
, d_ports(allocator)
, d_heartbeatSchedulerActive(false)
, d_heartbeatChannels(allocator)
, d_initialMissedHeartbeatCounter(calculateInitialMissedHbCounter(config))
Expand Down Expand Up @@ -1462,5 +1473,55 @@ bool TCPSessionFactory::isEndpointLoopback(const bslstl::StringRef& uri) const
mwcio::ChannelUtil::isLocalHost(endpoint.host());
}

// ------------------------------------
// class TCPSessionFactory::PortManager
// ------------------------------------

TCPSessionFactory::PortManager::PortManager(bslma::Allocator* allocator)
: d_portMap(allocator)
, d_allocator_p(allocator)
{
}

bslma::ManagedPtr<mwcst::StatContext>
TCPSessionFactory::PortManager::addChannelContext(mwcst::StatContext* parent,
const bsl::string& endpoint,
bsl::uint16_t port)
{
bdlma::LocalSequentialAllocator<2048> localAllocator(d_allocator_p);
mwcst::StatContextConfiguration statConfig(endpoint, &localAllocator);
waldgange marked this conversation as resolved.
Show resolved Hide resolved

bslma::ManagedPtr<mwcst::StatContext> channelStatContext;

PortMap::iterator portIt = d_portMap.find(port);

if (portIt != d_portMap.end()) {
channelStatContext = portIt->second.d_portContext->addSubcontext(
statConfig);
++portIt->second.d_numChannels;
}
else {
mwcst::StatContextConfiguration portConfig(
static_cast<bsls::Types::Int64>(port),
&localAllocator);
bsl::shared_ptr<mwcst::StatContext> portStatContext =
parent->addSubcontext(
portConfig.storeExpiredSubcontextValues(true));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we convert bslma::ManagedPtr to bsl::shared_ptr. How does the ownership transfer works in this situiation? Is this operation safe? Are we sure that the underlying StatContext is not accidentally destroyed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is a common practice:

bsl::shared_ptr<mwcst::StatContext> statContext(
d_config.d_statContextCreator(channel, handleSp));

d_partitionsStatContexts.emplace_back(
bsl::shared_ptr<mwcst::StatContext>(
d_statContext_mp->addSubcontext(
mwcst::StatContextConfiguration(partitionName,
&localAllocator))));

StatContextMp statContextMp =
d_clusterData.clusterNodesStatContext()->addSubcontext(config);
StatContextSp statContextSp(statContextMp, d_allocator_p);

and so on

channelStatContext = portStatContext->addSubcontext(statConfig);
d_portMap.emplace(port, PortContext({portStatContext, 1}));
}

return channelStatContext;
}

void TCPSessionFactory::PortManager::onDeleteChannelContext(bsl::uint16_t port)
{
// Lookup the port's StatContext and remove it from the internal containers
PortMap::iterator it = d_portMap.find(port);
if (it != d_portMap.end() && --it->second.d_numChannels == 0) {
d_portMap.erase(it);
}
}

} // close package namespace
} // close enterprise namespace
43 changes: 43 additions & 0 deletions src/groups/mqb/mqbnet/mqbnet_tcpsessionfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class TCPSessionFactory {
/// Name of a property set on the channel representing the peer's IP.
static const char* k_CHANNEL_PROPERTY_PEER_IP;

/// Name of a property set on the channel representing the local port.
static const char* k_CHANNEL_PROPERTY_LOCAL_PORT;

/// Name of a property set on the channel representing the BTE channel
/// id.
static const char* k_CHANNEL_PROPERTY_CHANNEL_ID;
Expand Down Expand Up @@ -214,6 +217,43 @@ class TCPSessionFactory {
// scheduler thread.
};

/// This class provides mechanism to store a map of port stat contexts.
class PortManager {
public:
// PUBLIC TYPES
struct PortContext {
bsl::shared_ptr<mwcst::StatContext> d_portContext;
bsl::size_t d_numChannels;
};
typedef bsl::unordered_map<bsl::uint16_t, PortContext> PortMap;

private:
// PRIVATE DATA

/// A map of all ports
PortMap d_portMap;

/// Allocator to use
bslma::Allocator* d_allocator_p;

public:
// CREATORS
explicit PortManager(bslma::Allocator* allocator = 0);

// PUBLIC METHODS
/// Create a sub context of the specified 'parent' with the specified
/// 'endpoint' as the StatContext's name. Increases the number of
/// channels on the specified 'port'.
bslma::ManagedPtr<mwcst::StatContext>
addChannelContext(mwcst::StatContext* parent,
const bsl::string& endpoint,
bsl::uint16_t port);

/// Handle the deletion of a StatContext associated with a channel
/// connected to the specified 'port'.
void onDeleteChannelContext(bsl::uint16_t port);
};

typedef bsl::shared_ptr<ChannelInfo> ChannelInfoSp;

/// Map associating a `Channel` to its corresponding `ChannelInfo` (as
Expand Down Expand Up @@ -322,6 +362,9 @@ class TCPSessionFactory {
ChannelMap d_channels;
// Map of all active channels

PortManager d_ports;
// Manager of all open ports

bool d_heartbeatSchedulerActive;
// True if the recurring
// heartbeat check event is
Expand Down
14 changes: 8 additions & 6 deletions src/groups/mwc/mwcio/mwcio_ntcchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1384,12 +1384,14 @@ int NtcChannel::channelId() const

ntsa::Endpoint NtcChannel::peerEndpoint() const
{
if (d_streamSocket_sp) {
return d_streamSocket_sp->remoteEndpoint();
}
else {
return ntsa::Endpoint();
}
return d_streamSocket_sp ? d_streamSocket_sp->remoteEndpoint()
: ntsa::Endpoint();
}

ntsa::Endpoint NtcChannel::sourceEndpoint() const
{
return d_streamSocket_sp ? d_streamSocket_sp->sourceEndpoint()
: ntsa::Endpoint();
}

const bsl::string& NtcChannel::peerUri() const
Expand Down
5 changes: 4 additions & 1 deletion src/groups/mwc/mwcio/mwcio_ntcchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,12 @@ class NtcChannel : public mwcio::Channel,
/// Return the channel ID.
int channelId() const;

/// Load into the specified `result` the endpoint of the peer.
/// Return the endpoint of the "remote" peer.
ntsa::Endpoint peerEndpoint() const;

/// Return the endpoint of the "source" peer.
ntsa::Endpoint sourceEndpoint() const;

/// Return the URI of the "remote" end of this channel. It is up to the
/// underlying implementation to define the format of the returned URI.
const bsl::string& peerUri() const BSLS_KEYWORD_OVERRIDE;
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ StatChannel::StatChannel(const StatChannelConfig& config,
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(config.d_statContext_sp);
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, 1);
}

StatChannel::~StatChannel()
{
// NOTHING
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, -1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do per-channel stat contexts, then the number of connections will always be 0 or 1, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I utilize the aggregation functionality Of StatContextTable here in order to deal only with the Channel's StatContext here. Otherways I would have to deal with its parent (port's StatContext), and, hence, store its shared_ptr here in StatChannel class

}

// MANIPULATORS
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mwc/mwcio/mwcio_statchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class StatChannel : public DecoratingChannelPartialImp {
/// `mwcio::StatChannelFactory`).
struct Stat {
// TYPES
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1 };
enum Enum { e_BYTES_IN = 0, e_BYTES_OUT = 1, e_CONNECTIONS = 2 };
};

private:
Expand Down
24 changes: 24 additions & 0 deletions src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ StatChannelFactoryUtil::statContextConfiguration(const bsl::string& name,
config.isTable(true);
config.value("in_bytes")
.value("out_bytes")
.value("connections")
.storeExpiredSubcontextValues(true);

if (historySize != -1) {
Expand Down Expand Up @@ -268,6 +269,10 @@ void StatChannelFactoryUtil::initializeStatsTable(
StatChannel::Stat::e_BYTES_OUT,
mwcst::StatUtil::value,
start);
schema.addColumn("connections",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::value,
start);

if (!(end == mwcst::StatValue::SnapshotLocation())) {
schema.addColumn("in_bytes_delta",
Expand All @@ -280,6 +285,11 @@ void StatChannelFactoryUtil::initializeStatsTable(
mwcst::StatUtil::valueDifference,
start,
end);
schema.addColumn("connections_delta",
StatChannel::Stat::e_CONNECTIONS,
mwcst::StatUtil::valueDifference,
start,
end);
}

// Configure records
Expand Down Expand Up @@ -316,6 +326,14 @@ void StatChannelFactoryUtil::initializeStatsTable(
.printAsMemory();
}
tip->addColumn("out_bytes", "total").zeroString("").printAsMemory();

tip->setColumnGroup("Connections");
if (!(end == mwcst::StatValue::SnapshotLocation())) {
tip->addColumn("connections_delta", "delta")
.zeroString("")
.setPrecision(0);
}
tip->addColumn("connections", "total").setPrecision(0);
}

bsls::Types::Int64
Expand Down Expand Up @@ -352,6 +370,12 @@ StatChannelFactoryUtil::getValue(const mwcst::StatContext& context,
case Stat::e_BYTES_OUT_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_BYTES_OUT);
}
case Stat::e_CONNECTIONS_DELTA: {
return STAT_RANGE(valueDifference, StatChannel::Stat::e_CONNECTIONS);
}
case Stat::e_CONNECTIONS_ABS: {
return STAT_SINGLE(value, StatChannel::Stat::e_CONNECTIONS);
}
default: {
BSLS_ASSERT_SAFE(false && "Attempting to access an unknown stat");
}
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mwc/mwcio/mwcio_statchannelfactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ struct StatChannelFactoryUtil {
e_BYTES_IN_DELTA,
e_BYTES_IN_ABS,
e_BYTES_OUT_DELTA,
e_BYTES_OUT_ABS
e_BYTES_OUT_ABS,
e_CONNECTIONS_DELTA,
e_CONNECTIONS_ABS
};
};

Expand Down
Loading
Loading