From c5b7c31977d81a4694d058a43af22272dfcdef6b Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Tue, 3 Sep 2024 13:27:40 -0400 Subject: [PATCH] Addressing review Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../bmq/bmqp/bmqp_schemaeventbuilder.cpp | 2 +- src/groups/mqb/mqba/mqba_adminsession.cpp | 4 +-- src/groups/mqb/mqba/mqba_adminsession.h | 5 +++- src/groups/mqb/mqba/mqba_application.cpp | 6 ++-- src/groups/mqb/mqba/mqba_application.h | 8 +++-- src/groups/mqb/mqba/mqba_clientsession.cpp | 8 ++--- src/groups/mqb/mqba/mqba_clientsession.h | 9 +++--- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 18 +++++------- src/groups/mqb/mqbblp/mqbblp_cluster.h | 12 ++++---- src/groups/mqb/mqbblp/mqbblp_clustercatalog.h | 2 +- .../mqb/mqbblp/mqbblp_clusterorchestrator.cpp | 2 +- src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp | 13 +++++---- src/groups/mqb/mqbblp/mqbblp_clusterproxy.h | 12 ++++---- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 29 +++++++++++-------- .../mqb/mqbblp/mqbblp_clusterqueuehelper.h | 11 ++++--- src/groups/mqb/mqbblp/mqbblp_queue.cpp | 2 +- src/groups/mqb/mqbblp/mqbblp_queue.h | 2 ++ .../mqb/mqbblp/mqbblp_queueengineutil.cpp | 4 +-- .../mqb/mqbblp/mqbblp_queueengineutil.h | 2 +- .../mqb/mqbblp/mqbblp_relayqueueengine.cpp | 6 ++-- src/groups/mqb/mqbblp/mqbblp_remotequeue.h | 2 ++ .../mqb/mqbblp/mqbblp_rootqueueengine.cpp | 4 +-- src/groups/mqb/mqbblp/mqbblp_routers.h | 3 -- src/groups/mqb/mqbi/mqbi_cluster.h | 8 ++--- src/groups/mqb/mqbi/mqbi_queue.h | 2 ++ src/groups/mqb/mqbmock/mqbmock_cluster.cpp | 2 +- src/groups/mqb/mqbmock/mqbmock_cluster.h | 8 ++--- src/groups/mqb/mqbmock/mqbmock_queue.h | 2 ++ src/groups/mqb/mqbnet/mqbnet_dummysession.cpp | 2 +- src/groups/mqb/mqbnet/mqbnet_dummysession.h | 4 +-- src/groups/mqb/mqbnet/mqbnet_session.h | 4 +-- src/groups/mqb/mqbnet/mqbnet_session.t.cpp | 7 ++--- 32 files changed, 112 insertions(+), 93 deletions(-) diff --git a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp index 03ef9b8bd0..42764f6594 100644 --- a/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp +++ b/src/groups/bmq/bmqp/bmqp_schemaeventbuilder.cpp @@ -47,7 +47,7 @@ EncodingType::Enum SchemaEventBuilderUtil::bestEncodingSupported( return EncodingType::e_BER; // RETURN } - // If remote suppports BER, return BER + // If remote supports BER, return BER if (bsl::find(encodingsSupported.cbegin(), encodingsSupported.cend(), bsl::string(EncodingFeature::k_ENCODING_BER)) != diff --git a/src/groups/mqb/mqba/mqba_adminsession.cpp b/src/groups/mqb/mqba/mqba_adminsession.cpp index 5dcf778dd4..7650721883 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.cpp +++ b/src/groups/mqb/mqba/mqba_adminsession.cpp @@ -420,11 +420,11 @@ void AdminSession::tearDown(const bsl::shared_ptr& session, void AdminSession::initiateShutdown(const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2) + bool supportShutdownV2) { // executed by the *ANY* thread (void)timeout; - (void)suppportShutdownV2; + (void)supportShutdownV2; dispatcher()->execute( bdlf::BindUtil::bind(&AdminSession::initiateShutdownDispatched, diff --git a/src/groups/mqb/mqba/mqba_adminsession.h b/src/groups/mqb/mqba/mqba_adminsession.h index 64b4331200..ca85b0b2be 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.h +++ b/src/groups/mqb/mqba/mqba_adminsession.h @@ -262,10 +262,13 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient { /// Initiate the shutdown of the session and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence or /// if the specified `timeout` is expired. + /// The optional (temporary) specified 'supportShutdownV2' indicates + /// shutdown V2 logic which is not applicable to `AdminSession` + /// implementation. void initiateShutdown(const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; + bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Make the session abandon any work it has. void invalidate() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index cdc35d9507..55ff3e51b6 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -463,9 +463,9 @@ void Application::stop() d_transportManager_mp->initiateShutdown(); BALL_LOG_INFO << "Stopped listening for new connections."; - bool suppportShutdownV2 = initiateShutdown(); + bool supportShutdownV2 = initiateShutdown(); - if (suppportShutdownV2) { + if (supportShutdownV2) { BALL_LOG_INFO << ": Executing GRACEFUL_SHUTDOWN_V2"; } else { @@ -484,7 +484,7 @@ void Application::stop() ++clusterIt, --count) { clusterIt.cluster()->initiateShutdown( bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch), - suppportShutdownV2); + supportShutdownV2); } latch.wait(); diff --git a/src/groups/mqb/mqba/mqba_application.h b/src/groups/mqb/mqba/mqba_application.h index 9a22b3e627..850af282b8 100644 --- a/src/groups/mqb/mqba/mqba_application.h +++ b/src/groups/mqb/mqba/mqba_application.h @@ -169,8 +169,12 @@ class Application { /// Pendant operation of the `oneTimeInit` one. void oneTimeShutdown(); - /// Attempt to execute shutdown logic v2. Return 'true' if all nodes and - /// proxies support it. + /// Attempt to execute graceful shutdown logic v2. + /// + /// If any node or proxy does not support the v2 graceful shutdown logic, + /// do not perform any shutdown actions and return `false`. Otherwise, + /// send v2 shutdown requests to all nodes, shutdown clients and proxies, + /// and return `true`. bool initiateShutdown(); private: diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index b0b76d9fa3..e630670d16 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -847,7 +847,7 @@ void ClientSession::onHandleConfiguredDispatched( void ClientSession::initiateShutdownDispatched( const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2) + bool supportShutdownV2) { // executed by the *CLIENT* dispatcher thread @@ -889,7 +889,7 @@ void ClientSession::initiateShutdownDispatched( return; // RETURN } - if (suppportShutdownV2) { + if (supportShutdownV2) { d_operationState = e_SHUTTING_DOWN_V2; d_queueSessionManager.shutDown(); @@ -2868,7 +2868,7 @@ void ClientSession::tearDown(const bsl::shared_ptr& session, void ClientSession::initiateShutdown(const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2) + bool supportShutdownV2) { // executed by the *ANY* thread @@ -2906,7 +2906,7 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback, d_self.acquire()), callback, timeout, - suppportShutdownV2), + supportShutdownV2), this, mqbi::DispatcherEventType::e_DISPATCHER); // Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling diff --git a/src/groups/mqb/mqba/mqba_clientsession.h b/src/groups/mqb/mqba/mqba_clientsession.h index 3cee9edeed..ecc803395b 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.h +++ b/src/groups/mqb/mqba/mqba_clientsession.h @@ -273,7 +273,8 @@ class ClientSession : public mqbnet::Session, enum OperationState { e_RUNNING // Running normally , - // TEMPORARY, remove 'after switching to StopRequest V2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest + // V2. e_SHUTTING_DOWN // Shutting down due to 'initiateShutdown' request , e_SHUTTING_DOWN_V2 // Shutting down due to 'initiateShutdown' request @@ -483,7 +484,7 @@ class ClientSession : public mqbnet::Session, /// if the specified `timeout` is expired. void initiateShutdownDispatched(const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2); + bool supportShutdownV2); void invalidateDispatched(); @@ -687,14 +688,14 @@ class ClientSession : public mqbnet::Session, /// Initiate the shutdown of the session and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence or /// if the specified `timeout` is expired. If the optional (temporary) - /// specified 'suppportShutdownV2' is 'true' execute shutdown logic V2 + /// specified 'supportShutdownV2' is 'true' execute shutdown logic V2 /// where upstream (not downstream) nodes deconfigure queues and the /// shutting down node (not downstream) waits for CONFIRMS. /// The shutdown is complete when 'tearDownAllQueuesDone'. void initiateShutdown(const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; + bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Make the session abandon any work it has. void invalidate() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index caf6c5fa8f..e7a143567f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -622,7 +622,7 @@ void Cluster::processCommandDispatched(mqbcmd::ClusterResult* result, } void Cluster::initiateShutdownDispatched(const VoidFunctor& callback, - bool suppportShutdownV2) + bool supportShutdownV2) { // executed by the *DISPATCHER* thread @@ -636,7 +636,7 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback, d_clusterData.membership().setSelfNodeStatus( bmqp_ctrlmsg::NodeStatus::E_STOPPING); - if (suppportShutdownV2) { + if (supportShutdownV2) { d_clusterOrchestrator.queueHelper().requestToStopPushing(); bsls::TimeInterval whenToStop( @@ -652,7 +652,8 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback, bdlf::PlaceHolders::_1)); // completionCb } else { - // Temporary, remove after switching all to version 2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest + // V2. // Send StopRequest to all nodes and proxies. The peers are expected // not to send any PUT msgs to this node after receiving StopRequest. // For each queue for which this node is the primary, peers (replicas @@ -671,7 +672,7 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback, SessionSpVec sessions; for (mqbnet::TransportManagerIterator sessIt( - &d_clusterData.transportManager()); + &d_clusterData.transportManager()); sessIt; ++sessIt) { bsl::shared_ptr sessionSp = @@ -694,16 +695,13 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback, } if (mqbnet::ClusterUtil::isClient(negoMsg)) { - // if (!d_suppportShutdownV2) { link.insert(bdlf::BindUtil::bind( &mqbnet::Session::initiateShutdown, sessionSp, bdlf::PlaceHolders::_1, // completion callback shutdownTimeout, false)); - // } - // else there is no need to de-confgiure queues and wait for - // unconfirmed since V2 upstreams do that on StopRequest V2 + continue; // CONTINUE } @@ -2696,7 +2694,7 @@ int Cluster::start(bsl::ostream& errorDescription) } void Cluster::initiateShutdown(const VoidFunctor& callback, - bool suppportShutdownV2) + bool supportShutdownV2) { // executed by *ANY* thread @@ -2709,7 +2707,7 @@ void Cluster::initiateShutdown(const VoidFunctor& callback, bdlf::BindUtil::bind(&Cluster::initiateShutdownDispatched, this, callback, - suppportShutdownV2), + supportShutdownV2), this); // Wait for above event to complete. This is needed because diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index cc0069ed07..5c51bffed8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -415,9 +415,11 @@ class Cluster : public mqbi::Cluster, /// Executed by dispatcher thread. void initiateShutdownDispatched(const VoidFunctor& callback, - bool suppportShutdownV2); + bool supportShutdownV2); + + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest + // V2. - // // Temporary, remove after switching all to version 2 /// Send stop request to proxies and nodes specified in `sessions` using /// the specified `stopCb` as a callback to be called once all the /// requests get responses. @@ -569,13 +571,13 @@ class Cluster : public mqbi::Cluster, /// Initiate the shutdown of the cluster. It is expected that `stop()` /// will be called soon after this routine is invoked. Invoke the /// specified `callback` upon completion of (asynchronous) shutdown - /// sequence. If the optional (temporary) specified 'suppportShutdownV2' + /// sequence. If the optional (temporary) specified 'supportShutdownV2' /// is 'true' execute shutdown logic V2 where upstream (not downstream) - /// nodes deconfigure queues and he shutting down node (not downstream) + /// nodes deconfigure queues and the shutting down node (not downstream) /// wait for CONFIRMS. void initiateShutdown(const VoidFunctor& callback, - bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; + bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Stop the `Cluster`. void stop() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h index dd0a3e8d5a..38d65e609d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h +++ b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h @@ -292,7 +292,7 @@ class ClusterCatalog { mqbnet::Session::AdminCommandEnqueueCb d_adminCb; // Callback function to enqueue admin commands - + RequestManagerType d_requestManager; // Request manager to use diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index ef9310b9b8..6ebb52e5ff 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -880,7 +880,7 @@ void ClusterOrchestrator::processStopRequest( << ", current status: " << ns->nodeStatus() << ", new status: " << bmqp_ctrlmsg::NodeStatus::E_STOPPING; - // Temporary, remove after switching all to version 2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2. if (stopRequest.version() == 1 && stopRequest.clusterName() != name) { BSLS_PERFORMANCEHINT_UNLIKELY_HINT; BALL_LOG_ERROR << d_clusterData_p->identity().description() diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 03d07d2394..bbe62bc748 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -158,7 +158,7 @@ void ClusterProxy::startDispatched() } void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback, - bool suppportShutdownV2) + bool supportShutdownV2) { // executed by the *DISPATCHER* thread @@ -171,7 +171,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback, // Mark self as stopping. d_isStopping = true; - if (suppportShutdownV2) { + if (supportShutdownV2) { d_queueHelper.requestToStopPushing(); bsls::TimeInterval whenToStop( @@ -187,7 +187,8 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback, bdlf::PlaceHolders::_1)); // completionCb } else { - // Temporary, remove after switching all to version 2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest + // V2. // Fill the first link with client session shutdown operations mwcu::OperationChainLink link(d_shutdownChain.allocator()); @@ -197,7 +198,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback, clusterProxyConfig()->queueOperations().shutdownTimeoutMs()); for (mqbnet::TransportManagerIterator sessIt( - &d_clusterData.transportManager()); + &d_clusterData.transportManager()); sessIt; ++sessIt) { bsl::shared_ptr sessionSp = @@ -1150,7 +1151,7 @@ int ClusterProxy::start(BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription) } void ClusterProxy::initiateShutdown(const VoidFunctor& callback, - bool suppportShutdownV2) + bool supportShutdownV2) { // executed by *ANY* thread @@ -1163,7 +1164,7 @@ void ClusterProxy::initiateShutdown(const VoidFunctor& callback, bdlf::BindUtil::bind(&ClusterProxy::initiateShutdownDispatched, this, callback, - suppportShutdownV2), + supportShutdownV2), this); dispatcher()->synchronize(this); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 8be070334a..2d81a926e7 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -266,7 +266,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// be called when the shutdown is completed. This routine is invoked /// in the cluster-dispatcher thread. void initiateShutdownDispatched(const VoidFunctor& callback, - bool suppportShutdownV2 = false); + bool supportShutdownV2 = false); /// Stop the `Cluster`. void stopDispatched(); @@ -401,7 +401,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver, void processResponseDispatched(const bmqp_ctrlmsg::ControlMessage& response); - // Temporary, remove after switching all to version 2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2. /// Send stop request to proxies specified in `sessions` using the /// specified `stopCb` as a callback to be called once all the requests /// get responses. @@ -459,13 +459,13 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Initiate the shutdown of the cluster and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence. It /// is expected that `stop()` will be called soon after this routine is - /// invoked. If the optional (temporary) specified 'suppportShutdownV2' is + /// invoked. If the optional (temporary) specified 'supportShutdownV2' is /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes - /// deconfigure queues and he shutting down node (not downstream) wait for - /// CONFIRMS. + /// deconfigure queues and the shutting down node (not downstream) wait + /// for CONFIRMS. void initiateShutdown(const VoidFunctor& callback, - bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; + bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Stop the `Cluster`. void stop() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 1ab13cafd5..d25d6ec680 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -2701,7 +2701,7 @@ void ClusterQueueHelper::configureQueueDispatched( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - if (d_suppportShutdownV2) { + if (d_supportShutdownV2) { BMQ_LOGTHROTTLE_INFO() << d_cluster_p->description() << ": Shutting down and skipping configure queue [: " << uri @@ -2865,7 +2865,7 @@ void ClusterQueueHelper::releaseQueueDispatched( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - if (d_suppportShutdownV2) { + if (d_supportShutdownV2) { BMQ_LOGTHROTTLE_INFO() << d_cluster_p->description() << ": Shutting down and skipping close queue [: " @@ -4532,7 +4532,7 @@ ClusterQueueHelper::ClusterQueueHelper( , d_numPendingReopenQueueRequests(0) , d_primaryNotLeaderAlarmRaised(false) , d_stopContexts(allocator) -, d_suppportShutdownV2(false) +, d_supportShutdownV2(false) { BSLS_ASSERT( d_clusterData_p->clusterConfig() @@ -5292,8 +5292,9 @@ void ClusterQueueHelper::requestToStopPushing() d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); // Assume Shutdown V2 - d_suppportShutdownV2 = true; + d_supportShutdownV2 = true; + // Prevent future queue operations from sending PUSHes. for (QueueContextMapIter it = d_queues.begin(); it != d_queues.end(); ++it) { QueueContextSp& queueContextSp = it->second; @@ -5337,7 +5338,9 @@ void ClusterQueueHelper::processNodeStoppingNotification( // operations. Once all functors complete, the 'finishStopSequence' // deleter sends back StopResponse. - // TEMPORARY, remove 'timeout; 'after switching to StopRequest V2 + // TODO(shutdown-v2): TEMPORARY, remove 'timeout' when all switch to + // StopRequest V2. + // No need to wait for CONFIRMs, the waiting is done by the shutting down // node. @@ -5378,15 +5381,16 @@ void ClusterQueueHelper::processNodeStoppingNotification( << clusterNode->nodeDescription() << " with timeout (ms) " << timeout; - // TEMPORARY, remove 'after switching to StopRequest V2 - bool suppportShutdownV2 = true; + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest + // V2. + bool supportShutdownV2 = true; if (request) { const bmqp_ctrlmsg::StopRequest& stopRequest = request->choice().clusterMessage().choice().stopRequest(); if (stopRequest.version() == 1) { - suppportShutdownV2 = false; + supportShutdownV2 = false; } else { BSLS_ASSERT_SAFE(stopRequest.version() == 2); @@ -5395,7 +5399,7 @@ void ClusterQueueHelper::processNodeStoppingNotification( // StopRequests have replaced E_STOPPING advisory. // In any case, do minimal (V2) work unless explicitly requested - if (suppportShutdownV2) { + if (supportShutdownV2) { if (ns) { // As an Upstream, deconfigure queues of the (shutting down) // ClusterNodeSession 'ns'. @@ -5502,7 +5506,8 @@ void ClusterQueueHelper::processNodeStoppingNotification( } } else { - // TEMPORARY, remove 'after switching to StopRequest V2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to + // StopRequest V2. // Downstreams do not need to deconfigure queues for which the // shutting down node is the upstream. The deconfiguring is done // by the upstream of the shutting down node instead. @@ -5601,7 +5606,7 @@ void ClusterQueueHelper::deconfigureQueues( const bsl::shared_ptr& contextSp, const bsl::vector* partitions) { - // TEMPORARY, remove after switching all to StopRequest V2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2. // executed by the cluster *DISPATCHER* thread @@ -5954,7 +5959,7 @@ void ClusterQueueHelper::checkUnconfirmedV2Dispatched( } bdlmt::EventScheduler::EventHandle eventHandle; // Never cancel the timer - d_clusterData_p->scheduler()->scheduleEvent( + d_clusterData_p->scheduler().scheduleEvent( &eventHandle, t, bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index af768bcf2d..bbc42d362a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -166,7 +166,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, // State of the upstream bdlmt::EventScheduler::EventHandle d_timer; - // TEMPORARY, remove 'after switching to StopRequest V2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest + // V2. // (timer handle 1s) when waiting for // unconfirmed. This is to cancel the timer in // the case when this broker stops while @@ -484,7 +485,9 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, StopContexts d_stopContexts; - bool d_suppportShutdownV2; + /// When `true`, all cluster nodes support StopRequest V2 and this node + /// executes shutdown V2 logic. + bool d_supportShutdownV2; private: // PRIVATE MANIPULATORS @@ -855,7 +858,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, void deconfigureQueue(const bsl::shared_ptr& contextSp, const QueueContextSp& queueContextSp); - // TEMPORARY, remove 'after switching to StopRequest V2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2. /// Second step of StopRequest / CLOSING node advisory processing /// (after de-configure response). Start timer to wait the configured /// `stopTimeoutMs` is there are any pending PUSH messages to collect @@ -873,7 +876,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, unsigned int subId, bsls::TimeInterval& t); - // TEMPORARY, remove 'after switching to StopRequest V2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2. void checkUnconfirmed(const bsl::shared_ptr& contextSp, const QueueContextSp& queueContextSp, unsigned int subId); diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.cpp b/src/groups/mqb/mqbblp/mqbblp_queue.cpp index 97a425b3cc..1834eeb352 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queue.cpp @@ -931,7 +931,7 @@ bsls::Types::Int64 Queue::countUnconfirmed(unsigned int subId) return d_state.handleCatalog().countUnconfirmed(); // RETURN } - // Temporary, remove after switching all to shutdown version 2 + // TODO(shutdown-v2): TEMPORARY, remove when all switch to StopRequest V2. struct local { static void sum(bsls::Types::Int64* sum, mqbi::QueueHandle* handle, diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h index 15785dc49e..20abac4520 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.h +++ b/src/groups/mqb/mqbblp/mqbblp_queue.h @@ -315,6 +315,8 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// and current upstream `genCount`, then the PUT message gets dropped /// to avoid out of order PUTs. If the `upstreamSubQueueId` is /// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen. + /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This + /// should be specified if the upstream is stopping. /// /// THREAD: This method is called from the Queue's dispatcher thread. void onOpenUpstream(bsls::Types::Uint64 genCount, diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index f22c3ceb2f..61617db157 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -1271,7 +1271,7 @@ void QueueEngineUtil_AppState::cancelThrottle() } } -void QueueEngineUtil_AppState::reset() +void QueueEngineUtil_AppState::undoRouting() { d_priorityCount = 0; cancelThrottle(); @@ -1288,7 +1288,7 @@ void QueueEngineUtil_AppState::rebuildConsumers( { // Rebuild ConsumersState for this app // Prepare the app for rebuilding consumers - reset(); + undoRouting(); bsl::shared_ptr previous = d_routing_sp; d_routing_sp = replacement; diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h index eb810de86a..2912b1494c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.h @@ -410,7 +410,7 @@ struct QueueEngineUtil_AppState { bool isExpired); /// Reset the internal state to have no consumers. - void reset(); + void undoRouting(); /// Deliver all messages in the storage to the consumer represented by /// this instance. Load the message delay into the specified `delay`. diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index a2fdd0b7a3..ff10791f18 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -810,7 +810,7 @@ void RelayQueueEngine::applyConfiguration(App_State& app, BSLS_ASSERT_SAFE(d_queueState_p->queue()->dispatcher()->inDispatcherThread( d_queueState_p->queue())); - app.reset(); + app.undoRouting(); app.d_routing_sp = context.d_routing_sp; @@ -911,10 +911,8 @@ int RelayQueueEngine::configure( void RelayQueueEngine::resetState(bool isShuttingDown) { - // d_self.reset(this); - for (AppsMap::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { - it->second->reset(); + it->second->undoRouting(); if (isShuttingDown) { it->second->d_routing_sp->reset(); } diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.h b/src/groups/mqb/mqbblp/mqbblp_remotequeue.h index ba6cb1cf47..7d0360c41b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.h +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.h @@ -458,6 +458,8 @@ class RemoteQueue { /// and current upstream `genCount`, then the PUT message gets dropped /// to avoid out of order PUTs. If the `upstreamSubQueueId` is /// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen. + /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This + /// should be specified if the upstream is stopping. /// /// THREAD: This method is called from the Queue's dispatcher thread. void onOpenUpstream(bsls::Types::Uint64 genCount, diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index a0877a6ba5..c7fc0146a8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -428,7 +428,7 @@ int RootQueueEngine::initializeAppId(const bsl::string& appId, void RootQueueEngine::resetState(bool isShuttingDown) { for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { - it->value()->reset(); + it->value()->undoRouting(); it->value()->d_routing_sp->reset(); } @@ -893,7 +893,7 @@ void RootQueueEngine::configureHandle( const AppStateSp& affectedApp = iter->value(); // prepare the App for rebuilding consumers - affectedApp->reset(); + affectedApp->undoRouting(); // Rebuild the highest priority state for all affected apps. diff --git a/src/groups/mqb/mqbblp/mqbblp_routers.h b/src/groups/mqb/mqbblp/mqbblp_routers.h index 67d3bc0897..056f361932 100644 --- a/src/groups/mqb/mqbblp/mqbblp_routers.h +++ b/src/groups/mqb/mqbblp/mqbblp_routers.h @@ -595,8 +595,6 @@ class Routers { bmqeval::EvaluationContext d_evaluationContext; - bool d_isShuttingDown; - bslma::Allocator* d_allocator_p; QueueRoutingContext(bmqp::SchemaLearner& schemaLearner, @@ -847,7 +845,6 @@ inline Routers::QueueRoutingContext::QueueRoutingContext( , d_preader(new(*allocator) MessagePropertiesReader(schemaLearner, allocator), allocator) , d_evaluationContext(0, allocator) -, d_isShuttingDown(false) , d_allocator_p(allocator) { d_evaluationContext.setPropertiesReader(d_preader.get()); diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 4659034532..0ab4d05c93 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -266,12 +266,12 @@ class Cluster : public DispatcherClient { /// Initiate the shutdown of the cluster and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence. It /// is expected that `stop()` will be called soon after this routine is - /// invoked. If the optional (temporary) specified 'suppportShutdownV2' is + /// invoked. If the optional (temporary) specified 'supportShutdownV2' is /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes - /// deconfigure queues and he shutting down node (not downstream) wait for - /// CONFIRMS. + /// deconfigure queues and the shutting down node (not downstream) wait + /// for CONFIRMS. virtual void initiateShutdown(const VoidFunctor& callback, - bool suppportShutdownV2 = false) = 0; + bool supportShutdownV2 = false) = 0; /// Stop the `Cluster`; this is the counterpart of the `start()` /// operation. diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h index 9476c2be7f..2ae5d34c44 100644 --- a/src/groups/mqb/mqbi/mqbi_queue.h +++ b/src/groups/mqb/mqbi/mqbi_queue.h @@ -856,6 +856,8 @@ class Queue : public DispatcherClient { /// and current upstream `genCount`, then the PUT message gets dropped /// to avoid out of order PUTs. If the `upstreamSubQueueId` is /// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen. + /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This + /// should be specified if the upstream is stopping. /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void onOpenUpstream(bsls::Types::Uint64 genCount, diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index aa1902813d..e537f459a2 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -323,7 +323,7 @@ int Cluster::start(BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription) void Cluster::initiateShutdown( BSLS_ANNOTATION_UNUSED const VoidFunctor& callback, - BSLS_ANNOTATION_UNUSED bool suppportShutdownV2) + BSLS_ANNOTATION_UNUSED bool supportShutdownV2) { // PRECONDITIONS BSLS_ASSERT_OPT(!d_isStarted && diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index 270c61eeae..bcdd63a345 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -302,13 +302,13 @@ class Cluster : public mqbi::Cluster { /// Initiate the shutdown of the cluster and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence. It /// is expected that `stop()` will be called soon after this routine is - /// invoked. If the optional (temporary) specified 'suppportShutdownV2' is + /// invoked. If the optional (temporary) specified 'supportShutdownV2' is /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes - /// deconfigure queues and he shutting down node (not downstream) wait for - /// CONFIRMS. + /// deconfigure queues and the shutting down node (not downstream) wait + /// for CONFIRMS. void initiateShutdown(const VoidFunctor& callback, - bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; + bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Stop the `Cluster`. void stop() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h index cc79eed542..d9b5a68695 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.h +++ b/src/groups/mqb/mqbmock/mqbmock_queue.h @@ -303,6 +303,8 @@ class Queue : public mqbi::Queue { /// and current upstream `genCount`, then the PUT message gets dropped /// to avoid out of order PUTs. If the `upstreamSubQueueId` is /// `k_ANY_SUBQUEUE_ID`, all SubQueues are reopen. + /// If the optionally specified isWriterOnly is true, ignore CONFIRMs. This + /// should be specified if the upstream is stopping. /// /// THREAD: This method is called from the Queue's dispatcher thread. void onOpenUpstream(bsls::Types::Uint64 genCount, diff --git a/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp b/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp index 4dcb2e3c82..340e64fc15 100644 --- a/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp @@ -59,7 +59,7 @@ void DummySession::tearDown( void DummySession::initiateShutdown( BSLS_ANNOTATION_UNUSED const ShutdownCb& callback, BSLS_ANNOTATION_UNUSED const bsls::TimeInterval& timeout, - BSLS_ANNOTATION_UNUSED bool suppportShutdownV2) + BSLS_ANNOTATION_UNUSED bool supportShutdownV2) { // NOTHING } diff --git a/src/groups/mqb/mqbnet/mqbnet_dummysession.h b/src/groups/mqb/mqbnet/mqbnet_dummysession.h index 3db79d8898..8c192c827a 100644 --- a/src/groups/mqb/mqbnet/mqbnet_dummysession.h +++ b/src/groups/mqb/mqbnet/mqbnet_dummysession.h @@ -136,13 +136,13 @@ class DummySession : public Session { /// Initiate the shutdown of the session and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence or /// if the specified `timeout` is expired. If the optional (temporary) - /// specified 'suppportShutdownV2' is 'true' execute shutdown logic V2 + /// specified 'supportShutdownV2' is 'true' execute shutdown logic V2 /// where upstream (not downstream) nodes deconfigure queues and the /// shutting down node (not downstream) waits for CONFIRMS. void initiateShutdown(const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; + bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Make the session abandon any work it has. void invalidate() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbnet/mqbnet_session.h b/src/groups/mqb/mqbnet/mqbnet_session.h index 04aad0d8df..a395bff47e 100644 --- a/src/groups/mqb/mqbnet/mqbnet_session.h +++ b/src/groups/mqb/mqbnet/mqbnet_session.h @@ -130,12 +130,12 @@ class Session : public SessionEventProcessor { /// Initiate the shutdown of the session and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence or /// if the specified `timeout` is expired. If the optional (temporary) - /// specified 'suppportShutdownV2' is 'true' execute shutdown logic V2 + /// specified 'supportShutdownV2' is 'true' execute shutdown logic V2 /// where upstream (not downstream) nodes deconfigure queues and the /// shutting down node (not downstream) waits for CONFIRMS. virtual void initiateShutdown(const ShutdownCb& callback, const bsls::TimeInterval& timeout, - bool suppportShutdownV2 = false) = 0; + bool supportShutdownV2 = false) = 0; /// Make the session abandon any work it has. virtual void invalidate() = 0; diff --git a/src/groups/mqb/mqbnet/mqbnet_session.t.cpp b/src/groups/mqb/mqbnet/mqbnet_session.t.cpp index 7775b21b99..bf19fe2a72 100644 --- a/src/groups/mqb/mqbnet/mqbnet_session.t.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_session.t.cpp @@ -69,10 +69,9 @@ struct SessionTestImp : bsls::ProtocolTestImp { markDone(); } - void - initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout, - bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE + void initiateShutdown(const ShutdownCb& callback, + const bsls::TimeInterval& timeout, + bool supportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE { markDone(); }