Skip to content

Commit

Permalink
Monolithic Virtual Storage
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
  • Loading branch information
dorjesinpo committed Jun 26, 2024
1 parent 1387dfa commit db1187f
Show file tree
Hide file tree
Showing 57 changed files with 4,032 additions and 4,240 deletions.
14 changes: 11 additions & 3 deletions src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <mqba_domainmanager.h>
#include <mqba_sessionnegotiator.h>
#include <mqbblp_clustercatalog.h>
#include <mqbblp_relayqueueengine.h>
#include <mqbcfg_brokerconfig.h>
#include <mqbcfg_messages.h>
#include <mqbcmd_commandlist.h>
Expand Down Expand Up @@ -149,6 +150,8 @@ Application::Application(bdlmt::EventScheduler* scheduler,
bdlf::PlaceHolders::_2), // allocator
k_BLOB_POOL_GROWTH_STRATEGY,
d_allocators.get("BlobSpPool"))
, d_pushElementsPool(sizeof(mqbblp::PushStream::Element),
d_allocators.get("PushElementsPool"))
, d_allocatorsStatContext_p(allocatorsStatContext)
, d_pluginManager_mp()
, d_statController_mp()
Expand Down Expand Up @@ -259,6 +262,13 @@ int Application::start(bsl::ostream& errorDescription)
}
}

mqbi::ClusterResources resources;

resources.d_blobSpPool_p = &d_blobSpPool;
resources.d_bufferFactory_p = &d_bufferFactory;
resources.d_scheduler_p = d_scheduler_p;
resources.d_pushElementsPool_p = &d_pushElementsPool;

// Start the StatController
d_statController_mp.load(
new (*d_allocator_p) mqbstat::StatController(
Expand Down Expand Up @@ -343,12 +353,10 @@ int Application::start(bsl::ostream& errorDescription)

// Start the ClusterCatalog
d_clusterCatalog_mp.load(new (*d_allocator_p) mqbblp::ClusterCatalog(
d_scheduler_p,
d_dispatcher_mp.get(),
d_transportManager_mp.get(),
statContextsMap,
&d_bufferFactory,
&d_blobSpPool,
resources,
d_allocators.get("ClusterCatalog")),
d_allocator_p);

Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqba/mqba_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class Application {

BlobSpPool d_blobSpPool;

bdlma::ConcurrentPool d_pushElementsPool;

mwcst::StatContext* d_allocatorsStatContext_p;
// Stat context of the counting allocators,
// if used
Expand Down
11 changes: 4 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2491,20 +2491,16 @@ Cluster::Cluster(const bslstl::StringRef& name,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
mqbi::DomainFactory* domainFactory,
bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_allocators(d_allocator_p)
, d_isStarted(false)
, d_isStopping(false)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
resources,
clusterConfig,
mqbcfg::ClusterProxyDefinition(allocator),
netCluster,
Expand Down Expand Up @@ -2539,7 +2535,8 @@ Cluster::Cluster(const bslstl::StringRef& name,
BSLS_ASSERT(d_allocator_p);
mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster();
BSLS_ASSERT(netCluster_p && "NetCluster not set !");
BSLS_ASSERT(scheduler->clockType() == bsls::SystemClockType::e_MONOTONIC);
BSLS_ASSERT(resources.d_scheduler_p->clockType() ==
bsls::SystemClockType::e_MONOTONIC);
BSLS_ASSERT_SAFE(d_clusterData.membership().selfNode() &&
"SelfNode not found in cluster!");

Expand Down
4 changes: 1 addition & 3 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,9 @@ class Cluster : public mqbi::Cluster,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
mqbi::DomainFactory* domainFactory,
bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
BlobSpPool* blobSpPool,
bdlbb::BlobBufferFactory* bufferFactory,
mqbnet::TransportManager* transportManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator);

/// Destructor
Expand Down
28 changes: 10 additions & 18 deletions src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,9 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
netCluster,
d_statContexts,
d_domainFactory_p,
d_scheduler_p,
d_dispatcher_p,
d_blobSpPool_p,
d_bufferFactory_p,
d_transportManager_p,
d_resources,
clusterAllocator);

info.d_cluster_sp.reset(cluster, clusterAllocator);
Expand Down Expand Up @@ -221,11 +219,9 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription,
clusterProxyDefinition,
netCluster,
d_statContexts,
d_scheduler_p,
d_bufferFactory_p,
d_blobSpPool_p,
d_dispatcher_p,
d_transportManager_p,
d_resources,
clusterAllocator);

info.d_cluster_sp.reset(cluster, clusterAllocator);
Expand Down Expand Up @@ -356,20 +352,15 @@ int ClusterCatalog::initiateReversedClusterConnectionsImp(
return rc;
}

ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
bslma::Allocator* allocator)
ClusterCatalog::ClusterCatalog(mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_allocators(d_allocator_p)
, d_isStarted(false)
, d_scheduler_p(scheduler)
, d_dispatcher_p(dispatcher)
, d_bufferFactory_p(bufferFactory)
, d_blobSpPool_p(blobSpPool)
, d_transportManager_p(transportManager)
, d_domainFactory_p(0)
, d_clustersDefinition(d_allocator_p)
Expand All @@ -379,9 +370,10 @@ ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler,
, d_reversedClusterConnections(d_allocator_p)
, d_clusters(d_allocator_p)
, d_statContexts(statContexts)
, d_resources(resources)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(scheduler->clockType() ==
BSLS_ASSERT_SAFE(d_resources.d_scheduler_p->clockType() ==
bsls::SystemClockType::e_MONOTONIC);
}

Expand All @@ -392,7 +384,7 @@ ClusterCatalog::~ClusterCatalog()
"stop() must be called before destroying this object");
}

