Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: generate AppKey only once #469

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 21 additions & 28 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2209,27 +2209,16 @@ void Cluster::onRecoveryStatusDispatched(
const bmqt::Uri uri(itMp->uri().canonical());
BSLS_ASSERT_SAFE(itMp->storage()->partitionId() ==
static_cast<int>(pid));
if (isCSLModeEnabled()) {
AppIdKeyPairs appIdKeyPairs;
itMp->storage()->loadVirtualStorageDetails(&appIdKeyPairs);
AppIdInfos appIdInfos(appIdKeyPairs.cbegin(),
appIdKeyPairs.cend());

d_clusterOrchestrator.registerQueueInfo(
uri,
pid,
itMp->storage()->queueKey(),
appIdInfos,
false); // Force-update?
}
else {
d_clusterOrchestrator.registerQueueInfo(
uri,
pid,
itMp->storage()->queueKey(),
AppIdInfos(),
false); // Force-update?
}

AppInfos appIdInfos;
itMp->storage()->loadVirtualStorageDetails(&appIdInfos);

d_clusterOrchestrator.registerQueueInfo(
uri,
pid,
itMp->storage()->queueKey(),
appIdInfos,
false); // Force-update?

++(*itMp);
}
Expand Down Expand Up @@ -2844,18 +2833,22 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
}

// Compute list of added and removed App IDs.
bsl::vector<bsl::string> oldCfgAppIds(oldDefn.mode().fanout().appIDs(),
d_allocator_p);
bsl::vector<bsl::string> newCfgAppIds(newDefn.mode().fanout().appIDs(),
d_allocator_p);

bsl::vector<bsl::string> addedIds, removedIds;
bsl::unordered_set<bsl::string> oldCfgAppIds(
oldDefn.mode().fanout().appIDs().cbegin(),
oldDefn.mode().fanout().appIDs().cend(),
d_allocator_p);
bsl::unordered_set<bsl::string> newCfgAppIds(
newDefn.mode().fanout().appIDs().cbegin(),
newDefn.mode().fanout().appIDs().cend(),
d_allocator_p);

bsl::unordered_set<bsl::string> addedIds, removedIds;
mqbc::StorageUtil::loadAddedAndRemovedEntries(&addedIds,
&removedIds,
oldCfgAppIds,
newCfgAppIds);

bsl::vector<bsl::string>::const_iterator it = addedIds.begin();
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppId,
Expand Down
4 changes: 1 addition & 3 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,9 @@ class Cluster : public mqbi::Cluster,

private:
// PRIVATE TYPES
typedef mqbi::Storage::AppIdKeyPairs AppIdKeyPairs;

typedef mqbc::ClusterStatePartitionInfo ClusterStatePartitionInfo;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

typedef mqbc::ClusterMembership::ClusterNodeSessionSp ClusterNodeSessionSp;

Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ void ClusterOrchestrator::processBufferedQueueAdvisories()
void ClusterOrchestrator::registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate)
{
// executed by the *DISPATCHER* thread
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class ClusterOrchestrator {

typedef bdlmt::EventScheduler::RecurringEventHandle RecurringEventHandle;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

private:
// DATA
Expand Down Expand Up @@ -515,7 +515,7 @@ class ClusterOrchestrator {
void registerQueueInfo(const bmqt::Uri& uri,
int partitionId,
const mqbu::StorageKey& queueKey,
const AppIdInfos& appIdInfos,
const AppInfos& appIdInfos,
bool forceUpdate);

/// Executed by any thread.
Expand Down
96 changes: 34 additions & 62 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,29 @@ void createQueueUriKey(bmqt::Uri* out,
}

void afterAppIdRegisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdRegistered(
mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second));
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}

void afterAppIdUnregisteredDispatched(
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppIdInfo& appIdInfo)
mqbi::Queue* queue,
const mqbc::ClusterStateQueueInfo::AppInfo& appIdInfo)
{
// executed by the *QUEUE DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(queue->dispatcher()->inDispatcherThread(queue));

queue->queueEngine()->afterAppIdUnregistered(
mqbi::Storage::AppIdKeyPair(appIdInfo.first, appIdInfo.second));
mqbi::Storage::AppInfo(appIdInfo.first, appIdInfo.second));
}

void handleHolderDummy(const bsl::shared_ptr<mqbi::QueueHandle>& handle)
Expand Down Expand Up @@ -2152,27 +2152,15 @@ bsl::shared_ptr<mqbi::Queue> ClusterQueueHelper::createQueueFactory(
// queue but the queue is never opened, it will not be registered with
// the StorageMgr. This is ok.

if (d_cluster_p->isCSLModeEnabled()) {
const AppIdInfos& appIdInfos =
context.d_queueContext_p->d_stateQInfo_sp->appIdInfos();
const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
appIdInfos.cbegin(),
appIdInfos.cend());
d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
appIdKeyPairs,
context.d_domain_p);
}
else {
d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
mqbi::Storage::AppIdKeyPairs(),
context.d_domain_p);
}
// Use keys in the CSL instead of generating new ones to keep CSL and
// non-CSL consistent.

