Skip to content

Commit

Permalink
Feat[BMQ, MQB, MWC]: performance tune-up [1] (#375)
Browse files Browse the repository at this point in the history
* performance tune-up #1

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>

* Reverting the change causing IT failure

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>

* Addressing review

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>

* cleaning

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>

---------

Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
  • Loading branch information
dorjesinpo authored Aug 8, 2024
1 parent fc69873 commit 7d4b3d7
Show file tree
Hide file tree
Showing 23 changed files with 424 additions and 292 deletions.
122 changes: 47 additions & 75 deletions src/groups/bmq/bmqp/bmqp_eventutil.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ generateSubQueueInfos(bmqp::Protocol::SubQueueInfosArray* subQueueInfos,

subQueueInfos->clear();

static unsigned int nextSubId = 1;
for (int i = 0; i < numSubQueueInfos; ++i) {
subQueueInfos->push_back(bmqp::SubQueueInfo(nextSubId++));
subQueueInfos->push_back(bmqp::SubQueueInfo(i + 1));
}
}

Expand Down Expand Up @@ -159,7 +158,7 @@ static void appendDatum(bsl::vector<Data>* data,
BSLS_ASSERT_OPT(payloadLength >= 0);

Data datum(bufferFactory, allocator);
datum.d_qid = generateRandomInteger(1, 200);
datum.d_qid = data->size();
datum.d_flags = 0;
// Use the new SubQueueInfo option
datum.d_isSubQueueInfo = true;
Expand Down Expand Up @@ -420,10 +419,11 @@ static void test2_flattenExplodesEvent()
//
// Plan:
// 1) Create an event composed of one message having a payload of size
// one third the maximum enforced size and four SubQueueIds.
// a little over the quarter of the maximum enforced size and four
// SubQueueIds.
// 2) Flatten the event.
// 3) Verify that the flattening results in two event blobs, each having
// two messages with one SubQueueId each.
// 3) Verify that the flattening results in two event blobs, first having
// 15 messages and the second 1 with one SubQueueId each.
//
// Testing:
// Flattening an event having a message with more than one SubQueueId
Expand All @@ -444,13 +444,21 @@ static void test2_flattenExplodesEvent()
// 1) Event composed of one message having a payload of size one third the
// maximum enforced size and four SubQueueIds.
// Msg1
payloadLength = bmqp::EventHeader::k_MAX_SIZE_SOFT / 3;
payloadLength = bmqp::PushHeader::k_MAX_PAYLOAD_SIZE_SOFT / 2;
numSubQueueIds = 4;
appendDatum(&data,
numSubQueueIds,
payloadLength,
&bufferFactory,
s_allocator_p);

int count = 0;
int total = 0;

while (total < bmqp::EventHeader::k_MAX_SIZE_SOFT / 4) {
appendDatum(&data,
numSubQueueIds,
payloadLength,
&bufferFactory,
s_allocator_p);
total += data[count].d_payload.length();
++count;
}

// Create event
appendMessages(&pushEventBuilder, data);
Expand All @@ -468,12 +476,12 @@ static void test2_flattenExplodesEvent()
// 3) Verify that the flattening results in two event blobs, each having
// two messages with one SubQueueId each.
bmqp::PushMessageIterator msgIterator(&bufferFactory, s_allocator_p);
const Data& D = data[0];
int idx = 0;

// 1st flattened event
bmqp::Event flattenedEvent1(&(eventInfos[0].d_blob), s_allocator_p);
ASSERT_EQ(eventInfos[0].d_ids.size(), 2u);
ASSERT_EQ(eventInfos[0].d_ids.size(),
static_cast<size_t>(count * numSubQueueIds - 1));

flattenedEvent1.loadPushMessageIterator(&msgIterator, true);
BSLS_ASSERT_OPT(msgIterator.isValid());
Expand All @@ -489,7 +497,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -506,14 +514,16 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);
ASSERT_EQ(data[0].d_subQueueInfos[0], subQueueInfos[0]);
}

// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message
const int id = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message

