From 6c43ce482961e5c308c66dcb3604d4b7fe15ba0a Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Fri, 30 Aug 2024 17:43:43 -0400 Subject: [PATCH 1/5] mqbc::StorageMgr: Ban 'processPrimaryStatusAdvisory' in non-FSM mode Signed-off-by: Yuan Jing Vincent Yan --- .../mqb/mqbblp/mqbblp_clusterorchestrator.cpp | 9 ++++- src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 34 +++++-------------- src/groups/mqb/mqbc/mqbc_storageutil.cpp | 5 --- 3 files changed, 17 insertions(+), 31 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index e624c17e7..205382343 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -1759,11 +1759,18 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory( // TBD: may need to review the order of invoking these routines. + BALL_LOG_INFO << d_clusterData_p->identity().description() + << " PartitionId [" << primaryAdv.partitionId() + << "]: received primary status advisory: " << primaryAdv + << ", from: " << source->nodeDescription(); + BSLS_ASSERT_SAFE(ns->isPrimaryForPartition(primaryAdv.partitionId())); d_stateManager_mp->setPrimaryStatus(primaryAdv.partitionId(), primaryAdv.status()); - d_storageManager_p->processPrimaryStatusAdvisory(primaryAdv, source); + if (!d_clusterConfig.clusterAttributes().isFSMWorkflow()) { + d_storageManager_p->processPrimaryStatusAdvisory(primaryAdv, source); + } } void ClusterOrchestrator::processStateNotification( diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index d1c0289e6..3a86cce4a 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -358,6 +358,10 @@ void StorageManager::setPrimaryStatusForPartitionDispatched( << "."; pinfo.setPrimaryStatus(value); + if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == value) { + d_fileStores[partitionId]->setPrimary(pinfo.primary(), + pinfo.primaryLeaseId()); + } } void StorageManager::processPrimaryDetect(int partitionId, @@ -4202,36 +4206,16 @@ void StorageManager::processReceiptEvent(const bmqp::Event& event, } void StorageManager::processPrimaryStatusAdvisory( - const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, - mqbnet::ClusterNode* source) + BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source) { // executed by *CLUSTER DISPATCHER* thread - // PRECONDITIONS + // PRECONDITION BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(source); - BSLS_ASSERT_SAFE(d_fileStores.size() > - static_cast(advisory.partitionId())); - - if (d_cluster_p->isStopping()) { - BALL_LOG_WARN << d_clusterData_p->identity().description() - << " Partition [" << advisory.partitionId() << "]: " - << "Cluster is stopping; skipping processing of " - << "PrimaryStatusAdvisory."; - return; // RETURN - } - - mqbs::FileStore* fs = d_fileStores[advisory.partitionId()].get(); - BSLS_ASSERT_SAFE(fs); - fs->execute(bdlf::BindUtil::bind( - &StorageUtil::processPrimaryStatusAdvisoryDispatched, - fs, - &d_partitionInfoVec[advisory.partitionId()], - advisory, - d_clusterData_p->identity().description(), - source, - true)); // isFSMWorkflow + BSLS_ASSERT_OPT(false && + "This method should only be invoked in non-FSM mode"); } void StorageManager::processReplicaStatusAdvisory( diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 60eace567..eb0424d93 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -3408,11 +3408,6 @@ void StorageUtil::processPrimaryStatusAdvisoryDispatched( << "node as primary."; } - BALL_LOG_INFO << clusterDescription << " PartitionId [" - << advisory.partitionId() - << "]: received primary status advisory: " << advisory - << ", from: " << source->nodeDescription(); - pinfo->setPrimary(source); pinfo->setPrimaryLeaseId(advisory.primaryLeaseId()); pinfo->setPrimaryStatus(advisory.status()); From fe1f576e20569615668871a65fdc08ee876ebaa1 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Wed, 4 Sep 2024 16:17:52 -0400 Subject: [PATCH 2/5] mqbc::StorageMgr: Transition to available only when all primary active Signed-off-by: Yuan Jing Vincent Yan --- src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 41 ++++++++++++++++++--- src/groups/mqb/mqbc/mqbc_storagemanager.h | 8 ++++ src/groups/mqb/mqbi/mqbi_storagemanager.h | 1 + 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 3a86cce4a..d63693823 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -51,6 +51,12 @@ namespace mqbc { namespace { const int k_GC_MESSAGES_INTERVAL_SECONDS = 30; + +bool isPrimaryActive(const mqbi::StorageManager_PartitionInfo pinfo) +{ + return pinfo.primaryStatus() == bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE; +} + } // close unnamed namespace // ---------------------------- @@ -248,9 +254,9 @@ void StorageManager::onPartitionRecovery(int partitionId) // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' // PRECONDITIONS - BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); mwcu::MemOutStream out; mqbs::StoragePrintUtil::printRecoveredStorages( @@ -286,7 +292,12 @@ void StorageManager::onPartitionRecovery(int partitionId) bdlf::BindUtil::bind(&StorageManager::forceFlushFileStores, this)); - d_recoveryStatusCb(0); + // Even though Cluster FSM and all Partition FSMs are now healed, + // we must check that all partitions have an active primary before + // transitioning ourself to E_AVAILABLE. + if (allParitionsAvailable()) { + d_recoveryStatusCb(0); + } } else { d_recoveryStatusCb(-1); @@ -334,9 +345,9 @@ void StorageManager::setPrimaryStatusForPartitionDispatched( // executed by *QUEUE_DISPATCHER* thread associated with 'partitionId' // PRECONDITIONS - BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); BSLS_ASSERT_SAFE(0 <= partitionId && partitionId < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(d_fileStores[partitionId]->inDispatcherThread()); PartitionInfo& pinfo = d_partitionInfoVec[partitionId]; if (!pinfo.primary()) { @@ -349,18 +360,23 @@ void StorageManager::setPrimaryStatusForPartitionDispatched( } BSLS_ASSERT_SAFE(pinfo.primaryLeaseId() > 0); + const bmqp_ctrlmsg::PrimaryStatus::Value oldValue = pinfo.primaryStatus(); BALL_LOG_INFO << d_clusterData_p->identity().description() << " Partition [" << partitionId << "]: " << "Setting the status of primary: " << pinfo.primary()->nodeDescription() << ", primaryLeaseId: " << pinfo.primaryLeaseId() - << ", from " << pinfo.primaryStatus() << " to " << value - << "."; + << ", from " << oldValue << " to " << value << "."; pinfo.setPrimaryStatus(value); if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == value) { d_fileStores[partitionId]->setPrimary(pinfo.primary(), pinfo.primaryLeaseId()); + + if (oldValue != bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE && + allParitionsAvailable()) { + d_recoveryStatusCb(0); + } } } @@ -3208,6 +3224,21 @@ void StorageManager::do_reapplyDetectSelfReplica( eventDataVecOut); } +// PRIVATE ACCESSORS +bool StorageManager::allParitionsAvailable() const +{ + // executed by *QUEUE_DISPATCHER* thread associated with *ANY* partition + + if (d_numPartitionsRecoveredFully != + static_cast(d_fileStores.size())) { + return false; // RETURN + } + + return bsl::all_of(d_partitionInfoVec.cbegin(), + d_partitionInfoVec.cend(), + &isPrimaryActive); +} + // CREATORS StorageManager::StorageManager( const mqbcfg::ClusterDefinition& clusterConfig, diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index 3b1bbd597..3cc4fb8b8 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -749,6 +749,14 @@ class StorageManager virtual void do_reapplyDetectSelfReplica(const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE; + // PRIVATE ACCESSORS + + /// Return true if all partitions are fully healed and have an active + /// active primary, false otherwise. + /// + /// THREAD: Executed by the Queue's dispatcher thread. + bool allParitionsAvailable() const; + public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(StorageManager, bslma::UsesBslmaAllocator) diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index a89683511..92315e5fd 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -43,6 +43,7 @@ // BDE #include +#include #include #include #include From 9bd46b963f57f4b20355b728007e744b39db34e5 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Wed, 4 Sep 2024 16:39:21 -0400 Subject: [PATCH 3/5] mqbc::StorageMgr: clang-format Signed-off-by: Yuan Jing Vincent Yan --- src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index d63693823..d110c244e 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -362,8 +362,8 @@ void StorageManager::setPrimaryStatusForPartitionDispatched( const bmqp_ctrlmsg::PrimaryStatus::Value oldValue = pinfo.primaryStatus(); BALL_LOG_INFO << d_clusterData_p->identity().description() - << " Partition [" << partitionId << "]: " - << "Setting the status of primary: " + << " Partition [" << partitionId + << "]: " << "Setting the status of primary: " << pinfo.primary()->nodeDescription() << ", primaryLeaseId: " << pinfo.primaryLeaseId() << ", from " << oldValue << " to " << value << "."; From 9394a3724fa32b24ee8487df52caac797e25dd42 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Fri, 6 Sep 2024 17:22:33 -0400 Subject: [PATCH 4/5] mqbc::StorageMgr: Healing replica buffers primary status advisories Signed-off-by: Yuan Jing Vincent Yan --- .../mqb/mqbblp/mqbblp_clusterorchestrator.cpp | 37 ++++-- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 9 ++ src/groups/mqb/mqbblp/mqbblp_storagemanager.h | 5 + .../mqb/mqbc/mqbc_partitionstatetable.h | 25 ++-- src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 121 ++++++++++++++++++ src/groups/mqb/mqbc/mqbc_storagemanager.h | 32 +++++ src/groups/mqb/mqbi/mqbi_storagemanager.h | 5 + .../mqb/mqbmock/mqbmock_storagemanager.cpp | 7 + .../mqb/mqbmock/mqbmock_storagemanager.h | 5 + 9 files changed, 226 insertions(+), 20 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index 205382343..1dbc09321 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -1655,17 +1655,32 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory( if (d_clusterConfig.clusterAttributes().isFSMWorkflow()) { if (pinfo.primaryNode() != source || pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) { - BALL_LOG_WARN << d_clusterData_p->identity().description() - << ": Partition [" << primaryAdv.partitionId() - << "]: received primary status advisory: " - << primaryAdv - << " from: " << source->nodeDescription() - << ", but self perceived primary and its leaseId are" - << ": [" - << (pinfo.primaryNode() - ? pinfo.primaryNode()->nodeDescription() - : "** null **") - << ", " << pinfo.primaryLeaseId() << "]."; + BALL_LOG_WARN_BLOCK + { + BALL_LOG_OUTPUT_STREAM + << d_clusterData_p->identity().description() + << ": Partition [" << primaryAdv.partitionId() + << "]: received primary status advisory: " << primaryAdv + << " from: " << source->nodeDescription() + << ", but self perceived primary and its leaseId are" + << ": [" + << (pinfo.primaryNode() + ? pinfo.primaryNode()->nodeDescription() + : "** null **") + << ", " << pinfo.primaryLeaseId() << "]."; + if (pinfo.primaryNode()) { + BALL_LOG_OUTPUT_STREAM << " Ignoring advisory."; + } + else { + BALL_LOG_OUTPUT_STREAM << " Since we have not received any" + << " information regarding the true" + << " primary, this advisory could " + << "be from the true one. Will" + << " buffer the advisory for now."; + d_storageManager_p->bufferPrimaryStatusAdvisory(primaryAdv, + source); + } + } return; // RETURN } } diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 8562aa7dc..d9f73b38c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -2025,6 +2025,15 @@ void StorageManager::processReceiptEvent(const bmqp::Event& event, source)); } +void StorageManager::bufferPrimaryStatusAdvisory( + BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source) +{ + // executed by *ANY* thread + + BSLS_ASSERT_OPT(false && "This method should only be invoked in FSM mode"); +} + void StorageManager::processPrimaryStatusAdvisory( const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, mqbnet::ClusterNode* source) diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index 2d0144fe5..cc3d7173f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -676,6 +676,11 @@ class StorageManager : public mqbi::StorageManager { processReceiptEvent(const bmqp::Event& event, mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; + /// Executed by any thread. + virtual void bufferPrimaryStatusAdvisory( + const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; + /// Executed by any thread. virtual void processPrimaryStatusAdvisory( const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, diff --git a/src/groups/mqb/mqbc/mqbc_partitionstatetable.h b/src/groups/mqb/mqbc/mqbc_partitionstatetable.h index 037c89b17..3c3520801 100644 --- a/src/groups/mqb/mqbc/mqbc_partitionstatetable.h +++ b/src/groups/mqb/mqbc/mqbc_partitionstatetable.h @@ -257,6 +257,9 @@ class PartitionStateTableActions { virtual void do_processBufferedLiveData(const ARGS& args) = 0; + virtual void + do_processBufferedPrimaryStatusAdvisories(const ARGS& args) = 0; + virtual void do_processLiveData(const ARGS& args) = 0; virtual void do_processPut(const ARGS& args) = 0; @@ -367,7 +370,8 @@ class PartitionStateTableActions { void do_cleanupSeqnums_resetReceiveDataCtx_reapplyDetectSelfReplica( const ARGS& args); - void do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog( + void + do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog( const ARGS& args); void @@ -382,7 +386,7 @@ class PartitionStateTableActions { const ARGS& args); void - do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog( + do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog( const ARGS& args); void @@ -579,10 +583,11 @@ class PartitionStateTable REPLICA_DATA_RQST_PULL, closeRecoveryFileSet_openStorage_startSendDataChunks, REPLICA_HEALING); - PST_CFG(REPLICA_HEALING, - DONE_SENDING_DATA_CHUNKS, - replicaDataResponsePull_processBufferedLiveData_stopWatchDog, - REPLICA_HEALED); + PST_CFG( + REPLICA_HEALING, + DONE_SENDING_DATA_CHUNKS, + replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog, + REPLICA_HEALED); PST_CFG( REPLICA_HEALING, ERROR_SENDING_DATA_CHUNKS, @@ -603,7 +608,7 @@ class PartitionStateTable PST_CFG( REPLICA_HEALING, DONE_RECEIVING_DATA_CHUNKS, - replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog, + replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog, REPLICA_HEALED); PST_CFG( REPLICA_HEALING, @@ -940,11 +945,12 @@ void PartitionStateTableActions:: template void PartitionStateTableActions:: - do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog( + do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog( const ARGS& args) { do_replicaDataResponsePull(args); do_processBufferedLiveData(args); + do_processBufferedPrimaryStatusAdvisories(args); do_stopWatchDog(args); } @@ -981,7 +987,7 @@ void PartitionStateTableActions:: template void PartitionStateTableActions:: - do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog( + do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog( const ARGS& args) { do_replicaDataResponsePush(args); @@ -989,6 +995,7 @@ void PartitionStateTableActions:: do_closeRecoveryFileSet(args); do_openStorage(args); do_processBufferedLiveData(args); + do_processBufferedPrimaryStatusAdvisories(args); do_stopWatchDog(args); } diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index d110c244e..5552a13a4 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -1085,6 +1085,25 @@ void StorageManager::processReplicaDataResponse( d_cluster_p); } +void StorageManager::bufferPrimaryStatusAdvisoryDispatched( + const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + mqbnet::ClusterNode* source) +{ + // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' + + // PRECONDITIONS + const int pid = advisory.partitionId(); + BSLS_ASSERT_SAFE(0 <= pid && pid < static_cast(d_fileStores.size())); + BSLS_ASSERT_SAFE(d_fileStores[pid]->inDispatcherThread()); + + BALL_LOG_INFO << d_clusterData_p->identity().description() + << " Partition [" << pid + << "]: Buffering primary status advisory: " << advisory; + + d_bufferedPrimaryStatusAdvisoryInfosVec.at(pid).push_back( + bsl::make_pair(advisory, source)); +} + void StorageManager::processShutdownEventDispatched(int partitionId) { // executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId' @@ -2399,6 +2418,67 @@ void StorageManager::do_processBufferedLiveData(const PartitionFSMArgsSp& args) } } +void StorageManager::do_processBufferedPrimaryStatusAdvisories( + const PartitionFSMArgsSp& args) +{ + // executed by the *QUEUE DISPATCHER* thread associated with the paritionId + // contained in 'args' + + // PRECONDITIONS + BSLS_ASSERT_SAFE(!args->eventsQueue()->empty()); + + const PartitionFSM::EventWithData& eventWithData = + args->eventsQueue()->front(); + const EventData& eventDataVec = eventWithData.second; + + BSLS_ASSERT_SAFE(eventDataVec.size() == 1); + + const PartitionFSMEventData& eventData = eventDataVec[0]; + const int partitionId = eventData.partitionId(); + + if (d_cluster_p->isStopping()) { + BALL_LOG_WARN << d_clusterData_p->identity().description() + << " Partition [" << partitionId << "]: " + << "Cluster is stopping; skipping processing of " + << "primary status advisory."; + return; // RETURN + } + + BALL_LOG_INFO + << d_clusterData_p->identity().description() << " Partition [" + << partitionId << "]: " << "Processing " + << d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].size() + << " buffered primary status advisory."; + + for (PrimaryStatusAdvisoryInfosCIter cit = + d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].cbegin(); + cit != d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].cend(); + ++cit) { + BSLS_ASSERT_SAFE(cit->first.partitionId() == partitionId); + + PartitionInfo& pinfo = d_partitionInfoVec[partitionId]; + if (cit->second->nodeId() != pinfo.primary()->nodeId() || + cit->first.primaryLeaseId() != pinfo.primaryLeaseId()) { + BALL_LOG_INFO << d_clusterData_p->identity().description() + << " Partition [" << partitionId + << "]: " << "Ignoring primary status advisory " + << cit->first + << " because primary node or leaseId is invalid. " + << "Self-perceived [prmary, leaseId] is: [" + << pinfo.primary()->nodeDescription() << "," + << pinfo.primaryLeaseId() << "]"; + continue; // CONTINUE + } + pinfo.setPrimaryStatus(cit->first.status()); + if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == cit->first.status()) { + d_fileStores[partitionId]->setPrimary(pinfo.primary(), + pinfo.primaryLeaseId()); + } + } + + d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].clear(); +} + void StorageManager::do_processLiveData(const PartitionFSMArgsSp& args) { // executed by the *QUEUE DISPATCHER* thread associated with the paritionId @@ -3276,6 +3356,7 @@ StorageManager::StorageManager( , d_appKeysVec(allocator) , d_partitionInfoVec(allocator) , d_partitionFSMVec(allocator) +, d_bufferedPrimaryStatusAdvisoryInfosVec(allocator) , d_numPartitionsRecoveredFully(0) , d_numPartitionsRecoveredQueues(0) , d_recoveryStartTimes(allocator) @@ -3307,6 +3388,9 @@ StorageManager::StorageManager( d_storages.resize(partitionCfg.numPartitions()); d_appKeysVec.resize(partitionCfg.numPartitions()); d_partitionInfoVec.resize(partitionCfg.numPartitions()); + d_bufferedPrimaryStatusAdvisoryInfosVec.resize( + partitionCfg.numPartitions(), + PrimaryStatusAdvisoryInfos(allocator)); d_recoveryStartTimes.resize(partitionCfg.numPartitions()); d_nodeToSeqNumCtxMapVec.resize(partitionCfg.numPartitions()); d_numReplicaDataResponsesReceivedVec.resize(partitionCfg.numPartitions()); @@ -4236,6 +4320,43 @@ void StorageManager::processReceiptEvent(const bmqp::Event& event, source)); } +void StorageManager::bufferPrimaryStatusAdvisory( + const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + mqbnet::ClusterNode* source) +{ + // executed by *ANY* thread + + // PRECONDITION + BSLS_ASSERT_SAFE(d_fileStores.size() > + static_cast(advisory.partitionId())); + + if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(!d_isStarted)) { + BSLS_PERFORMANCEHINT_UNLIKELY_HINT; + BALL_LOG_WARN << d_clusterData_p->identity().description() + << " Partition [" << advisory.partitionId() << "]: " + << " Not buffering primary status advisory as StorageMgr" + << " is not started."; + return; // RETURN + } + + if (d_cluster_p->isStopping()) { + BALL_LOG_WARN << d_clusterData_p->identity().description() + << " Partition [" << advisory.partitionId() << "]: " + << " Not buffering primary status advisory as cluster" + << " is stopping."; + return; // RETURN + } + + mqbs::FileStore* fs = d_fileStores[advisory.partitionId()].get(); + BSLS_ASSERT_SAFE(fs); + + fs->execute(bdlf::BindUtil::bind( + &StorageManager::bufferPrimaryStatusAdvisoryDispatched, + this, + advisory, + source)); +} + void StorageManager::processPrimaryStatusAdvisory( BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source) diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index 3cc4fb8b8..904f25919 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -157,6 +157,16 @@ class StorageManager typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter; + /// Vector of pairs of buffered primary status advisories and their source + typedef bsl::vector< + bsl::pair > + PrimaryStatusAdvisoryInfos; + typedef PrimaryStatusAdvisoryInfos::const_iterator + PrimaryStatusAdvisoryInfosCIter; + + typedef bsl::vector + PrimaryStatusAdvisoryInfosVec; + public: // TYPES typedef PartitionFSM::PartitionFSMArgsSp PartitionFSMArgsSp; @@ -344,6 +354,14 @@ class StorageManager // **must** be accessed in the associated Queue dispatcher thread // for the i-th partitionId. + /// Vector, indexed by partitionId, of vectors of pairs of buffered primary + /// status advisories and their source. + /// + // THREAD: Except during the ctor, the i-th index of this data member + // **must** be accessed in the associated Queue dispatcher thread + // for the i-th partitionId. + PrimaryStatusAdvisoryInfosVec d_bufferedPrimaryStatusAdvisoryInfosVec; + bsls::AtomicInt d_numPartitionsRecoveredFully; // Number of partitions whose recovery // has been fully completed. This @@ -603,6 +621,12 @@ class StorageManager processReplicaDataResponse(const RequestManagerType::RequestSp& context, mqbnet::ClusterNode* responder); + /// THREAD: Executed by the dispatcher thread for the specified + /// `partitionId`. + void bufferPrimaryStatusAdvisoryDispatched( + const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + mqbnet::ClusterNode* source); + /// THREAD: Executed by the dispatcher thread for the specified /// `partitionId`. void processShutdownEventDispatched(int partitionId); @@ -689,6 +713,9 @@ class StorageManager virtual void do_processBufferedLiveData(const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE; + virtual void do_processBufferedPrimaryStatusAdvisories( + const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE; + virtual void do_processLiveData(const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE; @@ -986,6 +1013,11 @@ class StorageManager processReceiptEvent(const bmqp::Event& event, mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; + /// Executed by any thread. + virtual void bufferPrimaryStatusAdvisory( + const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; + /// Executed in cluster dispatcher thread. virtual void processPrimaryStatusAdvisory( const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index 92315e5fd..f34106c6f 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -383,6 +383,11 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual void processReceiptEvent(const bmqp::Event& event, mqbnet::ClusterNode* source) = 0; + /// Executed by any thread. + virtual void bufferPrimaryStatusAdvisory( + const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + mqbnet::ClusterNode* source) = 0; + /// Executed by any thread. virtual void processPrimaryStatusAdvisory( const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp index f18b842a1..ed556c355 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.cpp @@ -237,6 +237,13 @@ void StorageManager::processReceiptEvent( // NOTHING } +void StorageManager::bufferPrimaryStatusAdvisory( + BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source) +{ + // NOTHING +} + void StorageManager::processPrimaryStatusAdvisory( BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source) diff --git a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h index 96d248ea3..efe1e6132 100644 --- a/src/groups/mqb/mqbmock/mqbmock_storagemanager.h +++ b/src/groups/mqb/mqbmock/mqbmock_storagemanager.h @@ -237,6 +237,11 @@ class StorageManager : public mqbi::StorageManager { processReceiptEvent(const bmqp::Event& event, mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; + /// Executed by any thread. + virtual void bufferPrimaryStatusAdvisory( + const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, + mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; + /// Executed by any thread. virtual void processPrimaryStatusAdvisory( const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory, From 8059ba6d758f62ba91f4067b057c7c830337cf32 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Wed, 11 Sep 2024 18:17:08 -0400 Subject: [PATCH 5/5] mqbs::FileStore: Rename setPrimary -> setActivePrimary Signed-off-by: Yuan Jing Vincent Yan --- src/groups/mqb/mqbc/mqbc_storagemanager.cpp | 9 +++++---- src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp | 8 ++++---- src/groups/mqb/mqbc/mqbc_storageutil.cpp | 6 +++--- src/groups/mqb/mqbs/mqbs_datastore.h | 6 +++--- src/groups/mqb/mqbs/mqbs_filestore.cpp | 12 ++++++------ src/groups/mqb/mqbs/mqbs_filestore.h | 6 +++--- src/groups/mqb/mqbs/mqbs_filestore.t.cpp | 4 ++-- 7 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 5552a13a4..623bea1ce 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -370,8 +370,8 @@ void StorageManager::setPrimaryStatusForPartitionDispatched( pinfo.setPrimaryStatus(value); if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == value) { - d_fileStores[partitionId]->setPrimary(pinfo.primary(), - pinfo.primaryLeaseId()); + d_fileStores[partitionId]->setActivePrimary(pinfo.primary(), + pinfo.primaryLeaseId()); if (oldValue != bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE && allParitionsAvailable()) { @@ -2471,8 +2471,9 @@ void StorageManager::do_processBufferedPrimaryStatusAdvisories( } pinfo.setPrimaryStatus(cit->first.status()); if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == cit->first.status()) { - d_fileStores[partitionId]->setPrimary(pinfo.primary(), - pinfo.primaryLeaseId()); + d_fileStores[partitionId]->setActivePrimary( + pinfo.primary(), + pinfo.primaryLeaseId()); } } diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp index 940d932a8..16722d928 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.t.cpp @@ -886,12 +886,12 @@ struct TestHelper { ._setDisableBroadcast(true); fs.open(); // TODO: clean this up since its a hack to set replica node as primary! - // had to explicitly setPrimary for fileStore because of the assert in - // writeRecords which allows writes only if current node is primary for - // the fileStore. + // had to explicitly setActivePrimary for fileStore because of the + // assert in writeRecords which allows writes only if current node is + // primary for the fileStore. // TODO: set primary to self but also correct it to the right node // after writing 'numRecords' records. - fs.setPrimary(selfNode, 1U); + fs.setActivePrimary(selfNode, 1U); const mqbu::StorageKey& queueKey = writeQueueCreationRecord(handle, &fs, k_PARTITION_ID); diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index eb0424d93..5afdcae84 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -1443,7 +1443,7 @@ void StorageUtil::onPartitionPrimarySync( } // Broadcast self as active primary of this partition. This must be done - // before invoking 'FileStore::setPrimary'. + // before invoking 'FileStore::setActivePrimary'. transitionToActivePrimary(pinfo, clusterData, partitionId); partitionPrimaryStatusCb(partitionId, status, pinfo->primaryLeaseId()); @@ -1451,7 +1451,7 @@ void StorageUtil::onPartitionPrimarySync( // Safe to inform partition now. Note that partition will issue a sync // point with old leaseId (if applicable) and another with new leaseId // immediately. - fs->setPrimary(pinfo->primary(), pinfo->primaryLeaseId()); + fs->setActivePrimary(pinfo->primary(), pinfo->primaryLeaseId()); } void StorageUtil::recoveredQueuesCb( @@ -3413,7 +3413,7 @@ void StorageUtil::processPrimaryStatusAdvisoryDispatched( pinfo->setPrimaryStatus(advisory.status()); if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == advisory.status()) { - fs->setPrimary(source, advisory.primaryLeaseId()); + fs->setActivePrimary(source, advisory.primaryLeaseId()); } } diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 70ea91568..6e680641e 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -675,10 +675,10 @@ class DataStore : public mqbi::DispatcherClient { virtual int issueSyncPoint() = 0; /// Set the specified `primaryNode` with the specified `primaryLeaseId` - /// as the primary for this data store partition. Note that + /// as the active primary for this data store partition. Note that /// `primaryNode` could refer to the node which owns this data store. - virtual void setPrimary(mqbnet::ClusterNode* primaryNode, - unsigned int primaryLeaseId) = 0; + virtual void setActivePrimary(mqbnet::ClusterNode* primaryNode, + unsigned int primaryLeaseId) = 0; /// Clear the current primary associated with this partition. virtual void clearPrimary() = 0; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index de99589ab..9b7eb2cf5 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -111,7 +111,7 @@ const unsigned int k_REQUESTED_JOURNAL_SPACE = // Above, 3 == 1 journal record being written + // 1 journal sync point if rolling over + // 1 journal sync point if self needs to issue another sync point -// in 'setPrimary' with old values +// in 'setActivePrimary' with old values /// Return a rounded (down) percentage value (range [0-100]) representing /// the space in use on a file with the specified `capacity`, currently @@ -3758,8 +3758,8 @@ int FileStore::issueSyncPointInternal(SyncPointType::Enum type, // New primary and no force issue requested. Currently, // this check is redundant because we always force issue a // sync point when a primary is chosen (see - // 'setPrimary()'), which always bumps up 'd_sequenceNum' - // to 1. + // 'setActivePrimary()'), which always bumps up + // 'd_sequenceNum' to 1. return rc_SUCCESS; // RETURN } @@ -6261,7 +6261,7 @@ void FileStore::processStorageEvent(const bsl::shared_ptr& blob, // If we are processing a partition-sync event, we have to // bump up the leaseId to that of the message, because we don't // get a separate notification about leaseId (unlike in steady - // state when StorageMgr invokes fs.setPrimary()). + // state when StorageMgr invokes fs.setActivePrimary()). d_primaryLeaseId = recHeader->primaryLeaseId(); @@ -6605,8 +6605,8 @@ int FileStore::issueSyncPoint() return rc_SUCCESS; } -void FileStore::setPrimary(mqbnet::ClusterNode* primaryNode, - unsigned int primaryLeaseId) +void FileStore::setActivePrimary(mqbnet::ClusterNode* primaryNode, + unsigned int primaryLeaseId) { // executed by the *DISPATCHER* thread diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index 3aeeec1bf..84ec766b4 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -859,10 +859,10 @@ class FileStore : public DataStore { int issueSyncPoint() BSLS_KEYWORD_OVERRIDE; /// Set the specified `primaryNode` with the specified `primaryLeaseId` - /// as the primary for this data store partition. Note that + /// as the active primary for this data store partition. Note that /// `primaryNode` could refer to the node which owns this data store. - void setPrimary(mqbnet::ClusterNode* primaryNode, - unsigned int primaryLeaseId) BSLS_KEYWORD_OVERRIDE; + void setActivePrimary(mqbnet::ClusterNode* primaryNode, + unsigned int primaryLeaseId) BSLS_KEYWORD_OVERRIDE; /// Clear the current primary associated with this partition. void clearPrimary() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp index 6e4ca278e..c235787ab 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.t.cpp @@ -799,7 +799,7 @@ static void test1_breathingTest() unsigned int primaryLeaseId = 1; bsls::Types::Uint64 seqNum = 1; - fs.setPrimary(tester.node(), primaryLeaseId); + fs.setActivePrimary(tester.node(), primaryLeaseId); ASSERT_EQ(primaryLeaseId, fs.primaryLeaseId()); ASSERT_EQ(seqNum, fs.sequenceNumber()); @@ -902,7 +902,7 @@ static void test2_printTest() // Set primary. unsigned int primaryLeaseId = 1; bsls::Types::Uint64 seqNum = 1; - fs.setPrimary(tester.node(), primaryLeaseId); + fs.setActivePrimary(tester.node(), primaryLeaseId); // Write various records to the partition. SyncPointOffsetPairs spOffsetPairs(s_allocator_p);