d_storageManager_p->registerQueue(
context.d_queueContext_p->uri(),
context.d_queueContext_p->key(),
context.d_queueContext_p->partitionId(),
context.d_queueContext_p->d_stateQInfo_sp->appInfos(),
context.d_domain_p);

// Queue must have been registered with storage manager before
// registering it with the domain, otherwise Queue.configure() will
Expand Down Expand Up @@ -3698,28 +3686,12 @@ void ClusterQueueHelper::restoreStateCluster(int partitionId)
// node creates a local queue instance (see
// 'createQueueFactory').

if (d_cluster_p->isCSLModeEnabled()) {
const AppIdInfos& appIdInfos =
queueContext->d_stateQInfo_sp->appIdInfos();
const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
appIdInfos.cbegin(),
appIdInfos.cend());

d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
appIdKeyPairs,
qinfo.d_queue_sp->domain());
}
else {
d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
mqbi::Storage::AppIdKeyPairs(),
qinfo.d_queue_sp->domain());
}
d_storageManager_p->registerQueue(
queueContext->uri(),
queueContext->key(),
queueContext->partitionId(),
queueContext->d_stateQInfo_sp->appInfos(),
qinfo.d_queue_sp->domain());

// Convert the queue from remote to local instance.
queueContext->d_liveQInfo.d_queue_sp->convertToLocal();
Expand Down Expand Up @@ -4109,6 +4081,7 @@ void ClusterQueueHelper::onQueueAssigned(
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());

if (!d_cluster_p->isCSLModeEnabled()) {
// REVISIT
return; // RETURN
}

Expand Down Expand Up @@ -4229,14 +4202,11 @@ void ClusterQueueHelper::onQueueAssigned(
->domain(),
true); // allowDuplicate

const mqbi::Storage::AppIdKeyPairs appIdKeyPairs(
info.appIdInfos().cbegin(),
info.appIdInfos().cend());
d_storageManager_p->updateQueueReplica(
info.partitionId(),
info.uri(),
info.key(),
appIdKeyPairs,
info.appInfos(),
d_clusterState_p->domainStates()
.at(info.uri().qualifiedDomain())
->domain(),
Expand Down Expand Up @@ -4395,8 +4365,8 @@ void ClusterQueueHelper::onQueueUnassigned(

void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const bsl::string& domain,
const AppIdInfos& addedAppIds,
const AppIdInfos& removedAppIds)
const AppInfos& addedAppIds,
const AppInfos& removedAppIds)
{
// executed by the cluster *DISPATCHER* thread

Expand Down Expand Up @@ -4424,19 +4394,21 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
const int partitionId = qiter->second->partitionId();
BSLS_ASSERT_SAFE(partitionId != mqbs::DataStore::k_INVALID_PARTITION_ID);

for (AppIdInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.
mqbi::Storage::AppIdKeyPair appIdKeyPair(cit->first, cit->second);
mqbi::Storage::AppIdKeyPairs appIdKeyPairs(1, appIdKeyPair);

mqbi::Storage::AppInfos one(1, d_allocator_p);
one.emplace(*cit);

d_storageManager_p->updateQueueReplica(
partitionId,
uri,
qiter->second->key(),
appIdKeyPairs,
one,
d_clusterState_p->domainStates()
.at(uri.qualifiedDomain())
->domain());
Expand All @@ -4450,7 +4422,7 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
}
}

for (AppIdInfosCIter cit = removedAppIds.cbegin();
for (AppInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
Expand All @@ -4471,8 +4443,8 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
}
}

bmqu::Printer<AppIdInfos> printer1(&addedAppIds);
bmqu::Printer<AppIdInfos> printer2(&removedAppIds);
bmqu::Printer<AppInfos> printer1(&addedAppIds);
bmqu::Printer<AppInfos> printer2(&removedAppIds);
BALL_LOG_INFO << d_cluster_p->description() << ": Updated queue: " << uri
<< ", addedAppIds: " << printer1
<< ", removedAppIds: " << printer2;
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,9 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// queue which have a proper valid unique queueId.
typedef bsl::unordered_map<int, QueueContext*> QueueContextByIdMap;

typedef AppIdInfos::const_iterator AppIdInfosCIter;
typedef AppInfos::const_iterator AppInfosCIter;

typedef mqbc::ClusterStateQueueInfo::AppIdInfos AppIdInfos;
typedef mqbc::ClusterStateQueueInfo::AppInfos AppInfos;

private:
// DATA
Expand Down Expand Up @@ -997,8 +997,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// dispatcher thread.
virtual void onQueueUpdated(const bmqt::Uri& uri,
const bsl::string& domain,
const AppIdInfos& addedAppIds,
const AppIdInfos& removedAppIds = AppIdInfos())
const AppInfos& addedAppIds,
const AppInfos& removedAppIds = AppInfos())
BSLS_KEYWORD_OVERRIDE;

private:
Expand Down
Loading
Loading