Skip to content

Commit

Permalink
Performance[mqbi::DispatcherEvent]: replace multiple inheritance with…
Browse files Browse the repository at this point in the history
… variant

Signed-off-by: Evgeny Malygin <emalygin@bloomberg.net>
  • Loading branch information
678098 committed Aug 9, 2024
1 parent 7d4b3d7 commit e16b562
Show file tree
Hide file tree
Showing 17 changed files with 1,414 additions and 831 deletions.
20 changes: 9 additions & 11 deletions src/groups/mqb/mqba/mqba_clientsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2749,30 +2749,28 @@ void ClientSession::processEvent(
return; // RETURN
}

// Not a control or leader message, it's either a put or a confirm ..
mqbi::DispatcherEventType::Enum eventType;
// TODO optimize if error else is triggered
// Dispatch the event
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);
bsl::shared_ptr<bdlbb::Blob> blobSp =
d_state.d_blobSpPool_p->getObject();
*blobSp = *(event.blob());

if (event.isPutEvent()) {
eventType = mqbi::DispatcherEventType::e_PUT;
(*dispEvent).setSource(this).makePutEvent().setBlob(blobSp);
}
else if (event.isConfirmEvent()) {
eventType = mqbi::DispatcherEventType::e_CONFIRM;
(*dispEvent).setSource(this).makeConfirmEvent().setBlob(blobSp);
}
else if (event.isRejectEvent()) {
eventType = mqbi::DispatcherEventType::e_REJECT;
(*dispEvent).setSource(this).makeRejectEvent().setBlob(blobSp);
}
else {
BALL_LOG_ERROR << "#CLIENT_UNEXPECTED_EVENT " << description()
<< ": Unexpected event type: " << event;
return; // RETURN
}

// Dispatch the event
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);
bsl::shared_ptr<bdlbb::Blob> blobSp =
d_state.d_blobSpPool_p->getObject();
*blobSp = *(event.blob());
(*dispEvent).setType(eventType).setSource(this).setBlob(blobSp);
dispatcher()->dispatchEvent(dispEvent, this);
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/groups/mqb/mqba/mqba_clientsession.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,13 +864,13 @@ class TestBench {
cat);

mqbi::DispatcherEvent event(d_allocator_p);
event.setType(mqbi::DispatcherEventType::e_PUT)
event.setSource(&d_cs) // DispatcherClient *value
.setCompressionAlgorithmType(cat)
.makePutEvent()
.setIsRelay(true) // Relay message
.setSource(&d_cs) // DispatcherClient *value
.setPutHeader(putHeader)
.setPartitionId(1) // d_state_p->partitionId()) // int value
.setBlob(eventBlob) // const bsl::shared_ptr<bdlbb::Blob>& value
.setCompressionAlgorithmType(cat);
.setBlob(eventBlob); // const bsl::shared_ptr<bdlbb::Blob>& value

// Internal-ticket D167598037.
// Verify that PutMessageIterator does not change the input.
Expand Down Expand Up @@ -1918,9 +1918,9 @@ static void test9_newStylePush()
blobSp.createInplace(s_allocator_p, &tb.d_bufferFactory, s_allocator_p);
*blobSp = peb.blob();

putEvent.setType(mqbi::DispatcherEventType::e_PUT)
putEvent.setSource(&tb.d_cs) // DispatcherClient *value
.makePutEvent()
.setIsRelay(true) // Relay message
.setSource(&tb.d_cs) // DispatcherClient *value
.setPutHeader(putIt.header())
.setBlob(blobSp); // const bsl::shared_ptr<bdlbb::Blob>& value

Expand Down Expand Up @@ -2026,12 +2026,12 @@ static void test10_newStyleCompressedPush()
blobSp.createInplace(s_allocator_p, &tb.d_bufferFactory, s_allocator_p);
*blobSp = peb.blob();

