Skip to content

Commit

Permalink
Perf[MQB]: make independent item pools for channels
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <emalygin@bloomberg.net>
  • Loading branch information
678098 committed Oct 26, 2024
1 parent bb7d697 commit 5dbe10b
Show file tree
Hide file tree
Showing 13 changed files with 24 additions and 74 deletions.
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbmock/mqbmock_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ void Cluster::_initializeNetcluster()
d_netCluster_mp.load(new (*d_allocator_p)
mqbnet::MockCluster(d_clusterDefinition,
d_bufferFactory_p,
&d_itemPool,
d_allocator_p),
d_allocator_p);

Expand Down Expand Up @@ -221,7 +220,6 @@ Cluster::Cluster(bdlbb::BlobBufferFactory* bufferFactory,
, d_timeSource(&d_scheduler)
, d_isStarted(false)
, d_clusterDefinition(allocator)
, d_itemPool(mqbnet::Channel::k_ITEM_SIZE, allocator)
, d_channels(allocator)
, d_negotiator_mp()
, d_transportManager(&d_scheduler,
Expand Down
11 changes: 0 additions & 11 deletions src/groups/mqb/mqbmock/mqbmock_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,6 @@ class Cluster : public mqbi::Cluster {
mqbcfg::ClusterDefinition d_clusterDefinition;
// Cluster definition

mqbnet::Channel::ItemPool d_itemPool;
// Item pool

TestChannelMap d_channels;
// Test channels

Expand Down Expand Up @@ -429,9 +426,6 @@ class Cluster : public mqbi::Cluster {
/// Get a modifiable reference to this object's time source.
bdlmt::EventSchedulerTestTimeSource& _timeSource();

/// Get a modifiable reference to this object's item pool.
mqbnet::Channel::ItemPool& _itemPool();

/// Get a modifiable reference to this object's cluster data.
mqbc::ClusterData* _clusterData();

Expand Down Expand Up @@ -585,11 +579,6 @@ inline bdlmt::EventSchedulerTestTimeSource& Cluster::_timeSource()
return d_timeSource;
}

inline mqbnet::Channel::ItemPool& Cluster::_itemPool()
{
return d_itemPool;
}

inline mqbc::ClusterData* Cluster::_clusterData()
{
return d_clusterData_mp.get();
Expand Down
25 changes: 13 additions & 12 deletions src/groups/mqb/mqbnet/mqbnet_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ void Channel::Stats::reset()
// -------------

Channel::Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
ItemPool* itemPool,
const bsl::string& name,
bslma::Allocator* allocator)
: d_allocators(allocator)
Expand All @@ -73,7 +72,9 @@ Channel::Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
, d_ackBuilder(blobBufferFactory, d_allocator_p)
, d_confirmBuilder(blobBufferFactory, d_allocator_p)
, d_rejectBuilder(blobBufferFactory, d_allocator_p)
, d_itemPool_p(itemPool)
, d_itemPool(sizeof(Item),
bsls::BlockGrowth::BSLS_CONSTANT,
d_allocators.get(bsl::string("ChannelPool-") + name))
, d_buffer(1024, allocator)
, d_secondaryBuffer(1024, allocator)
, d_doStop(false)
Expand Down Expand Up @@ -108,7 +109,7 @@ Channel::~Channel()

void Channel::deleteItem(void* item, void* cookie)
{
static_cast<Channel*>(cookie)->d_itemPool_p->deleteObject(
static_cast<Channel*>(cookie)->d_itemPool.deleteObject(
static_cast<Item*>(item));
}

Expand All @@ -119,7 +120,7 @@ Channel::writePut(const bmqp::PutHeader& ph,
bool keepWeakPtr)
{
bslma::ManagedPtr<Item> item(
new (d_itemPool_p->allocate())
new (d_itemPool.allocate())
Item(ph, data, keepWeakPtr, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -136,7 +137,7 @@ Channel::writePush(const bsl::shared_ptr<bdlbb::Blob>& payload,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
msgId,
flags,
Expand All @@ -160,7 +161,7 @@ Channel::writePush(int queueId,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
msgId,
flags,
Expand All @@ -182,7 +183,7 @@ Channel::writeAck(int status,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(
new (d_itemPool_p->allocate())
new (d_itemPool.allocate())
Item(status, correlationId, guid, queueId, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -195,7 +196,7 @@ Channel::writeConfirm(int queueId,
const bmqt::MessageGUID& guid,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
subQueueId,
guid,
Expand All @@ -213,7 +214,7 @@ Channel::writeReject(int queueId,
const bmqt::MessageGUID& guid,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(queueId,
subQueueId,
guid,
Expand All @@ -230,7 +231,7 @@ Channel::writeBlob(const bdlbb::Blob& data,
bmqp::EventType::Enum type,
const bsl::shared_ptr<bmqu::AtomicState>& state)
{
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(data, type, state, d_allocator_p),
this,
deleteItem);
Expand All @@ -253,7 +254,7 @@ void Channel::resetChannel()
d_stateCondition.signal();
}
// Wake up the writing thread in case it is blocked by 'popFront'
bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(d_allocator_p),
this,
deleteItem);
Expand Down Expand Up @@ -336,7 +337,7 @@ void Channel::flush()
return; // RETURN
}

bslma::ManagedPtr<Item> item(new (d_itemPool_p->allocate())
bslma::ManagedPtr<Item> item(new (d_itemPool.allocate())
Item(d_allocator_p),
this,
deleteItem);
Expand Down
9 changes: 1 addition & 8 deletions src/groups/mqb/mqbnet/mqbnet_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,6 @@ class Channel {

public:
// PUBLIC TYPES

typedef bdlma::ConcurrentPool ItemPool;
typedef bmqc::MonitoredQueue<
bdlcc::SingleConsumerQueue<bslma::ManagedPtr<Item> > >
ItemQueue;
Expand All @@ -366,10 +364,6 @@ class Channel {
e_HWM = 5 // HWM
};

public:
// CONSTANTS
static const int k_ITEM_SIZE = sizeof(Item); // for ItemPool

private:
// CONSTANTS
static const int k_NAGLE_PACKET_SIZE = 1024 * 1024; // 1MB;
Expand All @@ -390,7 +384,7 @@ class Channel {

bmqp::RejectEventBuilder d_rejectBuilder;

ItemPool* d_itemPool_p;
bdlma::ConcurrentPool d_itemPool;
// Pool of 'Item' objects.

ItemQueue d_buffer;
Expand Down Expand Up @@ -513,7 +507,6 @@ class Channel {

/// Create a new object using the specified `allocator`.
Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
ItemPool* itemPool,
const bsl::string& name,
bslma::Allocator* allocator);

Expand Down
24 changes: 6 additions & 18 deletions src/groups/mqb/mqbnet/mqbnet_channel.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,7 @@ static void test1_write()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -793,9 +791,7 @@ static void test2_highWatermark()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -885,9 +881,7 @@ static void test3_highWatermarkInWriteCb()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -977,9 +971,7 @@ static void test4_controlBlob()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -1048,9 +1040,7 @@ static void test5_reconnect()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down Expand Up @@ -1129,9 +1119,7 @@ static void test6_weakData()
// ------------------------------------------------------------------------
{
bdlbb::PooledBlobBufferFactory bufferFactory(k_BUFFER_SIZE, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::Channel channel(&bufferFactory, &itemPool, "test", s_allocator_p);
mqbnet::Channel channel(&bufferFactory, "test", s_allocator_p);

bsl::shared_ptr<bmqio::TestChannelEx> testChannel(
new (*s_allocator_p)
Expand Down
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbnet/mqbnet_clusterimp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ namespace mqbnet {
ClusterNodeImp::ClusterNodeImp(ClusterImp* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_cluster_p(cluster)
, d_config(config, allocator)
, d_description(allocator)
, d_channel(blobBufferFactory, itemPool, config.name(), allocator)
, d_channel(blobBufferFactory, config.name(), allocator)
, d_identity(allocator)
, d_isReading(false)
{
Expand Down Expand Up @@ -181,7 +180,6 @@ ClusterImp::ClusterImp(const bsl::string& name,
const bsl::vector<mqbcfg::ClusterNode>& nodesConfig,
int selfNodeId,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_name(name, allocator)
Expand All @@ -199,7 +197,7 @@ ClusterImp::ClusterImp(const bsl::string& name,
bsl::vector<mqbcfg::ClusterNode>::const_iterator nodeIt;
for (nodeIt = d_nodesConfig.begin(); nodeIt != d_nodesConfig.end();
++nodeIt) {
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory, itemPool);
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory);
d_nodesList.emplace_back(&d_nodes.back());
if (nodeIt->id() == selfNodeId) {
d_selfNode = d_nodesList.back();
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbnet/mqbnet_clusterimp.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ class ClusterNodeImp : public ClusterNode {
ClusterNodeImp(ClusterImp* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor.
Expand Down Expand Up @@ -277,7 +276,6 @@ class ClusterImp : public Cluster {
const bsl::vector<mqbcfg::ClusterNode>& nodesConfig,
int selfNodeId,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor
Expand Down
4 changes: 0 additions & 4 deletions src/groups/mqb/mqbnet/mqbnet_dummysession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,14 @@ static void test1_BreathingTest()

mqbcfg::ClusterDefinition clusterConfig(s_allocator_p);
bdlbb::PooledBlobBufferFactory bufferFactory(1024, s_allocator_p);
mqbnet::Channel::ItemPool itemPool(mqbnet::Channel::k_ITEM_SIZE,
s_allocator_p);
mqbnet::MockCluster mockCluster(clusterConfig,
&bufferFactory,
&itemPool,
s_allocator_p);

mqbcfg::ClusterNode clusterNodeConfig(s_allocator_p);
mqbnet::MockClusterNode mockClusterNode(&mockCluster,
clusterNodeConfig,
&bufferFactory,
&itemPool,
s_allocator_p);

bsl::shared_ptr<bmqio::TestChannel> testChannel;
Expand Down
6 changes: 2 additions & 4 deletions src/groups/mqb/mqbnet/mqbnet_mockcluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ namespace mqbnet {
MockClusterNode::MockClusterNode(MockCluster* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_cluster_p(cluster)
, d_config(config, allocator)
, d_description(allocator)
, d_channel(blobBufferFactory, itemPool, config.name(), allocator)
, d_channel(blobBufferFactory, config.name(), allocator)
, d_identity(allocator)
, d_isReading(false)
{
Expand Down Expand Up @@ -156,7 +155,6 @@ void MockCluster::notifyObserversOfNodeStateChange(ClusterNode* node,

MockCluster::MockCluster(const mqbcfg::ClusterDefinition& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_config(config, allocator)
Expand All @@ -171,7 +169,7 @@ MockCluster::MockCluster(const mqbcfg::ClusterDefinition& config,
bsl::vector<mqbcfg::ClusterNode>::const_iterator nodeIt;
for (nodeIt = d_config.nodes().begin(); nodeIt != d_config.nodes().end();
++nodeIt) {
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory, itemPool);
d_nodes.emplace_back(this, *nodeIt, blobBufferFactory);
d_nodesList.emplace_back(&d_nodes.back());
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbnet/mqbnet_mockcluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ class MockClusterNode : public ClusterNode {
MockClusterNode(MockCluster* cluster,
const mqbcfg::ClusterNode& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor.
Expand Down Expand Up @@ -251,7 +250,6 @@ class MockCluster : public Cluster {
/// `allocator`.
MockCluster(const mqbcfg::ClusterDefinition& config,
bdlbb::BlobBufferFactory* blobBufferFactory,
Channel::ItemPool* itemPool,
bslma::Allocator* allocator);

/// Destructor
Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbnet/mqbnet_transportmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ TransportManager::TransportManager(bdlmt::EventScheduler* scheduler,
, d_state(e_STOPPED)
, d_scheduler_p(scheduler)
, d_blobBufferFactory_p(blobBufferFactory)
, d_itemPool(Channel::k_ITEM_SIZE, bsls::BlockGrowth::BSLS_CONSTANT, allocator)
, d_negotiator_mp(negotiator)
, d_statController_p(statController)
, d_tcpSessionFactory_mp(0)
Expand Down Expand Up @@ -525,7 +524,6 @@ int TransportManager::createCluster(
nodes,
myNodeId,
d_blobBufferFactory_p,
&d_itemPool,
d_allocator_p),
d_allocator_p);

Expand Down
2 changes: 0 additions & 2 deletions src/groups/mqb/mqbnet/mqbnet_transportmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ class TransportManager {
// BlobBufferFactory to use by the
// sessions

Channel::ItemPool d_itemPool;

bslma::ManagedPtr<Negotiator> d_negotiator_mp;
// Negotiation to use

Expand Down
Loading

0 comments on commit 5dbe10b

Please sign in to comment.