Skip to content

Commit

Permalink
mqbc::StorageMgr: Healing replica buffers primary status advisories
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Jing Vincent Yan <yyan82@bloomberg.net>
  • Loading branch information
kaikulimu committed Sep 10, 2024
1 parent e8e2a65 commit e5d59b2
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 20 deletions.
37 changes: 26 additions & 11 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1655,17 +1655,32 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
if (d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
if (pinfo.primaryNode() != source ||
pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
BALL_LOG_WARN_BLOCK
{
BALL_LOG_OUTPUT_STREAM
<< d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: " << primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
if (pinfo.primaryNode()) {
BALL_LOG_OUTPUT_STREAM << " Ignoring advisory.";
}
else {
BALL_LOG_OUTPUT_STREAM << " Since we have not received any"
<< " information regarding the true"
<< " primary, this advisory could "
<< "be from the true one. Will"
<< " buffer the advisory for now.";
d_storageManager_p->bufferPrimaryStatusAdvisory(primaryAdv,
source);
}
}
return; // RETURN
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,15 @@ void StorageManager::processReceiptEvent(const bmqp::Event& event,
source));
}

void StorageManager::bufferPrimaryStatusAdvisory(
BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source)
{
// executed by *ANY* thread

BSLS_ASSERT_OPT(false && "This method should only be invoked in FSM mode");
}

void StorageManager::processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source)
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ class StorageManager : public mqbi::StorageManager {
processReceiptEvent(const bmqp::Event& event,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed by any thread.
virtual void bufferPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed by any thread.
virtual void processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
Expand Down
25 changes: 16 additions & 9 deletions src/groups/mqb/mqbc/mqbc_partitionstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ class PartitionStateTableActions {

virtual void do_processBufferedLiveData(const ARGS& args) = 0;

virtual void
do_processBufferedPrimaryStatusAdvisories(const ARGS& args) = 0;

virtual void do_processLiveData(const ARGS& args) = 0;

virtual void do_processPut(const ARGS& args) = 0;
Expand Down Expand Up @@ -367,7 +370,8 @@ class PartitionStateTableActions {
void do_cleanupSeqnums_resetReceiveDataCtx_reapplyDetectSelfReplica(
const ARGS& args);

void do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog(
void
do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args);

void
Expand All @@ -382,7 +386,7 @@ class PartitionStateTableActions {
const ARGS& args);

void
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args);

void
Expand Down Expand Up @@ -579,10 +583,11 @@ class PartitionStateTable
REPLICA_DATA_RQST_PULL,
closeRecoveryFileSet_openStorage_startSendDataChunks,
REPLICA_HEALING);
PST_CFG(REPLICA_HEALING,
DONE_SENDING_DATA_CHUNKS,
replicaDataResponsePull_processBufferedLiveData_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
DONE_SENDING_DATA_CHUNKS,
replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
ERROR_SENDING_DATA_CHUNKS,
Expand All @@ -603,7 +608,7 @@ class PartitionStateTable
PST_CFG(
REPLICA_HEALING,
DONE_RECEIVING_DATA_CHUNKS,
replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog,
replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
Expand Down Expand Up @@ -940,11 +945,12 @@ void PartitionStateTableActions<ARGS>::

template <typename ARGS>
void PartitionStateTableActions<ARGS>::
do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args)
{
do_replicaDataResponsePull(args);
do_processBufferedLiveData(args);
do_processBufferedPrimaryStatusAdvisories(args);
do_stopWatchDog(args);
}

Expand Down Expand Up @@ -981,14 +987,15 @@ void PartitionStateTableActions<ARGS>::

template <typename ARGS>
void PartitionStateTableActions<ARGS>::
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args)
{
do_replicaDataResponsePush(args);
do_resetReceiveDataCtx(args);
do_closeRecoveryFileSet(args);
do_openStorage(args);
do_processBufferedLiveData(args);
do_processBufferedPrimaryStatusAdvisories(args);
do_stopWatchDog(args);
}

Expand Down
121 changes: 121 additions & 0 deletions src/groups/mqb/mqbc/mqbc_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,25 @@ void StorageManager::processReplicaDataResponse(
d_cluster_p);
}

void StorageManager::bufferPrimaryStatusAdvisoryDispatched(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'

// PRECONDITIONS
const int pid = advisory.partitionId();
BSLS_ASSERT_SAFE(0 <= pid && pid < static_cast<int>(d_fileStores.size()));
BSLS_ASSERT_SAFE(d_fileStores[pid]->inDispatcherThread());

BALL_LOG_INFO << d_clusterData_p->identity().description()
<< " Partition [" << pid
<< "]: Buffering primary status advisory: " << advisory;

d_bufferedPrimaryStatusAdvisoryInfosVec.at(pid).push_back(
bsl::make_pair(advisory, source));
}

void StorageManager::processShutdownEventDispatched(int partitionId)
{
// executed by *QUEUE_DISPATCHER* thread with the specified 'partitionId'
Expand Down Expand Up @@ -2399,6 +2418,67 @@ void StorageManager::do_processBufferedLiveData(const PartitionFSMArgsSp& args)
}
}

