Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf[MQB]: make independent item pools for channels #479

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbmock/mqbmock_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ void Cluster::_initializeNetcluster()
d_netCluster_mp.load(new (*d_allocator_p)
mqbnet::MockCluster(d_clusterDefinition,
d_bufferFactory_p,
&d_itemPool,
d_allocator_p),
d_allocator_p);

Expand Down Expand Up @@ -221,7 +220,6 @@ Cluster::Cluster(bdlbb::BlobBufferFactory* bufferFactory,
, d_timeSource(&d_scheduler)
, d_isStarted(false)
, d_clusterDefinition(allocator)
, d_itemPool(mqbnet::Channel::k_ITEM_SIZE, allocator)
, d_channels(allocator)
, d_negotiator_mp()
, d_transportManager(&d_scheduler,
Expand Down
11 changes: 0 additions & 11 deletions src/groups/mqb/mqbmock/mqbmock_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ class Cluster : public mqbi::Cluster {
mqbcfg::ClusterDefinition d_clusterDefinition;
// Cluster definition

mqbnet::Channel::ItemPool d_itemPool;
// Item pool

TestChannelMap d_channels;
// Test channels

Expand Down Expand Up @@ -429,9 +426,6 @@ class Cluster : public mqbi::Cluster {
/// Get a modifiable reference to this object's time source.
bdlmt::EventSchedulerTestTimeSource& _timeSource();

/// Get a modifiable reference to this object's item pool.
mqbnet::Channel::ItemPool& _itemPool();

/// Get a modifiable reference to this object's cluster data.
mqbc::ClusterData* _clusterData();

Expand Down Expand Up @@ -585,11 +579,6 @@ inline bdlmt::EventSchedulerTestTimeSource& Cluster::_timeSource()
return d_timeSource;
}

inline mqbnet::Channel::ItemPool& Cluster::_itemPool()
{
return d_itemPool;
}

inline mqbc::ClusterData* Cluster::_clusterData()
{
return d_clusterData_mp.get();
Expand Down
27 changes: 14 additions & 13 deletions src/groups/mqb/mqbnet/mqbnet_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,18 @@ void Channel::Stats::reset()
// -------------

Channel::Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
ItemPool* itemPool,
const bsl::string& name,
bslma::Allocator* allocator)
: d_allocators(allocator)
, d_allocator_p(d_allocators.get(bsl::string("Channel-") + name))
, d_allocator_p(d_allocators.get("Channel"))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dorjesinpo, actually, there are 2 allocators used (Channel and ItemPool), and I will add more in the next PR.

, d_putBuilder(blobBufferFactory, d_allocator_p)
, d_pushBuilder(blobBufferFactory, d_allocator_p)
, d_ackBuilder(blobBufferFactory, d_allocator_p)
, d_confirmBuilder(blobBufferFactory, d_allocator_p)
, d_rejectBuilder(blobBufferFactory, d_allocator_p)
, d_itemPool_p(itemPool)
, d_itemPool(sizeof(Item),
bsls::BlockGrowth::BSLS_CONSTANT,
d_allocators.get(bsl::string("ItemPool")))
, d_buffer(1024, allocator)
, d_secondaryBuffer(1024, allocator)
, d_doStop(false)
Expand Down Expand Up @@ -108,7 +109,7 @@ Channel::~Channel()

void Channel::deleteItem(void* item, void* cookie)
{
static_cast<Channel*>(cookie)->d_itemPool_p->deleteObject(
static_cast<Channel*>(cookie)->d_itemPool.deleteObject(
static_cast<Item*>(item));
}

Expand All @@ -119,7 +120,7 @@ Channel::writePut(const bmqp::PutHeader& ph,
bool keepWeakPtr)
{
bslma::ManagedPtr<Item> item(
new (d_itemPool_p->allocate())
new (d_itemPool.allocate())
Item(ph, data, keepWeakPtr, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -136,7 +137,7 @@ Channel::writePush(const bsl::shared_ptr<bdlbb::Blob>& payload,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
msgId,
flags,
Expand All @@ -160,7 +161,7 @@ Channel::writePush(int queueId,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
msgId,
flags,
Expand All @@ -182,7 +183,7 @@ Channel::writeAck(int status,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(
new (d_itemPool_p->allocate())
new (d_itemPool.allocate())
Item(status, correlationId, guid, queueId, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -195,7 +196,7 @@ Channel::writeConfirm(int queueId,
const bmqt::MessageGUID& guid,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
subQueueId,
guid,
Expand All @@ -213,7 +214,7 @@ Channel::writeReject(int queueId,
const bmqt::MessageGUID& guid,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
subQueueId,
guid,
Expand All @@ -230,7 +231,7 @@ Channel::writeBlob(const bdlbb::Blob& data,
bmqp::EventType::Enum type,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(data, type, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -253,7 +254,7 @@ void Channel::resetChannel()
d_stateCondition.signal();
}
// Wake up the writing thread in case it is blocked by 'popFront'
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(d_allocator_p),
this,
deleteItem);
Expand Down Expand Up @@ -336,7 +337,7 @@ void Channel::flush()
return; // RETURN
}

bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(d_allocator_p),
this,
deleteItem);
Expand Down
12 changes: 3 additions & 9 deletions src/groups/mqb/mqbnet/mqbnet_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ class Channel {

public:
// PUBLIC TYPES

typedef bdlma::ConcurrentPool ItemPool;
typedef bmqc::MonitoredQueue<
bdlcc::SingleConsumerQueue<bslma::ManagedPtr<Item> > >
ItemQueue;
Expand All @@ -366,18 +364,15 @@ class Channel {
e_HWM = 5 // HWM
};

public:
// CONSTANTS
static const int k_ITEM_SIZE = sizeof(Item); // for ItemPool

private:
// CONSTANTS
static const int k_NAGLE_PACKET_SIZE = 1024 * 1024; // 1MB;

// DATA
/// Allocator store to spawn new allocators for sub-components
bmqma::CountingAllocatorStore d_allocators;
// Counting allocator

/// Counting allocator
bslma::Allocator* d_allocator_p;

bmqp::PutEventBuilder d_putBuilder;
Expand All @@ -390,7 +385,7 @@ class Channel {

bmqp::RejectEventBuilder d_rejectBuilder;

ItemPool* d_itemPool_p;
bdlma::ConcurrentPool d_itemPool;
// Pool of 'Item' objects.

ItemQueue d_buffer;
Expand Down Expand Up @@ -513,7 +508,6 @@ class Channel {

/// Create a new object using the specified `allocator`.
Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
ItemPool* itemPool,
const bsl::string& name,
bslma::Allocator* allocator);

Expand Down
24 changes: 6 additions & 18 deletions src/groups/mqb/mqbnet/mqbnet_channel.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,7 @@ static void test1_write()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -793,9 +791,7 @@ static void test2_highWatermark()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -885,9 +881,7 @@ static void test3_highWatermarkInWriteCb()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -977,9 +971,7 @@ static void test4_controlBlob()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -1048,9 +1040,7 @@ static void test5_reconnect()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -1129,9 +1119,7 @@ static void test6_weakData()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down
12 changes: 5 additions & 7 deletions src/groups/mqb/mqbnet/mqbnet_clusterimp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ namespace mqbnet {
ClusterNodeImp::ClusterNodeImp(ClusterImp* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_cluster_p(cluster)
: d_allocators(allocator)
, d_cluster_p(cluster)
, d_config(config, allocator)
, d_description(allocator)
, d_channel(blobBufferFactory, itemPool, config.name(), allocator)
, d_channel(blobBufferFactory, config.name(), d_allocators.get(config.name()))
, d_identity(allocator)
, d_isReading(false)
{
Expand Down Expand Up @@ -181,10 +181,8 @@ ClusterImp::ClusterImp(const bsl::string& name,
const bsl::vector<mqbcfg::ClusterNode>& nodesConfig,
int selfNodeId,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_name(name, allocator)
: d_name(name, allocator)
, d_nodesConfig(nodesConfig, allocator)
, d_selfNodeId(selfNodeId)
, d_selfNode(0) // set below
Expand All @@ -199,7 +197,7 @@ ClusterImp::ClusterImp(const bsl::string& name,
bsl::vector<mqbcfg::ClusterNode>::const_iterator nodeIt;
for (nodeIt = d_nodesConfig.begin(); nodeIt != d_nodesConfig.end();
++nodeIt) {
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory, itemPool);
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory);
d_nodesList.emplace_back(&d_nodes.back());
if (nodeIt->id() == selfNodeId) {
d_selfNode = d_nodesList.back();
Expand Down
8 changes: 3 additions & 5 deletions src/groups/mqb/mqbnet/mqbnet_clusterimp.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class ClusterNodeImp : public ClusterNode {

private:
// DATA
/// Allocator store to spawn new allocators for sub-components
bmqma::CountingAllocatorStore d_allocators;

ClusterImp* d_cluster_p;
// Cluster this node belongs to

Expand Down Expand Up @@ -113,7 +116,6 @@ class ClusterNodeImp : public ClusterNode {
ClusterNodeImp(ClusterImp* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor.
Expand Down Expand Up @@ -196,9 +198,6 @@ class ClusterImp : public Cluster {

private:
// DATA
bslma::Allocator* d_allocator_p;
// Allocator to use

bsl::string d_name;
// Name of this Cluster

Expand Down Expand Up @@ -277,7 +276,6 @@ class ClusterImp : public Cluster {
const bsl::vector<mqbcfg::ClusterNode>& nodesConfig,
int selfNodeId,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor
Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbnet/mqbnet_dummysession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,14 @@ static void test1_BreathingTest()

mqbcfg::ClusterDefinition clusterConfig(s_allocator_p);
bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::MockCluster mockCluster(clusterConfig,
&bufferFactory,
&itemPool,
s_allocator_p);

mqbcfg::ClusterNode clusterNodeConfig(s_allocator_p);
mqbnet::MockClusterNode mockClusterNode(&mockCluster,
clusterNodeConfig,
&bufferFactory,
&itemPool,
s_allocator_p);

bsl::shared_ptr<bmqio::TestChannel> testChannel;
Expand Down
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbnet/mqbnet_mockcluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ namespace mqbnet {
MockClusterNode::MockClusterNode(MockCluster* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_cluster_p(cluster)
, d_config(config, allocator)
, d_description(allocator)
, d_channel(blobBufferFactory, itemPool, config.name(), allocator)
, d_channel(blobBufferFactory, config.name(), allocator)
, d_identity(allocator)
, d_isReading(false)
{
Expand Down Expand Up @@ -156,7 +155,6 @@ void MockCluster::notifyObserversOfNodeStateChange(ClusterNode* node,

MockCluster::MockCluster(const mqbcfg::ClusterDefinition& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_config(config, allocator)
Expand All @@ -171,7 +169,7 @@ MockCluster::MockCluster(const mqbcfg::ClusterDefinition& config,
bsl::vector<mqbcfg::ClusterNode>::const_iterator nodeIt;
for (nodeIt = d_config.nodes().begin(); nodeIt != d_config.nodes().end();
++nodeIt) {
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory, itemPool);
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory);
d_nodesList.emplace_back(&d_nodes.back());
}
}
Expand Down
Loading
Loading