Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat[MQB]: Enhance queue consumption monitor alarm log with additional details #420

Merged
merged 46 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f02419e
Fix macro redefinition warnings, add bmqstoragetool.td target
alexander-e1off Mar 28, 2024
122344f
Merge branch 'bloomberg:main' into main
alexander-e1off May 31, 2024
9e23230
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 6, 2024
9117ac7
Fix order of objects creation in unit test
alexander-e1off Jun 6, 2024
87cdf0b
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 7, 2024
8e88dc2
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 12, 2024
6dfd1fa
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 14, 2024
4ba1056
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 18, 2024
dcae5bf
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 27, 2024
7482e16
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 28, 2024
81a0854
Merge branch 'bloomberg:main' into main
alexander-e1off Jun 28, 2024
2e29ae8
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 1, 2024
924c207
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 2, 2024
f279efd
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 15, 2024
8142686
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 23, 2024
2b5eaad
Merge branch 'bloomberg:main' into main
alexander-e1off Jul 30, 2024
917e8a1
Merge branch 'bloomberg:main' into main
alexander-e1off Aug 7, 2024
b1f9170
Merge branch 'bloomberg:main' into main
alexander-e1off Aug 20, 2024
9bf33e6
Merge branch 'bloomberg:main' into main
alexander-e1off Aug 29, 2024
66e3001
Merge branch 'bloomberg:main' into main
alexander-e1off Sep 6, 2024
3c6651f
Enhance queue alarm log with subscriptions info
alexander-e1off Sep 11, 2024
69da6c1
Debug message properties
alexander-e1off Sep 12, 2024
493b459
Add logging of oldest message in put aside queue and its properties
alexander-e1off Sep 12, 2024
b972874
Fix mqbblp_queueconsumptionmonitor.t
alexander-e1off Sep 13, 2024
763563f
Rename alarm label
alexander-e1off Sep 13, 2024
d092e76
Remove CI debug
alexander-e1off Sep 13, 2024
d2a5164
Cleanup
alexander-e1off Sep 13, 2024
e54b1b8
Fix subscription expressions printing
alexander-e1off Sep 17, 2024
070baec
Add printing of numMessages/numBytes per appId
alexander-e1off Sep 18, 2024
d4cf81d
Get rid of headCb
alexander-e1off Sep 23, 2024
1982cc2
Fix UT
alexander-e1off Sep 23, 2024
76744df
Cleanup
alexander-e1off Sep 24, 2024
a95ed92
Fix Solaris build
alexander-e1off Sep 24, 2024
265b77c
Merge branch 'bloomberg:main' into enhance-alarm-log
alexander-e1off Sep 24, 2024
b29a25c
Add test_alarms_subscription_mismatch
alexander-e1off Sep 24, 2024
4e11d2d
Cleanup
alexander-e1off Sep 24, 2024
6f7238f
Merge branch 'bloomberg:main' into enhance-alarm-log
alexander-e1off Sep 27, 2024
1616eb0
Fix review comments
alexander-e1off Sep 30, 2024
85a3eef
Refactor mqbblp_queueconsumptionmonitor.t
alexander-e1off Oct 11, 2024
88cd3df
Print appId in queue uri
alexander-e1off Oct 11, 2024
c6c332e
Merge branch 'bloomberg:main' into main
alexander-e1off Oct 11, 2024
f853ebf
Merge from main, fix merge conflicts
alexander-e1off Oct 14, 2024
24d2027
Fix code formatting
alexander-e1off Oct 14, 2024
e1b55b8
Fix review comments
alexander-e1off Oct 15, 2024
047955c
Merge branch 'main' into enhance-alarm-log
alexander-e1off Oct 15, 2024
b4e100f
Fix failed IT
alexander-e1off Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 35 additions & 176 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,16 @@
#include <mqbscm_version.h>
// MBQ
#include <mqbblp_queuehandlecatalog.h>
#include <mqbcmd_humanprinter.h>
#include <mqbcmd_messages.h>
#include <mqbi_queueengine.h>
#include <mqbi_storage.h>
#include <mqbs_storageprintutil.h>
#include <mqbu_capacitymeter.h>

// BMQ
#include <bmqp_ctrlmsg_messages.h>
#include <bmqt_queueflags.h>
#include <bmqt_uri.h>

// MWC
#include <mwctsk_alarmlog.h>
#include <mwcu_memoutstream.h>
#include <mwcu_printutil.h>