void StorageManager::do_processBufferedPrimaryStatusAdvisories(
const PartitionFSMArgsSp& args)
{
// executed by the *QUEUE DISPATCHER* thread associated with the paritionId
// contained in 'args'

// PRECONDITIONS
BSLS_ASSERT_SAFE(!args->eventsQueue()->empty());

const PartitionFSM::EventWithData& eventWithData =
args->eventsQueue()->front();
const EventData& eventDataVec = eventWithData.second;

BSLS_ASSERT_SAFE(eventDataVec.size() == 1);

const PartitionFSMEventData& eventData = eventDataVec[0];
const int partitionId = eventData.partitionId();

if (d_cluster_p->isStopping()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " Partition [" << partitionId << "]: "
<< "Cluster is stopping; skipping processing of "
<< "primary status advisory.";
return; // RETURN
}

BALL_LOG_INFO
<< d_clusterData_p->identity().description() << " Partition ["
<< partitionId << "]: " << "Processing "
<< d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].size()
<< " buffered primary status advisory.";

for (PrimaryStatusAdvisoryInfosCIter cit =
d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].cbegin();
cit != d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].cend();
++cit) {
BSLS_ASSERT_SAFE(cit->first.partitionId() == partitionId);

PartitionInfo& pinfo = d_partitionInfoVec[partitionId];
if (cit->second->nodeId() != pinfo.primary()->nodeId() ||
cit->first.primaryLeaseId() != pinfo.primaryLeaseId()) {
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< " Partition [" << partitionId
<< "]: " << "Ignoring primary status advisory "
<< cit->first
<< " because primary node or leaseId is invalid. "
<< "Self-perceived [prmary, leaseId] is: ["
<< pinfo.primary()->nodeDescription() << ","
<< pinfo.primaryLeaseId() << "]";
continue; // CONTINUE
}
pinfo.setPrimaryStatus(cit->first.status());
if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE == cit->first.status()) {
d_fileStores[partitionId]->setPrimary(pinfo.primary(),
pinfo.primaryLeaseId());
}
}

d_bufferedPrimaryStatusAdvisoryInfosVec[partitionId].clear();
}

void StorageManager::do_processLiveData(const PartitionFSMArgsSp& args)
{
// executed by the *QUEUE DISPATCHER* thread associated with the paritionId
Expand Down Expand Up @@ -3276,6 +3356,7 @@ StorageManager::StorageManager(
, d_appKeysVec(allocator)
, d_partitionInfoVec(allocator)
, d_partitionFSMVec(allocator)
, d_bufferedPrimaryStatusAdvisoryInfosVec(allocator)
, d_numPartitionsRecoveredFully(0)
, d_numPartitionsRecoveredQueues(0)
, d_recoveryStartTimes(allocator)
Expand Down Expand Up @@ -3307,6 +3388,9 @@ StorageManager::StorageManager(
d_storages.resize(partitionCfg.numPartitions());
d_appKeysVec.resize(partitionCfg.numPartitions());
d_partitionInfoVec.resize(partitionCfg.numPartitions());
d_bufferedPrimaryStatusAdvisoryInfosVec.resize(
partitionCfg.numPartitions(),
PrimaryStatusAdvisoryInfos(allocator));
d_recoveryStartTimes.resize(partitionCfg.numPartitions());
d_nodeToSeqNumCtxMapVec.resize(partitionCfg.numPartitions());
d_numReplicaDataResponsesReceivedVec.resize(partitionCfg.numPartitions());
Expand Down Expand Up @@ -4236,6 +4320,43 @@ void StorageManager::processReceiptEvent(const bmqp::Event& event,
source));
}

void StorageManager::bufferPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source)
{
// executed by *ANY* thread

// PRECONDITION
BSLS_ASSERT_SAFE(d_fileStores.size() >
static_cast<size_t>(advisory.partitionId()));

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(!d_isStarted)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " Partition [" << advisory.partitionId() << "]: "
<< " Not buffering primary status advisory as StorageMgr"
<< " is not started.";
return; // RETURN
}

