Skip to content

Commit

Permalink
Feat[MQB]: Enhance queue consumption monitor alarm log with additiona…
Browse files Browse the repository at this point in the history
…l details (bloomberg#420)
  • Loading branch information
alexander-e1off committed Oct 24, 2024
1 parent e7f197e commit 67709ff
Show file tree
Hide file tree
Showing 8 changed files with 413 additions and 648 deletions.
211 changes: 35 additions & 176 deletions src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,16 @@
#include <mqbscm_version.h>
// MBQ
#include <mqbblp_queuehandlecatalog.h>
#include <mqbcmd_humanprinter.h>
#include <mqbcmd_messages.h>
#include <mqbi_queueengine.h>
#include <mqbi_storage.h>
#include <mqbs_storageprintutil.h>
#include <mqbu_capacitymeter.h>

// BMQ
#include <bmqp_ctrlmsg_messages.h>
#include <bmqt_queueflags.h>
#include <bmqt_uri.h>

// MWC
#include <mwctsk_alarmlog.h>
#include <mwcu_memoutstream.h>
#include <mwcu_printutil.h>

Expand Down Expand Up @@ -125,23 +121,12 @@ const char* QueueConsumptionMonitor::Transition::toAscii(
// struct QueueConsumptionMonitor::SubStreamInfo
// ---------------------------------------------

QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(const HeadCb& headCb)
QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo()
: d_lastKnownGoodTimer(0)
, d_messageSent(true)
, d_state(State::e_ALIVE)
, d_headCb(headCb)
{
BSLS_ASSERT_SAFE(d_headCb);
}

QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(
const SubStreamInfo& other)
: d_lastKnownGoodTimer(other.d_lastKnownGoodTimer)
, d_messageSent(other.d_messageSent)
, d_state(other.d_state)
, d_headCb(other.d_headCb)
{
BSLS_ASSERT_SAFE(d_headCb);
// NOTHING
}

// -----------------------------
Expand All @@ -150,14 +135,17 @@ QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(

// CREATORS
QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState,
const LoggingCb& loggingCb,
bslma::Allocator* allocator)
: d_queueState_p(queueState)
, d_maxIdleTime(0)
, d_currentTimer(0)
, d_subStreamInfos(allocator)
, d_loggingCb(loggingCb)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p);
BSLS_ASSERT_SAFE(d_loggingCb);
}

// MANIPULATORS
Expand All @@ -176,27 +164,25 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value)
last = d_subStreamInfos.end();
iter != last;
++iter) {
iter->second = SubStreamInfo(iter->second.d_headCb);
iter->second = SubStreamInfo();
}

return *this;
}

void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key,
const HeadCb& headCb)
void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key)
{
// Should always be called from the queue thread, but will be invoked from
// the cluster thread once upon queue creation.

// PRECONDITIONS
BSLS_ASSERT_SAFE(key != mqbu::StorageKey::k_NULL_KEY ||
d_subStreamInfos.empty());
BSLS_ASSERT_SAFE(headCb);
BSLS_ASSERT_SAFE(d_subStreamInfos.find(mqbu::StorageKey::k_NULL_KEY) ==
d_subStreamInfos.end());
BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end());

d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo(headCb)));
d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo()));
}

void QueueConsumptionMonitor::unregisterSubStream(const mqbu::StorageKey& key)
Expand Down Expand Up @@ -240,55 +226,52 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer)

d_currentTimer = currentTimer;

// TBD: 'queue empty' is not the best condition to test. The queue may
// contain messages that have been sent but not yet confirmed. A better
// test would be to check whether the message iterator in the engine points
// to the end of storage, but we don't have access to these. A solution
// would be to have QueueEngine::beforeMessageRemoved notify this monitor,
// via a new method on this component. Not implemented yet because Engines
// are about to undergo overhaul.

for (SubStreamInfoMapIter iter = d_subStreamInfos.begin(),
last = d_subStreamInfos.end();
iter != last;
++iter) {
SubStreamInfo& info = iter->second;
BSLS_ASSERT_SAFE(info.d_headCb);
bslma::ManagedPtr<mqbi::StorageIterator> head = info.d_headCb();
if (head) {
if (head->atEnd()) {
head.reset();
}
}
if (info.d_messageSent || !head) {
// Queue is 'alive' either because at least one message was sent
// since the last 'timer', or the queue is at its head (no more
// messages to deliver to this substream).
SubStreamInfo& info = iter->second;
const mqbu::StorageKey& appKey = iter->first;
if (info.d_messageSent) {
// Queue is 'alive' because at least one message was sent
// since the last 'timer'.

info.d_messageSent = false;
info.d_lastKnownGoodTimer = d_currentTimer;

if (info.d_state == State::e_IDLE) {
// object was in idle state
onTransitionToAlive(&(iter->second), iter->first);
onTransitionToAlive(&info, appKey);
continue; // CONTINUE
}

BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE);
continue; // CONTINUE
}

if (info.d_state == State::e_IDLE) {
// state was already idle, nothing more to do
continue; // CONTINUE
}

BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE);