Expand Down Expand Up @@ -125,23 +121,12 @@ const char* QueueConsumptionMonitor::Transition::toAscii(
// struct QueueConsumptionMonitor::SubStreamInfo
// ---------------------------------------------

QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(const HeadCb& headCb)
QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo()
: d_lastKnownGoodTimer(0)
, d_messageSent(true)
, d_state(State::e_ALIVE)
, d_headCb(headCb)
{
BSLS_ASSERT_SAFE(d_headCb);
}

QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(
const SubStreamInfo& other)
: d_lastKnownGoodTimer(other.d_lastKnownGoodTimer)
, d_messageSent(other.d_messageSent)
, d_state(other.d_state)
, d_headCb(other.d_headCb)
{
BSLS_ASSERT_SAFE(d_headCb);
// NOTHING
}

// -----------------------------
Expand All @@ -150,14 +135,17 @@ QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(

// CREATORS
QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState,
const LoggingCb& loggingCb,
bslma::Allocator* allocator)
: d_queueState_p(queueState)
, d_maxIdleTime(0)
, d_currentTimer(0)
, d_subStreamInfos(allocator)
, d_loggingCb(loggingCb)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p);
BSLS_ASSERT_SAFE(d_loggingCb);
}

// MANIPULATORS
Expand All @@ -176,27 +164,25 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value)
last = d_subStreamInfos.end();
iter != last;
++iter) {
iter->second = SubStreamInfo(iter->second.d_headCb);
iter->second = SubStreamInfo();
}

return *this;
}

void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key,
const HeadCb& headCb)
void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key)
{
// Should always be called from the queue thread, but will be invoked from
// the cluster thread once upon queue creation.

// PRECONDITIONS
BSLS_ASSERT_SAFE(key != mqbu::StorageKey::k_NULL_KEY ||
d_subStreamInfos.empty());
BSLS_ASSERT_SAFE(headCb);
BSLS_ASSERT_SAFE(d_subStreamInfos.find(mqbu::StorageKey::k_NULL_KEY) ==
d_subStreamInfos.end());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end());

d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo(headCb)));
d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo()));
}

void QueueConsumptionMonitor::unregisterSubStream(const mqbu::StorageKey& key)
Expand Down Expand Up @@ -240,55 +226,52 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)

d_currentTimer = currentTimer;

// TBD: 'queue empty' is not the best condition to test. The queue may
// contain messages that have been sent but not yet confirmed. A better
// test would be to check whether the message iterator in the engine points
// to the end of storage, but we don't have access to these. A solution
// would be to have QueueEngine::beforeMessageRemoved notify this monitor,
// via a new method on this component. Not implemented yet because Engines
// are about to undergo overhaul.

for (SubStreamInfoMapIter iter = d_subStreamInfos.begin(),
last = d_subStreamInfos.end();
iter != last;
++iter) {
SubStreamInfo& info = iter->second;
BSLS_ASSERT_SAFE(info.d_headCb);
bslma::ManagedPtr<mqbi::StorageIterator> head = info.d_headCb();
if (head) {
if (head->atEnd()) {
head.reset();
}
}
if (info.d_messageSent || !head) {
// Queue is 'alive' either because at least one message was sent
// since the last 'timer', or the queue is at its head (no more
// messages to deliver to this substream).
SubStreamInfo& info = iter->second;
const mqbu::StorageKey& appKey = iter->first;
if (info.d_messageSent) {
// Queue is 'alive' because at least one message was sent
// since the last 'timer'.

info.d_messageSent = false;
info.d_lastKnownGoodTimer = d_currentTimer;

if (info.d_state == State::e_IDLE) {
// object was in idle state
onTransitionToAlive(&(iter->second), iter->first);
onTransitionToAlive(&info, appKey);
continue; // CONTINUE
}

BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE);
continue; // CONTINUE
}

if (info.d_state == State::e_IDLE) {
// state was already idle, nothing more to do
continue; // CONTINUE
}

BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE);

