From 07fa5d1bfe4e7ff3ce832778ed82d4372ebd8613 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Mon, 14 Oct 2024 19:24:02 +0300 Subject: [PATCH 1/3] Feat[plugins]: report queue depth per appId to prometheus (#446) Signed-off-by: Evgeny Malygin --- .../mqb/mqbstat/mqbstat_statcontroller.cpp | 7 ++- .../bmqprometheus_prometheusstatconsumer.cpp | 62 ++++++++++++------- ...qprometheus_prometheusstatconsumer_test.py | 8 ++- 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp index aa499e8e9..dcd07cde9 100644 --- a/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp +++ b/src/groups/mqb/mqbstat/mqbstat_statcontroller.cpp @@ -576,7 +576,12 @@ void StatController::snapshotAndNotify() // StatConsumers will report all stats bsl::vector::iterator it = d_statConsumers.begin(); for (; it != d_statConsumers.end(); ++it) { - (*it)->onSnapshot(); + try { + (*it)->onSnapshot(); + } + catch (const bsl::exception& e) { + BALL_LOG_ERROR << "#PLUGIN_ERROR " << e.what(); + } } const bool willPrint = d_printer_mp->nextSnapshotWillPrint(); diff --git a/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp b/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp index b7fa44559..c037b54ee 100644 --- a/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp +++ b/src/plugins/bmqprometheus/bmqprometheus_prometheusstatconsumer.cpp @@ -359,8 +359,8 @@ void PrometheusStatConsumer::captureQueueStats() {"queue_gc_msgs", Stat::e_GC_MSGS_DELTA, true}, {"queue_cfg_msgs", Stat::e_CFG_MSGS, false}, {"queue_cfg_bytes", Stat::e_CFG_BYTES, false}, - {"queue_content_msgs", Stat::e_MESSAGES_MAX, false}, - {"queue_content_bytes", Stat::e_BYTES_MAX, false}, + {"queue_content_msgs_max", Stat::e_MESSAGES_MAX, false}, + {"queue_content_bytes_max", Stat::e_BYTES_MAX, false}, {"queue_queue_time_avg", Stat::e_QUEUE_TIME_AVG, false}, {"queue_queue_time_max", Stat::e_QUEUE_TIME_MAX, false}, {"queue_reject_msgs", Stat::e_REJECT_DELTA, true}, @@ -390,16 +390,19 @@ void PrometheusStatConsumer::captureQueueStats() } } - // Add `appId` tag to `queue_confirm_time_max` and - // `queue_queue_time_max` metrics. - static const DatapointDef confirmTimeDataPoint = { - "queue_confirm_time_max", - Stat::e_CONFIRM_TIME_MAX, - false}; - static const DatapointDef queueTimeDataPoint = { - "queue_queue_time_max", - Stat::e_QUEUE_TIME_MAX, - false}; + // Add `appId` tag to metrics. + + // These per-appId metrics exist for both primary and replica + static const DatapointDef defsCommon[] = { + {"queue_confirm_time_max", Stat::e_CONFIRM_TIME_MAX, false}, + }; + + // These per-appId metrics exist only for primary + static const DatapointDef defsPrimary[] = { + {"queue_queue_time_max", Stat::e_QUEUE_TIME_MAX, false}, + {"queue_content_msgs_max", Stat::e_MESSAGES_MAX, false}, + {"queue_content_bytes_max", Stat::e_BYTES_MAX, false}, + }; for (mwcst::StatContextIterator appIdIt = queueIt->subcontextIterator(); appIdIt; @@ -407,18 +410,33 @@ void PrometheusStatConsumer::captureQueueStats() tagger.setAppId(appIdIt->name()); const auto labels = tagger.getLabels(); - auto value = mqbstat::QueueStatsDomain::getValue( - *appIdIt, - d_snapshotId, - mqbstat::QueueStatsDomain::Stat::e_CONFIRM_TIME_MAX); - updateMetric(&confirmTimeDataPoint, labels, value); + for (DatapointDefCIter dpIt = + bdlb::ArrayUtil::begin(defsCommon); + dpIt != bdlb::ArrayUtil::end(defsCommon); + ++dpIt) { + const bsls::Types::Int64 value = + mqbstat::QueueStatsDomain::getValue( + *appIdIt, + d_snapshotId, + static_cast( + dpIt->d_stat)); + updateMetric(dpIt, labels, value); + } if (role == mqbstat::QueueStatsDomain::Role::e_PRIMARY) { - value = mqbstat::QueueStatsDomain::getValue( - *appIdIt, - d_snapshotId, - mqbstat::QueueStatsDomain::Stat::e_QUEUE_TIME_MAX); - updateMetric(&queueTimeDataPoint, labels, value); + for (DatapointDefCIter dpIt = + bdlb::ArrayUtil::begin(defsPrimary); + dpIt != bdlb::ArrayUtil::end(defsPrimary); + ++dpIt) { + const bsls::Types::Int64 value = + mqbstat::QueueStatsDomain::getValue( + *appIdIt, + d_snapshotId, + static_cast< + mqbstat::QueueStatsDomain::Stat::Enum>( + dpIt->d_stat)); + updateMetric(dpIt, labels, value); + } } } } diff --git a/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py b/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py index 2ec1c66ba..67a54e26b 100755 --- a/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py +++ b/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py @@ -66,7 +66,11 @@ "queue_push_bytes", "queue_ack_msgs", ] -QUEUE_PRIMARY_NODE_METRICS = ["queue_gc_msgs", "queue_cfg_msgs", "queue_content_msgs"] +QUEUE_PRIMARY_NODE_METRICS = [ + "queue_gc_msgs", + "queue_cfg_msgs", + "queue_content_msgs_max", +] CLUSTER_METRICS = ["cluster_healthiness"] BROKER_METRICS = ["brkr_summary_queues_count", "brkr_summary_clients_count"] @@ -330,7 +334,7 @@ def _check_statistic(prometheus_host): value = response["result"][1]["value"][-1] assert value == "1", _assert_message(metric, "1", value) # Queue primary node statistic - elif metric == "queue_content_msgs": + elif metric == "queue_content_msgs_max": # For first queue assert value == "2", _assert_message(metric, "2", value) # For second queue From 70bfe34c39f6ba0f56f2761a67312377c104c8c7 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Mon, 14 Oct 2024 14:19:12 -0400 Subject: [PATCH 2/3] [Fix] m_bmqstoragetool::FileManagerImpl: Asserts not have side effects (#461) Signed-off-by: Yuan Jing Vincent Yan --- .../bmqstoragetool/m_bmqstoragetool_filemanager.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/applications/bmqstoragetool/m_bmqstoragetool_filemanager.cpp b/src/applications/bmqstoragetool/m_bmqstoragetool_filemanager.cpp index 1e3040168..698ee84c1 100644 --- a/src/applications/bmqstoragetool/m_bmqstoragetool_filemanager.cpp +++ b/src/applications/bmqstoragetool/m_bmqstoragetool_filemanager.cpp @@ -129,8 +129,10 @@ QueueMap FileManagerImpl::buildQueueMap(const bsl::string& cslFile, cslFile.c_str()); bsl::string pattern(alloc); bsl::string location(alloc); - BSLS_ASSERT(bdls::PathUtil::getBasename(&pattern, cslFile) == 0); - BSLS_ASSERT(bdls::PathUtil::getDirname(&location, cslFile) == 0); + int rc = bdls::PathUtil::getBasename(&pattern, cslFile); + BSLS_ASSERT(rc == 0); + rc = bdls::PathUtil::getDirname(&location, cslFile); + BSLS_ASSERT(rc == 0); ledgerConfig.setLocation(location) .setPattern(pattern) .setMaxLogSize(fileSize) @@ -145,7 +147,10 @@ QueueMap FileManagerImpl::buildQueueMap(const bsl::string& cslFile, // Create and open the ledger mqbsl::Ledger ledger(ledgerConfig, alloc); - BSLS_ASSERT(ledger.open(mqbsi::Ledger::e_READ_ONLY) == 0); + rc = ledger.open(mqbsi::Ledger::e_READ_ONLY); + BSLS_ASSERT(rc == 0); + (void)rc; // Compiler happiness + // Set guard to close the ledger bdlb::ScopeExitAny guard(bdlf::BindUtil::bind(closeLedger, &ledger)); From 71b4dabcd4d85a51290ebe9280ce3b0b3a55e3b9 Mon Sep 17 00:00:00 2001 From: Alexander Ivanov Date: Tue, 15 Oct 2024 16:13:03 +0300 Subject: [PATCH 3/3] Feat[MQB]: Enhance queue consumption monitor alarm log with additional details (#420) --- .../mqbblp/mqbblp_queueconsumptionmonitor.cpp | 211 ++------ .../mqbblp/mqbblp_queueconsumptionmonitor.h | 51 +- .../mqbblp_queueconsumptionmonitor.t.cpp | 505 +++--------------- .../mqb/mqbblp/mqbblp_queueengineutil.h | 14 + .../mqb/mqbblp/mqbblp_rootqueueengine.cpp | 225 +++++++- .../mqb/mqbblp/mqbblp_rootqueueengine.h | 7 +- src/integration-tests/test_alarms.py | 44 +- .../test_reconfigure_domains.py | 4 +- 8 files changed, 413 insertions(+), 648 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp index 04567bd41..da75b1534 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.cpp @@ -19,12 +19,9 @@ #include // MBQ #include -#include #include #include #include -#include -#include // BMQ #include @@ -32,7 +29,6 @@ #include // MWC -#include #include #include @@ -125,23 +121,12 @@ const char* QueueConsumptionMonitor::Transition::toAscii( // struct QueueConsumptionMonitor::SubStreamInfo // --------------------------------------------- -QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo(const HeadCb& headCb) +QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo() : d_lastKnownGoodTimer(0) , d_messageSent(true) , d_state(State::e_ALIVE) -, d_headCb(headCb) { - BSLS_ASSERT_SAFE(d_headCb); -} - -QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo( - const SubStreamInfo& other) -: d_lastKnownGoodTimer(other.d_lastKnownGoodTimer) -, d_messageSent(other.d_messageSent) -, d_state(other.d_state) -, d_headCb(other.d_headCb) -{ - BSLS_ASSERT_SAFE(d_headCb); + // NOTHING } // ----------------------------- @@ -150,14 +135,17 @@ QueueConsumptionMonitor::SubStreamInfo::SubStreamInfo( // CREATORS QueueConsumptionMonitor::QueueConsumptionMonitor(QueueState* queueState, + const LoggingCb& loggingCb, bslma::Allocator* allocator) : d_queueState_p(queueState) , d_maxIdleTime(0) , d_currentTimer(0) , d_subStreamInfos(allocator) +, d_loggingCb(loggingCb) { // PRECONDITIONS BSLS_ASSERT_SAFE(d_queueState_p); + BSLS_ASSERT_SAFE(d_loggingCb); } // MANIPULATORS @@ -176,14 +164,13 @@ QueueConsumptionMonitor::setMaxIdleTime(bsls::Types::Int64 value) last = d_subStreamInfos.end(); iter != last; ++iter) { - iter->second = SubStreamInfo(iter->second.d_headCb); + iter->second = SubStreamInfo(); } return *this; } -void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key, - const HeadCb& headCb) +void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key) { // Should always be called from the queue thread, but will be invoked from // the cluster thread once upon queue creation. @@ -191,12 +178,11 @@ void QueueConsumptionMonitor::registerSubStream(const mqbu::StorageKey& key, // PRECONDITIONS BSLS_ASSERT_SAFE(key != mqbu::StorageKey::k_NULL_KEY || d_subStreamInfos.empty()); - BSLS_ASSERT_SAFE(headCb); BSLS_ASSERT_SAFE(d_subStreamInfos.find(mqbu::StorageKey::k_NULL_KEY) == d_subStreamInfos.end()); BSLS_ASSERT_SAFE(d_subStreamInfos.find(key) == d_subStreamInfos.end()); - d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo(headCb))); + d_subStreamInfos.insert(bsl::make_pair(key, SubStreamInfo())); } void QueueConsumptionMonitor::unregisterSubStream(const mqbu::StorageKey& key) @@ -240,37 +226,22 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) d_currentTimer = currentTimer; - // TBD: 'queue empty' is not the best condition to test. The queue may - // contain messages that have been sent but not yet confirmed. A better - // test would be to check whether the message iterator in the engine points - // to the end of storage, but we don't have access to these. A solution - // would be to have QueueEngine::beforeMessageRemoved notify this monitor, - // via a new method on this component. Not implemented yet because Engines - // are about to undergo overhaul. - for (SubStreamInfoMapIter iter = d_subStreamInfos.begin(), last = d_subStreamInfos.end(); iter != last; ++iter) { - SubStreamInfo& info = iter->second; - BSLS_ASSERT_SAFE(info.d_headCb); - bslma::ManagedPtr head = info.d_headCb(); - if (head) { - if (head->atEnd()) { - head.reset(); - } - } - if (info.d_messageSent || !head) { - // Queue is 'alive' either because at least one message was sent - // since the last 'timer', or the queue is at its head (no more - // messages to deliver to this substream). + SubStreamInfo& info = iter->second; + const mqbu::StorageKey& appKey = iter->first; + if (info.d_messageSent) { + // Queue is 'alive' because at least one message was sent + // since the last 'timer'. info.d_messageSent = false; info.d_lastKnownGoodTimer = d_currentTimer; if (info.d_state == State::e_IDLE) { // object was in idle state - onTransitionToAlive(&(iter->second), iter->first); + onTransitionToAlive(&info, appKey); continue; // CONTINUE } @@ -278,17 +249,29 @@ void QueueConsumptionMonitor::onTimer(bsls::Types::Int64 currentTimer) continue; // CONTINUE } - if (info.d_state == State::e_IDLE) { - // state was already idle, nothing more to do - continue; // CONTINUE - } - - BSLS_ASSERT_SAFE(info.d_state == State::e_ALIVE); - if (d_currentTimer - info.d_lastKnownGoodTimer > d_maxIdleTime) { // No delivered messages in the last 'maxIdleTime'. - onTransitionToIdle(&(iter->second), iter->first, head); - continue; // CONTINUE + + // Call callback to log alarm if there are undelivered messages. + const bool haveUndelivered = d_loggingCb(appKey, + info.d_state == + State::e_ALIVE); + + if (haveUndelivered) { + // There are undelivered messages, transition to idle. + if (info.d_state == State::e_ALIVE) { + info.d_state = State::e_IDLE; + } + } + else { + // The queue is at its head (no more + // messages to deliver to this substream), + // so transition to alive. + if (info.d_state == State::e_IDLE) { + info.d_lastKnownGoodTimer = d_currentTimer; + onTransitionToAlive(&info, appKey); + } + } } } } @@ -321,129 +304,5 @@ void QueueConsumptionMonitor::onTransitionToAlive( BALL_LOG_INFO << "Queue '" << uri << "' no longer appears to be stuck."; } -void QueueConsumptionMonitor::onTransitionToIdle( - SubStreamInfo* subStreamInfo, - const mqbu::StorageKey& appKey, - const bslma::ManagedPtr& head) -{ - // executed by the *QUEUE DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( - d_queueState_p->queue())); - - subStreamInfo->d_state = State::e_IDLE; - - bdlma::LocalSequentialAllocator<2048> localAllocator(0); - bsl::vector handles(&localAllocator); - d_queueState_p->handleCatalog().loadHandles(&handles); - - bmqt::UriBuilder uriBuilder(d_queueState_p->uri(), &localAllocator); - bsl::string appId; - - if (appKey.isNull()) { - appId = bmqp::ProtocolUtil::k_DEFAULT_APP_ID; - } - else if (d_queueState_p->storage()->hasVirtualStorage(appKey, &appId)) { - uriBuilder.setId(appId); - } - - bmqt::Uri uri(&localAllocator); - uriBuilder.uri(&uri); - - mwcu::MemOutStream ss(&localAllocator); - - int idx = 1; - int numConsumers = 0; - - const bool isFanoutValue = - d_queueState_p->queue()->hasMultipleSubStreams(); - - for (bsl::vector::const_iterator it = handles.begin(), - last = handles.end(); - it != last; - ++it) { - const mqbi::QueueHandle::SubStreams& subStreamInfos = - (*it)->subStreamInfos(); - - for (mqbi::QueueHandle::SubStreams::const_iterator infoCiter = - subStreamInfos.begin(); - infoCiter != subStreamInfos.end(); - ++infoCiter) { - const bsl::string& itemAppId = infoCiter->first; - - bool isReader = !isFanoutValue && - bmqt::QueueFlagsUtil::isReader( - (*it)->handleParameters().flags()); - // Non-fanout mode consumer in the default subStream ? - isReader |= isFanoutValue && !itemAppId.empty(); - - if (!isReader) { - continue; // CONTINUE - } - - if (itemAppId != appId) { - continue; // CONTINUE - } - - numConsumers += infoCiter->second.d_counts.d_readCount; - - const int level = 2, spacesPerLevel = 2; - - ss << "\n " << idx++ << ". " << (*it)->client()->description() - << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - << "Handle Parameters .....: " << (*it)->handleParameters() - << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - << "UnconfirmedMonitors ....:"; - - const bsl::vector monitors = - (*it)->unconfirmedMonitors(appId); - for (size_t i = 0; i < monitors.size(); ++i) { - ss << "\n " << monitors[i]; - } - } - } - - mwcu::MemOutStream out; - out << "Queue '" << uri << "' "; - d_queueState_p->storage()->capacityMeter()->printShortSummary(out); - out << ", max idle time " - << mwcu::PrintUtil::prettyTimeInterval(d_maxIdleTime) - << " appears to be stuck. It currently has " << numConsumers - << " consumers." << ss.str() << "\n"; - - // Print the 10 oldest messages in the queue - static const int k_NUM_MSGS = 10; - const int level = 0, spacesPerLevel = 2; - - out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - << k_NUM_MSGS << " oldest messages in the queue:\n"; - - mqbcmd::Result result; - mqbs::StoragePrintUtil::listMessages(&result.makeQueueContents(), - appId, - 0, - k_NUM_MSGS, - d_queueState_p->storage()); - mqbcmd::HumanPrinter::print(out, result); - - if (!head) { - return; // RETURN - } - - // Print the current head of the queue - mqbi::Storage* const storage = d_queueState_p->storage(); - out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) - << "Current head of the queue:\n"; - - mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), storage, *head); - - mqbcmd::HumanPrinter::print(out, result); - out << "\n"; - - MWCTSK_ALARMLOG_ALARM("QUEUE_CONSUMER_MONITOR") - << out.str() << MWCTSK_ALARMLOG_END; -} - } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h index 0779ea601..0d570e64d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.h @@ -210,8 +210,12 @@ class QueueConsumptionMonitor { static const char* toAscii(Transition::Enum value); }; - typedef bsl::function(void)> - HeadCb; + /// Callback function to log alarm info when queue state transitions to + /// idle. First argument is the app key, second argument is a boolean flag + /// to enable logging. If `enableLog` is `false`, logging is skipped. + /// Return `true` if there are un-delivered messages and `false` otherwise. + typedef bsl::function + LoggingCb; private: // PRIVATE TYPES @@ -220,8 +224,7 @@ class QueueConsumptionMonitor { struct SubStreamInfo { // CREATORS - SubStreamInfo(const HeadCb& headCb); - SubStreamInfo(const SubStreamInfo& other); + SubStreamInfo(); // PUBLIC DATA bsls::Types::Int64 d_lastKnownGoodTimer; @@ -234,12 +237,6 @@ class QueueConsumptionMonitor { // the last time slice State::Enum d_state; // The current state. - - HeadCb d_headCb; - // Returns storage iterator to the 1st - // un-delivered message including - // 'put-aside' messages (those without - // matching Subscriptions). }; typedef bsl::unordered_map& head); + /// `appKey`, and write log, upon transition to alive state. + void onTransitionToAlive(SubStreamInfo* subStreamInfo, + const mqbu::StorageKey& appKey); public: // TRAITS @@ -302,10 +296,12 @@ class QueueConsumptionMonitor { // CREATORS /// Create a `QueueConsumptionMonitor` object that monitors the queue - /// specified by `queueState`. Use the optionally specified - /// `basicAllocator` to supply memory. If `basicAllocator` is 0, the - /// currently installed default allocator is used. + /// specified by `queueState`. Use the specified `loggingCb` callback for + /// logging alarm data. Use the optionally specified `allocator` to supply + /// memory. If `allocator` is 0, the currently installed default allocator + /// is used. QueueConsumptionMonitor(QueueState* queueState, + const LoggingCb& loggingCb, bslma::Allocator* allocator); // MANIPULATORS @@ -321,12 +317,11 @@ class QueueConsumptionMonitor { /// this object. QueueConsumptionMonitor& setMaxIdleTime(bsls::Types::Int64 value); - /// Register the substream identified by the specified `key` and - /// consuming from the specified `storageIter` for monitoring. `key` - /// may be `StorageKey::k_NULL_KEY`, in which case no other key may be - /// registered via this function. It is illegal to register the same + /// Register the substream identified by the specified `key`. + /// `key` may be `StorageKey::k_NULL_KEY`, in which case no other key may + /// be registered via this function. It is illegal to register the same /// substream more than once. - void registerSubStream(const mqbu::StorageKey& key, const HeadCb& headCb); + void registerSubStream(const mqbu::StorageKey& key); /// Stop monitoring the substream identified by the specified `key`. /// `key` must have been previously registered via `registerSubStream`. diff --git a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp index 6ecaed15e..e08c85940 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp @@ -17,28 +17,19 @@ #include // MBQ -#include #include #include -#include -#include #include -#include #include #include #include #include #include #include -#include #include -// BMQ -#include -#include -#include - // MWC +#include #include #include #include @@ -51,6 +42,7 @@ #include #include #include +#include using namespace BloombergLP; using namespace bsl; @@ -68,104 +60,57 @@ static mqbconfm::Domain getDomainConfig() return domainCfg; } -struct ClientContext { - // PUBLIC DATA - mqbmock::DispatcherClient d_dispatcherClient; - const bsl::shared_ptr - d_requesterContext_sp; - - // CREATORS - ClientContext(); - virtual ~ClientContext(); -}; - -ClientContext::ClientContext() -: d_dispatcherClient(s_allocator_p) -, d_requesterContext_sp(new (*s_allocator_p) - mqbi::QueueHandleRequesterContext(s_allocator_p), - s_allocator_p) -{ - d_requesterContext_sp->setClient(&d_dispatcherClient); -} - -ClientContext::~ClientContext() -{ - // NOTHING -} - struct Test : mwctst::Test { - typedef bsl::vector< - bsl::pair > - TestQueueHandleSeq; - // PUBLIC DATA - bmqt::Uri d_uri; - unsigned int d_id; - mqbu::StorageKey d_storageKey; - int d_partitionId; - mqbmock::Dispatcher d_dispatcher; - bdlbb::PooledBlobBufferFactory d_bufferFactory; - mqbmock::Cluster d_cluster; - mqbmock::Domain d_domain; - mqbmock::Queue d_queue; - QueueState d_queueState; - QueueConsumptionMonitor d_monitor; - mqbs::InMemoryStorage d_storage; - bdlbb::Blob d_dataBlob, d_optionBlob; - bsl::unordered_map d_advance; - unsigned int d_clientId; - ClientContext d_consumer1; - ClientContext d_consumer2; - ClientContext d_producer; - TestQueueHandleSeq d_testQueueHandles; + mqbu::StorageKey d_storageKey; + mqbmock::Dispatcher d_dispatcher; + bdlbb::PooledBlobBufferFactory d_bufferFactory; + mqbmock::Cluster d_cluster; + mqbmock::Domain d_domain; + mqbmock::Queue d_queue; + QueueState d_queueState; + QueueConsumptionMonitor d_monitor; + mqbs::InMemoryStorage d_storage; + bsl::set d_haveUndelivered; // CREATORS Test(); ~Test() BSLS_KEYWORD_OVERRIDE; // MANIPULATORS - void putMessage(); - mqbi::QueueHandle* createClient( - const ClientContext& clientContext, - bmqt::QueueFlags::Enum role, - const bsl::string& appId = bmqp::ProtocolUtil::k_DEFAULT_APP_ID, - unsigned int subQueueId = bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID); - - bslma::ManagedPtr head(const mqbu::StorageKey& key); - void advance(const mqbu::StorageKey& key); + void putMessage(mqbu::StorageKey key = mqbu::StorageKey::k_NULL_KEY); + bool loggingCb(const mqbu::StorageKey& appKey, bool enableLog); }; Test::Test() -: d_uri("bmq://bmq.test.local/test_queue") -, d_id(802701) -, d_storageKey(mqbu::StorageKey::k_NULL_KEY) -, d_partitionId(1) +: d_storageKey(mqbu::StorageKey::k_NULL_KEY) , d_dispatcher(s_allocator_p) , d_bufferFactory(1024, s_allocator_p) , d_cluster(&d_bufferFactory, s_allocator_p) , d_domain(&d_cluster, s_allocator_p) , d_queue(&d_domain, s_allocator_p) , d_queueState(&d_queue, - d_uri, - d_id, + bmqt::Uri("bmq://bmq.test.local/test_queue"), + 802701, d_storageKey, - d_partitionId, + 1, &d_domain, d_cluster._resources(), s_allocator_p) -, d_monitor(&d_queueState, s_allocator_p) +, d_monitor(&d_queueState, + bdlf::BindUtil::bind(&Test::loggingCb, + this, + bdlf::PlaceHolders::_1, // appKey + bdlf::PlaceHolders::_2), // enableLog + + s_allocator_p) , d_storage(d_queue.uri(), - mqbu::StorageKey::k_NULL_KEY, + d_storageKey, mqbs::DataStore::k_INVALID_PARTITION_ID, getDomainConfig(), d_domain.capacityMeter(), s_allocator_p) -, d_advance(s_allocator_p) -, d_clientId(0) -, d_consumer1() -, d_consumer2() -, d_producer() -, d_testQueueHandles(s_allocator_p) +, d_haveUndelivered(s_allocator_p) { d_dispatcher._setInDispatcherThread(true); d_queue._setDispatcher(&d_dispatcher); @@ -198,95 +143,31 @@ Test::Test() d_queueState.setStorage(storageMp); d_domain.queueStatContext()->snapshot(); - - d_consumer1.d_dispatcherClient._setDescription("test consumer 1"); - d_consumer2.d_dispatcherClient._setDescription("test consumer 2"); - d_producer.d_dispatcherClient._setDescription("test producer"); } Test::~Test() { - for (TestQueueHandleSeq::reverse_iterator - iter = d_testQueueHandles.rbegin(), - last = d_testQueueHandles.rend(); - iter != last; - ++iter) { - bsl::shared_ptr handleSp; - bsls::Types::Uint64 lostFlags; - d_queueState.handleCatalog().releaseHandleHelper(&handleSp, - &lostFlags, - iter->first, - iter->second, - true); - } - d_domain.unregisterQueue(&d_queue); } -void Test::putMessage() +void Test::putMessage(mqbu::StorageKey key) { - bmqt::MessageGUID messageGUID; - mqbu::MessageGUIDUtil::generateGUID(&messageGUID); - - mqbi::StorageMessageAttributes messageAttributes; - bslma::ManagedPtr appData(&d_dataBlob, - 0, - bslma::ManagedPtrUtil::noOpDeleter); - bslma::ManagedPtr options(&d_dataBlob, - 0, - bslma::ManagedPtrUtil::noOpDeleter); - - ASSERT_EQ(d_storage.put(&messageAttributes, messageGUID, appData, options), - mqbi::StorageResult::e_SUCCESS); + d_monitor.onMessageSent(key); + d_haveUndelivered.insert(key); } -mqbi::QueueHandle* Test::createClient(const ClientContext& clientContext, - bmqt::QueueFlags::Enum role, - const bsl::string& appId, - unsigned int subQueueId) +bool Test::loggingCb(const mqbu::StorageKey& appKey, const bool enableLog) { - bmqp_ctrlmsg::QueueHandleParameters handleParams(s_allocator_p); - handleParams.uri() = d_uri.asString(); - handleParams.qId() = ++d_clientId; - handleParams.readCount() = role == bmqt::QueueFlags::e_READ ? 1 : 0; - handleParams.writeCount() = role == bmqt::QueueFlags::e_WRITE ? 1 : 0; - handleParams.adminCount() = 0; - handleParams.flags() = role; - - mqbi::QueueHandle* queueHandle = d_queueState.handleCatalog().createHandle( - clientContext.d_requesterContext_sp, - handleParams, - &d_queueState.stats()); - d_testQueueHandles.push_back(bsl::make_pair(queueHandle, handleParams)); - - // Update the current handle parameters. - d_queueState.add(handleParams); - - bmqp_ctrlmsg::SubQueueIdInfo subStreamInfo; - subStreamInfo.appId() = appId; - queueHandle->registerSubStream(subStreamInfo, - subQueueId, - mqbi::QueueCounts(1, 0)); - - return queueHandle; -} + BALL_LOG_SET_CATEGORY("MQBBLP.QUEUECONSUMPTIONMONITORTEST"); -bslma::ManagedPtr -Test::head(const mqbu::StorageKey& key) -{ - bslma::ManagedPtr out; - out = d_storage.getIterator(key); - for (int i = 0; i < d_advance[key]; ++i) { - if (!out->atEnd()) { - out->advance(); - } + bool haveUndelivered = d_haveUndelivered.contains(appKey); + + if (enableLog && haveUndelivered) { + MWCTSK_ALARMLOG_ALARM("QUEUE_STUCK") + << "Test Alarm" << MWCTSK_ALARMLOG_END; } - return out; -} -void Test::advance(const mqbu::StorageKey& key) -{ - ++d_advance[key]; + return haveUndelivered; } // ============================================================================ @@ -306,18 +187,16 @@ TEST_F(Test, doNotMonitor) mwctst::ScopedLogObserver observer(ball::Severity::INFO, s_allocator_p); - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); + d_monitor.registerSubStream(d_storageKey); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(0); d_monitor.onTimer(1000000); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(observer.records().size(), 0U); @@ -338,58 +217,19 @@ TEST_F(Test, emptyQueue) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); + d_monitor.registerSubStream(d_storageKey); d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); } -TEST_F(Test, logFormat) -// ------------------------------------------------------------------------ -// Concerns: State becomes IDLE after set period then returns to normal -// when message is processed - this is a typical, full scenario. -// -// Plan: Instantiate component, put message in queue, make time pass and -// check that state flips to IDLE according to specs, check logs, make more -// time pass and check that state remains 'idle', signal component that a -// message was consumed, check that state flips to 'alive', make more time -// pass, check that state remains 'alive'. -// ------------------------------------------------------------------------ -{ - mwctst::ScopedLogObserver logObserver(ball::Severity::INFO, s_allocator_p); - - const bsls::Types::Int64 k_MAX_IDLE_TIME = 10; - - d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); - - putMessage(); - - d_monitor.onTimer(k_MAX_IDLE_TIME); - d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(logObserver.records().size(), 1u); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "ALARM \\[QUEUE_CONSUMER_MONITOR\\]", - s_allocator_p)); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "Queue '.*'", - s_allocator_p)); -} - TEST_F(Test, putAliveIdleSendAlive) // ------------------------------------------------------------------------ // Concerns: State becomes IDLE after set period then returns to normal @@ -409,47 +249,40 @@ TEST_F(Test, putAliveIdleSendAlive) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); + d_monitor.registerSubStream(d_storageKey); putMessage(); d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME - 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(logObserver.records().size(), ++expectedLogRecords); ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( logObserver.records().back(), - "ALARM \\[QUEUE_CONSUMER_MONITOR\\]", - s_allocator_p)); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "0 consumers", + "ALARM \\[QUEUE_STUCK\\]", s_allocator_p)); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_IDLE); - d_monitor.onMessageSent(mqbu::StorageKey::k_NULL_KEY); - advance(mqbu::StorageKey::k_NULL_KEY); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + d_monitor.onMessageSent(d_storageKey); + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); @@ -459,60 +292,8 @@ TEST_F(Test, putAliveIdleSendAlive) logObserver.records().back(), "no longer appears to be stuck", s_allocator_p)); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), - QueueConsumptionMonitor::State::e_ALIVE); -} - -TEST_F(Test, putAliveIdleWithConsumer) -{ - // ------------------------------------------------------------------------ - // Concerns: Same as above, but with two read and one write clients. - // - // Plan: Start monitoring, create three clients (2 read and 1 write), put - // message in queue, create an 'idle' condition, check that the two read - // clients (but not the write client) are reported in the log. - // ------------------------------------------------------------------------ - mwctst::ScopedLogObserver logObserver(ball::Severity::INFO, s_allocator_p); - size_t expectedLogRecords = 3U; - - const bsls::Types::Int64 k_MAX_IDLE_TIME = 10; - d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); - - createClient(d_consumer1, bmqt::QueueFlags::e_READ); - createClient(d_consumer2, bmqt::QueueFlags::e_READ); - createClient(d_producer, bmqt::QueueFlags::e_WRITE); - - putMessage(); - - d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(logObserver.records().size(), expectedLogRecords); - - d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), - QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(logObserver.records().size(), ++expectedLogRecords); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "ALARM \\[QUEUE_CONSUMER_MONITOR\\].*It currently has 2 consumers", - s_allocator_p)); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "test consumer 1", - s_allocator_p)); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "test consumer 2", - s_allocator_p)); - ASSERT(!mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "test producer", - s_allocator_p)); } TEST_F(Test, putAliveIdleEmptyAlive) @@ -528,23 +309,22 @@ TEST_F(Test, putAliveIdleEmptyAlive) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); + d_monitor.registerSubStream(d_storageKey); + putMessage(); d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_IDLE); - d_storage.removeAll(d_storageKey); + d_haveUndelivered.erase(d_storageKey); d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); } @@ -562,37 +342,35 @@ TEST_F(Test, changeMaxIdleTime) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); + d_monitor.registerSubStream(d_storageKey); putMessage(); d_monitor.onTimer(0); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_IDLE); mwctst::ScopedLogObserver logObserver(ball::Severity::INFO, s_allocator_p); d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME * 2); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); ASSERT_EQ(logObserver.records().size(), 0u); d_monitor.onTimer(k_MAX_IDLE_TIME * 2); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(k_MAX_IDLE_TIME * 2 + k_MAX_IDLE_TIME * 2); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); d_monitor.onTimer(k_MAX_IDLE_TIME * 2 + k_MAX_IDLE_TIME * 2 + 1); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_IDLE); } @@ -609,14 +387,12 @@ TEST_F(Test, reset) d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); + d_monitor.registerSubStream(d_storageKey); putMessage(); d_monitor.onTimer(0); - ASSERT_EQ(d_monitor.state(mqbu::StorageKey::k_NULL_KEY), + ASSERT_EQ(d_monitor.state(d_storageKey), QueueConsumptionMonitor::State::e_ALIVE); mwctst::ScopedLogObserver logObserver(ball::Severity::INFO, s_allocator_p); @@ -653,13 +429,11 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) d_storage.addVirtualStorage(errorDescription, "app1", key1); d_storage.addVirtualStorage(errorDescription, "app2", key2); - d_monitor.registerSubStream(key1, - bdlf::BindUtil::bind(&Test::head, this, key1)); - - d_monitor.registerSubStream(key2, - bdlf::BindUtil::bind(&Test::head, this, key2)); + d_monitor.registerSubStream(key1); + d_monitor.registerSubStream(key2); - putMessage(); + putMessage(key1); + putMessage(key2); d_monitor.onTimer(k_MAX_IDLE_TIME); ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); @@ -685,11 +459,7 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) for (int i = 0; i < 2; ++i) { ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( logObserver.records().rbegin()[i], - "ALARM \\[QUEUE_CONSUMER_MONITOR\\]", - s_allocator_p)); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().rbegin()[i], - "0 consumers", + "ALARM \\[QUEUE_STUCK\\]", s_allocator_p)); } @@ -698,7 +468,6 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); d_monitor.onMessageSent(key1); - advance(key1); ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); ASSERT_EQ(logObserver.records().size(), expectedLogRecords); @@ -715,7 +484,6 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); d_monitor.onMessageSent(key2); - advance(key2); d_monitor.onTimer(3 * k_MAX_IDLE_TIME + 3); ASSERT_EQ(logObserver.records().size(), expectedLogRecords += 1); ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( @@ -727,122 +495,6 @@ TEST_F(Test, putAliveIdleSendAliveTwoSubstreams) ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); } -TEST_F(Test, putAliveIdleSendAliveTwoSubstreamsTwoConsumers) -// ------------------------------------------------------------------------ -// Concerns: State becomes IDLE after set period then returns to normal -// when message is processed - this is a typical, full scenario. -// -// Plan: Instantiate component, put message in queue, make time pass and -// check that state flips to IDLE according to specs, check logs, make more -// time pass and check that state remains 'idle', signal component that a -// message was consumed, check that state flips to 'alive', make more time -// pass, check that state remains 'alive'. -// ------------------------------------------------------------------------ -{ - mwctst::ScopedLogObserver logObserver(ball::Severity::INFO, s_allocator_p); - size_t expectedLogRecords = 3U; - - const bsls::Types::Int64 k_MAX_IDLE_TIME = 10; - - mqbu::StorageKey key1, key2; - key1.fromHex("ABCDEF1234"); - key2.fromHex("1234ABCDEF"); - - d_monitor.setMaxIdleTime(k_MAX_IDLE_TIME); - - mwcu::MemOutStream errorDescription(s_allocator_p); - d_storage.addVirtualStorage(errorDescription, "app1", key1); - d_storage.addVirtualStorage(errorDescription, "app2", key2); - - d_monitor.registerSubStream(key1, - bdlf::BindUtil::bind(&Test::head, this, key1)); - - d_monitor.registerSubStream(key2, - bdlf::BindUtil::bind(&Test::head, this, key2)); - - createClient(d_consumer1, bmqt::QueueFlags::e_READ, "app1"); - createClient(d_consumer2, bmqt::QueueFlags::e_READ, "app2"); - createClient(d_producer, bmqt::QueueFlags::e_WRITE); - - putMessage(); - - d_monitor.onTimer(k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(logObserver.records().size(), expectedLogRecords); - - d_monitor.onTimer(2 * k_MAX_IDLE_TIME - 1); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(logObserver.records().size(), expectedLogRecords); - - d_monitor.onTimer(2 * k_MAX_IDLE_TIME); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(logObserver.records().size(), expectedLogRecords); - - d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 1); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); - - ASSERT_EQ(logObserver.records().size(), expectedLogRecords += 2); - - for (bsl::vector::const_iterator iter = - logObserver.records().end() - 2; - iter != logObserver.records().end(); - ++iter) { - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - *iter, - "ALARM \\[QUEUE_CONSUMER_MONITOR\\] Queue " - "'bmq://bmq.test.local/test_queue\\?id=app\\d'", - s_allocator_p)); - ASSERT( - mwctst::ScopedLogObserverUtil::recordMessageMatch(*iter, - "1 consumers", - s_allocator_p)); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - *iter, - "test consumer \\d", - s_allocator_p)); - ASSERT( - !mwctst::ScopedLogObserverUtil::recordMessageMatch(*iter, - "test producer", - s_allocator_p)); - } - - d_monitor.onTimer(2 * k_MAX_IDLE_TIME + 2); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); - - d_monitor.onMessageSent(key1); - advance(key1); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); - ASSERT_EQ(logObserver.records().size(), expectedLogRecords); - - d_monitor.onTimer(3 * k_MAX_IDLE_TIME + 2); - ASSERT_EQ(logObserver.records().size(), expectedLogRecords += 1); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "Queue 'bmq://bmq.test.local/test_queue\\?id=app1' no longer appears " - "to be stuck", - s_allocator_p)); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_IDLE); - - d_monitor.onMessageSent(key2); - advance(key2); - d_monitor.onTimer(3 * k_MAX_IDLE_TIME + 3); - ASSERT_EQ(logObserver.records().size(), expectedLogRecords += 1); - ASSERT(mwctst::ScopedLogObserverUtil::recordMessageMatch( - logObserver.records().back(), - "Queue 'bmq://bmq.test.local/test_queue\\?id=app2' no longer appears " - "to be stuck", - s_allocator_p)); - ASSERT_EQ(d_monitor.state(key1), QueueConsumptionMonitor::State::e_ALIVE); - ASSERT_EQ(d_monitor.state(key2), QueueConsumptionMonitor::State::e_ALIVE); -} - TEST_F(Test, usage) // ------------------------------------------------------------------------- // Concerns: Make sure the usage example is correct. @@ -857,9 +509,7 @@ TEST_F(Test, usage) monitor.setMaxIdleTime(20); - d_monitor.registerSubStream( - mqbu::StorageKey::k_NULL_KEY, - bdlf::BindUtil::bind(&Test::head, this, mqbu::StorageKey::k_NULL_KEY)); + d_monitor.registerSubStream(d_storageKey); putMessage(); putMessage(); @@ -878,8 +528,7 @@ TEST_F(Test, usage) monitor.onTimer(T += 15); // nothing is logged ASSERT_EQ(logObserver.records().size(), 1u); // 15 seconds later - T + 60s - consume first message, inform monitor: - monitor.onMessageSent(mqbu::StorageKey::k_NULL_KEY); - advance(mqbu::StorageKey::k_NULL_KEY); + monitor.onMessageSent(d_storageKey); // 15 seconds later - T + 75s monitor.onTimer(T += 15); // log INFO: back to active @@ -891,7 +540,7 @@ TEST_F(Test, usage) monitor.onTimer(T += 15); // log ALARM ASSERT_EQ(logObserver.records().size(), 3u); // 15 seconds later - T + 120s - d_storage.removeAll(d_storageKey); + d_haveUndelivered.erase(d_storageKey); monitor.onTimer(T += 15); // log INFO: back to active ASSERT_EQ(logObserver.records().size(), 4u); diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h index 377688654..08e371716 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h @@ -550,6 +550,10 @@ struct QueueEngineUtil_AppState { /// the queue iterator: empty PutAside List and no resume point. bool isAtEndOfStorage() const; + size_t putAsideListSize() const; + + size_t redeliveryListSize() const; + Routers::Consumer* findQueueHandleContext(mqbi::QueueHandle* handle); unsigned int upstreamSubQueueId() const; @@ -784,6 +788,16 @@ QueueEngineUtil_AppState::putForRedelivery(const bmqt::MessageGUID& guid) d_redeliveryList.add(guid); } +inline size_t QueueEngineUtil_AppState::putAsideListSize() const +{ + return d_putAsideList.size(); +} + +inline size_t QueueEngineUtil_AppState::redeliveryListSize() const +{ + return d_redeliveryList.size(); +} + inline Routers::Consumer* QueueEngineUtil_AppState::findQueueHandleContext(mqbi::QueueHandle* handle) { diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 80cb2fce9..f2a362177 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -23,12 +23,15 @@ #include #include #include +#include #include #include #include #include #include #include +#include +#include // BMQ #include @@ -258,7 +261,13 @@ RootQueueEngine::RootQueueEngine(QueueState* queueState, const mqbconfm::Domain& domainConfig, bslma::Allocator* allocator) : d_queueState_p(queueState) -, d_consumptionMonitor(queueState, allocator) +, d_consumptionMonitor( + queueState, + bdlf::BindUtil::bind(&RootQueueEngine::logAlarmCb, + this, + bdlf::PlaceHolders::_1, // appKey + bdlf::PlaceHolders::_2), // enableLog + allocator) , d_apps(allocator) , d_nullKeyCount(0) , d_hasAutoSubscriptions(false) @@ -436,9 +445,7 @@ int RootQueueEngine::initializeAppId(const bsl::string& appId, iter->value()->authorize(appKey, ordinal); - d_consumptionMonitor.registerSubStream( - appKey, - bdlf::BindUtil::bind(&RootQueueEngine::head, this, iter->value())); + d_consumptionMonitor.registerSubStream(appKey); BALL_LOG_INFO << "Found virtual storage for appId [" << appId << "], queue [" << d_queueState_p->uri() << "], appKey [" @@ -801,10 +808,7 @@ mqbi::QueueHandle* RootQueueEngine::getHandle( if (!iter->value()->isAuthorized()) { if (iter->value()->authorize()) { d_consumptionMonitor.registerSubStream( - iter->value()->appKey(), - bdlf::BindUtil::bind(&RootQueueEngine::head, - this, - iter->value())); + iter->value()->appKey()); } } } @@ -1613,6 +1617,207 @@ void RootQueueEngine::onTimer(bsls::Types::Int64 currentTimer) d_consumptionMonitor.onTimer(currentTimer); } +bool RootQueueEngine::logAlarmCb(const mqbu::StorageKey& appKey, + bool enableLog) const +{ + // executed by the *QUEUE DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( + d_queueState_p->queue())); + + // Get AppState by appKey. + Apps::const_iterator cItApp = d_apps.findByKey2(AppKeyCount(appKey, 0)); + if (cItApp == d_apps.end()) { + BALL_LOG_WARN << "No app found for appKey: " << appKey; + return false; // RETURN + } + const AppStateSp& app = cItApp->value(); + + // Check if there are un-delivered messages + bslma::ManagedPtr headIt = head(app); + + if (!headIt) { + // No un-delivered messages, do nothing. + return false; // RETURN + } + if (!enableLog) { + // There are un-delivered messages, but log is disabled. + return true; // RETURN + } + + // Logging alarm info + bdlma::LocalSequentialAllocator<4096> localAllocator(d_allocator_p); + + mwcu::MemOutStream ss(&localAllocator); + + // Log app consumers queue handles info + int idx = 1; + int numConsumers = 0; + + const QueueEngineUtil_AppState::Consumers& consumers = app->consumers(); + for (QueueEngineUtil_AppState::Consumers::const_iterator citConsumer = + consumers.begin(); + citConsumer != consumers.end(); + ++citConsumer) { + mqbi::QueueHandle* const queueHandle_p = citConsumer->first; + + const mqbi::QueueHandle::SubStreams& subStreamInfos = + queueHandle_p->subStreamInfos(); + + for (mqbi::QueueHandle::SubStreams::const_iterator citSubStreams = + subStreamInfos.begin(); + citSubStreams != subStreamInfos.end(); + ++citSubStreams) { + numConsumers += citSubStreams->second.d_counts.d_readCount; + + const int level = 2, spacesPerLevel = 2; + + ss << "\n " << idx++ << ". " + << queueHandle_p->client()->description() + << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) + << "Handle Parameters .....: " + << queueHandle_p->handleParameters() + << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) + << "Number of unconfirmed messages .....: " + << queueHandle_p->countUnconfirmed() + << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) + << "UnconfirmedMonitors ....:"; + + const bsl::vector monitors = + queueHandle_p->unconfirmedMonitors(app->appId()); + for (size_t i = 0; i < monitors.size(); ++i) { + ss << "\n " << *monitors[i]; + } + } + } + + mwcu::MemOutStream out(&localAllocator); + mqbi::Storage* const storage = d_queueState_p->storage(); + + out << "Queue '" << d_queueState_p->uri(); + if (app->appId() != bmqp::ProtocolUtil::k_DEFAULT_APP_ID) { + out << "?id=" << app->appId(); + } + out << "' "; + storage->capacityMeter()->printShortSummary(out); + out << ", max idle time " + << mwcu::PrintUtil::prettyTimeInterval( + d_queueState_p->queue()->domain()->config().maxIdleTime() * + bdlt::TimeUnitRatio::k_NANOSECONDS_PER_SECOND) + << " appears to be stuck. It currently has " << numConsumers + << " consumers." << ss.str() << '\n'; + + // Log un-delivered messages info + out << "\nFor appId: " << app->appId() << '\n'; + out << "Put aside list size: " << app->putAsideListSize() << '\n'; + out << "Redelivery list size: " << app->redeliveryListSize() << '\n'; + out << "Number of messages: " << storage->numMessages(appKey) << '\n'; + out << "Number of bytes: " << storage->numBytes(appKey) << "\n\n"; + + // Log consumer subscriptions + mqbblp::Routers::QueueRoutingContext& routingContext = + app->routing()->d_queue; + mqbcmd::Routing routing; + routingContext.loadInternals(&routing); + const bsl::vector& subscrGroups = + routing.subscriptionGroups(); + + // Limit to log only k_EXPR_NUM_LIMIT expressions + static const size_t k_EXPR_NUM_LIMIT = 50; + ss.reset(); + size_t exprNum = 0; + for (bsl::vector::const_iterator cIt = + subscrGroups.begin(); + cIt != subscrGroups.end() && exprNum < k_EXPR_NUM_LIMIT; + ++cIt) { + if (!cIt->expression().empty()) { + ss << cIt->expression() << '\n'; + ++exprNum; + } + } + if (exprNum) { + if (exprNum == k_EXPR_NUM_LIMIT) { + out << "First " << k_EXPR_NUM_LIMIT + << " of consumer subscription expressions: "; + } + else { + out << "Consumer subscription expressions: "; + } + out << '\n' << ss.str() << '\n'; + } + + // Log the first (oldest) message in a put aside list and its properties + if (!app->putAsideList().empty()) { + bslma::ManagedPtr storageIt_mp; + mqbi::StorageResult::Enum rc = storage->getIterator( + &storageIt_mp, + appKey, + app->putAsideList().first()); + if (rc == mqbi::StorageResult::e_SUCCESS) { + // Log timestamp + out << "Oldest message in the 'Put aside' list:\n"; + mqbcmd::Result result; + mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), + storage, + *storageIt_mp); + mqbcmd::HumanPrinter::print(out, result); + out << '\n'; + // Log message properties + const bsl::shared_ptr& appData = + storageIt_mp->appData(); + const bmqp::MessagePropertiesInfo& logic = + storageIt_mp->attributes().messagePropertiesInfo(); + bmqp::MessageProperties properties; + int ret = properties.streamIn(*appData, logic.isExtended()); + if (!ret) { + out << "Message Properties: " << properties << '\n'; + } + else { + BALL_LOG_WARN << "Failed to streamIn MessageProperties, rc = " + << rc; + out << "Message Properties: Failed to acquire [rc: " << rc + << "]\n"; + } + } + else { + BALL_LOG_WARN << "Failed to get storage iterator for GUID: " + << app->putAsideList().first() << ", rc = " << rc; + out << "'Put aside' list: Failed to acquire [rc: " << rc << "]\n"; + } + } + + // Print the 10 oldest messages in the queue + static const int k_NUM_MSGS = 10; + const int level = 0, spacesPerLevel = 2; + + out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) + << k_NUM_MSGS << " oldest messages in the queue:\n"; + + mqbcmd::Result result; + mqbs::StoragePrintUtil::listMessages(&result.makeQueueContents(), + app->appId(), + 0, + k_NUM_MSGS, + storage); + mqbcmd::HumanPrinter::print(out, result); + + // Print the current head of the queue + out << mwcu::PrintUtil::newlineAndIndent(level, spacesPerLevel) + << "Current head of the queue:\n"; + + mqbs::StoragePrintUtil::listMessage(&result.makeMessage(), + storage, + *headIt); + + mqbcmd::HumanPrinter::print(out, result); + out << "\n"; + + MWCTSK_ALARMLOG_ALARM("QUEUE_STUCK") << out.str() << MWCTSK_ALARMLOG_END; + + return true; +} + void RootQueueEngine::afterAppIdRegistered( const mqbi::Storage::AppIdKeyPair& appIdKeyPair) { @@ -1796,9 +2001,7 @@ void RootQueueEngine::registerStorage(const bsl::string& appId, iter->value()->authorize(appKey, appOrdinal); - d_consumptionMonitor.registerSubStream( - appKey, - bdlf::BindUtil::bind(&RootQueueEngine::head, this, iter->value())); + d_consumptionMonitor.registerSubStream(appKey); } void RootQueueEngine::unregisterStorage(const bsl::string& appId, diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index d4c369110..8ca5012a4 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -97,7 +97,6 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { /// (appId, appKeyCount) -> AppStateSp typedef mwcc::TwoKeyHashMap Apps; - typedef bslma::ManagedPtr StorageIteratorMp; private: // DATA @@ -207,6 +206,12 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { const AppStateSp& subQueue(unsigned int upstreamSubQueueId) const; + /// Callback called by `d_consumptionMonitor` when alarm condition is met. + /// If there are un-delivered messages for the specified `appKey` and + /// `enableLog` is `true` it logs alarm data. Return `true` if there are + /// un-delivered messages and `false` otherwise. + bool logAlarmCb(const mqbu::StorageKey& appKey, bool enableLog) const; + public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(RootQueueEngine, bslma::UsesBslmaAllocator) diff --git a/src/integration-tests/test_alarms.py b/src/integration-tests/test_alarms.py index 7728f99a9..3d296aa21 100644 --- a/src/integration-tests/test_alarms.py +++ b/src/integration-tests/test_alarms.py @@ -56,7 +56,7 @@ def test_no_alarms_for_a_slow_queue(cluster: Cluster): time.sleep(4) # First, test the alarm - assert leader.alarms("QUEUE_CONSUMER_MONITOR", 1) + assert leader.alarms("QUEUE_STUCK", 1) leader.drain() # Then test no alarm while consumer1 slowly confirms @@ -74,4 +74,44 @@ def test_no_alarms_for_a_slow_queue(cluster: Cluster): ) time.sleep(1) - assert not leader.alarms("QUEUE_CONSUMER_MONITOR", 1) + assert not leader.alarms("QUEUE_STUCK", 1) + + +@tweak.cluster.queue_operations.consumption_monitor_period_ms(500) +@tweak.domain.max_idle_time(1) +def test_alarms_subscription_mismatch(cluster: Cluster): + """ + Test broker ALARM log content for producer/consumer subscription expression mismatch (put aside list is not empty). + """ + + leader = cluster.last_known_leader + proxy = next(cluster.proxy_cycle()) + + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write,ack"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open( + tc.URI_PRIORITY, + flags=["read"], + succeed=True, + subscriptions=[{"expression": "x == 1"}], + ) + + producer.post( + tc.URI_PRIORITY, + ["msg"], + succeed=True, + wait_ack=True, + messageProperties=[{"name": "x", "value": "0", "type": "E_INT"}], + ) + + time.sleep(2) + + assert leader.alarms("QUEUE_STUCK", 1) + assert leader.capture(r"Put aside list size: 1") + assert leader.capture(r"Redelivery list size: 0") + assert leader.capture(r"Consumer subscription expressions:") + assert leader.capture(r"x == 1") + assert leader.capture(r"Oldest message in the 'Put aside' list:") + assert leader.capture(r"Message Properties: \[ x \(INT32\) = 0 \]") diff --git a/src/integration-tests/test_reconfigure_domains.py b/src/integration-tests/test_reconfigure_domains.py index 89aca318a..3f325cba8 100644 --- a/src/integration-tests/test_reconfigure_domains.py +++ b/src/integration-tests/test_reconfigure_domains.py @@ -254,7 +254,7 @@ def test_reconfigure_max_idle_time(self, multi_node: Cluster): # Sleep for long enough to trigger an alarm. time.sleep(1.5) - assert leader.alarms("QUEUE_CONSUMER_MONITOR", 1) + assert leader.alarms("QUEUE_STUCK", 1) # Confirm all messages in the queue. self.reader.confirm(URI_PRIORITY_1, "+2", succeed=True) @@ -277,7 +277,7 @@ def test_reconfigure_max_idle_time(self, multi_node: Cluster): self.reader.confirm(URI_PRIORITY_1, "+2", succeed=True) # Ensure that no alarm was issued. - assert not leader.alarms("QUEUE_CONSUMER_MONITOR", 1) + assert not leader.alarms("QUEUE_STUCK", 1) @tweak.domain.message_ttl(1) def test_reconfigure_message_ttl(self, multi_node: Cluster):