ASSERT(find(eventInfos[0], id, subcriptionId));
for (size_t i = 0; i < eventInfos[0].d_ids.size(); ++i) {
ASSERT_EQ(eventInfos[0].d_ids[i].d_subscriptionId, i % count + 1);
ASSERT_EQ(size_t(eventInfos[0].d_ids[i].d_header.queueId()),
i / count);
}

++idx;
Expand All @@ -529,7 +539,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -546,25 +556,19 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);

// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
ASSERT(find(eventInfos[0], qId, subcriptionId));
ASSERT_EQ(data[0].d_subQueueInfos[idx], subQueueInfos[0]);
}

++idx;
idx = count - 1; // the last one did not fit the first event

// 2nd flattened event
bmqp::Event flattenedEvent2(&(eventInfos[1].d_blob), s_allocator_p);
ASSERT_EQ(eventInfos[1].d_ids.size(), 2u);
ASSERT_EQ(eventInfos[1].d_ids.size(), 1u);

flattenedEvent2.loadPushMessageIterator(&msgIterator, true);
BSLS_ASSERT_OPT(msgIterator.isValid());

// Third message
// 1st message in tne second event
rc = msgIterator.next();
BSLS_ASSERT_OPT(rc == 1);
BSLS_ASSERT_OPT(msgIterator.hasOptions());
Expand All @@ -575,7 +579,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[idx].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -592,54 +596,22 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx].id(), subQueueInfos[0].id());
ASSERT_EQ(data[idx].d_subQueueInfos[idx].id(), subQueueInfos[0].id());

// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
const int qId = data[idx].d_qid;
const unsigned int subcriptionId = data[idx].d_subQueueInfos[idx].id();
ASSERT(find(eventInfos[1], qId, subcriptionId));
}

++idx;

// Fourth message
rc = msgIterator.next();
BSLS_ASSERT_OPT(rc == 1);
BSLS_ASSERT_OPT(msgIterator.hasOptions());

// Verify payload
{
bdlbb::Blob payload(&bufferFactory, s_allocator_p);
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
}

// Verify SubQueueInfos
{
bmqp::OptionsView optionsView(s_allocator_p);
rc = msgIterator.loadOptionsView(&optionsView);
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(optionsView.isValid());
BSLS_ASSERT_OPT(
optionsView.find(bmqp::OptionType::e_SUB_QUEUE_INFOS) !=
optionsView.end());
bmqp::Protocol::SubQueueInfosArray subQueueInfos(s_allocator_p);
rc = optionsView.loadSubQueueInfosOption(&subQueueInfos);
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);
// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message

// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
ASSERT_EQ(eventInfos[1].d_ids[0].d_subscriptionId,
data[count - 1].d_subQueueInfos.back().id());
ASSERT_EQ(eventInfos[1].d_ids[0].d_header.queueId(), count - 1);

ASSERT(find(eventInfos[1], qId, subcriptionId));
}
// No more messages
rc = msgIterator.next();
}

