-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat[MQB, MWC]: report the number of tcp connections #384
Conversation
ee0e30a
to
bdd9c6d
Compare
d785037
to
2282eb1
Compare
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
2282eb1
to
2ceb0f5
Compare
&d_portsMutex, | ||
bdlf::PlaceHolders::_1))); | ||
channelStatContext = portStatContext->addSubcontext(statConfig); | ||
d_portsMap.emplace(portStatContext->name(), portStatContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, it doesn't compile on Solaris
[2024-08-07T12:57:33.062Z] "/SunOS/opt/bb/include/bslstl_pair.h", line 2528: Error: Could not find a match for BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>::ManagedPtr<MANAGED_TYPE>(const BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>) needed in void bsl::Pair_Second<BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>::Pair_Second<BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>(const BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>&, BloombergLP::bslma::Allocator*, bsl::integral_constant<int, 0>).
...
[2024-08-07T12:57:33.062Z] "/opt/bb/include/bslstl_unorderedmap_cpp03.h", line 3617: Where: Instantiated from bsl::pair<BloombergLP::bslstl::HashTableIterator<bsl::pair<const bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>, long>, bool> bsl::unordered_map<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>, bsl::hash<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>>, bsl::equal_to<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>>, bsl::allocator<bsl::pair<const bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>>>::emplace<bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>, BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>>(const bsl::basic_string<char, std::char_traits<char>, bsl::allocator<char>>&, const BloombergLP::bslma::ManagedPtr<BloombergLP::mwcst::StatContext>&).
You might introduce parallel structures bsl::list<ManagedPtr<...> >
and bsl::unordered_map<bsl::string, StatContext*>
like it is done in this PR:
#389
Or you might find another solution
@@ -100,6 +101,21 @@ const int k_SESSION_DESTROY_WAIT = 20; | |||
const int k_CLIENT_CLOSE_WAIT = 20; | |||
// Time to wait incrementally (in seconds) for all clients and | |||
// proxies to be destroyed during stop sequence. | |||
const char k_PORT_PATTERN[] = ":(\\d{1,5})"; | |||
|
|||
bsl::string portFromUri(const bsl::string& endpoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bsl::string portFromUri(const bsl::string& endpoint) | |
inline bsl::string portFromUri(const bsl::string& endpoint) |
@@ -799,6 +799,10 @@ class StatContext { | |||
/// all sibling subcontexts. | |||
int uniqueId() const; | |||
|
|||
/// Return te number of times this 'StatContext' had 'snapshot' called on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Return te number of times this 'StatContext' had 'snapshot' called on | |
/// Return the number of times this 'StatContext' had 'snapshot' called on |
std::vector<bsl::string_view> matchVector; | ||
|
||
BSLS_ASSERT_SAFE( | ||
0 == regEx.prepare(&errorMessage, &errorOffset, k_PORT_PATTERN)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not necessary to prepare the same regEx
on any opened channel.
We can initialize it once and reuse it from different threads since it's const thread-safe:
https://bloomberg.github.io/bde-resources/doxygen/bde_api_prod/group__bdlpcre__regex.html#3.5
|
||
BSLS_ASSERT_SAFE( | ||
0 == regEx.prepare(&errorMessage, &errorOffset, k_PORT_PATTERN)); | ||
BSLS_ASSERT_SAFE(0 == regEx.match(&matchVector, endpoint)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to use match
overload with one string_view*
instead, not needed to construct an entire vector for this
https://bloomberg.github.io/bde-resources/doxygen/bde_api_prod/classbdlpcre_1_1RegEx.html#a6c7d41da3bddf9caa0aba7fd2677a68b
0 == regEx.prepare(&errorMessage, &errorOffset, k_PORT_PATTERN)); | ||
BSLS_ASSERT_SAFE(0 == regEx.match(&matchVector, endpoint)); | ||
|
||
return bsl::string(matchVector[1]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can return a string_view
as well, since it relies on the string
passed in arg, so we don't construct a string with a default allocator
BSLS_ASSERT_SAFE(parent); | ||
|
||
bsl::string name; | ||
bsl::string name, localPort; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would rather use the provided allocator. If we do so, we can see in the stats table any small allocations within this component, and debug it if needed
bsl::string name, localPort; | |
bsl::string name(d_allocator_p), localPort(d_allocator_p); |
const mwcst::StatContext& context) | ||
{ | ||
// Lookup the port's StatContext and remove it from the 'map' | ||
bslmt::LockGuard<bslmt::Mutex> guard(mutex); // LOCK |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, you introduce a synchronization between different threads. We might assume it's okay, but if it's not, it's difficult to diagnose.
I think it's possible to avoid this, by storing ManagedPtr<StatContext>
within a corresponding channel object. So if we close this channel and call stop or destructor (not sure which one), we free this ManagedPtr
. StatContext
class already has the mechanism to keep track of deleted subcontexts. So, if we call a destructor for ManagedPtr
to subcontext, the parent context will know about this and remove raw pointer stored to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the threadsafety of StatContext
class is only partial.
blazingmq/src/groups/mwc/mwcst/mwcst_statcontext.h
Lines 96 to 98 in 7d4b3d7
// In simpler terms, only 'addSubContext', 'addSubTable', 'adjustValue', | |
// 'reportValue', and 'setValue' are thread-safe. All other functions should | |
// be considered not thread safe. |
For example, I can't iterate over subcontexts that can be added/deleted without additional synchronisation anywhere except the
snapshot()
method or its synchronious pre- or post- steps, as handling the deleted subcontexts happens inside this method. Please find below some arguments in favor of the current imlementation:Each channel stores its own
StatContext
, and this approach hasn't changed. What I have done is another 'layer' of StatContext
s between channel contexts and their parent. This layer serves for breaking down the channels stats by ports. I really need to have only one StatContext
for each port as I report aggregated stats. The inter-thread synchronisation is required here, because:
portsDeleter()
is always called fromStatController
's thread, butTCPSessionFactory::channelStatContextCreator
and, hence, constructors and destructors of channel-relatedStatContext
s are called fromTCPSessionFactory
's threads.- Although
StatContext::addSubcontext()
is a thread-safe method, it will create a newStatContext
instance even if there are sub-contexts with the same name.
So, in order to add a channel StatContext, I need to check if there is a port StatContext for that port and use it, otherways I have to create a new port Statcontext
. After that I add a channel sub context to it. So I must have a ManagedPtr's to the StatContext
s of the ports and to lock a container (currently, the unordered_map), where they are stored. Otherways race conditions are possible, either with two insertions of the same name port StatContext
s, or parent deletion during the adding a new channel subcontext.
If you have an idea how to reach it without the synchronization, please let me know
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have an idea how to reach it without the synchronization, please let me know
@waldgange let me think about this for some time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @waldgange
I think the correct way to do it is to pass ManagedPtr
to the StatChannel
here, instead of shared_ptr
:
blazingmq/src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp
Lines 126 to 133 in a6a6cef
bsl::shared_ptr<mwcst::StatContext> statContext( | |
d_config.d_statContextCreator(channel, handleSp)); | |
// Create the channel and notify user | |
bsl::shared_ptr<StatChannel> newChannel; | |
newChannel.createInplace( | |
handleSp->d_allocator_p, | |
StatChannelConfig(channel, statContext, handleSp->d_allocator_p), | |
handleSp->d_allocator_p); |
So, StatChannel
keeps ownership of this StatContext
. When a StatChannel
goes down, the ManagedPtr
to its StatContext
goes down too.
Note that it requires to change MWC
interfaces. However, StatChannel
and StatChannelFactory
are used only within BlazingMQ
, so I don't see any harm in this. Also, since we plan to own ManagedPtr
in StatChannel
, the StatChannel
class is not copy constructible anymore, so we might explicitly delete copy constructor and assignment
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
{ | ||
bsl::string_view result; | ||
|
||
BSLS_ASSERT_SAFE(0 == d_regex.match(&result, endpoint)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to pass an incorrect endpoint here and cause the broker to crash on assert? I am thinking if we should crash here or handle incorrect endpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It never happens as an endpoint is always a host:port
string. However, I can remove this assert and just return an empty string in case of not matching endpoint. Then broker will proceed working and all the channels with unknown ports will report their TCP connections number metric with an empty tag port
|
||
BSLS_ASSERT_SAFE(0 == d_regex.match(&result, endpoint)); | ||
|
||
return result.substr(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we make substr(1)
here because we capture the leading :
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, definetley. If we used the match()
overload that puts results to a vector, we would just take the [1] element of the vector, that contains sub expresion of the regex, and the [0] element would contain the full match with :
in the beginning. However current match()
overload only returns full matched expression.
} | ||
|
||
StatChannel::~StatChannel() | ||
{ | ||
// NOTHING | ||
d_config.d_statContext_sp->adjustValue(Stat::e_CONNECTIONS, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do per-channel stat contexts, then the number of connections will always be 0
or 1
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, I utilize the aggregation functionality Of StatContextTable here in order to deal only with the Channel's StatContext here. Otherways I would have to deal with its parent (port's StatContext
), and, hence, store its shared_ptr here in StatChannel
class
@@ -799,6 +799,10 @@ class StatContext { | |||
/// all sibling subcontexts. | |||
int uniqueId() const; | |||
|
|||
/// Return the number of times this 'StatContext' had 'snapshot' called on | |||
/// it. | |||
bsls::Types::Int64 numSnapshots() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is useful, but also it's an interface change in MWC
. Do we want to keep it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to check it in portsDeleter()
as before the first snapshot numSubcontexts()
returns zero, and I don't want to delete new StatContext
s in this case. It happens because StatContext::addSubcontext()
puts the newly created contexts to d_newSubcontexts
vector.
d_newSubcontexts.push_back(newContext); |
Then it moves them to
d_statContext
vector on next StatContext::snapshot()
moveNewSubcontexts(); |
However
StatContext::numSubcontexts()
doesn't take into account d_newSubcontext
and will return 0 if there haven't been any stapshots yet.blazingmq/src/groups/mwc/mwcst/mwcst_statcontext.h
Lines 1136 to 1140 in 7d4b3d7
inline int StatContext::numSubcontexts() const | |
{ | |
return static_cast<int>(d_subcontexts.size() + | |
d_deletedSubcontexts.size()); | |
} |
So if we don't check the number of snapshots in
portsDeleter()
we can unintentionally delete the StatContext
that has just been added.
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 167 of commit 3d29a39 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 167 of commit 3d29a39 has completed with FAILURE
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 232 of commit 3555a56 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 236 of commit 4cf0b83 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 239 of commit 0565d51 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts
mwcst::StatContextConfiguration portConfig(port, &localAllocator); | ||
bsl::shared_ptr<mwcst::StatContext> portStatContext = | ||
parent->addSubcontext( | ||
portConfig.storeExpiredSubcontextValues(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we convert bslma::ManagedPtr
to bsl::shared_ptr
. How does the ownership transfer works in this situiation? Is this operation safe? Are we sure that the underlying StatContext
is not accidentally destroyed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this is a common practice:
blazingmq/src/groups/mwc/mwcio/mwcio_statchannelfactory.cpp
Lines 126 to 127 in 5aaf596
bsl::shared_ptr<mwcst::StatContext> statContext( | |
d_config.d_statContextCreator(channel, handleSp)); |
blazingmq/src/groups/mqb/mqbstat/mqbstat_clusterstats.cpp
Lines 254 to 258 in 5aaf596
d_partitionsStatContexts.emplace_back( | |
bsl::shared_ptr<mwcst::StatContext>( | |
d_statContext_mp->addSubcontext( | |
mwcst::StatContextConfiguration(partitionName, | |
&localAllocator)))); |
blazingmq/src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Lines 2582 to 2584 in 5aaf596
StatContextMp statContextMp = | |
d_clusterData.clusterNodesStatContext()->addSubcontext(config); | |
StatContextSp statContextSp(statContextMp, d_allocator_p); |
and so on
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 254 of commit 2e5e9e8 has completed with FAILURE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.com>
} | ||
|
||
void TCPSessionFactory::PortManager::onDeleteChannelContext( | ||
const bsl::uint16_t port) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For basic numeric types we don't typically specify const in arguments list
return d_streamSocket_sp->sourceEndpoint(); | ||
} | ||
else { | ||
return ntsa::Endpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might replace with ternary operator
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build 259 of commit 7f29d3a has completed with FAILURE
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
Signed-off-by: Anton Pryakhin <apryakhin1@bloomberg.net>
No description provided.