From d6836a2f149f6cf253cbfb26de3870ff22653491 Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Tue, 8 Oct 2024 22:18:29 +0100 Subject: [PATCH] Refactor[mqbblp::Cluster]: use concrete event types Signed-off-by: Evgeny Malygin --- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 249 +++++++++++------------ src/groups/mqb/mqbblp/mqbblp_cluster.h | 18 +- 2 files changed, 128 insertions(+), 139 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 079db67c4..a004855d0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -870,7 +870,7 @@ void Cluster::onRelayPutEvent(const mqbi::DispatcherEvent& event) // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_PUT == event.type()); - BSLS_ASSERT_SAFE(event.asPutEvent()->isRelay() == true); + BSLS_ASSERT_SAFE(event.asPutEvent()->isRelay()); const mqbi::DispatcherPutEvent* realEvent = event.asPutEvent(); @@ -1034,28 +1034,25 @@ void Cluster::onRelayPutEvent(const mqbi::DispatcherEvent& event) } } -void Cluster::onPutEvent(const mqbi::DispatcherEvent& event) +void Cluster::onPutEvent(const mqbi::DispatcherPutEvent& event) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_PUT == event.type()); - BSLS_ASSERT_SAFE(event.asPutEvent()->isRelay() == false); + BSLS_ASSERT_SAFE(!event.isRelay()); // This PUT event arrives from a replica node to this (primary) node, and // it needs to be forwarded to the queue after appropriate checks. The // replica source node is event.clusterNode(). This routine is similar to // that of ClientSession. - const mqbi::DispatcherPutEvent* realEvent = event.asPutEvent(); - - mqbnet::ClusterNode* source = realEvent->clusterNode(); + mqbnet::ClusterNode* source = event.clusterNode(); mqbc::ClusterNodeSession* ns = d_clusterData.membership().getClusterNodeSession(source); BSLS_ASSERT_SAFE(ns); - bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p); + bmqp::Event rawEvent(event.blob().get(), d_allocator_p); bmqp::PutMessageIterator putIt(&d_clusterData.bufferFactory(), d_allocator_p); @@ -1274,23 +1271,20 @@ void Cluster::onPutEvent(const mqbi::DispatcherEvent& event) } } -void Cluster::onAckEvent(const mqbi::DispatcherEvent& event) +void Cluster::onAckEvent(const mqbi::DispatcherAckEvent& event) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_ACK == event.type()); - BSLS_ASSERT_SAFE(event.asAckEvent()->isRelay() == false); - - const mqbi::DispatcherAckEvent* realEvent = event.asAckEvent(); + BSLS_ASSERT_SAFE(!event.isRelay()); // This ACK message is enqueued by mqbblp::Queue on this node, and needs to // be forwarded to 'event.clusterNode()' (the replica node). // NOTE: we do not log anything here, all logging is done in 'sendAck'. - const bmqp::AckMessage& ackMessage = realEvent->ackMessage(); + const bmqp::AckMessage& ackMessage = event.ackMessage(); bmqp_ctrlmsg::NodeStatus::Value selfStatus = d_clusterData.membership().selfNodeStatus(); if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( @@ -1305,15 +1299,14 @@ void Cluster::onAckEvent(const mqbi::DispatcherEvent& event) << ", guid: " << ackMessage.messageGUID() << ", status: " << ackMessage.status() << "] for node " - << realEvent->clusterNode()->nodeDescription() + << event.clusterNode()->nodeDescription() << ". Reason: self (primary node) not available. " << "Node status: " << selfStatus;); return; // RETURN } mqbc::ClusterNodeSession* ns = - d_clusterData.membership().getClusterNodeSession( - realEvent->clusterNode()); + d_clusterData.membership().getClusterNodeSession(event.clusterNode()); BSLS_ASSERT_SAFE(ns); bmqp_ctrlmsg::NodeStatus::Value downstreamStatus = ns->nodeStatus(); @@ -1331,7 +1324,7 @@ void Cluster::onAckEvent(const mqbi::DispatcherEvent& event) << ", guid: " << ackMessage.messageGUID() << ", status: " << ackMessage.status() << "] for node " - << realEvent->clusterNode()->nodeDescription() + << event.clusterNode()->nodeDescription() << ". Reason: target node not available. " << "Node status: " << downstreamStatus;); return; // RETURN @@ -1342,25 +1335,22 @@ void Cluster::onAckEvent(const mqbi::DispatcherEvent& event) ackMessage.messageGUID(), ackMessage.queueId(), "onAckEvent", - realEvent->clusterNode(), + event.clusterNode(), false); // isSelfGenerated } -void Cluster::onRelayAckEvent(const mqbi::DispatcherEvent& event) +void Cluster::onRelayAckEvent(const mqbi::DispatcherAckEvent& event) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_ACK == event.type()); - BSLS_ASSERT_SAFE(event.asAckEvent()->isRelay() == true); + BSLS_ASSERT_SAFE(event.isRelay()); // This relay-ACK event is sent by primary (event.clusterNode()) to replica // (this) node. Iterate over each message in the event and forward it to // appropriate remote queue. - const mqbi::DispatcherAckEvent* realEvent = event.asAckEvent(); - bmqp_ctrlmsg::NodeStatus::Value selfStatus = d_clusterData.membership().selfNodeStatus(); if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( @@ -1372,13 +1362,13 @@ void Cluster::onRelayAckEvent(const mqbi::DispatcherEvent& event) BMQU_THROTTLEDACTION_THROTTLE( d_throttledFailedAckMessages, BALL_LOG_WARN << "Dropping relay ACK messages from node " - << realEvent->clusterNode()->nodeDescription() + << event.clusterNode()->nodeDescription() << ". Reason: self (replica node) not available." << " Self node status: " << selfStatus;); return; // RETURN } - bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p); + bmqp::Event rawEvent(event.blob().get(), d_allocator_p); BSLS_ASSERT_SAFE(rawEvent.isAckEvent()); bmqp::AckMessageIterator ackIt; @@ -1403,7 +1393,7 @@ void Cluster::onRelayAckEvent(const mqbi::DispatcherEvent& event) << ", guid: " << ackMessage.messageGUID() << ", status: " << ackMessage.status() << "] from node " - << realEvent->clusterNode()->nodeDescription();); + << event.clusterNode()->nodeDescription();); continue; // CONTINUE } @@ -1412,23 +1402,25 @@ void Cluster::onRelayAckEvent(const mqbi::DispatcherEvent& event) } } -void Cluster::onConfirmEvent(const mqbi::DispatcherEvent& event) +void Cluster::onConfirmEvent(const mqbi::DispatcherConfirmEvent& event) { - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_CONFIRM == event.type()); - BSLS_ASSERT_SAFE(false == event.asConfirmEvent()->isRelay()); + // executed by the *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + BSLS_ASSERT_SAFE(!event.isRelay()); // This CONFIRM event arrives from a replica node (event.clusterNode()) to // this (primary) node, and it needs to be forwarded to the queue after // appropriate checks. Iterate over each CONFIRM message in the event and // forward it to the queue handle. - const mqbi::DispatcherConfirmEvent* realEvent = event.asConfirmEvent(); - mqbnet::ClusterNode* source = realEvent->clusterNode(); + mqbnet::ClusterNode* source = event.clusterNode(); mqbc::ClusterNodeSession* ns = d_clusterData.membership().getClusterNodeSession(source); BSLS_ASSERT_SAFE(ns); - bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p); + bmqp::Event rawEvent(event.blob().get(), d_allocator_p); bmqp::ConfirmMessageIterator confIt; BSLS_ASSERT_SAFE(rawEvent.isConfirmEvent()); @@ -1509,24 +1501,25 @@ void Cluster::onConfirmEvent(const mqbi::DispatcherEvent& event) } } -void Cluster::onRejectEvent(const mqbi::DispatcherEvent& event) +void Cluster::onRejectEvent(const mqbi::DispatcherRejectEvent& event) { + // executed by the *DISPATCHER* thread + // PRECONDITIONS - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_REJECT == event.type()); - BSLS_ASSERT_SAFE(false == event.asRejectEvent()->isRelay()); + BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + BSLS_ASSERT_SAFE(!event.isRelay()); // This REJECT event arrives from a replica node (event.clusterNode()) to // this (primary) node, and it needs to be forwarded to the queue after // appropriate checks. Iterate over each REJECT message in the event and // forward it to the queue handle. - const mqbi::DispatcherRejectEvent* realEvent = event.asRejectEvent(); - mqbnet::ClusterNode* source = realEvent->clusterNode(); + mqbnet::ClusterNode* source = event.clusterNode(); mqbc::ClusterNodeSession* ns = d_clusterData.membership().getClusterNodeSession(source); BSLS_ASSERT_SAFE(ns); - bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p); + bmqp::Event rawEvent(event.blob().get(), d_allocator_p); bmqp::RejectMessageIterator rejectIt; BSLS_ASSERT_SAFE(rawEvent.isRejectEvent()); rawEvent.loadRejectMessageIterator(&rejectIt); @@ -1659,21 +1652,21 @@ Cluster::validateMessage(mqbi::QueueHandle** queueHandle, return ValidationResult::k_SUCCESS; } -void Cluster::onRelayRejectEvent(const mqbi::DispatcherEvent& event) +void Cluster::onRelayRejectEvent(const mqbi::DispatcherRejectEvent& event) { - // PRECONDITIONS - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_REJECT == event.type()); - BSLS_ASSERT_SAFE(true == event.asRejectEvent()->isRelay()); + // executed by the *DISPATCHER* thread - const mqbi::DispatcherRejectEvent* realEvent = event.asRejectEvent(); + // PRECONDITIONS + BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + BSLS_ASSERT_SAFE(event.isRelay()); // This relay-REJECT message is enqueued by the RemoteQueue on either // cluster (in case of replica) or clusterProxy (in case of proxy). This // is a replica so this node just needs to forward the message to queue's // partition's primary node (after appropriate checks). - const bmqp::RejectMessage& rejectMessage = realEvent->rejectMessage(); - const int pid = realEvent->partitionId(); + const bmqp::RejectMessage& rejectMessage = event.rejectMessage(); + const int pid = event.partitionId(); const int id = rejectMessage.queueId(); const unsigned int subId = static_cast( @@ -1718,21 +1711,21 @@ void Cluster::onRelayRejectEvent(const mqbi::DispatcherEvent& event) } } -void Cluster::onRelayConfirmEvent(const mqbi::DispatcherEvent& event) +void Cluster::onRelayConfirmEvent(const mqbi::DispatcherConfirmEvent& event) { - // PRECONDITIONS - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_CONFIRM == event.type()); - BSLS_ASSERT_SAFE(true == event.asConfirmEvent()->isRelay()); + // executed by the *DISPATCHER* thread - const mqbi::DispatcherConfirmEvent* realEvent = event.asConfirmEvent(); + // PRECONDITIONS + BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + BSLS_ASSERT_SAFE(event.isRelay()); // This relay-CONFIRM message is enqueued by the RemoteQueue on either // cluster (in case of replica) or clusterProxy (in case of proxy). This // is a replica so this node just needs to forward the message to queue's // partition's primary node (after appropriate checks). - const bmqp::ConfirmMessage& confirmMsg = realEvent->confirmMessage(); - const int pid = realEvent->partitionId(); + const bmqp::ConfirmMessage& confirmMsg = event.confirmMessage(); + const int pid = event.partitionId(); const int id = confirmMsg.queueId(); const unsigned int subId = static_cast( @@ -1838,16 +1831,13 @@ bool Cluster::validateRelayMessage(mqbc::ClusterNodeSession** ns, return true; } -void Cluster::onPushEvent(const mqbi::DispatcherEvent& event) +void Cluster::onPushEvent(const mqbi::DispatcherPushEvent& event) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_PUSH == event.type()); - BSLS_ASSERT_SAFE(event.asPushEvent()->isRelay() == false); - - const mqbi::DispatcherPushEvent* realEvent = event.asPushEvent(); + BSLS_ASSERT_SAFE(!event.isRelay()); // This PUSH message is enqueued by mqbblp::Queue/QueueHandle on this node, // and needs to be forwarded to 'event.clusterNode()' (the replica node, @@ -1865,17 +1855,16 @@ void Cluster::onPushEvent(const mqbi::DispatcherEvent& event) BMQU_THROTTLEDACTION_THROTTLE( d_throttledFailedPushMessages, BALL_LOG_WARN << "Dropping a PUSH for queue [queueId: " - << realEvent->queueId() - << ", guid: " << realEvent->guid() << "] for node " - << realEvent->clusterNode()->nodeDescription() + << event.queueId() << ", guid: " << event.guid() + << "] for node " + << event.clusterNode()->nodeDescription() << ". Reason: self (primary node) not available." << " Node status: " << selfStatus;); return; // RETURN } mqbc::ClusterNodeSession* ns = - d_clusterData.membership().getClusterNodeSession( - realEvent->clusterNode()); + d_clusterData.membership().getClusterNodeSession(event.clusterNode()); BSLS_ASSERT_SAFE(ns); if (bmqp_ctrlmsg::NodeStatus::E_AVAILABLE != ns->nodeStatus()) { @@ -1887,9 +1876,9 @@ void Cluster::onPushEvent(const mqbi::DispatcherEvent& event) d_throttledFailedPushMessages, BALL_LOG_WARN << description() << ": Failed to send PUSH message [queueId: " - << realEvent->queueId() << ", GUID: " - << realEvent->guid() << "] to target node: " - << realEvent->clusterNode()->nodeDescription() + << event.queueId() << ", GUID: " << event.guid() + << "] to target node: " + << event.clusterNode()->nodeDescription() << ". Reason: node not available. " << "Target node status: " << ns->nodeStatus();); @@ -1897,15 +1886,15 @@ void Cluster::onPushEvent(const mqbi::DispatcherEvent& event) } QueueHandleMap& queueHandles = ns->queueHandles(); - QueueHandleMapIter queueIt = queueHandles.find(realEvent->queueId()); + QueueHandleMapIter queueIt = queueHandles.find(event.queueId()); if (queueIt == queueHandles.end()) { BMQU_THROTTLEDACTION_THROTTLE( d_throttledFailedPushMessages, BALL_LOG_WARN << description() << ": PUSH message for queue with unknown queueId [" - << realEvent->queueId() << ", guid: " - << realEvent->guid() << "] to target node: " - << realEvent->clusterNode()->nodeDescription();); + << event.queueId() << ", guid: " << event.guid() + << "] to target node: " + << event.clusterNode()->nodeDescription();); return; // RETURN } @@ -1917,15 +1906,15 @@ void Cluster::onPushEvent(const mqbi::DispatcherEvent& event) // TODO: Extract this and the version from 'mqba::ClientSession' to a // function for (bmqp::Protocol::SubQueueInfosArray::size_type i = 0; - i < realEvent->subQueueInfos().size(); + i < event.subQueueInfos().size(); ++i) { StreamsMap::const_iterator subQueueCiter = queueState.d_subQueueInfosMap.findBySubscriptionId( - realEvent->subQueueInfos()[i].id()); + event.subQueueInfos()[i].id()); subQueueCiter->value().d_clientStats->onEvent( mqbstat::ClusterNodeStats::EventType::e_PUSH, - realEvent->blob() ? realEvent->blob()->length() : 0); + event.blob() ? event.blob()->length() : 0); } bmqt::GenericResult::Enum rc = bmqt::GenericResult::e_SUCCESS; @@ -1936,32 +1925,32 @@ void Cluster::onPushEvent(const mqbi::DispatcherEvent& event) // If it's at most once, then we explicitly send the payload since it's // in-mem mode and there's been no replication (i.e. no preceding // STORAGE message). - BSLS_ASSERT_SAFE(realEvent->blob()); + BSLS_ASSERT_SAFE(event.blob()); rc = ns->clusterNode()->channel().writePush( - realEvent->blob(), - realEvent->queueId(), - realEvent->guid(), + event.blob(), + event.queueId(), + event.guid(), 0, - realEvent->compressionAlgorithmType(), - realEvent->messagePropertiesInfo(), - realEvent->subQueueInfos()); + event.compressionAlgorithmType(), + event.messagePropertiesInfo(), + event.subQueueInfos()); } else { int flags = 0; - if (realEvent->isOutOfOrderPush()) { + if (event.isOutOfOrderPush()) { bmqp::PushHeaderFlagUtil::setFlag( &flags, bmqp::PushHeaderFlags::e_OUT_OF_ORDER); } rc = ns->clusterNode()->channel().writePush( - realEvent->queueId(), - realEvent->guid(), + event.queueId(), + event.guid(), flags, - realEvent->compressionAlgorithmType(), - realEvent->messagePropertiesInfo(), - realEvent->subQueueInfos()); + event.compressionAlgorithmType(), + event.messagePropertiesInfo(), + event.subQueueInfos()); } if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( @@ -1973,28 +1962,25 @@ void Cluster::onPushEvent(const mqbi::DispatcherEvent& event) BMQU_THROTTLEDACTION_THROTTLE( d_throttledDroppedPushMessages, BALL_LOG_ERROR << description() << ": dropping PUSH message " - << "[queueId: " << realEvent->queueId() - << ", guid: " << realEvent->guid() - << "] to target node: " - << realEvent->clusterNode()->nodeDescription() + << "[queueId: " << event.queueId() << ", guid: " + << event.guid() << "] to target node: " + << event.clusterNode()->nodeDescription() << ", PushBuilder rc: " << rc << ".";); } } -void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event) +void Cluster::onRelayPushEvent(const mqbi::DispatcherPushEvent& event) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - BSLS_ASSERT_SAFE(mqbi::DispatcherEventType::e_PUSH == event.type()); - BSLS_ASSERT_SAFE(event.asPushEvent()->isRelay() == true); + BSLS_ASSERT_SAFE(event.isRelay()); // This relay-PUSH event is sent by primary (event.clusterNode()) to // replica (this) node. Iterate over each message in the event and forward // it to appropriate remote queue. Note that these PUSH msgs won't have // payloads. - const mqbi::DispatcherPushEvent* realEvent = event.asPushEvent(); bmqp_ctrlmsg::NodeStatus::Value selfStatus = d_clusterData.membership().selfNodeStatus(); @@ -2006,13 +1992,13 @@ void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event) BMQU_THROTTLEDACTION_THROTTLE( d_throttledFailedPushMessages, BALL_LOG_WARN << "Dropping relay PUSH messages from node " - << realEvent->clusterNode()->nodeDescription() + << event.clusterNode()->nodeDescription() << ". Reason: self (replica node) not available." << " Self node status: " << selfStatus;); return; // RETURN } - bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p); + bmqp::Event rawEvent(event.blob().get(), d_allocator_p); BSLS_ASSERT_SAFE(rawEvent.isPushEvent()); bdlma::LocalSequentialAllocator<1024> lsa(d_allocator_p); bmqp::PushMessageIterator pushIt(&d_clusterData.bufferFactory(), &lsa); @@ -2035,7 +2021,7 @@ void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event) << "queue [queueId: " << pushHeader.queueId() << ", guid: " << pushHeader.messageGUID() << ", flags: " << pushHeader.flags() << "] from node " - << realEvent->clusterNode()->nodeDescription() + << event.clusterNode()->nodeDescription() << BMQTSK_ALARMLOG_END;); continue; // CONTINUE @@ -2062,7 +2048,7 @@ void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event) << " for [queueId: " << pushHeader.queueId() << ", guid: " << pushHeader.messageGUID() << ", flags: " << pushHeader.flags() << "] from node " - << realEvent->clusterNode()->nodeDescription() + << event.clusterNode()->nodeDescription() << BMQTSK_ALARMLOG_END;); continue; // CONTINUE @@ -3415,69 +3401,72 @@ void Cluster::onDispatcherEvent(const mqbi::DispatcherEvent& event) switch (event.type()) { case mqbi::DispatcherEventType::e_CALLBACK: { - const mqbi::DispatcherCallbackEvent* realEvent = - event.asCallbackEvent(); - BSLS_ASSERT_SAFE(realEvent->callback()); - realEvent->callback()(dispatcherClientData().processorHandle()); + const mqbi::DispatcherCallbackEvent& realEvent = + *event.asCallbackEvent(); + BSLS_ASSERT_SAFE(realEvent.callback()); + realEvent.callback()(dispatcherClientData().processorHandle()); } break; // BREAK case mqbi::DispatcherEventType::e_PUT: { - const mqbi::DispatcherPutEvent* realEvent = event.asPutEvent(); - if (realEvent->isRelay()) { + const mqbi::DispatcherPutEvent& realEvent = *event.asPutEvent(); + if (realEvent.isRelay()) { + // We pass a parent object event here because the implementation + // uses `source()` field from this parent object onRelayPutEvent(event); } else { - onPutEvent(event); + onPutEvent(realEvent); } } break; // BREAK case mqbi::DispatcherEventType::e_ACK: { - const mqbi::DispatcherAckEvent* realEvent = event.asAckEvent(); - if (realEvent->isRelay()) { - onRelayAckEvent(event); + const mqbi::DispatcherAckEvent& realEvent = *event.asAckEvent(); + if (realEvent.isRelay()) { + onRelayAckEvent(realEvent); } else { - onAckEvent(event); + onAckEvent(realEvent); } } break; // BREAK case mqbi::DispatcherEventType::e_CONFIRM: { - const mqbi::DispatcherConfirmEvent* realEvent = event.asConfirmEvent(); - if (realEvent->isRelay()) { - onRelayConfirmEvent(event); + const mqbi::DispatcherConfirmEvent& realEvent = + *event.asConfirmEvent(); + if (realEvent.isRelay()) { + onRelayConfirmEvent(realEvent); } else { - onConfirmEvent(event); + onConfirmEvent(realEvent); } } break; case mqbi::DispatcherEventType::e_REJECT: { - const mqbi::DispatcherRejectEvent* realEvent = event.asRejectEvent(); - if (realEvent->isRelay()) { - onRelayRejectEvent(event); + const mqbi::DispatcherRejectEvent& realEvent = *event.asRejectEvent(); + if (realEvent.isRelay()) { + onRelayRejectEvent(realEvent); } else { - onRejectEvent(event); + onRejectEvent(realEvent); } } break; case mqbi::DispatcherEventType::e_CLUSTER_STATE: { - const mqbi::DispatcherClusterStateEvent* clusterStateEvt = - event.asClusterStateEvent(); - d_clusterOrchestrator.processClusterStateEvent(*clusterStateEvt); + const mqbi::DispatcherClusterStateEvent& clusterStateEvt = + *event.asClusterStateEvent(); + d_clusterOrchestrator.processClusterStateEvent(clusterStateEvt); } break; // BREAK case mqbi::DispatcherEventType::e_STORAGE: { - const mqbi::DispatcherStorageEvent* storageEvt = - event.asStorageEvent(); - d_storageManager_mp->processStorageEvent(*storageEvt); + const mqbi::DispatcherStorageEvent& storageEvt = + *event.asStorageEvent(); + d_storageManager_mp->processStorageEvent(storageEvt); } break; // BREAK case mqbi::DispatcherEventType::e_RECOVERY: { - const mqbi::DispatcherRecoveryEvent* recoveryEvt = - event.asRecoveryEvent(); - d_storageManager_mp->processRecoveryEvent(*recoveryEvt); + const mqbi::DispatcherRecoveryEvent& recoveryEvt = + *event.asRecoveryEvent(); + d_storageManager_mp->processRecoveryEvent(recoveryEvt); } break; // BREAK case mqbi::DispatcherEventType::e_PUSH: { - const mqbi::DispatcherPushEvent* realEvent = event.asPushEvent(); - if (realEvent->isRelay()) { - onRelayPushEvent(event); + const mqbi::DispatcherPushEvent& realEvent = *event.asPushEvent(); + if (realEvent.isRelay()) { + onRelayPushEvent(realEvent); } else { - onPushEvent(event); + onPushEvent(realEvent); } } break; // BREAK case mqbi::DispatcherEventType::e_REPLICATION_RECEIPT: diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 5c24e5130..de9c8f815 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -442,25 +442,25 @@ class Cluster : public mqbi::Cluster, void processClusterSyncRequest(const bmqp_ctrlmsg::ControlMessage& request, mqbnet::ClusterNode* requester); - void onPutEvent(const mqbi::DispatcherEvent& event); + void onPutEvent(const mqbi::DispatcherPutEvent& event); void onRelayPutEvent(const mqbi::DispatcherEvent& event); - void onAckEvent(const mqbi::DispatcherEvent& event); + void onAckEvent(const mqbi::DispatcherAckEvent& event); - void onRelayAckEvent(const mqbi::DispatcherEvent& event); + void onRelayAckEvent(const mqbi::DispatcherAckEvent& event); - void onConfirmEvent(const mqbi::DispatcherEvent& event); + void onConfirmEvent(const mqbi::DispatcherConfirmEvent& event); - void onRelayConfirmEvent(const mqbi::DispatcherEvent& event); + void onRelayConfirmEvent(const mqbi::DispatcherConfirmEvent& event); - void onRejectEvent(const mqbi::DispatcherEvent& event); + void onRejectEvent(const mqbi::DispatcherRejectEvent& event); - void onRelayRejectEvent(const mqbi::DispatcherEvent& event); + void onRelayRejectEvent(const mqbi::DispatcherRejectEvent& event); - void onPushEvent(const mqbi::DispatcherEvent& event); + void onPushEvent(const mqbi::DispatcherPushEvent& event); - void onRelayPushEvent(const mqbi::DispatcherEvent& event); + void onRelayPushEvent(const mqbi::DispatcherPushEvent& event); ValidationResult::Enum validateMessage(mqbi::QueueHandle** queueHandle, const bmqp::QueueId& queueId,