if (d_currentTimer - info.d_lastKnownGoodTimer > d_maxIdleTime) {
// No delivered messages in the last 'maxIdleTime'.
onTransitionToIdle(&(iter->second), iter->first, head);
continue; // CONTINUE

// Call callback to log alarm if there are undelivered messages.
const bool haveUndelivered = d_loggingCb(appKey,
info.d_state ==
State::e_ALIVE);

if (haveUndelivered) {
// There are undelivered messages, transition to idle.
if (info.d_state == State::e_ALIVE) {
info.d_state = State::e_IDLE;
}
}
else {
// The queue is at its head (no more
// messages to deliver to this substream),
// so transition to alive.
if (info.d_state == State::e_IDLE) {
info.d_lastKnownGoodTimer = d_currentTimer;
onTransitionToAlive(&info, appKey);
}
}
}
}
}
Expand Down Expand Up @@ -321,129 +304,5 @@ void QueueConsumptionMonitor::onTransitionToAlive(
BALL_LOG_INFO << "Queue '" << uri << "' no longer appears to be stuck.";
}

void QueueConsumptionMonitor::onTransitionToIdle(
SubStreamInfo* subStreamInfo,
const mqbu::StorageKey& appKey,
const bslma::ManagedPtr<mqbi::StorageIterator>& head)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

subStreamInfo->d_state = State::e_IDLE;

bdlma::LocalSequentialAllocator<2048> localAllocator(0);
bsl::vector<mqbi::QueueHandle*> handles(&localAllocator);
d_queueState_p->handleCatalog().loadHandles(&handles);

bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator);
bsl::string appId;

if (appKey.isNull()) {
appId = bmqp::ProtocolUtil::k_DEFAULT_APP_ID;
}
else if (d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) {
uriBuilder.setId(appId);
}

bmqt::Uri uri(&localAllocator);
uriBuilder.uri(&uri);

mwcu::MemOutStream ss(&localAllocator);

int idx = 1;
int numConsumers = 0;

const bool isFanoutValue =
d_queueState_p->queue()->hasMultipleSubStreams();

for (bsl::vector<mqbi::QueueHandle*>::const_iterator it = handles.begin(),
last = handles.end();
it != last;
++it) {
const mqbi::QueueHandle::SubStreams& subStreamInfos =
(*it)->subStreamInfos();

for (mqbi::QueueHandle::SubStreams::const_iterator infoCiter =
subStreamInfos.begin();
infoCiter != subStreamInfos.end();
++infoCiter) {
const bsl::string& itemAppId = infoCiter->first;

bool isReader = !isFanoutValue &&
bmqt::QueueFlagsUtil::isReader(
(*it)->handleParameters().flags());
// Non-fanout mode consumer in the default subStream ?
isReader |= isFanoutValue && !itemAppId.empty();

if (!isReader) {
continue; // CONTINUE
}

if (itemAppId != appId) {
continue; // CONTINUE
}

numConsumers += infoCiter->second.d_counts.d_readCount;

const int level = 2, spacesPerLevel = 2;

ss << "\n " << idx++ << ". " << (*it)->client()->description()
<< mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "Handle Parameters .....: " << (*it)->handleParameters()
<< mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "UnconfirmedMonitors ....:";

const bsl::vector<const mqbu::ResourceUsageMonitor*> monitors =
(*it)->unconfirmedMonitors(appId);
for (size_t i = 0; i < monitors.size(); ++i) {
ss << "\n " << monitors[i];
}
}
}

mwcu::MemOutStream out;
out << "Queue '" << uri << "' ";
d_queueState_p->storage()->capacityMeter()->printShortSummary(out);
out << ", max idle time "
<< mwcu::PrintUtil::prettyTimeInterval(d_maxIdleTime)
<< " appears to be stuck. It currently has " << numConsumers
<< " consumers." << ss.str() << "\n";

// Print the 10 oldest messages in the queue
static const int k_NUM_MSGS = 10;
const int level = 0, spacesPerLevel = 2;

out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< k_NUM_MSGS << " oldest messages in the queue:\n";

mqbcmd::Result result;
mqbs::StoragePrintUtil::listMessages(&result.makeQueueContents(),
appId,
0,
k_NUM_MSGS,
d_queueState_p->storage());
mqbcmd::HumanPrinter::print(out, result);

if (!head) {
return; // RETURN
}

// Print the current head of the queue
mqbi::Storage* const storage = d_queueState_p->storage();
out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "Current head of the queue:\n";

mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), storage, *head);

mqbcmd::HumanPrinter::print(out, result);
out << "\n";

MWCTSK_ALARMLOG_ALARM("QUEUE_CONSUMER_MONITOR")
<< out.str() << MWCTSK_ALARMLOG_END;
}

} // close package namespace
} // close enterprise namespace
Loading
Loading