diff --git a/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp b/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp index eb8b1e2d1..fa1978774 100644 --- a/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_eventutil.t.cpp @@ -118,9 +118,8 @@ generateSubQueueInfos(bmqp::Protocol::SubQueueInfosArray* subQueueInfos, subQueueInfos->clear(); - static unsigned int nextSubId = 1; for (int i = 0; i < numSubQueueInfos; ++i) { - subQueueInfos->push_back(bmqp::SubQueueInfo(nextSubId++)); + subQueueInfos->push_back(bmqp::SubQueueInfo(i + 1)); } } @@ -159,7 +158,7 @@ static void appendDatum(bsl::vector* data, BSLS_ASSERT_OPT(payloadLength >= 0); Data datum(bufferFactory, allocator); - datum.d_qid = generateRandomInteger(1, 200); + datum.d_qid = data->size(); datum.d_flags = 0; // Use the new SubQueueInfo option datum.d_isSubQueueInfo = true; @@ -420,10 +419,11 @@ static void test2_flattenExplodesEvent() // // Plan: // 1) Create an event composed of one message having a payload of size -// one third the maximum enforced size and four SubQueueIds. +// a little over the quarter of the maximum enforced size and four +// SubQueueIds. // 2) Flatten the event. -// 3) Verify that the flattening results in two event blobs, each having -// two messages with one SubQueueId each. +// 3) Verify that the flattening results in two event blobs, first having +// 15 messages and the second 1 with one SubQueueId each. // // Testing: // Flattening an event having a message with more than one SubQueueId @@ -444,13 +444,21 @@ static void test2_flattenExplodesEvent() // 1) Event composed of one message having a payload of size one third the // maximum enforced size and four SubQueueIds. // Msg1 - payloadLength = bmqp::EventHeader::k_MAX_SIZE_SOFT / 3; + payloadLength = bmqp::PushHeader::k_MAX_PAYLOAD_SIZE_SOFT / 2; numSubQueueIds = 4; - appendDatum(&data, - numSubQueueIds, - payloadLength, - &bufferFactory, - s_allocator_p); + + int count = 0; + int total = 0; + + while (total < bmqp::EventHeader::k_MAX_SIZE_SOFT / 4) { + appendDatum(&data, + numSubQueueIds, + payloadLength, + &bufferFactory, + s_allocator_p); + total += data[count].d_payload.length(); + ++count; + } // Create event appendMessages(&pushEventBuilder, data); @@ -468,12 +476,11 @@ static void test2_flattenExplodesEvent() // 3) Verify that the flattening results in two event blobs, each having // two messages with one SubQueueId each. bmqp::PushMessageIterator msgIterator(&bufferFactory, s_allocator_p); - const Data& D = data[0]; int idx = 0; // 1st flattened event bmqp::Event flattenedEvent1(&(eventInfos[0].d_blob), s_allocator_p); - ASSERT_EQ(eventInfos[0].d_ids.size(), 2u); + ASSERT_EQ(eventInfos[0].d_ids.size(), size_t(count * numSubQueueIds - 1)); flattenedEvent1.loadPushMessageIterator(&msgIterator, true); BSLS_ASSERT_OPT(msgIterator.isValid()); @@ -489,7 +496,7 @@ static void test2_flattenExplodesEvent() rc = msgIterator.loadMessagePayload(&payload); BSLS_ASSERT_OPT(rc == 0); - ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0); } // Verify SubQueueInfos @@ -506,14 +513,16 @@ static void test2_flattenExplodesEvent() BSLS_ASSERT_OPT(rc == 0); BSLS_ASSERT_OPT(subQueueInfos.size() == 1); - ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]); + ASSERT_EQ(data[0].d_subQueueInfos[0], subQueueInfos[0]); + } - // Verify that 'eventInfo' contains the queueId pair (id, subId) - // corresponding to this message - const int id = D.d_qid; - const unsigned int subcriptionId = D.d_subQueueInfos[idx].id(); + // Verify that 'eventInfo' contains the queueId pair (id, subId) + // corresponding to this message - ASSERT(find(eventInfos[0], id, subcriptionId)); + for (size_t i = 0; i < eventInfos[0].d_ids.size(); ++i) { + ASSERT_EQ(eventInfos[0].d_ids[i].d_subscriptionId, i % count + 1); + ASSERT_EQ(size_t(eventInfos[0].d_ids[i].d_header.queueId()), + i / count); } ++idx; @@ -529,7 +538,7 @@ static void test2_flattenExplodesEvent() rc = msgIterator.loadMessagePayload(&payload); BSLS_ASSERT_OPT(rc == 0); - ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0); } // Verify SubQueueInfos @@ -546,25 +555,19 @@ static void test2_flattenExplodesEvent() BSLS_ASSERT_OPT(rc == 0); BSLS_ASSERT_OPT(subQueueInfos.size() == 1); - ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]); - - // Verify that 'eventInfo' contains the queueId pair (id, subId) - // corresponding to this message - const int qId = D.d_qid; - const unsigned int subcriptionId = D.d_subQueueInfos[idx].id(); - ASSERT(find(eventInfos[0], qId, subcriptionId)); + ASSERT_EQ(data[0].d_subQueueInfos[idx], subQueueInfos[0]); } - ++idx; + idx = count - 1; // the last one did not fit the first event // 2nd flattened event bmqp::Event flattenedEvent2(&(eventInfos[1].d_blob), s_allocator_p); - ASSERT_EQ(eventInfos[1].d_ids.size(), 2u); + ASSERT_EQ(eventInfos[1].d_ids.size(), 1u); flattenedEvent2.loadPushMessageIterator(&msgIterator, true); BSLS_ASSERT_OPT(msgIterator.isValid()); - // Third message + // 1st message in tne second event rc = msgIterator.next(); BSLS_ASSERT_OPT(rc == 1); BSLS_ASSERT_OPT(msgIterator.hasOptions()); @@ -575,7 +578,7 @@ static void test2_flattenExplodesEvent() rc = msgIterator.loadMessagePayload(&payload); BSLS_ASSERT_OPT(rc == 0); - ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0); + ASSERT_EQ(bdlbb::BlobUtil::compare(data[idx].d_payload, payload), 0); } // Verify SubQueueInfos @@ -592,54 +595,22 @@ static void test2_flattenExplodesEvent() BSLS_ASSERT_OPT(rc == 0); BSLS_ASSERT_OPT(subQueueInfos.size() == 1); - ASSERT_EQ(D.d_subQueueInfos[idx].id(), subQueueInfos[0].id()); + ASSERT_EQ(data[idx].d_subQueueInfos[idx].id(), subQueueInfos[0].id()); - // Verify that 'eventInfo' contains the queueId (queueId, subQueueId) - // pair corresponding to this message - const int qId = D.d_qid; - const unsigned int subcriptionId = D.d_subQueueInfos[idx].id(); + const int qId = data[idx].d_qid; + const unsigned int subcriptionId = data[idx].d_subQueueInfos[idx].id(); ASSERT(find(eventInfos[1], qId, subcriptionId)); } - ++idx; - - // Fourth message - rc = msgIterator.next(); - BSLS_ASSERT_OPT(rc == 1); - BSLS_ASSERT_OPT(msgIterator.hasOptions()); - - // Verify payload - { - bdlbb::Blob payload(&bufferFactory, s_allocator_p); - rc = msgIterator.loadMessagePayload(&payload); - BSLS_ASSERT_OPT(rc == 0); - - ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0); - } - - // Verify SubQueueInfos - { - bmqp::OptionsView optionsView(s_allocator_p); - rc = msgIterator.loadOptionsView(&optionsView); - BSLS_ASSERT_OPT(rc == 0); - BSLS_ASSERT_OPT(optionsView.isValid()); - BSLS_ASSERT_OPT( - optionsView.find(bmqp::OptionType::e_SUB_QUEUE_INFOS) != - optionsView.end()); - bmqp::Protocol::SubQueueInfosArray subQueueInfos(s_allocator_p); - rc = optionsView.loadSubQueueInfosOption(&subQueueInfos); - BSLS_ASSERT_OPT(rc == 0); - BSLS_ASSERT_OPT(subQueueInfos.size() == 1); - - ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]); + // Verify that 'eventInfo' contains the queueId (queueId, subQueueId) + // pair corresponding to this message - // Verify that 'eventInfo' contains the queueId (queueId, subQueueId) - // pair corresponding to this message - const int qId = D.d_qid; - const unsigned int subcriptionId = D.d_subQueueInfos[idx].id(); + ASSERT_EQ(eventInfos[1].d_ids[0].d_subscriptionId, + data[count - 1].d_subQueueInfos.back().id()); + ASSERT_EQ(eventInfos[1].d_ids[0].d_header.queueId(), count - 1); - ASSERT(find(eventInfos[1], qId, subcriptionId)); - } + // No more messages + rc = msgIterator.next(); } static void test3_flattenWithMessageProperties() diff --git a/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp b/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp index cbc818c82..d59da1c23 100644 --- a/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_optionutil.t.cpp @@ -306,9 +306,6 @@ static void test2_basicOptionsBoxCanAdd() const int maxPayload = k_MAX_SIZE - headerSize; const LimitT limit = maxCanBeAdded(contentSize, maxPayload); ASSERT_EQ(Result::e_OPTION_TOO_BIG, limit.second); - const int sizeLeft = k_MAX_SIZE_SOFT - contentSize; - const int expected = sizeLeft / k_MAX_SIZE; - ASSERT_EQ(expected, limit.first); } } } diff --git a/src/groups/bmq/bmqp/bmqp_protocol.cpp b/src/groups/bmq/bmqp/bmqp_protocol.cpp index d6459bd89..50c6cd7ca 100644 --- a/src/groups/bmq/bmqp/bmqp_protocol.cpp +++ b/src/groups/bmq/bmqp/bmqp_protocol.cpp @@ -391,6 +391,7 @@ const int MessagePropertyHeader::k_PROP_NAME_LEN_MASK = bdlb::BitMaskUtil::one( const int PutHeader::k_MAX_OPTIONS_SIZE; const int PutHeader::k_MAX_PAYLOAD_SIZE_SOFT; +const int PutHeader::k_MAX_SIZE_SOFT; // Force variable/symbol definition so that it can be used in other files const int PutHeader::k_FLAGS_MASK = bdlb::BitMaskUtil::one( diff --git a/src/groups/bmq/bmqp/bmqp_protocol.h b/src/groups/bmq/bmqp/bmqp_protocol.h index ed7d6b76b..154f021f4 100644 --- a/src/groups/bmq/bmqp/bmqp_protocol.h +++ b/src/groups/bmq/bmqp/bmqp_protocol.h @@ -818,7 +818,7 @@ struct EventHeader { /// having the maximum allowable value is processed through the entire /// BlazingMQ pipeline w/o any issues. Also see notes in /// `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT` constant. - static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024; + static const int k_MAX_SIZE_SOFT = 512 * 1024 * 1024; /// Highest possible value for the type of an event. static const int k_MAX_TYPE = (1 << k_TYPE_NUM_BITS) - 1; @@ -1483,6 +1483,7 @@ struct PutHeader { /// be increased but not up to `k_MAX_SIZE`. static const int k_MAX_PAYLOAD_SIZE_SOFT = 64 * 1024 * 1024; + static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024; /// Maximum size (bytes) of the options area. static const int k_MAX_OPTIONS_SIZE = ((1 << k_OPTIONS_WORDS_NUM_BITS) - 1) * diff --git a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp index cbf7cbc81..3d7e9f456 100644 --- a/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp +++ b/src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp @@ -1077,7 +1077,20 @@ static void test8_buildEventTooBig() bdlbb::Blob validPayload2(&bufferFactory, s_allocator_p); bdlbb::BlobUtil::append(&validPayload2, s.c_str(), validLen); - rc = peb.packMessage(validPayload2, + int count = 1; + while ((evtSize + sizeof(bmqp::PushHeader) + validLen) < + bmqp::EventHeader::k_MAX_SIZE_SOFT) { + rc = peb.packMessage(validPayload2, + queueId, + guid, + flags, + bmqt::CompressionAlgorithmType::e_NONE); + ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS); + evtSize += sizeof(bmqp::PushHeader) + validLen; + ++count; + } + evtSize = peb.eventSize(); // not calculating padding + rc = peb.packMessage(validPayload2, queueId, guid, flags, @@ -1085,7 +1098,7 @@ static void test8_buildEventTooBig() ASSERT_EQ(rc, bmqt::EventBuilderResult::e_EVENT_TOO_BIG); ASSERT_EQ(evtSize, peb.eventSize()); - ASSERT_EQ(1, peb.messageCount()); + ASSERT_EQ(count, peb.messageCount()); } static void testN1_decodeFromFile() diff --git a/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp index 097fab953..3d6da5b4c 100644 --- a/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp @@ -84,7 +84,7 @@ PutEventBuilder::packMessageInternal(const bdlbb::Blob& appData, int queueId) numPaddingBytes; if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(sizeNoOptions > - EventHeader::k_MAX_SIZE_SOFT)) { + PutHeader::k_MAX_SIZE_SOFT)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; return Result::e_EVENT_TOO_BIG; // RETURN } diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index e4137642c..c8849afe3 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -141,7 +141,10 @@ Application::Application(bdlmt::EventScheduler* scheduler, 1, bsls::TimeInterval(120).totalMilliseconds(), allocator) -, d_bufferFactory(k_BLOBBUFFER_SIZE, d_allocators.get("BufferFactory")) +, d_bufferFactory(k_BLOBBUFFER_SIZE, + bsls::BlockGrowth::BSLS_CONSTANT, + d_allocators.get("BufferFactory")) + , d_blobSpPool(bdlf::BindUtil::bind(&createBlob, &d_bufferFactory, bdlf::PlaceHolders::_1, // arena diff --git a/src/groups/mqb/mqba/mqba_dispatcher.cpp b/src/groups/mqb/mqba/mqba_dispatcher.cpp index fffa8faab..0826c5446 100644 --- a/src/groups/mqb/mqba/mqba_dispatcher.cpp +++ b/src/groups/mqb/mqba/mqba_dispatcher.cpp @@ -332,7 +332,7 @@ Dispatcher::ProcessorPool::Queue* Dispatcher::queueCreator( bsl::string queueName(os.str().data(), os.str().length()); ProcessorPool::Queue* queue = new (*allocator) - ProcessorPool::Queue(config.queueSizeLowWatermark(), allocator); + ProcessorPool::Queue(config.queueSize(), allocator); queue->setWatermarks(config.queueSizeLowWatermark(), config.queueSizeHighWatermark()); diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 31ea838f6..5e9968869 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -410,6 +410,16 @@ void Cluster::sendAck(bmqt::AckResult::Enum status, 1); } } + else if (!isSelfGenerated) { + MWCU_THROTTLEDACTION_THROTTLE( + d_throttledFailedAckMessages, + BALL_LOG_WARN + << description() + << ": ACK message for queue with unknown queueId [" + << queueId << ", guid: " << messageGUID << ", for node: " + << nodeSession->clusterNode()->nodeDescription();); + return; // RETURN + } // Throttle error log if this is a 'failed Ack': note that we log at // INFO level in order not to overwhelm the dashboard, if a queue is @@ -1267,19 +1277,6 @@ void Cluster::onAckEvent(const mqbi::DispatcherEvent& event) return; // RETURN } - QueueHandleMap& queueHandles = ns->queueHandles(); - QueueHandleMapIter queueIt = queueHandles.find(ackMessage.queueId()); - if (queueIt == queueHandles.end()) { - MWCU_THROTTLEDACTION_THROTTLE( - d_throttledFailedAckMessages, - BALL_LOG_WARN << description() - << ": ACK message for queue with unknown queueId [" - << ackMessage.queueId() << ", guid: " - << ackMessage.messageGUID() << ", for node: " - << realEvent->clusterNode()->nodeDescription();); - return; // RETURN - } - sendAck(bmqp::ProtocolUtil::ackResultFromCode(ackMessage.status()), ackMessage.correlationId(), ackMessage.messageGUID(), @@ -1402,9 +1399,6 @@ void Cluster::onConfirmEvent(const mqbi::DispatcherEvent& event) int msgNum = 0; int rc = 0; - bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p); - mwcu::MemOutStream errorStream(&localAllocator); - while ((rc = confIt.next() == 1)) { const int id = confIt.message().queueId(); const unsigned int subId = static_cast( @@ -1412,12 +1406,11 @@ void Cluster::onConfirmEvent(const mqbi::DispatcherEvent& event) const bmqp::QueueId queueId(id, subId); mqbi::QueueHandle* queueHandle = 0; - bool isValid = validateMessage(&queueHandle, - &errorStream, - queueId, - ns, - bmqp::EventType::e_CONFIRM); - if (isValid) { + ValidationResult result = validateMessage(&queueHandle, + queueId, + ns, + bmqp::EventType::e_CONFIRM); + if (result == k_SUCCESS) { BSLS_ASSERT_SAFE(queueHandle); BALL_LOG_TRACE << description() << ": CONFIRM " @@ -1433,14 +1426,12 @@ void Cluster::onConfirmEvent(const mqbi::DispatcherEvent& event) MWCU_THROTTLEDACTION_THROTTLE( d_throttledFailedRejectMessages, MWCTSK_ALARMLOG_ALARM("CLUSTER") - << description() << ": CONFIRM " << errorStream.str() - << " [queue: '" + << ": CONFIRM " << validationResult(result) << " [queue: '" << (queueHandle ? queueHandle->queue()->uri() : "") << "', queueId: " << queueId << ", GUID: " << confIt.message().messageGUID() << "] from " << source->nodeDescription() << MWCTSK_ALARMLOG_END;); - errorStream.reset(); } } @@ -1504,9 +1495,6 @@ void Cluster::onRejectEvent(const mqbi::DispatcherEvent& event) int msgNum = 0; int rc = 0; - bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p); - mwcu::MemOutStream errorStream(&localAllocator); - while ((rc = rejectIt.next() == 1)) { const int id = rejectIt.message().queueId(); const unsigned int subId = static_cast( @@ -1514,12 +1502,11 @@ void Cluster::onRejectEvent(const mqbi::DispatcherEvent& event) const bmqp::QueueId queueId(id, subId); mqbi::QueueHandle* queueHandle = 0; - bool isValid = validateMessage(&queueHandle, - &errorStream, - queueId, - ns, - bmqp::EventType::e_REJECT); - if (isValid) { + ValidationResult result = validateMessage(&queueHandle, + queueId, + ns, + bmqp::EventType::e_REJECT); + if (result == k_SUCCESS) { BSLS_ASSERT_SAFE(queueHandle); BALL_LOG_TRACE << description() << ": REJECT " @@ -1535,14 +1522,12 @@ void Cluster::onRejectEvent(const mqbi::DispatcherEvent& event) MWCU_THROTTLEDACTION_THROTTLE( d_throttledFailedRejectMessages, MWCTSK_ALARMLOG_ALARM("CLUSTER") - << description() << ": REJECT " << errorStream.str() - << " [queue: '" + << ": REJECT " << validationResult(result) << " [queue: '" << (queueHandle ? queueHandle->queue()->uri() : "") << "', queueId: " << queueId << ", GUID: " << rejectIt.message().messageGUID() << "] from " << source->nodeDescription() << MWCTSK_ALARMLOG_END;); - errorStream.reset(); } } @@ -1559,11 +1544,11 @@ void Cluster::onRejectEvent(const mqbi::DispatcherEvent& event) } } -bool Cluster::validateMessage(mqbi::QueueHandle** queueHandle, - bsl::ostream* errorStream, - const bmqp::QueueId& queueId, - mqbc::ClusterNodeSession* ns, - bmqp::EventType::Enum eventType) +Cluster::ValidationResult +Cluster::validateMessage(mqbi::QueueHandle** queueHandle, + const bmqp::QueueId& queueId, + mqbc::ClusterNodeSession* ns, + bmqp::EventType::Enum eventType) { // PRECONDITIONS BSLS_ASSERT_SAFE((eventType == bmqp::EventType::e_CONFIRM || @@ -1577,9 +1562,7 @@ bool Cluster::validateMessage(mqbi::QueueHandle** queueHandle, if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(queueIt == queueHandles.end())) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - *errorStream << "message for unknown queue"; - - return false; // RETURN + return k_UNKNOWN_QUEUE; // RETURN } const QueueState& queueState = queueIt->second; @@ -1592,19 +1575,14 @@ bool Cluster::validateMessage(mqbi::QueueHandle** queueHandle, subQueueIt == queueState.d_subQueueInfosMap.end())) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - *errorStream << "message for unknown queue"; - - return false; // RETURN + return k_UNKNOWN_QUEUE; // RETURN } if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( queueState.d_isFinalCloseQueueReceived)) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - *errorStream << "message for which final closeQueue was already " - "received"; - - return false; // RETURN + return k_FINAL; // RETURN } if (eventType == bmqp::EventType::e_CONFIRM) { @@ -1614,7 +1592,7 @@ bool Cluster::validateMessage(mqbi::QueueHandle** queueHandle, 1); } - return true; + return k_SUCCESS; } void Cluster::onRelayRejectEvent(const mqbi::DispatcherEvent& event) @@ -3288,8 +3266,7 @@ void Cluster::processEvent(const bmqp::Event& event, } break; case bmqp::EventType::e_REPLICATION_RECEIPT: { // Receipt event arrives from replication nodes to primary. - DISPATCH_EVENT(mqbi::DispatcherEventType::e_REPLICATION_RECEIPT, - false); + d_storageManager_mp->processReceiptEvent(event, source); } break; // BREAK case bmqp::EventType::e_UNDEFINED: default: { @@ -3383,10 +3360,7 @@ void Cluster::onDispatcherEvent(const mqbi::DispatcherEvent& event) onPushEvent(event); } } break; // BREAK - case mqbi::DispatcherEventType::e_REPLICATION_RECEIPT: { - const mqbi::DispatcherReceiptEvent* realEvent = event.asReceiptEvent(); - d_storageManager_mp->processReceiptEvent(*realEvent); - } break; + case mqbi::DispatcherEventType::e_REPLICATION_RECEIPT: case mqbi::DispatcherEventType::e_CONTROL_MSG: case mqbi::DispatcherEventType::e_DISPATCHER: case mqbi::DispatcherEventType::e_UNDEFINED: diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index afa024dc3..83d3a85bd 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -217,6 +217,8 @@ class Cluster : public mqbi::Cluster, const StopRequestManagerType::RequestContextSp& contextSp)> StopRequestCompletionCallback; + enum ValidationResult { k_SUCCESS = 0, k_UNKNOWN_QUEUE, k_FINAL }; + private: // DATA bslma::Allocator* d_allocator_p; @@ -419,11 +421,10 @@ class Cluster : public mqbi::Cluster, void onRelayPushEvent(const mqbi::DispatcherEvent& event); - bool validateMessage(mqbi::QueueHandle** queueHandle, - bsl::ostream* errorStream, - const bmqp::QueueId& queueId, - mqbc::ClusterNodeSession* ns, - bmqp::EventType::Enum eventType); + ValidationResult validateMessage(mqbi::QueueHandle** queueHandle, + const bmqp::QueueId& queueId, + mqbc::ClusterNodeSession* ns, + bmqp::EventType::Enum eventType); // Validate a message of the specified 'eventType' using the specified // 'queueId' and 'ns'. Return true if the message is valid and false // otherwise. Populate the specified 'queueHandle' if the queue is found @@ -491,6 +492,8 @@ class Cluster : public mqbi::Cluster, /// Execute `initiateShutdown` followed by `stop` and SIGINT void terminate(mqbu::ExitCode::Enum reason); + static const char* validationResult(const ValidationResult& result); + public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(Cluster, bslma::UsesBslmaAllocator) @@ -789,6 +792,17 @@ class Cluster : public mqbi::Cluster, // ------------- // class Cluster // ------------- +// static +inline const char* Cluster::validationResult(const ValidationResult& result) +{ + switch (result) { + case k_SUCCESS: return "SUCCESS"; + case k_UNKNOWN_QUEUE: return "message for unknown queue"; + case k_FINAL: + return "message for which final closeQueue was already received"; + default: return "UNKNOWN"; + } +} inline Cluster::RequestManagerType& Cluster::requestManager() { diff --git a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp index 9c7d21c3a..243eb26b1 100644 --- a/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_localqueue.cpp @@ -278,7 +278,7 @@ void LocalQueue::configureHandle( // to it. We need to make sure that storage/replication is in sync, and // thus, we force-flush the file store. - d_state_p->storage()->dispatcherFlush(true, false); + d_state_p->storage()->dispatcherFlush(false); // Attempt to deliver all data in the storage. Otherwise, broadcast // can get dropped if the incoming configure request removes consumers. @@ -300,7 +300,7 @@ void LocalQueue::releaseHandle( BSLS_ASSERT_SAFE(d_state_p->queue()->dispatcher()->inDispatcherThread( d_state_p->queue())); - d_state_p->storage()->dispatcherFlush(true, false); + d_state_p->storage()->dispatcherFlush(false); d_queueEngine_mp->releaseHandle(handle, handleParameters, @@ -369,12 +369,17 @@ void LocalQueue::flush() // until it gets rolled back. If 'flush' gets called in between, the queue // may have no storage. if (d_state_p->storage()) { - d_state_p->storage()->dispatcherFlush(true, false); + d_state_p->storage()->dispatcherFlush(true); // See notes in 'FileStore::dispatcherFlush' for motivation behind // this flush. - } - deliverIfNeeded(); + deliverIfNeeded(); + // REVISIT: 'dispatcherFlush' calls 'onReplicatedBatch' which calls + // 'deliverIfNeeded' but only for file-based storage. Call it + // again to cover all types of storage. + // 'Storage::dispatcherFlush' / 'DataSore::dispatcherFlush' need + // refactoring. + } } void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, @@ -509,10 +514,6 @@ void LocalQueue::postMessage(const bmqp::PutHeader& putHeader, 1); } } - - // If 'FileStore::d_storageEventBuilder' is flushed, flush all relevant - // queues (call 'afterNewMessage' to deliver accumulated data) - d_state_p->storage()->dispatcherFlush(false, true); } void LocalQueue::onPushMessage( diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index 69102753d..2a40a1e1d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -1366,12 +1366,14 @@ void RemoteQueue::expirePendingMessagesDispatched() } if (numExpired) { - BALL_LOG_INFO << d_state_p->uri() << ": expired " - << mwcu::PrintUtil::prettyNumber(numExpired) - << " pending PUSH messages (" - << mwcu::PrintUtil::prettyNumber(numMessages - - numExpired) - << " remaining messages)."; + if (d_throttledFailedPutMessages.requestPermission()) { + BALL_LOG_INFO << d_state_p->uri() << ": expired " + << mwcu::PrintUtil::prettyNumber(numExpired) + << " pending PUT messages (" + << mwcu::PrintUtil::prettyNumber(numMessages - + numExpired) + << " remaining messages)."; + } } // reschedule diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index b1afee487..ed30e7b1b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -1126,7 +1126,7 @@ void RootQueueEngine::onHandleUsable(mqbi::QueueHandle* handle, } // Before attempting to deliver any messages, flush the storage. - d_queueState_p->queue()->storage()->dispatcherFlush(true, false); + d_queueState_p->queue()->storage()->dispatcherFlush(false); unsigned int upstreamSubQueueId = 0; if (d_queueState_p->routingContext().onUsable(&upstreamSubQueueId, diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 836bcabc2..de0c89a95 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -2732,13 +2732,10 @@ void StorageManager::processRecoveryEvent( source)); } -void StorageManager::processReceiptEvent( - const mqbi::DispatcherReceiptEvent& event) +void StorageManager::processReceiptEvent(const bmqp::Event& event, + mqbnet::ClusterNode* source) { - // executed by *CLUSTER DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p)); + // executed by *IO* thread mwcu::BlobPosition position; const int rc = mwcu::BlobUtil::findOffsetSafe(&position, @@ -2747,7 +2744,7 @@ void StorageManager::processReceiptEvent( BSLS_ASSERT_SAFE(rc == 0); mwcu::BlobObjectProxy receipt( - event.blob().get(), + event.blob(), position, true, // read mode false); // no write @@ -2766,7 +2763,7 @@ void StorageManager::processReceiptEvent( fs, receipt->primaryLeaseId(), receipt->sequenceNum(), - event.clusterNode())); + source)); } void StorageManager::processPrimaryStatusAdvisory( diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h index cdc2efb3f..a008f374a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.h +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.h @@ -719,9 +719,10 @@ class StorageManager : public mqbi::StorageManager { virtual void processRecoveryEvent( const mqbi::DispatcherRecoveryEvent& event) BSLS_KEYWORD_OVERRIDE; - /// Executed in cluster dispatcher thread. - virtual void processReceiptEvent(const mqbi::DispatcherReceiptEvent& event) - BSLS_KEYWORD_OVERRIDE; + /// Executed in IO thread. + virtual void + processReceiptEvent(const bmqp::Event& event, + mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; /// Executed by any thread. virtual void processPrimaryStatusAdvisory( diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp index 551d078f6..f63238789 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.cpp +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.cpp @@ -3474,13 +3474,10 @@ void StorageManager::processRecoveryEvent( "This method can only be invoked in non-CSL mode"); } -void StorageManager::processReceiptEvent( - const mqbi::DispatcherReceiptEvent& event) +void StorageManager::processReceiptEvent(const bmqp::Event& event, + mqbnet::ClusterNode* source) { - // executed by *CLUSTER DISPATCHER* thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE(d_dispatcher_p->inDispatcherThread(d_cluster_p)); + // executed by *IO* thread mwcu::BlobPosition position; const int rc = mwcu::BlobUtil::findOffsetSafe(&position, @@ -3489,7 +3486,7 @@ void StorageManager::processReceiptEvent( BSLS_ASSERT_SAFE(rc == 0); mwcu::BlobObjectProxy receipt( - event.blob().get(), + event.blob(), position, true, // read mode false); // no write @@ -3508,7 +3505,7 @@ void StorageManager::processReceiptEvent( fs, receipt->primaryLeaseId(), receipt->sequenceNum(), - event.clusterNode())); + source)); } void StorageManager::processPrimaryStatusAdvisory( diff --git a/src/groups/mqb/mqbc/mqbc_storagemanager.h b/src/groups/mqb/mqbc/mqbc_storagemanager.h index 6b69e625a..3a5b4ad69 100644 --- a/src/groups/mqb/mqbc/mqbc_storagemanager.h +++ b/src/groups/mqb/mqbc/mqbc_storagemanager.h @@ -853,9 +853,10 @@ class StorageManager virtual void processRecoveryEvent( const mqbi::DispatcherRecoveryEvent& event) BSLS_KEYWORD_OVERRIDE; - /// Executed in cluster dispatcher thread. - virtual void processReceiptEvent(const mqbi::DispatcherReceiptEvent& event) - BSLS_KEYWORD_OVERRIDE; + /// Executed in IO thread. + virtual void + processReceiptEvent(const bmqp::Event& event, + mqbnet::ClusterNode* source) BSLS_KEYWORD_OVERRIDE; /// Executed by any thread. virtual void processPrimaryStatusAdvisory( diff --git a/src/groups/mqb/mqbc/mqbc_storageutil.cpp b/src/groups/mqb/mqbc/mqbc_storageutil.cpp index 604784e72..136134aca 100644 --- a/src/groups/mqb/mqbc/mqbc_storageutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_storageutil.cpp @@ -233,7 +233,7 @@ void StorageUtil::registerQueueDispatched( // partition, we want to make sure that queue creation record written to // the partition above is sent to the replicas as soon as possible. - fs->dispatcherFlush(true, false); + fs->flushStorageBuilder(false); BALL_LOG_INFO << clusterDescription << ": PartitionId [" << partitionId << "] registered [" << storage->queueUri() << "], queueKey [" @@ -410,7 +410,7 @@ int StorageUtil::updateQueueRaw(mqbs::ReplicatedStorage* storage, // Flush the partition for records written above to reach replicas right // away. - fs->dispatcherFlush(true, false); + fs->flushStorageBuilder(false); mwcu::Printer printer1(&addedIdKeyPairs); mwcu::Printer printer2(&removedIdKeyPairs); @@ -1103,31 +1103,6 @@ bool StorageUtil::validateStorageEvent(const bmqp::Event& event, return false; // RETURN } - bmqp::StorageMessageIterator iter; - event.loadStorageMessageIterator(&iter); - BSLS_ASSERT_SAFE(iter.isValid()); - while (iter.next() == 1) { - const bmqp::StorageHeader& header = iter.header(); - if (static_cast(partitionId) != header.partitionId()) { - // A storage event is sent by 'source' cluster node. The node may - // be primary for one or more partitions, but as per the BMQ - // replication design, *all* messages in this event will belong to - // the *same* partition. Any exception to this is a bug in the - // implementation of replication, and thus, if it occurs, we reject - // the *entire* storage event. - - MWCTSK_ALARMLOG_ALARM("STORAGE") - << clusterData.identity().description() << ": Received storage" - << " event from node " << source->nodeDescription() << " with" - << " different PartitionId: [" << partitionId << "] vs [" - << header.partitionId() << "]" - << ". Ignoring entire storage event." << MWCTSK_ALARMLOG_END; - return false; // RETURN - } - - // NOTE: (leaseId, seqNum) will be checked later. - } - return true; } @@ -1897,7 +1872,7 @@ void StorageUtil::unregisterQueueDispatched( // that the partition is flushed and the QueueDeletion record reaches // replicas. - fs->dispatcherFlush(true, false); + fs->flushStorageBuilder(false); } int StorageUtil::updateQueue(StorageSpMap* storageMap, diff --git a/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp b/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp index 65297b68b..3a149d794 100644 --- a/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp +++ b/src/groups/mqb/mqbcmd/mqbcmd_humanprinter.cpp @@ -463,8 +463,7 @@ void printFileStoreSummary(bsl::ostream& os, << "Num unreceipted messages: " << prettyNumber(static_cast( summary.numUnreceiptedMessages())) - << newlineAndIndent(level + 1, spacesPerLevel) - << "Current Nagle count: " + << newlineAndIndent(level + 1, spacesPerLevel) << "Current Nagle size: " << prettyNumber( static_cast(summary.naglePacketCount())) << '\n' diff --git a/src/groups/mqb/mqbi/mqbi_storage.h b/src/groups/mqb/mqbi/mqbi_storage.h index c8b37de75..7296be39f 100644 --- a/src/groups/mqb/mqbi/mqbi_storage.h +++ b/src/groups/mqb/mqbi/mqbi_storage.h @@ -449,11 +449,11 @@ class Storage { /// * **e_APPKEY_NOT_FOUND** : Invalid appKey specified virtual StorageResult::Enum removeAll(const mqbu::StorageKey& appKey) = 0; - /// If the specified `storage` is `true`, flush any buffered replication - /// messages to the peers. If the specified `queues` is `true`, `flush` - /// all associated queues. Behavior is undefined unless this node is - /// the primary for this partition. - virtual void dispatcherFlush(bool storage, bool queues) = 0; + /// Flush any buffered replication messages to the peers using the + /// specified 'isQueueIdle' to adjust the size of storage builder batch. + /// Behavior is undefined unless this node is the primary for this + /// partition. + virtual void dispatcherFlush(bool isQueueIdle) = 0; /// Return the resource capacity meter associated to this storage. virtual mqbu::CapacityMeter* capacityMeter() = 0; diff --git a/src/groups/mqb/mqbi/mqbi_storagemanager.h b/src/groups/mqb/mqbi/mqbi_storagemanager.h index e9ef7b9ae..040b3f0ce 100644 --- a/src/groups/mqb/mqbi/mqbi_storagemanager.h +++ b/src/groups/mqb/mqbi/mqbi_storagemanager.h @@ -38,6 +38,7 @@ #include // BMQ +#include #include // BDE @@ -363,9 +364,9 @@ class StorageManager : public mqbi::AppKeyGenerator { virtual void processRecoveryEvent(const mqbi::DispatcherRecoveryEvent& event) = 0; - /// Executed in cluster dispatcher thread. - virtual void - processReceiptEvent(const mqbi::DispatcherReceiptEvent& event) = 0; + /// Executed in IO thread. + virtual void processReceiptEvent(const bmqp::Event& event, + mqbnet::ClusterNode* source) = 0; /// Executed by any thread. virtual void processPrimaryStatusAdvisory( diff --git a/src/groups/mqb/mqbnet/mqbnet_channel.cpp b/src/groups/mqb/mqbnet/mqbnet_channel.cpp index ec489f7c7..d2f45f511 100644 --- a/src/groups/mqb/mqbnet/mqbnet_channel.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_channel.cpp @@ -86,7 +86,7 @@ Channel::Channel(bdlbb::BlobBufferFactory* blobBufferFactory, bslmt::ThreadAttributes attr = mwcsys::ThreadUtil::defaultAttributes(); bsl::string threadName("bmqNet-"); attr.setThreadName(threadName + d_name); - d_buffer.setWatermarks(500, 1000, 5000); + d_buffer.setWatermarks(50000, 100000, 500000); d_buffer.setStateCallback( bdlf::MemFnUtil::memFn(&Channel::onBufferStateChange, this)); int rc = bslmt::ThreadUtil::createWithAllocator( diff --git a/src/groups/mqb/mqbnet/mqbnet_transportmanager.cpp b/src/groups/mqb/mqbnet/mqbnet_transportmanager.cpp index c7b53a5d8..760951b0a 100644 --- a/src/groups/mqb/mqbnet/mqbnet_transportmanager.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_transportmanager.cpp @@ -353,7 +353,7 @@ TransportManager::TransportManager(bdlmt::EventScheduler* scheduler, , d_state(e_STOPPED) , d_scheduler_p(scheduler) , d_blobBufferFactory_p(blobBufferFactory) -, d_itemPool(Channel::k_ITEM_SIZE, allocator) +, d_itemPool(Channel::k_ITEM_SIZE, bsls::BlockGrowth::BSLS_CONSTANT, allocator) , d_negotiator_mp(negotiator) , d_statController_p(statController) , d_tcpSessionFactory_mp(0) diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 013fbc9f4..beb1fc972 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -684,11 +684,11 @@ class DataStore : public mqbi::DispatcherClient { /// Clear the current primary associated with this partition. virtual void clearPrimary() = 0; - /// If the specified `storage` is `true`, flush any buffered replication - /// messages to the peers. If the specified `queues` is `true`, `flush` - /// all associated queues. Behavior is undefined unless this node is - /// the primary for this partition. - virtual void dispatcherFlush(bool storage, bool queues) = 0; + /// Flush any buffered replication messages to the peers using the + /// specified 'isQueueIdle' to adjust the size of storage builder batch. + /// Behavior is undefined unless this node is the primary for this + /// partition. + virtual void dispatcherFlush(bool isQueueIdle) = 0; // ACCESSORS diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp index d5d3bcd3a..bd8322429 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp @@ -538,7 +538,7 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) if (appKey.isNull()) { purgeCommon(appKey); // or 'mqbu::StorageKey::k_NULL_KEY' - dispatcherFlush(true, false); + dispatcherFlush(false); d_isEmpty.storeRelaxed(1); return mqbi::StorageResult::e_SUCCESS; // RETURN } @@ -634,7 +634,7 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) } purgeCommon(appKey); - dispatcherFlush(true, false); + dispatcherFlush(false); if (d_handles.empty()) { d_isEmpty.storeRelaxed(1); @@ -643,9 +643,9 @@ FileBackedStorage::removeAll(const mqbu::StorageKey& appKey) return mqbi::StorageResult::e_SUCCESS; } -void FileBackedStorage::dispatcherFlush(bool storage, bool queues) +void FileBackedStorage::dispatcherFlush(bool isQueueIdle) { - d_store_p->dispatcherFlush(storage, queues); + d_store_p->dispatcherFlush(isQueueIdle); } int FileBackedStorage::gcExpiredMessages( diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index b54382091..3e2ca95ca 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -434,12 +434,11 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { virtual mqbi::StorageResult::Enum removeAll(const mqbu::StorageKey& appKey) BSLS_KEYWORD_OVERRIDE; - /// If the specified `storage` is `true`, flush any buffered replication - /// messages to the peers. If the specified `queues` is `true`, `flush` - /// all associated queues. Behavior is undefined unless this node is - /// the primary for this partition. - virtual void dispatcherFlush(bool storage, - bool queues) BSLS_KEYWORD_OVERRIDE; + /// Flush any buffered replication messages to the peers using the + /// specified 'isQueueIdle' to adjust the size of storage builder batch. + /// Behavior is undefined unless this node is the primary for this + /// partition. + virtual void dispatcherFlush(bool isQueueIdle) BSLS_KEYWORD_OVERRIDE; /// Attempt to garbage-collect messages for which TTL has expired, and /// return the number of messages garbage-collected. Populate the diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index be553bbcd..29c1b1cd9 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -102,7 +102,26 @@ const bsls::Types::Uint64 k_SPACE_USED_PERCENT_SOFT = 40; /// partition. const double k_PARTITION_AVAILABLESPACE_SECS = 20; -const int k_NAGLE_PACKET_COUNT = 100; +const int k_NAGLE_PACKET_SIZE_MIN = 512 * 1024; +const int k_NAGLE_PACKET_SIZE_MAX = bmqp::EventHeader::k_MAX_SIZE_SOFT - 1024; +const int k_NAGLE_PACKET_SIZE_INIT = 1024 * 1024; +const int k_NAGLE_PACKET_SIZE_STEP = 1024; + +// Flushing 'd_storageEventBuilder' is triggered by: +// 1. Queue control events +// 2. Queue idle time +// 3. Builder reaching the limit +// +// The latter is not constant. Smaller value is better for lower rates but +// does not work for higher rates because flushing results in +// a. Flushing all previously accumulated PUSH data +// b. Making IO call +// c. incoming Receipts (Replicas generate one receipt per event) +// d. Generating and sending PUSH data! +// The way to handle high rate is to batch more (grow builder). +// For this reason, the limit varies between 'k_NAGLE_PACKET_SIZE_MIN' and +// 'k_NAGLE_PACKET_SIZE_MAX'. See 'FileStore::dispatcherFlush' for the +// explanation how the limit ('d_nagglePacketSize') gets adjusted. const int k_KEY_LEN = FileStoreProtocol::k_KEY_LENGTH; @@ -3916,7 +3935,7 @@ void FileStore::processReceiptEvent(unsigned int primaryLeaseId, affectedQueues.begin(); it != affectedQueues.end(); ++it) { - (*it)->queueEngine()->afterNewMessage(bmqt::MessageGUID(), 0); + (*it)->onReplicatedBatch(); } } @@ -4840,12 +4859,23 @@ void FileStore::replicateRecord(bmqp::StorageMessageType::Enum type, journalRecordBufferSp, FileStoreProtocol::k_JOURNAL_RECORD_SIZE); - bmqt::EventBuilderResult::Enum buildRc = d_storageEventBuilder.packMessage( - type, - d_config.partitionId(), - 0, // flags - journalOffsetWords, - journalRecordBlobBuffer); + bmqt::EventBuilderResult::Enum buildRc; + bool doRetry = false; + do { + buildRc = d_storageEventBuilder.packMessage(type, + d_config.partitionId(), + 0, // flags + journalOffsetWords, + journalRecordBlobBuffer); + if (buildRc == bmqt::EventBuilderResult::e_EVENT_TOO_BIG && !doRetry) { + flushIfNeeded(true); + + doRetry = true; + } + else { + doRetry = false; + } + } while (doRetry); if (bmqt::EventBuilderResult::e_SUCCESS != buildRc) { MWCTSK_ALARMLOG_ALARM("REPLICATION") @@ -4909,35 +4939,47 @@ void FileStore::replicateRecord(bmqp::StorageMessageType::Enum type, mfd.mapping() + dataOffset); dataBlobBuffer.reset(dataBufferSp, totalDataLen); } - - if (bmqp::StorageMessageType::e_DATA == type) { - buildRc = d_storageEventBuilder.packMessage( - bmqp::StorageMessageType::e_DATA, - d_config.partitionId(), - flags, - journalOffsetWords, - journalRecordBlobBuffer, - dataBlobBuffer); - } - else { - if (d_isFSMWorkflow) { + bool flushAndRetry = false; + do { + if (bmqp::StorageMessageType::e_DATA == type) { buildRc = d_storageEventBuilder.packMessage( - bmqp::StorageMessageType::e_QLIST, - d_config.partitionId(), - flags, - journalOffsetWords, - journalRecordBlobBuffer); - } - else { - buildRc = d_storageEventBuilder.packMessage( - bmqp::StorageMessageType::e_QLIST, + bmqp::StorageMessageType::e_DATA, d_config.partitionId(), flags, journalOffsetWords, journalRecordBlobBuffer, dataBlobBuffer); } - } + else { + if (d_isFSMWorkflow) { + buildRc = d_storageEventBuilder.packMessage( + bmqp::StorageMessageType::e_QLIST, + d_config.partitionId(), + flags, + journalOffsetWords, + journalRecordBlobBuffer); + } + else { + buildRc = d_storageEventBuilder.packMessage( + bmqp::StorageMessageType::e_QLIST, + d_config.partitionId(), + flags, + journalOffsetWords, + journalRecordBlobBuffer, + dataBlobBuffer); + } + } + if (buildRc == bmqt::EventBuilderResult::e_EVENT_TOO_BIG && + !flushAndRetry) { + flushIfNeeded(true); + + flushAndRetry = true; + } + else { + flushAndRetry = false; + } + + } while (flushAndRetry); if (bmqt::EventBuilderResult::e_SUCCESS != buildRc) { MWCTSK_ALARMLOG_ALARM("REPLICATION") @@ -5090,8 +5132,10 @@ void FileStore::aliasMessage(bsl::shared_ptr* appData, void FileStore::flushIfNeeded(bool immediateFlush) { if (immediateFlush || - d_storageEventBuilder.messageCount() >= d_nagglePacketCount) { - dispatcherFlush(true, false); + (d_storageEventBuilder.eventSize() >= d_naglePacketSize)) { + flushStorageBuilder(false); + + notifyQueuesOnReplicatedBatch(); } } @@ -5144,7 +5188,8 @@ FileStore::FileStore(const DataStoreConfig& config, , d_isCSLModeEnabled(isCSLModeEnabled) , d_isFSMWorkflow(isFSMWorkflow) , d_ignoreCrc32c(false) -, d_nagglePacketCount(k_NAGLE_PACKET_COUNT) +, d_naglePacketSize(k_NAGLE_PACKET_SIZE_INIT) +, d_step(k_NAGLE_PACKET_SIZE_STEP) { // PRECONDITIONS BSLS_ASSERT(allocator); @@ -6075,8 +6120,30 @@ void FileStore::processStorageEvent(const bsl::shared_ptr& blob, rawEvent.loadStorageMessageIterator(&iter); BSLS_ASSERT_SAFE(iter.isValid()); - while (1 == iter.next()) { - const bmqp::StorageHeader& header = iter.header(); + if (1 != iter.next()) { + return; // RETURN + } + const unsigned int pid = iter.header().partitionId(); + FileStore::NodeContext* nodeContext = 0; + + do { + const bmqp::StorageHeader& header = iter.header(); + if (pid != header.partitionId()) { + // A storage event is sent by 'source' cluster node. The node may + // be primary for one or more partitions, but as per the BMQ + // replication design, *all* messages in this event will belong to + // the *same* partition. Any exception to this is a bug in the + // implementation of replication. + + MWCTSK_ALARMLOG_ALARM("STORAGE") + << partitionDesc() << ": Received storage event from node " + << source->nodeDescription() << " with" + << " different PartitionId: [" << pid << "] vs [" + << header.partitionId() << "]" + << ". Ignoring storage event." << MWCTSK_ALARMLOG_END; + continue; // CONTINUE + } + mwcu::BlobPosition recordPosition; mwcu::BlobObjectProxy recHeader; @@ -6134,9 +6201,10 @@ void FileStore::processStorageEvent(const bsl::shared_ptr& blob, if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(0 == rc)) { if (header.flags() & bmqp::StorageHeaderFlags::e_RECEIPT_REQUESTED) { - issueReceipt(source, - recHeader->primaryLeaseId(), - recHeader->sequenceNumber()); + nodeContext = generateReceipt(nodeContext, + source, + recHeader->primaryLeaseId(), + recHeader->sequenceNumber()); } } } @@ -6187,7 +6255,9 @@ void FileStore::processStorageEvent(const bsl::shared_ptr& blob, << ", rc: " << rc << ". Ignoring this message." << MWCTSK_ALARMLOG_END; } - } // end: while loop + } while (1 == iter.next()); + + sendReceipt(source, nodeContext); } int FileStore::processRecoveryEvent(const bsl::shared_ptr& blob) @@ -6352,34 +6422,40 @@ int FileStore::processRecoveryEvent(const bsl::shared_ptr& blob) return rc_SUCCESS; } -void FileStore::issueReceipt(mqbnet::ClusterNode* node, - unsigned int primaryLeaseId, - bsls::Types::Uint64 sequenceNumber) +FileStore::NodeContext* +FileStore::generateReceipt(NodeContext* nodeContext, + mqbnet::ClusterNode* node, + unsigned int primaryLeaseId, + bsls::Types::Uint64 sequenceNumber) { - const int nodeId = node->nodeId(); - NodeReceiptContexts::iterator itNode = d_nodes.find(nodeId); - const DataStoreRecordKey key(sequenceNumber, primaryLeaseId); - - if (itNode == d_nodes.end()) { - // no prior history about this node - itNode = - d_nodes - .insert(bsl::make_pair( - nodeId, - NodeContext(d_config.bufferFactory(), key, d_allocator_p))) - .first; + const DataStoreRecordKey key(sequenceNumber, primaryLeaseId); + + if (nodeContext == 0) { + const int nodeId = node->nodeId(); + NodeReceiptContexts::iterator itNode = d_nodes.find(nodeId); + + if (itNode == d_nodes.end()) { + // no prior history about this node + itNode = d_nodes + .insert(bsl::make_pair( + nodeId, + NodeContext(d_config.bufferFactory(), + key, + d_allocator_p))) + .first; + } + nodeContext = &itNode->second; } - else if (itNode->second.d_key < key) { - itNode->second.d_key = key; + else if (nodeContext->d_key < key) { + nodeContext->d_key = key; } else { // outdated receipt - return; // RETURN + return nodeContext; // RETURN } - NodeContext& nodeContext = itNode->second; - if (nodeContext.d_state && nodeContext.d_state->tryLock()) { - char* buffer = nodeContext.d_blob.buffer(0).data(); + if (nodeContext->d_state && nodeContext->d_state->tryLock()) { + char* buffer = nodeContext->d_blob.buffer(0).data(); bmqp::ReplicationReceipt* receipt = reinterpret_cast( buffer + sizeof(bmqp::EventHeader)); @@ -6389,28 +6465,39 @@ void FileStore::issueReceipt(mqbnet::ClusterNode* node, .setPrimaryLeaseId(primaryLeaseId) .setSequenceNum(sequenceNumber); - nodeContext.d_state->unlock(); + nodeContext->d_state->unlock(); } else { - bmqp::ProtocolUtil::buildReceipt(&nodeContext.d_blob, + bmqp::ProtocolUtil::buildReceipt(&nodeContext->d_blob, d_config.partitionId(), primaryLeaseId, sequenceNumber); - nodeContext.d_state = d_statePool_p->getObject(); + nodeContext->d_state = d_statePool_p->getObject(); + } - int rc = node->channel().writeBlob( - nodeContext.d_blob, - bmqp::EventType::e_REPLICATION_RECEIPT, - nodeContext.d_state); + return nodeContext; +} - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( - rc != bmqt::GenericResult::e_SUCCESS)) { - BALL_LOG_INFO << partitionDesc() << "Failed to send Receipt for " - << "[ primaryLeaseId: " << primaryLeaseId - << ", sequence number: " << sequenceNumber << "."; +void FileStore::sendReceipt(mqbnet::ClusterNode* node, + NodeContext* nodeContext) +{ + if (nodeContext == 0) { + return; // RETURN + } - // Ignore the error and keep the blob - } + int rc = node->channel().writeBlob(nodeContext->d_blob, + bmqp::EventType::e_REPLICATION_RECEIPT, + nodeContext->d_state); + + if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( + rc != bmqt::GenericResult::e_SUCCESS)) { + BALL_LOG_INFO << partitionDesc() << "Failed to send Receipt for " + << "[ primaryLeaseId: " + << nodeContext->d_key.d_primaryLeaseId + << ", sequence number: " + << nodeContext->d_key.d_sequenceNum << "."; + + // Ignore the error and keep the blob } } @@ -6530,9 +6617,12 @@ void FileStore::setPrimary(mqbnet::ClusterNode* primaryNode, << "primaryLeaseId = " << d_primaryLeaseId << ", d_sequenceNum = " << d_sequenceNum << "]."; - issueReceipt(primaryNode, - d_lastRecoveredStrongConsistency.d_primaryLeaseId, - d_lastRecoveredStrongConsistency.d_sequenceNum); + FileStore::NodeContext* nodeContext = generateReceipt( + 0, + primaryNode, + d_lastRecoveredStrongConsistency.d_primaryLeaseId, + d_lastRecoveredStrongConsistency.d_sequenceNum); + sendReceipt(primaryNode, nodeContext); } return; // RETURN } @@ -6780,7 +6870,7 @@ void FileStore::clearPrimary() } } -void FileStore::dispatcherFlush(bool storage, bool queues) +void FileStore::flushStorageBuilder(bool isQueueIdle) { // 'LocalQueue::flush' invokes 'dispaterFlush'. // This means that 'dispaterFlush' will be executed more frequently on a @@ -6792,26 +6882,69 @@ void FileStore::dispatcherFlush(bool storage, bool queues) BSLS_ASSERT_SAFE(d_isPrimary); - if (storage) { - if (d_storageEventBuilder.messageCount() != 0) { - BALL_LOG_TRACE << partitionDesc() << "Flushing " - << d_storageEventBuilder.messageCount() - << " STORAGE messages."; - const int maxChannelPendingItems = d_cluster_p->broadcast( - d_storageEventBuilder.blob()); - if (maxChannelPendingItems > 0) { - if (d_nagglePacketCount < k_NAGLE_PACKET_COUNT) { - // back off - ++d_nagglePacketCount; + // Two factors affect d_nagglePacketSize: + // 1. Network condition. In case of congestion, the limit grows to + // minimize number of writes. The 'broadcast' return value + // reports max number of pending items among all cluster node + // channels. The value is greater than zero, if some channel is + // not able to keep up. + // 2. Dispatcher thread CPU condition. If there are too many pending + // events, the latency will grow even if the network condition is + // good. Bigger 'd_nagglePacketSize' can reduce CPU load. + // An (indirect) indicator of CPU load is the queue IDLE time + // which is the frequency of 'LocalQueue::flush' calls. The + // combination {storage=true, queues=true) indicates 'flush' call. + // Every time, this method is called not from 'LocalQueue::flush', + // increase the limit to reduce CPU load. + + if (d_storageEventBuilder.messageCount() != 0) { + BALL_LOG_TRACE << partitionDesc() << "Flushing " + << d_storageEventBuilder.messageCount() + << " STORAGE messages."; + + const int maxChannelPendingItems = d_cluster_p->broadcast( + d_storageEventBuilder.blob()); + + if (!isQueueIdle || maxChannelPendingItems > 0) { + if (d_naglePacketSize < k_NAGLE_PACKET_SIZE_MAX) { + // back off + if (d_step > 0) { + // Still moving forward. Make a bigger step + d_step *= 2; + } + else { + // Was moving backward. Make a small step + d_step = k_NAGLE_PACKET_SIZE_STEP; + } + + d_naglePacketSize += d_step; + if (d_naglePacketSize > k_NAGLE_PACKET_SIZE_MAX) { + d_naglePacketSize = k_NAGLE_PACKET_SIZE_MAX; } } - else if (d_nagglePacketCount) { - --d_nagglePacketCount; + } + else if (d_naglePacketSize > k_NAGLE_PACKET_SIZE_MIN) { + if (d_step < 0) { + // Still moving backward. Make a bigger step + d_step *= 2; + } + else { + // Was moving forward. Make a small step + d_step = -k_NAGLE_PACKET_SIZE_STEP; + } + + d_naglePacketSize += d_step; + if (d_naglePacketSize < k_NAGLE_PACKET_SIZE_MIN) { + d_naglePacketSize = k_NAGLE_PACKET_SIZE_MIN; } - d_storageEventBuilder.reset(); } + d_storageEventBuilder.reset(); } - if (queues && d_storageEventBuilder.messageCount() == 0) { +} + +void FileStore::notifyQueuesOnReplicatedBatch() +{ + if (d_storageEventBuilder.messageCount() == 0) { // Empty 'd_storageEventBuilder' means it has been flushed and it is a // good time to flush queues. for (StorageMapIter it = d_storages.begin(); it != d_storages.end(); @@ -6824,6 +6957,15 @@ void FileStore::dispatcherFlush(bool storage, bool queues) } } +void FileStore::dispatcherFlush(bool isQueueIdle) +{ + flushStorageBuilder(isQueueIdle); + + if (isQueueIdle) { + notifyQueuesOnReplicatedBatch(); + } +} + bool FileStore::gcExpiredMessages(const bdlt::Datetime& currentTimeUtc) { if (!d_isOpen) { @@ -6897,7 +7039,7 @@ bool FileStore::gcExpiredMessages(const bdlt::Datetime& currentTimeUtc) // If queues are idle, 'dispatcherFlush()' won't get called. // Internal-ticket D168465018. - dispatcherFlush(true, false); + flushStorageBuilder(false); } return haveMore; @@ -7193,7 +7335,7 @@ void FileStore::loadSummary(mqbcmd::FileStore* fileStore) const d_sequenceNum, d_records.size(), d_unreceipted.size(), - d_nagglePacketCount, + d_naglePacketSize, d_fileSets, d_storages); } diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index 4f126d8fd..d72377db4 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -389,13 +389,21 @@ class FileStore : public DataStore { // calculations. We should only set // this to true during testing. - int d_nagglePacketCount; - // Max number of messages in the + int d_naglePacketSize; + // Max size of messages in the // 'd_storageEventBuilder' before // flushing the builder. Depending // the cluster channels load, it can // grow or shrink. + int d_step; + // The value to increment/decrement + // 'd_naglePacketSize' when adapting + // to network and CPU conditions. + // The value is not a constant and it + // doubles when the conditions do not + // change. See 'dispatcherFlush'. + private: // NOT IMPLEMENTED FileStore(const FileStore&) BSLS_CPP11_DELETED; @@ -606,9 +614,11 @@ class FileStore : public DataStore { /// Send Replication Receipt to the specified `node` confirming the /// receipt of message with the specified `primaryLeaseId` and /// `sequenceNumber`. - void issueReceipt(mqbnet::ClusterNode* node, - unsigned int primaryLeaseId, - bsls::Types::Uint64 sequenceNumber); + NodeContext* generateReceipt(NodeContext* nodeContext, + mqbnet::ClusterNode* node, + unsigned int primaryLeaseId, + bsls::Types::Uint64 sequenceNumber); + void sendReceipt(mqbnet::ClusterNode* node, NodeContext* nodeContext); /// Insert the specified `record` value by the specified `key` into the /// list of outstanding records, and assign to the specified `handle` an @@ -855,11 +865,17 @@ class FileStore : public DataStore { /// Clear the current primary associated with this partition. void clearPrimary() BSLS_KEYWORD_OVERRIDE; - /// If the specified `storage` is `true`, flush any buffered replication - /// messages to the peers. If the specified `queues` is `true`, `flush` - /// all associated queues. Behavior is undefined unless this node is - /// the primary for this partition. - void dispatcherFlush(bool storage, bool queues) BSLS_KEYWORD_OVERRIDE; + /// Flush any buffered replication messages to the peers using the + /// specified 'isQueueIdle' to adjust the size of storage builder batch. + /// Behavior is undefined unless this node is the primary for this + /// partition. + void dispatcherFlush(bool isQueueIdle) BSLS_KEYWORD_OVERRIDE; + + void flushStorageBuilder(bool isQueueIdle); + + /// Call `onReplicatedBatch` on all associated queues if the storage + /// builder is empty (just flushed). + void notifyQueuesOnReplicatedBatch(); /// Invoke the specified `functor` with each queue associated to the /// partition represented by this FileStore if the partition was diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index ec69dfd78..e0bdbec6b 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -423,7 +423,7 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey) return mqbi::StorageResult::e_SUCCESS; } -void InMemoryStorage::dispatcherFlush(bool, bool) +void InMemoryStorage::dispatcherFlush(bool) { // NOTHING } diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index 60c9e3701..d5182dfc2 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -341,12 +341,8 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { virtual mqbi::StorageResult::Enum removeAll(const mqbu::StorageKey& appKey) BSLS_KEYWORD_OVERRIDE; - /// If the specified `storage` is `true`, flush any buffered replication - /// messages to the peers. If the specified `queues` is `true`, `flush` - /// all associated queues. Behavior is undefined unless this node is - /// the primary for this partition. - virtual void dispatcherFlush(bool storage, - bool queues) BSLS_KEYWORD_OVERRIDE; + /// No-op for InMemoryStorage. + virtual void dispatcherFlush(bool isQueueIdle) BSLS_KEYWORD_OVERRIDE; /// Return the resource capacity meter associated to this storage. virtual mqbu::CapacityMeter* capacityMeter() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.t.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.t.cpp index 8bc1b15f7..2786d0646 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.t.cpp @@ -480,7 +480,7 @@ TEST_F(BasicTest, breathingTest) ASSERT_NE(d_tester.storage().queue(), static_cast(0)); // Queue has been set via call to 'setQueue' - ASSERT_PASS(d_tester.storage().dispatcherFlush(true, false)); + ASSERT_PASS(d_tester.storage().dispatcherFlush(false)); // Does nothing, at the time of this writing ASSERT_EQ(d_tester.storage().queueOpRecordHandles().empty(), true); diff --git a/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp b/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp index edd418847..d8f06a83e 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_virtualstorage.cpp @@ -205,7 +205,7 @@ mqbi::StorageResult::Enum VirtualStorage::removeAll( return mqbi::StorageResult::e_SUCCESS; } -void VirtualStorage::dispatcherFlush(bool, bool) +void VirtualStorage::dispatcherFlush(bool) { BSLS_ASSERT_OPT(false && "Should not be invoked."); } @@ -399,7 +399,19 @@ const bsl::shared_ptr& VirtualStorageIterator::options() const const mqbi::StorageMessageAttributes& VirtualStorageIterator::attributes() const { - loadMessageAndAttributes(); + // Do not load memory-mapped file message (expensive). + + if (d_attributes.refCount() == 0) { + // No loaded Attributes for the current message yet. + + mqbi::StorageResult::Enum rc = d_virtualStorage_p->d_storage_p->get( + &d_attributes, + d_iterator->first); + BSLS_ASSERT_SAFE(mqbi::StorageResult::e_SUCCESS == rc); + (void)rc; + } + // else return reference to the previously loaded attributes. + return d_attributes; } diff --git a/src/groups/mqb/mqbs/mqbs_virtualstorage.h b/src/groups/mqb/mqbs/mqbs_virtualstorage.h index 25e26bad2..3b76db354 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstorage.h +++ b/src/groups/mqb/mqbs/mqbs_virtualstorage.h @@ -338,7 +338,7 @@ class VirtualStorage : public mqbi::Storage { /// Behavior is undefined if this method is ever invoked. This method /// needs to be implemented as its part of base protocol. - void dispatcherFlush(bool storage, bool queues) BSLS_KEYWORD_OVERRIDE; + void dispatcherFlush(bool isQueueIdle) BSLS_KEYWORD_OVERRIDE; /// Attempt to garbage-collect messages for which TTL has expired, and /// return the number of messages garbage-collected. Populate the diff --git a/src/groups/mqb/mqbs/mqbs_virtualstorage.t.cpp b/src/groups/mqb/mqbs/mqbs_virtualstorage.t.cpp index c65e440e1..a4f47f3a1 100644 --- a/src/groups/mqb/mqbs/mqbs_virtualstorage.t.cpp +++ b/src/groups/mqb/mqbs/mqbs_virtualstorage.t.cpp @@ -319,7 +319,7 @@ static void test2_unsupportedOperations() ASSERT_OPT_FAIL( tester.vStorage().put(&attributes, guid, appData, options)); ASSERT_OPT_FAIL(tester.vStorage().releaseRef(guid, k_APP_KEY, 0)); - ASSERT_OPT_FAIL(tester.vStorage().dispatcherFlush(true, false)); + ASSERT_OPT_FAIL(tester.vStorage().dispatcherFlush(false)); ASSERT_OPT_FAIL(tester.vStorage().numVirtualStorages()); ASSERT_OPT_FAIL(tester.vStorage().hasVirtualStorage(k_APP_KEY)); ASSERT_OPT_FAIL(tester.vStorage().hasVirtualStorage(k_APP_ID)); diff --git a/src/groups/mwc/mwcc/mwcc_multiqueuethreadpool.h b/src/groups/mwc/mwcc/mwcc_multiqueuethreadpool.h index 211765495..dd5582f23 100644 --- a/src/groups/mwc/mwcc/mwcc_multiqueuethreadpool.h +++ b/src/groups/mwc/mwcc/mwcc_multiqueuethreadpool.h @@ -1262,7 +1262,7 @@ inline MultiQueueThreadPool::MultiQueueThreadPool( bdlf::PlaceHolders::_1, // arena bdlf::PlaceHolders::_2), // allocator - -1, + 256 * 1024, basicAllocator) , d_queueEmptyEvent(config.d_objectCreatorFn, config.d_objectResetterFn,