Skip to content

Commit

Permalink
Optimize more
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <emalygin@bloomberg.net>
  • Loading branch information
678098 committed Aug 10, 2024
1 parent ef96296 commit 5870886
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ struct Protocol {

/// A constant used to declare the length of static part of the array of
/// subQueueIds (or AppKeys).
static const size_t k_SUBID_ARRAY_STATIC_LEN = 16;
static const size_t k_SUBID_ARRAY_STATIC_LEN = 4;

/// An array of subQueueInfos with statically reserved space for a
/// number of subQueueInfos (as indicated by the second template
Expand Down
8 changes: 3 additions & 5 deletions src/groups/mqb/mqba/mqba_clientsession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ class TestBench {
queueId);

mqbi::DispatcherEvent event(d_allocator_p);
event.setType(mqbi::DispatcherEventType::e_ACK)
event.makeAckEvent()
.setAckMessage(ackMessage);

dispatch(event);
Expand Down Expand Up @@ -866,7 +866,6 @@ class TestBench {
mqbi::DispatcherEvent event(d_allocator_p);
event
.setSource(&d_cs) // DispatcherClient *value
.setCompressionAlgorithmType(cat)
.makePutEvent()
.setIsRelay(true) // Relay message
.setPutHeader(putHeader)
Expand Down Expand Up @@ -906,8 +905,8 @@ class TestBench {

mqbi::DispatcherEvent event(d_allocator_p);

event.setType(mqbi::DispatcherEventType::e_PUSH)
.setSource(&d_cs) // DispatcherClient *value
event.setSource(&d_cs) // DispatcherClient *value
.makePushEvent()
.setQueueId(queueId)
.setBlob(blob)
.setGuid(msgGUID)
Expand Down Expand Up @@ -2030,7 +2029,6 @@ static void test10_newStyleCompressedPush()

putEvent
.setSource(&tb.d_cs) // DispatcherClient *value
.setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::e_ZLIB)
.makePutEvent()
.setIsRelay(true) // Relay message
.setPutHeader(putIt.header())
Expand Down
34 changes: 18 additions & 16 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,24 +503,26 @@ void QueueHandle::deliverMessageImpl(
// Create an event to dispatch delivery of the message to the client
mqbi::DispatcherClient* client = d_clientContext_sp->client();
mqbi::DispatcherEvent* event = client->dispatcher()->getEvent(client);
(*event)
mqbi::DispatcherPushEvent &pushEvent = (*event)
.setSource(d_queue_sp.get())
.makePushEvent()
.setGuid(msgGUID)
.setQueueId(id())
.setMessagePropertiesInfo(d_queue_sp->schemaLearner().demultiplex(
.makePushEvent(msgGUID, msgGroupId, attributes.compressionAlgorithmType(), isOutOfOrder, id(), subQueueInfos, d_queue_sp->schemaLearner().demultiplex(
d_schemaLearnerPushContext,
attributes.messagePropertiesInfo()))
.setSubQueueInfos(subQueueInfos)
.setMsgGroupId(msgGroupId)
.setCompressionAlgorithmType(attributes.compressionAlgorithmType())
.setOutOfOrderPush(isOutOfOrder)
.setBlob(message ? message : bsl::shared_ptr<bdlbb::Blob>());

// TODO refactor
// if (message) {
// event->setBlob(message);
// }
attributes.messagePropertiesInfo()));

// .setGuid(msgGUID)
// .setQueueId(id())
// .setMessagePropertiesInfo(d_queue_sp->schemaLearner().demultiplex(
// d_schemaLearnerPushContext,
// attributes.messagePropertiesInfo()))
// .setSubQueueInfos(subQueueInfos)
// .setMsgGroupId(msgGroupId)
// .setCompressionAlgorithmType(attributes.compressionAlgorithmType())
// .setOutOfOrderPush(isOutOfOrder)
// .setBlob(message ? message : bsl::shared_ptr<bdlbb::Blob>());

if (message) {
pushEvent.setBlob(message);
}

client->dispatcher()->dispatchEvent(event, client);
}
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_remotequeue.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ void TestBench::ackPuts(bmqt::AckResult::Enum status)
bmqp::PutHeaderFlags::e_ACK_REQUESTED) ||
status == bmqt::AckResult::e_NOT_READY) {
mqbi::DispatcherEvent ackEvent(d_allocator_p);
ackEvent.setType(mqbi::DispatcherEventType::e_ACK)
ackEvent.makeAckEvent()
.setAckMessage(ackMessage)
.setBlob(d_puts.front().d_appData)
.setOptions(d_puts.front().d_options);
Expand Down
55 changes: 33 additions & 22 deletions src/groups/mqb/mqbi/mqbi_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ class DispatcherPushEvent {
public:
// CREATORS
/// CONSTRUCTOR
DispatcherPushEvent()
inline DispatcherPushEvent()
: d_guid()
, d_msgGroupId()
, d_compressionAlgorithmType(bmqt::CompressionAlgorithmType::e_NONE)
Expand All @@ -937,6 +937,24 @@ class DispatcherPushEvent {
{
}

inline explicit DispatcherPushEvent(const bmqt::MessageGUID& guid,
const bmqp::Protocol::MsgGroupId& msgGroupId,
bmqt::CompressionAlgorithmType::Enum compression,
bool outOfOrder,
int queueId,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bmqp::MessagePropertiesInfo& messageProperties)
: d_guid(guid)
, d_msgGroupId(msgGroupId)
, d_compressionAlgorithmType(compression)
, d_isOutOfOrder(outOfOrder)
, d_queueId(queueId)
, d_subQueueInfos(subQueueInfos)
, d_messagePropertiesInfo(messageProperties)
{

}

/// Destructor.
virtual ~DispatcherPushEvent();

Expand Down Expand Up @@ -1726,18 +1744,30 @@ class DispatcherEvent {

public:
// MANIPULATORS
DispatcherPutEvent& makePutEvent()
inline DispatcherPutEvent& makePutEvent()
{
d_type = DispatcherEventType::e_PUT;
return d_eventImpl.emplace<DispatcherPutEvent>();
}

DispatcherPushEvent& makePushEvent()
inline DispatcherPushEvent& makePushEvent()
{
d_type = DispatcherEventType::e_PUSH;
return d_eventImpl.emplace<DispatcherPushEvent>();
}

inline DispatcherPushEvent& makePushEvent(const bmqt::MessageGUID& guid,
const bmqp::Protocol::MsgGroupId& msgGroupId,
bmqt::CompressionAlgorithmType::Enum compression,
bool outOfOrder,
int queueId,
const bmqp::Protocol::SubQueueInfosArray& subQueueInfos,
const bmqp::MessagePropertiesInfo& messageProperties)
{
d_type = DispatcherEventType::e_PUSH;
return d_eventImpl.emplace<DispatcherPushEvent>(guid, msgGroupId, compression, outOfOrder, queueId, subQueueInfos, messageProperties);
}

DispatcherDispatcherEvent& makeDispatcherEvent()
{
d_type = DispatcherEventType::e_DISPATCHER;
Expand Down Expand Up @@ -1798,7 +1828,6 @@ class DispatcherEvent {
return d_eventImpl.emplace<DispatcherRejectEvent>();
}

DispatcherEvent& setType(DispatcherEventType::Enum value);
DispatcherEvent& setSource(DispatcherClient* value);
DispatcherEvent& setDestination(DispatcherClient* value);

Expand Down Expand Up @@ -1993,24 +2022,6 @@ inline DispatcherEvent::DispatcherEvent(bslma::Allocator* allocator)
// NOTHING
}

inline DispatcherEvent&
DispatcherEvent::setType(DispatcherEventType::Enum value)
{
d_type = value;
switch (d_type) {
case DispatcherEventType::Enum::e_PUT: {
d_eventImpl = DispatcherPutEvent();
} break;
case DispatcherEventType::Enum::e_PUSH: {
d_eventImpl = DispatcherPushEvent();
} break;
default: {
d_eventImpl = bsl::monostate();
} break;
}
return *this;
}

inline DispatcherEvent& DispatcherEvent::setSource(DispatcherClient* value)
{
d_source_p = value;
Expand Down
132 changes: 118 additions & 14 deletions src/groups/mqb/mqbi/mqbi_dispatcher.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,130 @@ static void testN1_dispatcherEventPeformance()
bsl::cout << "sizeof(mqbi::DispatcherAckEvent): "
<< sizeof(mqbi::DispatcherAckEvent) << bsl::endl;
bsl::cout << "sizeof(mqbi::DispatcherControlMessageEvent): "
<< sizeof(mqbi::DispatcherControlMessageEvent) << bsl::endl;
<< sizeof(mqbi::DispatcherControlMessageEvent) << bsl::endl << bsl::endl;

const size_t k_ITERS_NUM = 100000000;

const bmqp::PutHeader header;
const bsl::shared_ptr<bdlbb::Blob> blob;
const bsl::shared_ptr<BloombergLP::mwcu::AtomicState> state;
const bmqt::MessageGUID guid;
const bmqp::MessagePropertiesInfo info;
const bmqp::Protocol::SubQueueInfosArray subQueueInfos;
const bsl::string msgGroupId;
const bmqp::ConfirmMessage confirm;

mqbi::DispatcherEvent event(s_allocator_p);

const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer();
for (size_t i = 0; i < k_ITERS_NUM; i++) {
event.reset();
{
const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer();
for (size_t i = 0; i < k_ITERS_NUM; i++) {
event.reset();
}
const bsls::Types::Int64 end = bsls::TimeUtil::getTimer();

bsl::cout << "mqbi::DispatcherEvent::reset():" << bsl::endl;
bsl::cout << " total: "
<< mwcu::PrintUtil::prettyTimeInterval(end - begin) << " ("
<< k_ITERS_NUM << " iterations)" << bsl::endl;
bsl::cout << " per call: "
<< mwcu::PrintUtil::prettyTimeInterval((end - begin) /
k_ITERS_NUM)
<< bsl::endl << bsl::endl;
}

{
const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer();
for (size_t i = 0; i < k_ITERS_NUM; i++) {
event.setSource(NULL)
.makePutEvent()
.setIsRelay(true) // Relay message
.setPutHeader(header)
.setPartitionId(1) // Only replica uses
.setBlob(blob)
.setOptions(blob)
.setGenCount(10)
.setState(state);
}
const bsls::Types::Int64 end = bsls::TimeUtil::getTimer();

bsl::cout << "mqbi::DispatcherEvent::makePutEvent():" << bsl::endl;
bsl::cout << " total: "
<< mwcu::PrintUtil::prettyTimeInterval(end - begin) << " ("
<< k_ITERS_NUM << " iterations)" << bsl::endl;
bsl::cout << " per call: "
<< mwcu::PrintUtil::prettyTimeInterval((end - begin) /
k_ITERS_NUM)
<< bsl::endl << bsl::endl;
}


{
const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer();
for (size_t i = 0; i < k_ITERS_NUM; i++) {
event.setSource(NULL)
.makePushEvent()
.setGuid(guid)
.setQueueId(678)
.setMessagePropertiesInfo(info)
.setSubQueueInfos(subQueueInfos)
.setMsgGroupId(msgGroupId)
.setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::e_NONE)
.setOutOfOrderPush(false)
.setBlob(blob);
}
const bsls::Types::Int64 end = bsls::TimeUtil::getTimer();

bsl::cout << "mqbi::DispatcherEvent::makePushEvent():" << bsl::endl;
bsl::cout << " total: "
<< mwcu::PrintUtil::prettyTimeInterval(end - begin) << " ("
<< k_ITERS_NUM << " iterations)" << bsl::endl;
bsl::cout << " per call: "
<< mwcu::PrintUtil::prettyTimeInterval((end - begin) /
k_ITERS_NUM)
<< bsl::endl;
}


{
const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer();
for (size_t i = 0; i < k_ITERS_NUM; i++) {
event.setSource(NULL)
.makePushEvent(guid, msgGroupId, bmqt::CompressionAlgorithmType::e_NONE, false, 678, subQueueInfos, info);
}
const bsls::Types::Int64 end = bsls::TimeUtil::getTimer();

bsl::cout << "mqbi::DispatcherEvent::makePushEvent(..args..):" << bsl::endl;
bsl::cout << " total: "
<< mwcu::PrintUtil::prettyTimeInterval(end - begin) << " ("
<< k_ITERS_NUM << " iterations)" << bsl::endl;
bsl::cout << " per call: "
<< mwcu::PrintUtil::prettyTimeInterval((end - begin) /
k_ITERS_NUM)
<< bsl::endl << bsl::endl;
}


{
const bsls::Types::Int64 begin = bsls::TimeUtil::getTimer();
for (size_t i = 0; i < k_ITERS_NUM; i++) {
event.setSource(NULL)
.makeConfirmEvent()
.setConfirmMessage(confirm)
.setPartitionId(678)
.setIsRelay(true); // Relay message
}
const bsls::Types::Int64 end = bsls::TimeUtil::getTimer();

bsl::cout << "mqbi::DispatcherEvent::makeConfirmEvent():" << bsl::endl;
bsl::cout << " total: "
<< mwcu::PrintUtil::prettyTimeInterval(end - begin) << " ("
<< k_ITERS_NUM << " iterations)" << bsl::endl;
bsl::cout << " per call: "
<< mwcu::PrintUtil::prettyTimeInterval((end - begin) /
k_ITERS_NUM)
<< bsl::endl << bsl::endl;
}
const bsls::Types::Int64 end = bsls::TimeUtil::getTimer();

bsl::cout << "mqbi::DispatcherEvent::reset():" << bsl::endl;
bsl::cout << " total: "
<< mwcu::PrintUtil::prettyTimeInterval(end - begin) << " ("
<< k_ITERS_NUM << " iterations)" << bsl::endl;
bsl::cout << " per call: "
<< mwcu::PrintUtil::prettyTimeInterval((end - begin) /
k_ITERS_NUM)
<< bsl::endl;

bsl::cout << event.type() << bsl::endl;
}
Expand Down

0 comments on commit 5870886

Please sign in to comment.