static void test3_flattenWithMessageProperties()
Expand Down
3 changes: 0 additions & 3 deletions src/groups/bmq/bmqp/bmqp_optionutil.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ static void test2_basicOptionsBoxCanAdd()
const int maxPayload = k_MAX_SIZE - headerSize;
const LimitT limit = maxCanBeAdded(contentSize, maxPayload);
ASSERT_EQ(Result::e_OPTION_TOO_BIG, limit.second);
const int sizeLeft = k_MAX_SIZE_SOFT - contentSize;
const int expected = sizeLeft / k_MAX_SIZE;
ASSERT_EQ(expected, limit.first);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ const int MessagePropertyHeader::k_PROP_NAME_LEN_MASK = bdlb::BitMaskUtil::one(

const int PutHeader::k_MAX_OPTIONS_SIZE;
const int PutHeader::k_MAX_PAYLOAD_SIZE_SOFT;
const int PutHeader::k_MAX_SIZE_SOFT;
// Force variable/symbol definition so that it can be used in other files

const int PutHeader::k_FLAGS_MASK = bdlb::BitMaskUtil::one(
Expand Down
11 changes: 7 additions & 4 deletions src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -823,11 +823,13 @@ struct EventHeader {
/// an outgoing storage message can exceed
/// `PutHeader::k_MAX_PAYLOAD_SIZE_SOFT`. So, we assign a value of 65MB
/// to `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT`, and assign a value of
/// 66MB to `'EventHeader::k_MAX_SIZE_SOFT` such that a PUT message
/// having the maximum allowable value is processed through the entire
/// BlazingMQ pipeline w/o any issues. Also see notes in
/// at least 66MB to `EventHeader::k_MAX_SIZE_SOFT` such that a PUT
/// message having the maximum allowable value is processed through the
/// BlazingMQ pipeline w/o any issues. The value of
/// `EventHeader::k_MAX_SIZE_SOFT` is 512Mb to improve batching at high
/// posting rates. Also see notes for the
/// `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT` constant.
static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024;
static const int k_MAX_SIZE_SOFT = 512 * 1024 * 1024;

/// Highest possible value for the type of an event.
static const int k_MAX_TYPE = (1 << k_TYPE_NUM_BITS) - 1;
Expand Down Expand Up @@ -1492,6 +1494,7 @@ struct PutHeader {
/// be increased but not up to `k_MAX_SIZE`.
static const int k_MAX_PAYLOAD_SIZE_SOFT = 64 * 1024 * 1024;

static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024;
/// Maximum size (bytes) of the options area.
static const int k_MAX_OPTIONS_SIZE = ((1 << k_OPTIONS_WORDS_NUM_BITS) -
1) *
Expand Down
17 changes: 15 additions & 2 deletions src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1077,15 +1077,28 @@ static void test8_buildEventTooBig()
bdlbb::Blob validPayload2(&bufferFactory, s_allocator_p);
bdlbb::BlobUtil::append(&validPayload2, s.c_str(), validLen);

rc = peb.packMessage(validPayload2,
int count = 1;
while ((evtSize + sizeof(bmqp::PushHeader) + validLen) <
bmqp::EventHeader::k_MAX_SIZE_SOFT) {
rc = peb.packMessage(validPayload2,
queueId,
guid,
flags,
bmqt::CompressionAlgorithmType::e_NONE);
ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS);
evtSize += sizeof(bmqp::PushHeader) + validLen;
++count;
}
evtSize = peb.eventSize(); // not calculating padding
rc = peb.packMessage(validPayload2,
queueId,
guid,
flags,
bmqt::CompressionAlgorithmType::e_NONE);

ASSERT_EQ(rc, bmqt::EventBuilderResult::e_EVENT_TOO_BIG);
ASSERT_EQ(evtSize, peb.eventSize());
ASSERT_EQ(1, peb.messageCount());
ASSERT_EQ(count, peb.messageCount());
}

static void testN1_decodeFromFile()
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ PutEventBuilder::packMessageInternal(const bdlbb::Blob& appData, int queueId)
numPaddingBytes;

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(sizeNoOptions >
EventHeader::k_MAX_SIZE_SOFT)) {
PutHeader::k_MAX_SIZE_SOFT)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
return Result::e_EVENT_TOO_BIG; // RETURN
}
Expand Down
5 changes: 4 additions & 1 deletion src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ Application::Application(bdlmt::EventScheduler* scheduler,
1,
bsls::TimeInterval(120).totalMilliseconds(),
allocator)
, d_bufferFactory(k_BLOBBUFFER_SIZE, d_allocators.get("BufferFactory"))
, d_bufferFactory(k_BLOBBUFFER_SIZE,
bsls::BlockGrowth::BSLS_CONSTANT,
d_allocators.get("BufferFactory"))

, d_blobSpPool(bdlf::BindUtil::bind(&createBlob,
&d_bufferFactory,
bdlf::PlaceHolders::_1, // arena
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqba/mqba_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ int Dispatcher::startContext(bsl::ostream& errorDescription,
// We should have subcontext per each type of event (PUSH, PUT,
// CALLBACK, ACK, ...)

processorPoolConfig.setGrowBy(64 * 1024);

context->d_processorPool_mp.load(
new (*d_allocator_p) ProcessorPool(processorPoolConfig, d_allocator_p),
d_allocator_p);
Expand Down
Loading

0 comments on commit 7d4b3d7

Please sign in to comment.