Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance tune-up #181

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 46 additions & 75 deletions src/groups/bmq/bmqp/bmqp_eventutil.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ generateSubQueueInfos(bmqp::Protocol::SubQueueInfosArray* subQueueInfos,

subQueueInfos->clear();

static unsigned int nextSubId = 1;
for (int i = 0; i < numSubQueueInfos; ++i) {
subQueueInfos->push_back(bmqp::SubQueueInfo(nextSubId++));
subQueueInfos->push_back(bmqp::SubQueueInfo(i + 1));
}
}

Expand Down Expand Up @@ -159,7 +158,7 @@ static void appendDatum(bsl::vector<Data>* data,
BSLS_ASSERT_OPT(payloadLength >= 0);

Data datum(bufferFactory, allocator);
datum.d_qid = generateRandomInteger(1, 200);
datum.d_qid = data->size();
datum.d_flags = 0;
// Use the new SubQueueInfo option
datum.d_isSubQueueInfo = true;
Expand Down Expand Up @@ -420,10 +419,11 @@ static void test2_flattenExplodesEvent()
//
// Plan:
// 1) Create an event composed of one message having a payload of size
// one third the maximum enforced size and four SubQueueIds.
// a little over the quarter of the maximum enforced size and four
// SubQueueIds.
// 2) Flatten the event.
// 3) Verify that the flattening results in two event blobs, each having
// two messages with one SubQueueId each.
// 3) Verify that the flattening results in two event blobs, first having
// 15 messages and the second 1 with one SubQueueId each.
//
// Testing:
// Flattening an event having a message with more than one SubQueueId
Expand All @@ -444,13 +444,21 @@ static void test2_flattenExplodesEvent()
// 1) Event composed of one message having a payload of size one third the
// maximum enforced size and four SubQueueIds.
// Msg1
payloadLength = bmqp::EventHeader::k_MAX_SIZE_SOFT / 3;
payloadLength = bmqp::PushHeader::k_MAX_PAYLOAD_SIZE_SOFT / 2;
numSubQueueIds = 4;
appendDatum(&data,
numSubQueueIds,
payloadLength,
&bufferFactory,
s_allocator_p);

int count = 0;
int total = 0;

while (total < bmqp::EventHeader::k_MAX_SIZE_SOFT / 4) {
appendDatum(&data,
numSubQueueIds,
payloadLength,
&bufferFactory,
s_allocator_p);
total += data[count].d_payload.length();
++count;
}

// Create event
appendMessages(&pushEventBuilder, data);
Expand All @@ -468,12 +476,11 @@ static void test2_flattenExplodesEvent()
// 3) Verify that the flattening results in two event blobs, each having
// two messages with one SubQueueId each.
bmqp::PushMessageIterator msgIterator(&bufferFactory, s_allocator_p);
const Data& D = data[0];
int idx = 0;

// 1st flattened event
bmqp::Event flattenedEvent1(&(eventInfos[0].d_blob), s_allocator_p);
ASSERT_EQ(eventInfos[0].d_ids.size(), 2u);
ASSERT_EQ(eventInfos[0].d_ids.size(), size_t(count * numSubQueueIds - 1));

flattenedEvent1.loadPushMessageIterator(&msgIterator, true);
BSLS_ASSERT_OPT(msgIterator.isValid());
Expand All @@ -489,7 +496,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -506,14 +513,16 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);
ASSERT_EQ(data[0].d_subQueueInfos[0], subQueueInfos[0]);
}

// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message
const int id = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message

ASSERT(find(eventInfos[0], id, subcriptionId));
for (size_t i = 0; i < eventInfos[0].d_ids.size(); ++i) {
ASSERT_EQ(eventInfos[0].d_ids[i].d_subscriptionId, i % count + 1);
ASSERT_EQ(size_t(eventInfos[0].d_ids[i].d_header.queueId()),
i / count);
}

++idx;
Expand All @@ -529,7 +538,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -546,25 +555,19 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);

// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
ASSERT(find(eventInfos[0], qId, subcriptionId));
ASSERT_EQ(data[0].d_subQueueInfos[idx], subQueueInfos[0]);
}

++idx;
idx = count - 1; // the last one did not fit the first event

// 2nd flattened event
bmqp::Event flattenedEvent2(&(eventInfos[1].d_blob), s_allocator_p);
ASSERT_EQ(eventInfos[1].d_ids.size(), 2u);
ASSERT_EQ(eventInfos[1].d_ids.size(), 1u);

flattenedEvent2.loadPushMessageIterator(&msgIterator, true);
BSLS_ASSERT_OPT(msgIterator.isValid());

// Third message
// 1st message in tne second event
rc = msgIterator.next();
BSLS_ASSERT_OPT(rc == 1);
BSLS_ASSERT_OPT(msgIterator.hasOptions());
Expand All @@ -575,7 +578,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[idx].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -592,54 +595,22 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx].id(), subQueueInfos[0].id());
ASSERT_EQ(data[idx].d_subQueueInfos[idx].id(), subQueueInfos[0].id());

// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
const int qId = data[idx].d_qid;
const unsigned int subcriptionId = data[idx].d_subQueueInfos[idx].id();
ASSERT(find(eventInfos[1], qId, subcriptionId));
}

++idx;

// Fourth message
rc = msgIterator.next();
BSLS_ASSERT_OPT(rc == 1);
BSLS_ASSERT_OPT(msgIterator.hasOptions());

// Verify payload
{
bdlbb::Blob payload(&bufferFactory, s_allocator_p);
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
}