if (d_cluster_p->isStopping()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< " Partition [" << advisory.partitionId() << "]: "
<< " Not buffering primary status advisory as cluster"
<< " is stopping.";
return; // RETURN
}

mqbs::FileStore* fs = d_fileStores[advisory.partitionId()].get();
BSLS_ASSERT_SAFE(fs);

fs->execute(bdlf::BindUtil::bind(
&StorageManager::bufferPrimaryStatusAdvisoryDispatched,
this,
advisory,
source));
}

void StorageManager::processPrimaryStatusAdvisory(
BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source)
Expand Down
32 changes: 32 additions & 0 deletions src/groups/mqb/mqbc/mqbc_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ class StorageManager

typedef ClusterStateQueueInfo::AppIdInfosCIter AppIdInfosCIter;

/// Vector of pairs of buffered primary status advisories and their source
typedef bsl::vector<
bsl::pair<bmqp_ctrlmsg::PrimaryStatusAdvisory, mqbnet::ClusterNode*> >
PrimaryStatusAdvisoryInfos;
typedef PrimaryStatusAdvisoryInfos::const_iterator
PrimaryStatusAdvisoryInfosCIter;

typedef bsl::vector<PrimaryStatusAdvisoryInfos>
PrimaryStatusAdvisoryInfosVec;

public:
// TYPES
typedef PartitionFSM::PartitionFSMArgsSp PartitionFSMArgsSp;
Expand Down Expand Up @@ -344,6 +354,14 @@ class StorageManager
// **must** be accessed in the associated Queue dispatcher thread
// for the i-th partitionId.

/// Vector, indexed by partitionId, of vectors of pairs of buffered primary
/// status advisories and their source.
///
// THREAD: Except during the ctor, the i-th index of this data member
// **must** be accessed in the associated Queue dispatcher thread
// for the i-th partitionId.
PrimaryStatusAdvisoryInfosVec d_bufferedPrimaryStatusAdvisoryInfosVec;

bsls::AtomicInt d_numPartitionsRecoveredFully;
// Number of partitions whose recovery
// has been fully completed. This
Expand Down Expand Up @@ -603,6 +621,12 @@ class StorageManager
processReplicaDataResponse(const RequestManagerType::RequestSp& context,
mqbnet::ClusterNode* responder);

/// THREAD: Executed by the dispatcher thread for the specified
/// `partitionId`.
void bufferPrimaryStatusAdvisoryDispatched(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source);

/// THREAD: Executed by the dispatcher thread for the specified
/// `partitionId`.
void processShutdownEventDispatched(int partitionId);
Expand Down Expand Up @@ -689,6 +713,9 @@ class StorageManager
virtual void do_processBufferedLiveData(const PartitionFSMArgsSp& args)
BSLS_KEYWORD_OVERRIDE;

virtual void do_processBufferedPrimaryStatusAdvisories(
const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE;

virtual void
do_processLiveData(const PartitionFSMArgsSp& args) BSLS_KEYWORD_OVERRIDE;

Expand Down Expand Up @@ -986,6 +1013,11 @@ class StorageManager
processReceiptEvent(const bmqp::Event& event,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed by any thread.
virtual void bufferPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed in cluster dispatcher thread.
virtual void processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbi/mqbi_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ class StorageManager : public mqbi::AppKeyGenerator {
virtual void processReceiptEvent(const bmqp::Event& event,
mqbnet::ClusterNode* source) = 0;

/// Executed by any thread.
virtual void bufferPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source) = 0;

/// Executed by any thread.
virtual void processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
Expand Down
Loading

0 comments on commit e5d59b2

Please sign in to comment.