if (d_currentTimer - info.d_lastKnownGoodTimer > d_maxIdleTime) {
// No delivered messages in the last 'maxIdleTime'.
onTransitionToIdle(&(iter->second), iter->first, head);
continue; // CONTINUE

// Call callback to log alarm if there are undelivered messages.
const bool haveUndelivered = d_loggingCb(appKey,
info.d_state ==
State::e_ALIVE);

if (haveUndelivered) {
// There are undelivered messages, transition to idle.
if (info.d_state == State::e_ALIVE) {
info.d_state = State::e_IDLE;
}
}
else {
// The queue is at its head (no more
// messages to deliver to this substream),
// so transition to alive.
if (info.d_state == State::e_IDLE) {
info.d_lastKnownGoodTimer = d_currentTimer;
onTransitionToAlive(&info, appKey);
}
}
}
}
}
Expand Down Expand Up @@ -321,129 +304,5 @@ void QueueConsumptionMonitor::onTransitionToAlive(
BALL_LOG_INFO << "Queue '" << uri << "' no longer appears to be stuck.";
}

void QueueConsumptionMonitor::onTransitionToIdle(
SubStreamInfo* subStreamInfo,
const mqbu::StorageKey& appKey,
const bslma::ManagedPtr<mqbi::StorageIterator>& head)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread(
d_queueState_p->queue()));

subStreamInfo->d_state = State::e_IDLE;

bdlma::LocalSequentialAllocator<2048> localAllocator(0);
bsl::vector<mqbi::QueueHandle*> handles(&localAllocator);
d_queueState_p->handleCatalog().loadHandles(&handles);

bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator);
bsl::string appId;

if (appKey.isNull()) {
appId = bmqp::ProtocolUtil::k_DEFAULT_APP_ID;
}
else if (d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) {
uriBuilder.setId(appId);
}

bmqt::Uri uri(&localAllocator);
uriBuilder.uri(&uri);

mwcu::MemOutStream ss(&localAllocator);

int idx = 1;
int numConsumers = 0;

const bool isFanoutValue =
d_queueState_p->queue()->hasMultipleSubStreams();

for (bsl::vector<mqbi::QueueHandle*>::const_iterator it = handles.begin(),
last = handles.end();
it != last;
++it) {
const mqbi::QueueHandle::SubStreams& subStreamInfos =
(*it)->subStreamInfos();

for (mqbi::QueueHandle::SubStreams::const_iterator infoCiter =
subStreamInfos.begin();
infoCiter != subStreamInfos.end();
++infoCiter) {
const bsl::string& itemAppId = infoCiter->first;

bool isReader = !isFanoutValue &&
bmqt::QueueFlagsUtil::isReader(
(*it)->handleParameters().flags());
// Non-fanout mode consumer in the default subStream ?
isReader |= isFanoutValue && !itemAppId.empty();

if (!isReader) {
continue; // CONTINUE
}

if (itemAppId != appId) {
continue; // CONTINUE
}

numConsumers += infoCiter->second.d_counts.d_readCount;

const int level = 2, spacesPerLevel = 2;

ss << "\n " << idx++ << ". " << (*it)->client()->description()
<< mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "Handle Parameters .....: " << (*it)->handleParameters()
<< mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "UnconfirmedMonitors ....:";

const bsl::vector<const mqbu::ResourceUsageMonitor*> monitors =
(*it)->unconfirmedMonitors(appId);
for (size_t i = 0; i < monitors.size(); ++i) {
ss << "\n " << monitors[i];
}
}
}

mwcu::MemOutStream out;
out << "Queue '" << uri << "' ";
d_queueState_p->storage()->capacityMeter()->printShortSummary(out);
out << ", max idle time "
<< mwcu::PrintUtil::prettyTimeInterval(d_maxIdleTime)
<< " appears to be stuck. It currently has " << numConsumers
<< " consumers." << ss.str() << "\n";

// Print the 10 oldest messages in the queue
static const int k_NUM_MSGS = 10;
const int level = 0, spacesPerLevel = 2;

out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< k_NUM_MSGS << " oldest messages in the queue:\n";

mqbcmd::Result result;
mqbs::StoragePrintUtil::listMessages(&result.makeQueueContents(),
appId,
0,
k_NUM_MSGS,
d_queueState_p->storage());
mqbcmd::HumanPrinter::print(out, result);

if (!head) {
return; // RETURN
}

// Print the current head of the queue
mqbi::Storage* const storage = d_queueState_p->storage();
out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel)
<< "Current head of the queue:\n";

mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), storage, *head);

mqbcmd::HumanPrinter::print(out, result);
out << "\n";

MWCTSK_ALARMLOG_ALARM("QUEUE_CONSUMER_MONITOR")
<< out.str() << MWCTSK_ALARMLOG_END;
}

} // close package namespace
} // close enterprise namespace
Loading

0 comments on commit 67709ff

Please sign in to comment.