int ClusterCatalog::loadBrokerClusterConfig(bsl::ostream& errorDescription)
int ClusterCatalog::loadBrokerClusterConfig(bsl::ostream&)
{
// executed by the *MAIN* thread

Expand Down
34 changes: 7 additions & 27 deletions src/groups/mqb/mqbblp/mqbblp_clustercatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,6 @@ class ClusterCatalog {
// FRIENDS
friend class ClusterCatalogIterator;

public:
// TYPES

/// Pool of shared pointers to Blobs
typedef bdlcc::SharedObjectPool<
bdlbb::Blob,
bdlcc::ObjectPoolFunctors::DefaultCreator,
bdlcc::ObjectPoolFunctors::RemoveAll<bdlbb::Blob> >
BlobSpPool;

public:
// TYPES

Expand Down Expand Up @@ -215,19 +205,9 @@ class ClusterCatalog {
bool d_isStarted;
// True if this component is started

bdlmt::EventScheduler* d_scheduler_p;
// EventScheduler to use

mqbi::Dispatcher* d_dispatcher_p;
// Dispatcher to use

bdlbb::BlobBufferFactory* d_bufferFactory_p;
// Blob buffer factory to use

BlobSpPool* d_blobSpPool_p;
// Pool of shared pointers to blob to
// use.

mqbnet::TransportManager* d_transportManager_p;
// TransportManager for creating
// mqbnet::Cluster
Expand Down Expand Up @@ -280,6 +260,8 @@ class ClusterCatalog {
StatContextsMap d_statContexts;
// Map of stat contexts

const mqbi::ClusterResources d_resources;

private:
// NOT IMPLEMENTED
ClusterCatalog(const ClusterCatalog&) BSLS_CPP11_DELETED;
Expand Down Expand Up @@ -334,13 +316,11 @@ class ClusterCatalog {
/// Create a new object using the specified `scheduler`, `dispatcher`,
/// `transportManager`, `bufferFactory`, `blobSpPool`, and the specified
/// `allocator`.
ClusterCatalog(bdlmt::EventScheduler* scheduler,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
bslma::Allocator* allocator);
ClusterCatalog(mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const StatContextsMap& statContexts,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator);

/// Destructor.
~ClusterCatalog();
Expand Down
10 changes: 3 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1007,19 +1007,15 @@ ClusterProxy::ClusterProxy(
const mqbcfg::ClusterProxyDefinition& clusterProxyConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
bdlmt::EventScheduler* scheduler,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_isStarted(false)
, d_isStopping(false)
, d_clusterData(name,
scheduler,
bufferFactory,
blobSpPool,
resources,
mqbcfg::ClusterDefinition(allocator),
clusterProxyConfig,
netCluster,
Expand Down Expand Up @@ -1047,7 +1043,7 @@ ClusterProxy::ClusterProxy(
// PRECONDITIONS
mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster();
BSLS_ASSERT_SAFE(netCluster_p && "NetCluster not set !");
BSLS_ASSERT_SAFE(scheduler->clockType() ==
BSLS_ASSERT_SAFE(resources.d_scheduler_p->clockType() ==
bsls::SystemClockType::e_MONOTONIC);

d_clusterData.clusterConfig().queueOperations() =
Expand Down
4 changes: 1 addition & 3 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,11 +429,9 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
const mqbcfg::ClusterProxyDefinition& clusterProxyConfig,
bslma::ManagedPtr<mqbnet::Cluster> netCluster,
const StatContextsMap& statContexts,
bdlmt::EventScheduler* scheduler,
bdlbb::BlobBufferFactory* bufferFactory,
BlobSpPool* blobSpPool,
mqbi::Dispatcher* dispatcher,
mqbnet::TransportManager* transportManager,
const mqbi::ClusterResources& resources,
bslma::Allocator* allocator);

/// Destructor
Expand Down
39 changes: 19 additions & 20 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2102,8 +2102,7 @@ bsl::shared_ptr<mqbi::Queue> ClusterQueueHelper::createQueueFactory(
context.d_queueContext_p->partitionId(),
context.d_domain_p,
d_storageManager_p,
d_clusterData_p->bufferFactory(),
d_clusterData_p->scheduler(),
d_clusterData_p->resources(),
d_clusterData_p->miscWorkThreadPool(),
openQueueResponse.routingConfiguration(),
d_allocator_p),
Expand Down Expand Up @@ -4374,15 +4373,8 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,

for (AppIdInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend();
++cit) {
if (d_clusterState_p->isSelfPrimary(partitionId) && queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
queue,
*cit),
queue);
}
else {
// Note: In non-CSL mode, the queue creation callback is instead
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue creation callback is
// invoked at replica nodes when they receive a queue creation
// record from the primary in the partition stream.
mqbi::Storage::AppIdKeyPair appIdKeyPair(cit->first, cit->second);
Expand All @@ -4396,27 +4388,34 @@ void ClusterQueueHelper::onQueueUpdated(const bmqt::Uri& uri,
.at(uri.qualifiedDomain())
->domain());
}
}

for (AppIdInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (d_clusterState_p->isSelfPrimary(partitionId) && queue) {
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
bdlf::BindUtil::bind(afterAppIdRegisteredDispatched,
queue,
*cit),
queue);
}
else {
// Note: In non-CSL mode, the queue deletion callback is instead
}

for (AppIdInfosCIter cit = removedAppIds.cbegin();
cit != removedAppIds.cend();
++cit) {
if (!d_clusterState_p->isSelfPrimary(partitionId) || queue == 0) {
// Note: In non-CSL mode, the queue deletion callback is
// invoked at replica nodes when they receive a queue deletion
// record from the primary in the partition stream.
d_storageManager_p->unregisterQueueReplica(partitionId,
uri,
qiter->second->key(),
cit->second);
}
if (queue) {
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(afterAppIdUnregisteredDispatched,
queue,
*cit),
queue);
}
}

mwcu::Printer<AppIdInfos> printer1(&addedAppIds);
Expand Down
7 changes: 2 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,13 @@ Queue::Queue(const bmqt::Uri& uri,
int partitionId,
mqbi::Domain* domain,
mqbi::StorageManager* storageManager,
bdlbb::BlobBufferFactory* blobBufferFactory,
bdlmt::EventScheduler* scheduler,
const mqbi::ClusterResources& resources,
bdlmt::FixedThreadPool* threadPool,
const bmqp_ctrlmsg::RoutingConfiguration& routingCfg,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_schemaLearner(allocator)
, d_state(this, uri, id, key, partitionId, domain, allocator)
, d_state(this, uri, id, key, partitionId, domain, resources, allocator)
, d_localQueue_mp(0)
, d_remoteQueue_mp(0)
{
Expand All @@ -488,8 +487,6 @@ Queue::Queue(const bmqt::Uri& uri,

d_state.setStorageManager(storageManager)
.setAppKeyGenerator(storageManager)
.setBlobBufferFactory(blobBufferFactory)
.setEventScheduler(scheduler)
.setMiscWorkThreadPool(threadPool)
.setRoutingConfig(routingCfg)
.setMessageThrottleConfig(messageThrottleConfig);
Expand Down
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbblp/mqbblp_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue {
int partitionId,
mqbi::Domain* domain,
mqbi::StorageManager* storageManager,
bdlbb::BlobBufferFactory* blobBufferFactory,
bdlmt::EventScheduler* scheduler,
const mqbi::ClusterResources& resources,
bdlmt::FixedThreadPool* threadPool,
const bmqp_ctrlmsg::RoutingConfiguration& routingCfg,
bslma::Allocator* allocator);
Expand Down
Loading

0 comments on commit db1187f

Please sign in to comment.