// Verify SubQueueInfos
{
bmqp::OptionsView optionsView(s_allocator_p);
rc = msgIterator.loadOptionsView(&optionsView);
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(optionsView.isValid());
BSLS_ASSERT_OPT(
optionsView.find(bmqp::OptionType::e_SUB_QUEUE_INFOS) !=
optionsView.end());
bmqp::Protocol::SubQueueInfosArray subQueueInfos(s_allocator_p);
rc = optionsView.loadSubQueueInfosOption(&subQueueInfos);
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);
// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message

// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
ASSERT_EQ(eventInfos[1].d_ids[0].d_subscriptionId,
data[count - 1].d_subQueueInfos.back().id());
ASSERT_EQ(eventInfos[1].d_ids[0].d_header.queueId(), count - 1);

ASSERT(find(eventInfos[1], qId, subcriptionId));
}
// No more messages
rc = msgIterator.next();
}

static void test3_flattenWithMessageProperties()
Expand Down
3 changes: 0 additions & 3 deletions src/groups/bmq/bmqp/bmqp_optionutil.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ static void test2_basicOptionsBoxCanAdd()
const int maxPayload = k_MAX_SIZE - headerSize;
const LimitT limit = maxCanBeAdded(contentSize, maxPayload);
ASSERT_EQ(Result::e_OPTION_TOO_BIG, limit.second);
const int sizeLeft = k_MAX_SIZE_SOFT - contentSize;
const int expected = sizeLeft / k_MAX_SIZE;
ASSERT_EQ(expected, limit.first);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ const int MessagePropertyHeader::k_PROP_NAME_LEN_MASK = bdlb::BitMaskUtil::one(

const int PutHeader::k_MAX_OPTIONS_SIZE;
const int PutHeader::k_MAX_PAYLOAD_SIZE_SOFT;
const int PutHeader::k_MAX_SIZE_SOFT;
// Force variable/symbol definition so that it can be used in other files

const int PutHeader::k_FLAGS_MASK = bdlb::BitMaskUtil::one(
Expand Down
3 changes: 2 additions & 1 deletion src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ struct EventHeader {
/// having the maximum allowable value is processed through the entire
/// BlazingMQ pipeline w/o any issues. Also see notes in
/// `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT` constant.
static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024;
static const int k_MAX_SIZE_SOFT = 512 * 1024 * 1024;

/// Highest possible value for the type of an event.
static const int k_MAX_TYPE = (1 << k_TYPE_NUM_BITS) - 1;
Expand Down Expand Up @@ -1483,6 +1483,7 @@ struct PutHeader {
/// be increased but not up to `k_MAX_SIZE`.
static const int k_MAX_PAYLOAD_SIZE_SOFT = 64 * 1024 * 1024;

static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024;
/// Maximum size (bytes) of the options area.
static const int k_MAX_OPTIONS_SIZE = ((1 << k_OPTIONS_WORDS_NUM_BITS) -
1) *
Expand Down
17 changes: 15 additions & 2 deletions src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1077,15 +1077,28 @@ static void test8_buildEventTooBig()
bdlbb::Blob validPayload2(&bufferFactory, s_allocator_p);
bdlbb::BlobUtil::append(&validPayload2, s.c_str(), validLen);

rc = peb.packMessage(validPayload2,
int count = 1;
while ((evtSize + sizeof(bmqp::PushHeader) + validLen) <
bmqp::EventHeader::k_MAX_SIZE_SOFT) {
rc = peb.packMessage(validPayload2,
queueId,
guid,
flags,
bmqt::CompressionAlgorithmType::e_NONE);
ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS);
evtSize += sizeof(bmqp::PushHeader) + validLen;
++count;
}
evtSize = peb.eventSize(); // not calculating padding
rc = peb.packMessage(validPayload2,
queueId,
guid,
flags,
bmqt::CompressionAlgorithmType::e_NONE);

ASSERT_EQ(rc, bmqt::EventBuilderResult::e_EVENT_TOO_BIG);
ASSERT_EQ(evtSize, peb.eventSize());
ASSERT_EQ(1, peb.messageCount());
ASSERT_EQ(count, peb.messageCount());
}

static void testN1_decodeFromFile()
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ PutEventBuilder::packMessageInternal(const bdlbb::Blob& appData, int queueId)
numPaddingBytes;

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(sizeNoOptions >
EventHeader::k_MAX_SIZE_SOFT)) {
PutHeader::k_MAX_SIZE_SOFT)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
return Result::e_EVENT_TOO_BIG; // RETURN
}
Expand Down
5 changes: 4 additions & 1 deletion src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ Application::Application(bdlmt::EventScheduler* scheduler,
1,
bsls::TimeInterval(120).totalMilliseconds(),
allocator)
, d_bufferFactory(k_BLOBBUFFER_SIZE, d_allocators.get("BufferFactory"))
, d_bufferFactory(k_BLOBBUFFER_SIZE,
bsls::BlockGrowth::BSLS_CONSTANT,
d_allocators.get("BufferFactory"))

, d_blobSpPool(bdlf::BindUtil::bind(&createBlob,
&d_bufferFactory,
bdlf::PlaceHolders::_1, // arena
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqba/mqba_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ Dispatcher::ProcessorPool::Queue* Dispatcher::queueCreator(
bsl::string queueName(os.str().data(), os.str().length());

ProcessorPool::Queue* queue = new (*allocator)
ProcessorPool::Queue(config.queueSizeLowWatermark(), allocator);
ProcessorPool::Queue(config.queueSize(), allocator);

queue->setWatermarks(config.queueSizeLowWatermark(),
config.queueSizeHighWatermark());
Expand Down
Loading
Loading