putEvent.setType(mqbi::DispatcherEventType::e_PUT)
putEvent.setSource(&tb.d_cs) // DispatcherClient *value
.setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::e_ZLIB)
.makePutEvent()
.setIsRelay(true) // Relay message
.setSource(&tb.d_cs) // DispatcherClient *value
.setPutHeader(putIt.header())
.setBlob(blobSp) // const bsl::shared_ptr<bdlbb::Blob>& value
.setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::e_ZLIB);
.setBlob(blobSp); // const bsl::shared_ptr<bdlbb::Blob>& value

tb.dispatch(putEvent);

Expand Down
18 changes: 9 additions & 9 deletions src/groups/mqb/mqba/mqba_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void Dispatcher_Executor::post(const bsl::function<void()>& f) const
d_processorPool_p->getUnmanagedEvent();

event->object()
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.makeDispatcherEvent()
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f));

// submit the event
Expand Down Expand Up @@ -183,9 +183,9 @@ void Dispatcher_ClientExecutor::post(const bsl::function<void()>& f) const
processorPool()->getUnmanagedEvent();

event->object()
.setType(mqbi::DispatcherEventType::e_CALLBACK)
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f))
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p));
.setDestination(const_cast<mqbi::DispatcherClient*>(d_client_p))
.makeDispatcherEvent()
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(f));

