Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix [MQB]: mqbc::StorageMgr: Transition to available only when all primary active #416

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1655,17 +1655,32 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(
if (d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
if (pinfo.primaryNode() != source ||
pinfo.primaryLeaseId() != primaryAdv.primaryLeaseId()) {
BALL_LOG_WARN << d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: "
<< primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
BALL_LOG_WARN_BLOCK
{
BALL_LOG_OUTPUT_STREAM
<< d_clusterData_p->identity().description()
<< ": Partition [" << primaryAdv.partitionId()
<< "]: received primary status advisory: " << primaryAdv
<< " from: " << source->nodeDescription()
<< ", but self perceived primary and its leaseId are"
<< ": ["
<< (pinfo.primaryNode()
? pinfo.primaryNode()->nodeDescription()
: "** null **")
<< ", " << pinfo.primaryLeaseId() << "].";
if (pinfo.primaryNode()) {
BALL_LOG_OUTPUT_STREAM << " Ignoring advisory.";
}
else {
BALL_LOG_OUTPUT_STREAM << " Since we have not received any"
<< " information regarding the true"
<< " primary, this advisory could "
<< "be from the true one. Will"
<< " buffer the advisory for now.";
d_storageManager_p->bufferPrimaryStatusAdvisory(primaryAdv,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, does pinfo.primaryNode() get assigned upon PrimaryStatusAdvisory or in the FSM flow there is another trigger? We receive PrimaryStatusAdvisory and we do not have partition primary, why not assign the primary then?

Copy link
Collaborator Author

@kaikulimu kaikulimu Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In FSM mode, the source of truth for partition assignments are in the cluster state snapshot of the CSL file. As part of healing, a new leader assigns partitions and then applies the assignments in its first CSL advisory. Primary status adviosries can be stale; that's why we have a lot of checks in this function in the first place. My original idea was to simply ignore all primary status advisories and purely rely upon FSM for partition assignments. However, FSM can heal a replica but neglect to set a primary as active. Thus, I came up with the idea of buffering primary status advisories. If an advisory is not stale (i.e. matching primary node and leaseId), then we trust the availability advisory.

source);
}
}
return; // RETURN
}
}
Expand Down Expand Up @@ -1759,11 +1774,18 @@ void ClusterOrchestrator::processPrimaryStatusAdvisory(

// TBD: may need to review the order of invoking these routines.

BALL_LOG_INFO << d_clusterData_p->identity().description()
<< " PartitionId [" << primaryAdv.partitionId()
<< "]: received primary status advisory: " << primaryAdv
<< ", from: " << source->nodeDescription();

BSLS_ASSERT_SAFE(ns->isPrimaryForPartition(primaryAdv.partitionId()));
d_stateManager_mp->setPrimaryStatus(primaryAdv.partitionId(),
primaryAdv.status());

d_storageManager_p->processPrimaryStatusAdvisory(primaryAdv, source);
if (!d_clusterConfig.clusterAttributes().isFSMWorkflow()) {
d_storageManager_p->processPrimaryStatusAdvisory(primaryAdv, source);
}
}

void ClusterOrchestrator::processStateNotification(
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,15 @@ void StorageManager::processReceiptEvent(const bmqp::Event& event,
source));
}

void StorageManager::bufferPrimaryStatusAdvisory(
BSLS_ANNOTATION_UNUSED const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
BSLS_ANNOTATION_UNUSED mqbnet::ClusterNode* source)
{
// executed by *ANY* thread

BSLS_ASSERT_OPT(false && "This method should only be invoked in FSM mode");
}

void StorageManager::processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source)
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ class StorageManager : public mqbi::StorageManager {
processReceiptEvent(const bmqp::Event& event,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed by any thread.
virtual void bufferPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE;

/// Executed by any thread.
virtual void processPrimaryStatusAdvisory(
const bmqp_ctrlmsg::PrimaryStatusAdvisory& advisory,
Expand Down
25 changes: 16 additions & 9 deletions src/groups/mqb/mqbc/mqbc_partitionstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ class PartitionStateTableActions {

virtual void do_processBufferedLiveData(const ARGS& args) = 0;

virtual void
do_processBufferedPrimaryStatusAdvisories(const ARGS& args) = 0;

virtual void do_processLiveData(const ARGS& args) = 0;

virtual void do_processPut(const ARGS& args) = 0;
Expand Down Expand Up @@ -367,7 +370,8 @@ class PartitionStateTableActions {
void do_cleanupSeqnums_resetReceiveDataCtx_reapplyDetectSelfReplica(
const ARGS& args);

void do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog(
void
do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args);

void
Expand All @@ -382,7 +386,7 @@ class PartitionStateTableActions {
const ARGS& args);

void
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args);

void
Expand Down Expand Up @@ -579,10 +583,11 @@ class PartitionStateTable
REPLICA_DATA_RQST_PULL,
closeRecoveryFileSet_openStorage_startSendDataChunks,
REPLICA_HEALING);
PST_CFG(REPLICA_HEALING,
DONE_SENDING_DATA_CHUNKS,
replicaDataResponsePull_processBufferedLiveData_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
DONE_SENDING_DATA_CHUNKS,
replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
ERROR_SENDING_DATA_CHUNKS,
Expand All @@ -603,7 +608,7 @@ class PartitionStateTable
PST_CFG(
REPLICA_HEALING,
DONE_RECEIVING_DATA_CHUNKS,
replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog,
replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog,
REPLICA_HEALED);
PST_CFG(
REPLICA_HEALING,
Expand Down Expand Up @@ -940,11 +945,12 @@ void PartitionStateTableActions<ARGS>::

template <typename ARGS>
void PartitionStateTableActions<ARGS>::
do_replicaDataResponsePull_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePull_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args)
{
do_replicaDataResponsePull(args);
do_processBufferedLiveData(args);
do_processBufferedPrimaryStatusAdvisories(args);
do_stopWatchDog(args);
}

Expand Down Expand Up @@ -981,14 +987,15 @@ void PartitionStateTableActions<ARGS>::

template <typename ARGS>
void PartitionStateTableActions<ARGS>::
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_stopWatchDog(
do_replicaDataResponsePush_resetReceiveDataCtx_closeRecoveryFileSet_openStorage_processBufferedLiveData_processBufferedPrimaryStatusAdvisories_stopWatchDog(
const ARGS& args)
{
do_replicaDataResponsePush(args);
do_resetReceiveDataCtx(args);
do_closeRecoveryFileSet(args);
do_openStorage(args);
do_processBufferedLiveData(args);
do_processBufferedPrimaryStatusAdvisories(args);
do_stopWatchDog(args);
}

Expand Down
Loading
Loading