// submit the event
int rc = processorPool()->enqueueEvent(event, processorHandle());
Expand Down Expand Up @@ -595,13 +595,13 @@ Dispatcher::registerClient(mqbi::DispatcherClient* client,
mqbi::DispatcherEvent* event =
&context.d_processorPool_mp->getUnmanagedEvent()->object();
(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.setDestination(client) // not needed
.makeDispatcherEvent()
.setCallback(
bdlf::BindUtil::bind(&Dispatcher::onNewClient,
this,
type,
bdlf::PlaceHolders::_1)) // processor
.setDestination(client); // not needed
bdlf::PlaceHolders::_1)); // processor
context.d_processorPool_mp->enqueueEvent(event, processor);
return processor; // RETURN
} // break;
Expand Down Expand Up @@ -692,7 +692,7 @@ void Dispatcher::execute(const mqbi::Dispatcher::ProcessorFunctor& functor,
if (processorPool[i] != 0) {
mqbi::DispatcherEvent* qEvent =
&processorPool[i]->getUnmanagedEvent()->object();
qEvent->setType(mqbi::DispatcherEventType::e_DISPATCHER)
qEvent->makeDispatcherEvent()
.setCallback(functor)
.setFinalizeCallback(doneCallback);
processorPool[i]->enqueueEventOnAllQueues(qEvent);
Expand All @@ -719,7 +719,7 @@ void Dispatcher::synchronize(mqbi::DispatcherClientType::Enum type,
bslmt::Semaphore semaphore;
mqbi::DispatcherEvent* event = getEvent(type);
(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.makeDispatcherEvent()
.setCallback(
bdlf::BindUtil::bind(static_cast<PostFn>(&bslmt::Semaphore::post),
&semaphore));
Expand Down
12 changes: 8 additions & 4 deletions src/groups/mqb/mqba/mqba_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,13 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,
BSLS_ASSERT_SAFE(functor);

mqbi::DispatcherEvent* event = getEvent(client);

(*event).setType(type).setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
if (type == mqbi::DispatcherEventType::e_DISPATCHER) {
(*event).makeDispatcherEvent().setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
} else {
(*event).makeCallbackEvent().setCallback(
mqbi::Dispatcher::voidToProcessorFunctor(functor));
}

dispatchEvent(event, client);
}
Expand All @@ -585,7 +589,7 @@ inline void Dispatcher::execute(const mqbi::Dispatcher::VoidFunctor& functor,
mqbi::DispatcherEvent* event = getEvent(client.clientType());

(*event)
.setType(mqbi::DispatcherEventType::e_DISPATCHER)
.makeDispatcherEvent()
.setCallback(mqbi::Dispatcher::voidToProcessorFunctor(functor));

dispatchEvent(event, client.clientType(), client.processorHandle());
Expand Down
22 changes: 11 additions & 11 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3210,9 +3210,9 @@ void Cluster::processEvent(const bmqp::Event& event,
d_clusterData.blobSpPool()->getObject(); \
*_blobSp = *(event.blob()); \
(*_evt) \
.setType(T) \
.setIsRelay(R) \
.setSource(this) \
.make ## T ## Event() \
.setIsRelay(R) \
.setBlob(_blobSp) \
.setClusterNode(source); \
dispatcher()->dispatchEvent(_evt, this); \
Expand Down Expand Up @@ -3270,29 +3270,29 @@ void Cluster::processEvent(const bmqp::Event& event,
// This event arrives from a replica to this node, which should be the
// primary of the partition of the queue to which this PUT event
// belongs.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_PUT, false);
DISPATCH_EVENT(Put, false);
} break; // BREAK
case bmqp::EventType::e_CONFIRM: {
// This event arrives from a replica to this node, which should be the
// primary of the partition of the queue to which this CONFIRM event
// belongs.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_CONFIRM, false);
DISPATCH_EVENT(Confirm, false);
} break; // BREAK
case bmqp::EventType::e_REJECT: {
// This event arrives from a replica to this node, which should be the
// primary of the partition of the queue to which this REJECT event
// belongs.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_REJECT, false);
DISPATCH_EVENT(Reject, false);
} break; // BREAK
case bmqp::EventType::e_PUSH: {
// This event arrives from primary to replica, and hence is a relay
// event.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_PUSH, true);
DISPATCH_EVENT(Push, true);
} break; // BREAK
case bmqp::EventType::e_ACK: {
// This event arrives from primary to replica, and hence is a relay
// event.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_ACK, true);
DISPATCH_EVENT(Ack, true);
} break; // BREAK
case bmqp::EventType::e_CLUSTER_STATE: {
if (isLocal()) {
Expand All @@ -3306,23 +3306,23 @@ void Cluster::processEvent(const bmqp::Event& event,
return; // RETURN
}

DISPATCH_EVENT(mqbi::DispatcherEventType::e_CLUSTER_STATE, false);
DISPATCH_EVENT(ClusterState, false);
} break;
case bmqp::EventType::e_STORAGE: {
// Storage event arrives from primary to replica/replication nodes.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_STORAGE, false);
DISPATCH_EVENT(Storage, false);
} break; // BREAK
case bmqp::EventType::e_PARTITION_SYNC: {
// PartitionSync event may arrive from a passive primary to replicas,
// or from the chosen syncing peer to the passive primary. We
// currently don't have a dispatcher event type for
// EventType::e_PARTITION_SYNC. So we overload
// DispatcherEventType::e_STORAGE.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_STORAGE, false);
DISPATCH_EVENT(Storage, false);
} break; // BREAK
case bmqp::EventType::e_RECOVERY: {
// This event arrives from a peer cluster node.
DISPATCH_EVENT(mqbi::DispatcherEventType::e_RECOVERY, false);
DISPATCH_EVENT(Recovery, false);
} break; // BREAK
case bmqp::EventType::e_HEARTBEAT_REQ:
case bmqp::EventType::e_HEARTBEAT_RSP: {
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
d_clusterData.blobSpPool()->getObject();
*blobSp = *(event.blob());
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_PUSH)
.setSource(this)
.makePushEvent()
.setBlob(blobSp);
dispatcher()->dispatchEvent(dispEvent, this);
} break;
Expand All @@ -731,8 +731,8 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
d_clusterData.blobSpPool()->getObject();
*blobSp = *(event.blob());
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_ACK)
.setSource(this)
.makeAckEvent()
.setBlob(blobSp);
dispatcher()->dispatchEvent(dispEvent, this);
} break;
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,8 @@ void Queue::onPushMessage(
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);

(*dispEvent)
.setType(mqbi::DispatcherEventType::e_PUSH)
.setSource(this)
.makePushEvent()
.setBlob(appData)
.setOptions(options)
.setGuid(msgGUID)
Expand Down Expand Up @@ -840,7 +840,7 @@ void Queue::onAckMessage(const bmqp::AckMessage& ackMessage)
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);

(*dispEvent)
.setType(mqbi::DispatcherEventType::e_ACK)
.makeAckEvent()
.setAckMessage(ackMessage);

dispatcher()->dispatchEvent(dispEvent, this);
Expand Down
16 changes: 9 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ void QueueHandle::deliverMessageImpl(
mqbi::DispatcherClient* client = d_clientContext_sp->client();
mqbi::DispatcherEvent* event = client->dispatcher()->getEvent(client);
(*event)
.setType(mqbi::DispatcherEventType::e_PUSH)
.setSource(d_queue_sp.get())
.makePushEvent()
.setGuid(msgGUID)
.setQueueId(id())
.setMessagePropertiesInfo(d_queue_sp->schemaLearner().demultiplex(
Expand All @@ -514,11 +514,13 @@ void QueueHandle::deliverMessageImpl(
.setSubQueueInfos(subQueueInfos)
.setMsgGroupId(msgGroupId)
.setCompressionAlgorithmType(attributes.compressionAlgorithmType())
.setOutOfOrderPush(isOutOfOrder);
.setOutOfOrderPush(isOutOfOrder)
.setBlob(message ? message : bsl::shared_ptr<bdlbb::Blob>());

if (message) {
event->setBlob(message);
}
// TODO refactor
// if (message) {
// event->setBlob(message);
// }

client->dispatcher()->dispatchEvent(event, client);
}
Expand Down Expand Up @@ -945,8 +947,8 @@ void QueueHandle::postMessage(const bmqp::PutHeader& putHeader,
d_queue_sp.get());

(*event)
.setType(mqbi::DispatcherEventType::e_PUT)
.setSource(d_clientContext_sp->client())
.makePutEvent()
.setBlob(appData)
.setOptions(options)
.setPutHeader(putHeader)
Expand Down Expand Up @@ -1191,8 +1193,8 @@ void QueueHandle::onAckMessage(const bmqp::AckMessage& ackMessage)
mqbi::DispatcherClient* client = d_clientContext_sp->client();
mqbi::DispatcherEvent* event = client->dispatcher()->getEvent(client);
(*event)
.setType(mqbi::DispatcherEventType::e_ACK)
.setSource(d_queue_sp.get())
.makeAckEvent()
.setAckMessage(ackMessage);

// Override with correct downstream queueId
Expand Down
8 changes: 4 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,8 @@ int RemoteQueue::rejectMessage(const bmqt::MessageGUID& msgGUID,
mqbi::Dispatcher* dispatcher = queue->dispatcher();
mqbi::DispatcherEvent* dispEvent = dispatcher->getEvent(cluster);
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_REJECT)
.setSource(queue)
.makeRejectEvent()
.setRejectMessage(rejectMessage)
.setPartitionId(d_state_p->partitionId())
.setIsRelay(true); // Relay message
Expand Down Expand Up @@ -1170,8 +1170,8 @@ void RemoteQueue::sendConfirmMessage(const bmqt::MessageGUID& msgGUID,
mqbi::Dispatcher* dispatcher = queue->dispatcher();
mqbi::DispatcherEvent* dispEvent = dispatcher->getEvent(cluster);
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_CONFIRM)
.setSource(queue)
.makeConfirmEvent()
.setConfirmMessage(confirmMessage)
.setPartitionId(d_state_p->partitionId())
.setIsRelay(true); // Relay message
Expand Down Expand Up @@ -1566,9 +1566,9 @@ void RemoteQueue::sendPutMessage(
mqbi::Dispatcher* dispatcher = d_state_p->queue()->dispatcher();
mqbi::DispatcherEvent* dispEvent = dispatcher->getEvent(cluster);
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_PUT)
.setIsRelay(true) // Relay message
.setSource(d_state_p->queue())
.makePutEvent()
.setIsRelay(true) // Relay message
.setPutHeader(ph)
.setPartitionId(d_state_p->partitionId()) // Only replica uses
.setBlob(appData)
Expand Down
Loading

0 comments on commit e16b562

Please sign in to comment.