From 018c8c489328f7adf17eae3235e39470e165b92e Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Thu, 15 Aug 2024 09:29:20 -0400 Subject: [PATCH] Shutdown V2 Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd | 1 + src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp | 23 +- src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h | 45 ++- src/groups/bmq/bmqp/bmqp_protocol.cpp | 2 + src/groups/bmq/bmqp/bmqp_protocol.h | 2 + src/groups/bmq/bmqp/bmqp_requestmanager.h | 4 + src/groups/mqb/mqba/mqba_adminsession.cpp | 4 +- src/groups/mqb/mqba/mqba_adminsession.h | 3 +- src/groups/mqb/mqba/mqba_application.cpp | 128 ++++++- src/groups/mqb/mqba/mqba_application.h | 4 + src/groups/mqb/mqba/mqba_clientsession.cpp | 300 +++++++++++----- src/groups/mqb/mqba/mqba_clientsession.h | 42 ++- .../mqb/mqba/mqba_sessionnegotiator.cpp | 4 +- src/groups/mqb/mqbblp/mqbblp_cluster.cpp | 173 +++++---- src/groups/mqb/mqbblp/mqbblp_cluster.h | 16 +- .../mqb/mqbblp/mqbblp_clustercatalog.cpp | 10 +- src/groups/mqb/mqbblp/mqbblp_clustercatalog.h | 33 ++ .../mqb/mqbblp/mqbblp_clusterorchestrator.cpp | 20 +- src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp | 134 ++++--- src/groups/mqb/mqbblp/mqbblp_clusterproxy.h | 24 +- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 331 +++++++++++++++++- .../mqb/mqbblp/mqbblp_clusterqueuehelper.h | 25 +- src/groups/mqb/mqbblp/mqbblp_queue.cpp | 17 +- src/groups/mqb/mqbblp/mqbblp_queue.h | 7 +- .../mqb/mqbblp/mqbblp_queueengineutil.cpp | 4 +- src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp | 2 +- .../mqb/mqbblp/mqbblp_queuehandlecatalog.cpp | 22 +- .../mqb/mqbblp/mqbblp_queuehandlecatalog.h | 7 +- src/groups/mqb/mqbblp/mqbblp_queuestate.h | 4 +- .../mqb/mqbblp/mqbblp_relayqueueengine.cpp | 10 +- .../mqb/mqbblp/mqbblp_relayqueueengine.h | 6 +- src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp | 24 +- src/groups/mqb/mqbblp/mqbblp_remotequeue.h | 7 +- .../mqb/mqbblp/mqbblp_rootqueueengine.cpp | 12 +- .../mqb/mqbblp/mqbblp_rootqueueengine.h | 6 +- src/groups/mqb/mqbblp/mqbblp_routers.h | 12 +- .../mqb/mqbc/mqbc_clusterstateledgerutil.cpp | 2 +- src/groups/mqb/mqbi/mqbi_cluster.h | 8 +- src/groups/mqb/mqbi/mqbi_queue.h | 7 +- src/groups/mqb/mqbi/mqbi_queueengine.h | 6 +- src/groups/mqb/mqbmock/mqbmock_cluster.cpp | 3 +- src/groups/mqb/mqbmock/mqbmock_cluster.h | 15 +- src/groups/mqb/mqbmock/mqbmock_queue.cpp | 8 +- src/groups/mqb/mqbmock/mqbmock_queue.h | 7 +- .../mqb/mqbmock/mqbmock_queueengine.cpp | 2 +- src/groups/mqb/mqbmock/mqbmock_queueengine.h | 6 +- src/groups/mqb/mqbnet/mqbnet_dummysession.cpp | 3 +- src/groups/mqb/mqbnet/mqbnet_dummysession.h | 8 +- .../mqb/mqbnet/mqbnet_multirequestmanager.h | 9 + src/groups/mqb/mqbnet/mqbnet_session.h | 8 +- src/groups/mqb/mqbnet/mqbnet_session.t.cpp | 3 +- .../test_graceful_shutdown.py | 88 +---- src/integration-tests/test_maxunconfirmed.py | 2 +- 53 files changed, 1264 insertions(+), 389 deletions(-) diff --git a/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd b/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd index c15c2ed120..7f4870891f 100644 --- a/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd +++ b/src/groups/bmq/bmqp/bmqp_ctrlmsg.xsd @@ -1404,6 +1404,7 @@ + diff --git a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp index c9d9a70c0b..fa09520746 100644 --- a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp +++ b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.cpp @@ -4017,19 +4017,26 @@ const char* StatusCategory::toString(StatusCategory::Value value) const char StopRequest::CLASS_NAME[] = "StopRequest"; +const int StopRequest::DEFAULT_INITIALIZER_VERSION = 1; + const bdlat_AttributeInfo StopRequest::ATTRIBUTE_INFO_ARRAY[] = { {ATTRIBUTE_ID_CLUSTER_NAME, "clusterName", sizeof("clusterName") - 1, "", - bdlat_FormattingMode::e_TEXT}}; + bdlat_FormattingMode::e_TEXT}, + {ATTRIBUTE_ID_VERSION, + "version", + sizeof("version") - 1, + "", + bdlat_FormattingMode::e_DEC}}; // CLASS METHODS const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(const char* name, int nameLength) { - for (int i = 0; i < 1; ++i) { + for (int i = 0; i < 2; ++i) { const bdlat_AttributeInfo& attributeInfo = StopRequest::ATTRIBUTE_INFO_ARRAY[i]; @@ -4047,6 +4054,8 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id) switch (id) { case ATTRIBUTE_ID_CLUSTER_NAME: return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]; + case ATTRIBUTE_ID_VERSION: + return &ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]; default: return 0; } } @@ -4055,25 +4064,29 @@ const bdlat_AttributeInfo* StopRequest::lookupAttributeInfo(int id) StopRequest::StopRequest(bslma::Allocator* basicAllocator) : d_clusterName(basicAllocator) +, d_version(DEFAULT_INITIALIZER_VERSION) { } StopRequest::StopRequest(const StopRequest& original, bslma::Allocator* basicAllocator) : d_clusterName(original.d_clusterName, basicAllocator) +, d_version(original.d_version) { } #if defined(BSLS_COMPILERFEATURES_SUPPORT_RVALUE_REFERENCES) && \ defined(BSLS_COMPILERFEATURES_SUPPORT_NOEXCEPT) StopRequest::StopRequest(StopRequest&& original) noexcept -: d_clusterName(bsl::move(original.d_clusterName)) +: d_clusterName(bsl::move(original.d_clusterName)), + d_version(bsl::move(original.d_version)) { } StopRequest::StopRequest(StopRequest&& original, bslma::Allocator* basicAllocator) : d_clusterName(bsl::move(original.d_clusterName), basicAllocator) +, d_version(bsl::move(original.d_version)) { } #endif @@ -4088,6 +4101,7 @@ StopRequest& StopRequest::operator=(const StopRequest& rhs) { if (this != &rhs) { d_clusterName = rhs.d_clusterName; + d_version = rhs.d_version; } return *this; @@ -4099,6 +4113,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs) { if (this != &rhs) { d_clusterName = bsl::move(rhs.d_clusterName); + d_version = bsl::move(rhs.d_version); } return *this; @@ -4108,6 +4123,7 @@ StopRequest& StopRequest::operator=(StopRequest&& rhs) void StopRequest::reset() { bdlat_ValueTypeFunctions::reset(&d_clusterName); + d_version = DEFAULT_INITIALIZER_VERSION; } // ACCESSORS @@ -4118,6 +4134,7 @@ StopRequest::print(bsl::ostream& stream, int level, int spacesPerLevel) const bslim::Printer printer(&stream, level, spacesPerLevel); printer.start(); printer.printAttribute("clusterName", this->clusterName()); + printer.printAttribute("version", this->version()); printer.end(); return stream; } diff --git a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h index 012e7b6231..e3e91e6f25 100644 --- a/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h +++ b/src/groups/bmq/bmqp/bmqp_ctrlmsg_messages.h @@ -7429,18 +7429,21 @@ namespace bmqp_ctrlmsg { class StopRequest { // INSTANCE DATA bsl::string d_clusterName; + int d_version; public: // TYPES - enum { ATTRIBUTE_ID_CLUSTER_NAME = 0 }; + enum { ATTRIBUTE_ID_CLUSTER_NAME = 0, ATTRIBUTE_ID_VERSION = 1 }; - enum { NUM_ATTRIBUTES = 1 }; + enum { NUM_ATTRIBUTES = 2 }; - enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0 }; + enum { ATTRIBUTE_INDEX_CLUSTER_NAME = 0, ATTRIBUTE_INDEX_VERSION = 1 }; // CONSTANTS static const char CLASS_NAME[]; + static const int DEFAULT_INITIALIZER_VERSION; + static const bdlat_AttributeInfo ATTRIBUTE_INFO_ARRAY[]; public: @@ -7540,6 +7543,10 @@ class StopRequest { /// object. bsl::string& clusterName(); + /// Return a reference to the non-modifiable "Version" attribute of this + /// object. + int& version(); + // ACCESSORS /// Format this object to the specified output `stream` at the @@ -7587,6 +7594,10 @@ class StopRequest { /// Return a reference to the non-modifiable "ClusterName" attribute of /// this object. const bsl::string& clusterName() const; + + /// Return a reference to the non-modifiable "Version" attribute of this + /// object. + int version() const; }; // FREE OPERATORS @@ -27150,6 +27161,12 @@ int StopRequest::manipulateAttributes(MANIPULATOR& manipulator) return ret; } + ret = manipulator(&d_version, + ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]); + if (ret) { + return ret; + } + return ret; } @@ -27163,6 +27180,10 @@ int StopRequest::manipulateAttribute(MANIPULATOR& manipulator, int id) return manipulator(&d_clusterName, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]); } + case ATTRIBUTE_ID_VERSION: { + return manipulator(&d_version, + ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]); + } default: return NOT_FOUND; } } @@ -27188,6 +27209,11 @@ inline bsl::string& StopRequest::clusterName() return d_clusterName; } +inline int& StopRequest::version() +{ + return d_version; +} + // ACCESSORS template int StopRequest::accessAttributes(ACCESSOR& accessor) const @@ -27200,6 +27226,10 @@ int StopRequest::accessAttributes(ACCESSOR& accessor) const return ret; } + ret = accessor(d_version, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]); + if (ret) { + return ret; + } return ret; } @@ -27213,6 +27243,10 @@ int StopRequest::accessAttribute(ACCESSOR& accessor, int id) const return accessor(d_clusterName, ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_CLUSTER_NAME]); } + case ATTRIBUTE_ID_VERSION: { + return accessor(d_version, + ATTRIBUTE_INFO_ARRAY[ATTRIBUTE_INDEX_VERSION]); + } default: return NOT_FOUND; } } @@ -27238,6 +27272,11 @@ inline const bsl::string& StopRequest::clusterName() const return d_clusterName; } +inline int StopRequest::version() const +{ + return d_version; +} + template void hashAppend(HASH_ALGORITHM& hashAlg, const bmqp_ctrlmsg::StopRequest& object) diff --git a/src/groups/bmq/bmqp/bmqp_protocol.cpp b/src/groups/bmq/bmqp/bmqp_protocol.cpp index 1aff4bf4c9..a1b521e514 100644 --- a/src/groups/bmq/bmqp/bmqp_protocol.cpp +++ b/src/groups/bmq/bmqp/bmqp_protocol.cpp @@ -264,6 +264,8 @@ const char HighAvailabilityFeatures::k_BROADCAST_TO_PROXIES[] = "BROADCAST_TO_PROXIES"; const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN[] = "GRACEFUL_SHUTDOWN"; +const char HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2[] = + "GRACEFUL_SHUTDOWN_V2"; // -------------------------------- // struct MessagePropertiesFeatures diff --git a/src/groups/bmq/bmqp/bmqp_protocol.h b/src/groups/bmq/bmqp/bmqp_protocol.h index 7fa6f8faf8..3424ddbc74 100644 --- a/src/groups/bmq/bmqp/bmqp_protocol.h +++ b/src/groups/bmq/bmqp/bmqp_protocol.h @@ -633,6 +633,8 @@ struct HighAvailabilityFeatures { static const char k_BROADCAST_TO_PROXIES[]; static const char k_GRACEFUL_SHUTDOWN[]; + + static const char k_GRACEFUL_SHUTDOWN_V2[]; }; /// This struct defines feature names related to MessageProperties diff --git a/src/groups/bmq/bmqp/bmqp_requestmanager.h b/src/groups/bmq/bmqp/bmqp_requestmanager.h index 38d89b5b10..14bd5ae2a6 100644 --- a/src/groups/bmq/bmqp/bmqp_requestmanager.h +++ b/src/groups/bmq/bmqp/bmqp_requestmanager.h @@ -1091,6 +1091,10 @@ void RequestManager::onRequestTimeout(int requestId) // Explicitly invalidate the timeout since we processed it request->d_timeoutSchedulerHandle.release(); + + if (!d_lateResponseMode) { + d_requests.erase(it); + } } // close guard scope BALL_LOG_ERROR << "Request with '" << request->nodeDescription() diff --git a/src/groups/mqb/mqba/mqba_adminsession.cpp b/src/groups/mqb/mqba/mqba_adminsession.cpp index 8d8e478a95..5092e2e1e8 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.cpp +++ b/src/groups/mqb/mqba/mqba_adminsession.cpp @@ -418,10 +418,12 @@ void AdminSession::tearDown(const bsl::shared_ptr& session, } void AdminSession::initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout) + const bsls::TimeInterval& timeout, + bool suppportShutdownV2) { // executed by the *ANY* thread (void)timeout; + (void)suppportShutdownV2; dispatcher()->execute( bdlf::BindUtil::bind(&AdminSession::initiateShutdownDispatched, diff --git a/src/groups/mqb/mqba/mqba_adminsession.h b/src/groups/mqb/mqba/mqba_adminsession.h index 681065bcd3..64b4331200 100644 --- a/src/groups/mqb/mqba/mqba_adminsession.h +++ b/src/groups/mqb/mqba/mqba_adminsession.h @@ -264,7 +264,8 @@ class AdminSession : public mqbnet::Session, public mqbi::DispatcherClient { /// if the specified `timeout` is expired. void initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE; + const bsls::TimeInterval& timeout, + bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Make the session abandon any work it has. void invalidate() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index d04670c51f..8faf0e8fcb 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -47,6 +47,7 @@ #include #include #include +#include // BDE #include @@ -76,6 +77,7 @@ namespace mqba { namespace { const int k_BLOBBUFFER_SIZE = 4 * 1024; const int k_BLOB_POOL_GROWTH_STRATEGY = 1024; +const bsls::Types::Int64 k_STOP_REQUEST_TIMEOUT_MS = 5000; /// Create a new blob at the specified `arena` address, using the specified /// `bufferFactory` and `allocator`. @@ -440,6 +442,16 @@ void Application::stop() d_transportManager_mp->initiateShutdown(); BALL_LOG_INFO << "Stopped listening for new connections."; + bool suppportShutdownV2 = initiateShutdown(); + + if (suppportShutdownV2) { + BALL_LOG_INFO << ": Executing GRACEFUL_SHUTDOWN_V2"; + } + else { + BALL_LOG_INFO << ": Peers do not support " + << "GRACEFUL_SHUTDOWN_V2. Retreat to V1"; + } + // For each cluster in cluster catalog, inform peers about this shutdown. int count = d_clusterCatalog_mp->count(); bslmt::Latch latch(count); @@ -450,7 +462,8 @@ void Application::stop() count > 0; ++clusterIt, --count) { clusterIt.cluster()->initiateShutdown( - bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch)); + bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch), + suppportShutdownV2); } latch.wait(); @@ -467,19 +480,15 @@ void Application::stop() // STOP everything. - // Note that we must do an out of order stop/destroy with respect to the - // 'DomainManager' and 'ClusterCatalog' because it appears that the domain - // manager contains objects that are held and owned by 'ClusterCatalog', so - // it is believed that the relationship is not properly done. - // // Note that clusterCatalog must be stopped before transport manager // because transportManager.stop() blocks until all sessions have been // destroyed, and above code proactively closes only the clientOrProxy // sessions; clusterNode ones are being destroyed by the clusterCatalog // calling stop on each cluster. - STOP_OBJ(d_domainManager_mp, "DomainManager"); + STOP_OBJ(d_clusterCatalog_mp, "ClusterCatalog"); STOP_OBJ(d_transportManager_mp, "TransportManager"); + STOP_OBJ(d_domainManager_mp, "DomainManager"); STOP_OBJ(d_dispatcher_mp, "Dispatcher"); STOP_OBJ(d_configProvider_mp, "ConfigProvider"); STOP_OBJ(d_statController_mp, "StatController"); @@ -500,6 +509,111 @@ void Application::stop() #undef STOP_OBJ } +bool Application::initiateShutdown() +{ + typedef bsl::vector > Sessions; + + // Send a StopRequest to all connected cluster nodes and brokers + Sessions brokers(d_allocator_p); + Sessions clients(d_allocator_p); + + for (mqbnet::TransportManagerIterator sessIt(d_transportManager_mp.get()); + sessIt; + ++sessIt) { + bsl::shared_ptr sessionSp = sessIt.session().lock(); + if (!sessionSp) { + continue; // CONTINUE + } + + const bmqp_ctrlmsg::NegotiationMessage& negoMsg = + sessionSp->negotiationMessage(); + + const bmqp_ctrlmsg::ClientIdentity& peerIdentity = + negoMsg.isClientIdentityValue() + ? negoMsg.clientIdentity() + : negoMsg.brokerResponse().brokerIdentity(); + + bool isBroker = false; + if (mqbnet::ClusterUtil::isClientOrProxy(negoMsg)) { + clients.push_back(sessionSp); + if (!negoMsg.clientIdentity().clusterName().empty()) { + isBroker = true; + } + } + else { + isBroker = true; + } + if (isBroker) { + // Node or Proxy + // Expect all proxies and nodes support this feature. + if (!bmqp::ProtocolUtil::hasFeature( + bmqp::HighAvailabilityFeatures::k_FIELD_NAME, + bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN, + peerIdentity.features())) { + BALL_LOG_ERROR << ": Peer doesn't support " + << "GRACEFUL_SHUTDOWN. Skip sending stopRequest" + << " to [" << peerIdentity << "]"; + continue; // CONTINUE + } + if (!bmqp::ProtocolUtil::hasFeature( + bmqp::HighAvailabilityFeatures::k_FIELD_NAME, + bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2, + peerIdentity.features())) { + // Abandon the attempt to shutdown V2 + return false; // RETURN + } + brokers.push_back(sessionSp); + } + } + + bslmt::Latch latch(clients.size() + 1); + // The 'StopRequestManagerType::sendRequest' always calls 'd_responseCb'. + + mqbblp::ClusterCatalog::StopRequestManagerType::RequestContextSp + contextSp = + d_clusterCatalog_mp->stopRequestManger().createRequestContext(); + + bmqp_ctrlmsg::StopRequest& request = contextSp->request() + .choice() + .makeClusterMessage() + .choice() + .makeStopRequest(); + + request.version() = 2; + + bsls::TimeInterval shutdownTimeout; + + shutdownTimeout.setTotalMilliseconds(k_STOP_REQUEST_TIMEOUT_MS); + + contextSp->setDestinationNodes(brokers); + + contextSp->setResponseCb( + bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch)); + + BALL_LOG_INFO << "Sending StopRequest V2 to " << brokers.size() + << " brokers; timeout is " << shutdownTimeout << " ms"; + + d_clusterCatalog_mp->stopRequestManger().sendRequest(contextSp, + shutdownTimeout); + + BALL_LOG_INFO << "Shutting down " << clients.size() + << " clients; timeout is " << shutdownTimeout << " ms"; + + for (Sessions::const_iterator cit = clients.begin(); cit != clients.end(); + ++cit) { + (*cit)->initiateShutdown(bdlf::BindUtil::bind(&bslmt::Latch::arrive, + &latch), + shutdownTimeout, + true); + } + + // Need to wait for peers to update this node status to guarantee no new + // clusters. + latch.wait(); + + return true; +} + int Application::processCommand(const bslstl::StringRef& source, const bsl::string& cmd, bsl::ostream& os) diff --git a/src/groups/mqb/mqba/mqba_application.h b/src/groups/mqb/mqba/mqba_application.h index f118ab29e8..03028cf2c2 100644 --- a/src/groups/mqb/mqba/mqba_application.h +++ b/src/groups/mqb/mqba/mqba_application.h @@ -158,6 +158,10 @@ class Application { /// Pendant operation of the `oneTimeInit` one. void oneTimeShutdown(); + /// Attempt to execute shutdown logic v2. Return 'true' if all nodes and + /// proxies support it. + bool initiateShutdown(); + private: // NOT IMPLEMENTED Application(const Application& other) BSLS_CPP11_DELETED; diff --git a/src/groups/mqb/mqba/mqba_clientsession.cpp b/src/groups/mqb/mqba/mqba_clientsession.cpp index 5455411af8..b65ad04036 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.cpp +++ b/src/groups/mqb/mqba/mqba_clientsession.cpp @@ -370,10 +370,21 @@ void ClientSession::sendErrorResponse( << " failure response: " << response; // Send the response - sendPacket(d_state.d_schemaEventBuilder.blob(), true); + sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true); } void ClientSession::sendPacket(const bdlbb::Blob& blob, bool flushBuilders) +{ + dispatcher()->execute( + bdlf::BindUtil::bind(&ClientSession::sendPacketDispatched, + this, + blob, + flushBuilders), + this); +} + +void ClientSession::sendPacketDispatched(const bdlbb::Blob& blob, + bool flushBuilders) { // executed by the *CLIENT* dispatcher thread @@ -620,12 +631,6 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore, return; // RETURN } - // Set up the 'd_operationState' to indicate that the channel is dying and - // we should not use it anymore trying to send any messages and should also - // stop enqueuing 'callbacks' to the client dispatcher thread ... - const bool doDeconfigure = d_operationState == e_RUNNING; - d_operationState = e_DEAD; - // If stop request handling is in progress cancel checking for the // unconfirmed messages. if (d_periodicUnconfirmedCheckHandler) { @@ -656,37 +661,32 @@ void ClientSession::tearDownImpl(bslmt::Semaphore* semaphore, // closeQueue request is in process in the queue thread, so the handle is // still active at the time tearDown is processed). - int numHandlesDropped = 0; - for (QueueStateMapIter it = d_queueSessionManager.queues().begin(); - it != d_queueSessionManager.queues().end(); - ++it) { - if (!it->second.d_hasReceivedFinalCloseQueue) { - BSLS_ASSERT_SAFE(it->second.d_handle_p->queue()); + const bool hasLostTheClient = (!isBrokerShutdown && !isProxy()); - if (isBrokerShutdown || - d_clientIdentity_p->clientType() == - bmqp_ctrlmsg::ClientType::E_TCPBROKER) { - it->second.d_handle_p->clearClient(false); - } - else { - it->second.d_handle_p->clearClient(true); - } + // if (d_operationState == e_SHUTTING_DOWN_V2) { + // // Leave over queues and handles + // } + // else { + // Set up the 'd_operationState' to indicate that the channel is dying and + // we should not use it anymore trying to send any messages and should also + // stop enqueuing 'callbacks' to the client dispatcher thread ... + const bool doDeconfigure = d_operationState == e_RUNNING; - it->second.d_handle_p->drop(doDeconfigure); - it->second.d_handle_p = 0; - it->second.d_hasReceivedFinalCloseQueue = true; - ++numHandlesDropped; - } - } + int numHandlesDropped = dropAllQueueHandles(doDeconfigure, + hasLostTheClient); + BALL_LOG_INFO << description() << ": Dropped " << numHandlesDropped + << " queue handles."; + // } + // Set up the 'd_operationState' to indicate that the channel is dying and + // we should not use it anymore trying to send any messages and should also + // stop enqueuing 'callbacks' to the client dispatcher thread ... + d_operationState = e_DEAD; // Invalidating 'd_queueSessionManager' _after_ calling 'clearClient', // otherwise handle can get destructed because of // 'QueueSessionManager::onHandleReleased' early exit. d_queueSessionManager.tearDown(); - BALL_LOG_INFO << description() << ": Dropped " << numHandlesDropped - << " queue handles."; - // Now that we enqueued a 'drop' for all applicable queue handles, we need // to synchronize on all queues, to make sure this drop event has been // processed. We do so by enqueuing an event to all queues dispatchers @@ -827,12 +827,13 @@ void ClientSession::onHandleConfiguredDispatched( << " for request: " << streamParamsCtrlMsg; // Send the response - sendPacket(d_state.d_schemaEventBuilder.blob(), true); + sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true); } void ClientSession::initiateShutdownDispatched( const ShutdownCb& callback, - const bsls::TimeInterval& timeout) + const bsls::TimeInterval& timeout, + bool suppportShutdownV2) { // executed by the *CLIENT* dispatcher thread @@ -848,15 +849,22 @@ void ClientSession::initiateShutdownDispatched( callback(); return; // RETURN } + if (d_operationState == e_SHUTTING_DOWN) { // More than one cluster calls 'initiateShutdown'? callback(); return; // RETURN } + if (d_operationState == e_SHUTTING_DOWN_V2) { + // More than one cluster calls 'initiateShutdown'? + callback(); + return; // RETURN + } + flush(); // Flush any pending messages - // Wait for tearDown. + // 'tearDown' should invoke the 'callback' d_shutdownCallback = callback; if (d_operationState == e_DISCONNECTING) { @@ -867,8 +875,36 @@ void ClientSession::initiateShutdownDispatched( return; // RETURN } - // Wait for unconfirmed messages. - // Wait for tearDown. + if (suppportShutdownV2) { + d_operationState = e_SHUTTING_DOWN_V2; + d_queueSessionManager.shutDown(); + + callback(); + } + else { + // After de-configuring (below), wait for unconfirmed messages. + // Once the wait for unconfirmed is over, close the channel + + ShutdownContextSp context; + context.createInplace( + d_state.d_allocator_p, + bdlf::BindUtil::bind(&ClientSession::closeChannel, this), + timeout); + + deconfigureAndWait(context); + } +} + +void ClientSession::deconfigureAndWait(ShutdownContextSp& context) +{ + // executed by the *CLIENT* dispatcher thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + BSLS_ASSERT_SAFE(context); + + // Use the same 'e_SHUTTING_DOWN' state for both shutting down and + // StopRequest processing. d_operationState = e_SHUTTING_DOWN; @@ -888,13 +924,6 @@ void ClientSession::initiateShutdownDispatched( // No-op if the link is empty d_shutdownChain.append(&link); - // Check unconfirmed messages once all the handles are deconfigured - ShutdownContextSp context; - context.createInplace(d_state.d_allocator_p, - bdlf::BindUtil::bind(&ClientSession::closeChannel, - this), - timeout); - d_shutdownChain.appendInplace( bdlf::BindUtil::bind(&ClientSession::checkUnconfirmed, this, @@ -1137,18 +1166,8 @@ void ClientSession::processDisconnectAllQueues( // us back to notify the handle can be deleted and we also don't need to // send a close queue response since this release is not initiated by the // client, but by the broker upon client's disconnect request. - int numHandlesDropped = 0; - for (QueueStateMapIter it = d_queueSessionManager.queues().begin(); - it != d_queueSessionManager.queues().end(); - ++it) { - if (!it->second.d_hasReceivedFinalCloseQueue) { - it->second.d_handle_p->clearClient(false); - it->second.d_handle_p->drop(doDeconfigure); - it->second.d_handle_p = 0; - it->second.d_hasReceivedFinalCloseQueue = true; - ++numHandlesDropped; - } - } + + int numHandlesDropped = dropAllQueueHandles(doDeconfigure, false); BALL_LOG_INFO << description() << ": processing disconnect, dropped " << numHandlesDropped << " queue handles."; @@ -1230,7 +1249,7 @@ void ClientSession::processDisconnect( << ": Sending disconnect response: " << response; // Send the response - sendPacket(d_state.d_schemaEventBuilder.blob(), true); + sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true); // Setting the 'd_operationState' to 'e_DISCONNECTED' implies that no // messages will be pushed to the client after this one: we set it now @@ -1317,7 +1336,7 @@ void ClientSession::openQueueCb( << " for request: " << handleParamsCtrlMsg; // Send the response - sendPacket(d_state.d_schemaEventBuilder.blob(), true); + sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true); } void ClientSession::processCloseQueue( @@ -1374,7 +1393,7 @@ void ClientSession::closeQueueCb( << ": Sending closeQueue response: " << response; // Send the response. - sendPacket(d_state.d_schemaEventBuilder.blob(), true); + sendPacketDispatched(d_state.d_schemaEventBuilder.blob(), true); // Release the handle's ptr in the queue's context to guarantee that the // handle will be destroyed after all ongoing queue events are handled. @@ -2689,34 +2708,11 @@ void ClientSession::processEvent( return; // RETURN } case MsgChoice::SELECTION_ID_CLUSTER_MESSAGE: { - if (d_clientIdentity_p->clientType() == - bmqp_ctrlmsg::ClientType::E_TCPBROKER && - choice.clusterMessage().choice().isStopResponseValue()) { - bsl::shared_ptr cluster; - - if (d_clusterCatalog_p->findCluster(&cluster, - choice.clusterMessage() - .choice() - .stopResponse() - .clusterName())) { - BSLS_ASSERT_SAFE(cluster); - cluster->processResponse(controlMessage); - return; // RETURN - } - else { - BALL_LOG_ERROR << "#CLIENT_IMPROPER_BEHAVIOR " - << description() - << ": unknown Cluster in ClusterMessage: " - << controlMessage; - } - } - else { - BALL_LOG_ERROR - << "#CLIENT_IMPROPER_BEHAVIOR " << description() - << ": unexpected ClusterMessage: " << controlMessage; - } - return; // RETURN - } + eventCallback = bdlf::BindUtil::bind( + &ClientSession::processClusterMessage, + this, + controlMessage); + } break; case MsgChoice::SELECTION_ID_UNDEFINED: case MsgChoice::SELECTION_ID_STATUS: default: { @@ -2811,7 +2807,8 @@ void ClientSession::tearDown(const bsl::shared_ptr& session, } void ClientSession::initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout) + const bsls::TimeInterval& timeout, + bool suppportShutdownV2) { // executed by the *ANY* thread @@ -2846,7 +2843,8 @@ void ClientSession::initiateShutdown(const ShutdownCb& callback, &ClientSession::initiateShutdownDispatched, d_self.acquire()), callback, - timeout), + timeout, + suppportShutdownV2), this, mqbi::DispatcherEventType::e_DISPATCHER); // Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling @@ -3012,7 +3010,7 @@ void ClientSession::flush() BALL_LOG_TRACE << description() << ": Flushing " << d_state.d_pushBuilder.messageCount() << " PUSH messages"; - sendPacket(d_state.d_pushBuilder.blob(), false); + sendPacketDispatched(d_state.d_pushBuilder.blob(), false); d_state.d_pushBuilder.reset(); } @@ -3021,10 +3019,134 @@ void ClientSession::flush() BALL_LOG_TRACE << description() << ": Flushing " << d_state.d_ackBuilder.messageCount() << " ACK messages"; - sendPacket(d_state.d_ackBuilder.blob(), false); + sendPacketDispatched(d_state.d_ackBuilder.blob(), false); d_state.d_ackBuilder.reset(); } } +void ClientSession::processClusterMessage( + const bmqp_ctrlmsg::ControlMessage& message) +{ + if (message.choice().clusterMessage().choice().isStopResponseValue()) { + BALL_LOG_INFO << description() << ": processStopResponse: " << message; + d_clusterCatalog_p->stopRequestManger().processResponse(message); + } + else if (message.choice().clusterMessage().choice().isStopRequestValue()) { + // This is StopRequest from Proxy + // Assume StopRequest V2 + + const bmqp_ctrlmsg::StopRequest& request = + message.choice().clusterMessage().choice().stopRequest(); + + BSLS_ASSERT_SAFE(request.version() == 2); + + // Deconfigure all queues. Do NOT wait for unconfirmed + + BALL_LOG_INFO << description() << ": processing StopRequest."; + + bmqp_ctrlmsg::ControlMessage response; + + response.choice().makeClusterMessage().choice().makeStopResponse(); + response.rId() = message.rId(); + + d_state.d_schemaEventBuilder.reset(); + int rc = d_state.d_schemaEventBuilder.setMessage( + response, + bmqp::EventType::e_CONTROL); + + if (rc != 0) { + BALL_LOG_ERROR + << "#CLIENT_SEND_FAILURE " << description() + << ": Encoding StopRequest response has failed, [rc: " << rc + << "]: " << response; + + return; // RETURN + } + + ShutdownContextSp context; + context.createInplace( + d_state.d_allocator_p, + bdlf::BindUtil::bind(&ClientSession::sendPacket, + this, + d_state.d_schemaEventBuilder.blob(), + false)); + + processStopRequest(context); + } + else { + BALL_LOG_ERROR << "#CLIENT_IMPROPER_BEHAVIOR " << description() + << ": unknown Cluster in StopResponse: " << message; + } +} + +void ClientSession::onDeconfiguredHandle(const ShutdownContextSp& contextSp) +{ + (void)contextSp; +} + +void ClientSession::processStopRequest(ShutdownContextSp& contextSp) +{ + // This StopRequest arrives from a downstream (otherwise, ClusterProxy + // would receive it). As an upstream, this node needs to deconfigure all + // queues and then respond with StopResponse. + + // Use the same logic as in the 'initiateShutdown' except that the final + // step is sending StopResponse instead of 'closeChannel' + // executed by the *CLIENT* dispatcher thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + + if (d_operationState == e_DEAD) { + // The client is disconnected. No-op + return; // RETURN + } + if (d_operationState == e_SHUTTING_DOWN) { + // The broker is already shutting down or processing a StopRequest + // The de-configuring is done. + // Even if the waiting is in progress, still reply with StopResponse + return; // RETURN + } + if (d_operationState == e_SHUTTING_DOWN_V2) { + // The broker is already shutting down or processing a StopRequest + // The de-configuring is done. + // Even if the waiting is in progress, still reply with StopResponse + return; // RETURN + } + + if (d_operationState == e_DISCONNECTING) { + // The client is disconnecting. No-op + return; // RETURN + } + for (QueueStateMapCIter cit = d_queueSessionManager.queues().begin(); + cit != d_queueSessionManager.queues().end(); + ++cit) { + if (!cit->second.d_hasReceivedFinalCloseQueue) { + cit->second.d_handle_p->deconfigureAll( + bdlf::BindUtil::bind(&ClientSession::onDeconfiguredHandle, + this, + contextSp)); + } + } +} + +int ClientSession::dropAllQueueHandles(bool doDeconfigure, bool hasLostClient) +{ + int numHandlesDropped = 0; + for (QueueStateMapIter it = d_queueSessionManager.queues().begin(); + it != d_queueSessionManager.queues().end(); + ++it) { + if (!it->second.d_hasReceivedFinalCloseQueue) { + it->second.d_handle_p->clearClient(hasLostClient); + it->second.d_handle_p->drop(doDeconfigure); + it->second.d_handle_p = 0; + it->second.d_hasReceivedFinalCloseQueue = true; + ++numHandlesDropped; + } + } + + return numHandlesDropped; +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqba/mqba_clientsession.h b/src/groups/mqb/mqba/mqba_clientsession.h index bf61b839cc..d5c6b488a4 100644 --- a/src/groups/mqb/mqba/mqba_clientsession.h +++ b/src/groups/mqb/mqba/mqba_clientsession.h @@ -273,8 +273,11 @@ class ClientSession : public mqbnet::Session, enum OperationState { e_RUNNING // Running normally , + // TEMPORARY, remove 'after switching to StopRequest V2 e_SHUTTING_DOWN // Shutting down due to 'initiateShutdown' request , + e_SHUTTING_DOWN_V2 // Shutting down due to 'initiateShutdown' request + , e_DISCONNECTING // Disconnecting due to the client disconnect request , e_DISCONNECTED // The session is disconnected and no more valid @@ -295,6 +298,8 @@ class ClientSession : public mqbnet::Session, ShutdownContext(const ShutdownCb& callback, const bsls::TimeInterval& timeout); + ShutdownContext(const ShutdownCb& callback); + ~ShutdownContext(); }; // Struct to be used as a context for shutdown operation. @@ -417,6 +422,7 @@ class ClientSession : public mqbnet::Session, /// guarantee strict serialization of events when sending a control /// message. void sendPacket(const bdlbb::Blob& blob, bool flushBuilders); + void sendPacketDispatched(const bdlbb::Blob& blob, bool flushBuilders); /// Flush as much as possible of the content of the internal /// `channelBufferQueue`. @@ -470,10 +476,13 @@ class ClientSession : public mqbnet::Session, /// `callback` upon completion of (asynchronous) shutdown sequence or /// if the specified `timeout` is expired. void initiateShutdownDispatched(const ShutdownCb& callback, - const bsls::TimeInterval& timeout); + const bsls::TimeInterval& timeout, + bool suppportShutdownV2); void invalidateDispatched(); + void deconfigureAndWait(ShutdownContextSp& context); + void checkUnconfirmed(const ShutdownContextSp& shutdownCtx, const VoidFunctor& completionCb); @@ -511,6 +520,12 @@ class ClientSession : public mqbnet::Session, const ShutdownContextSp& shutdownCtx, const VoidFunctor& completionCb); + void processClusterMessage(const bmqp_ctrlmsg::ControlMessage& message); + void processStopRequest(ShutdownContextSp& context); + void onDeconfiguredHandle(const ShutdownContextSp& contextSp); + + int dropAllQueueHandles(bool doDeconfigure, bool hasLostClient); + void processDisconnect(const bmqp_ctrlmsg::ControlMessage& controlMessage); void processDisconnectAllQueues( @@ -605,6 +620,8 @@ class ClientSession : public mqbnet::Session, /// Return true if the session is `e_DISCONNECTED` or worse (`e_DEAD`). bool isDisconnected() const; + bool isProxy() const; + public: // TRAITS BSLMF_NESTED_TRAIT_DECLARATION(ClientSession, bslma::UsesBslmaAllocator) @@ -658,11 +675,15 @@ class ClientSession : public mqbnet::Session, /// Initiate the shutdown of the session and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence or - /// if the specified `timeout` is expired. + /// if the specified `timeout` is expired. If the optional (temporary) + /// specified 'suppportShutdownV2' is 'true' execute shutdown logic V2 + /// where upstream (not downstream) nodes deconfigure queues and the + /// shutting down node (not downstream) waits for CONFIRMS. /// The shutdown is complete when 'tearDownAllQueuesDone'. void initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE; + const bsls::TimeInterval& timeout, + bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Make the session abandon any work it has. void invalidate() BSLS_KEYWORD_OVERRIDE; @@ -771,6 +792,15 @@ inline ClientSession::ShutdownContext::ShutdownContext( d_stopTime += timeout; } +inline ClientSession::ShutdownContext::ShutdownContext( + const ShutdownCb& callback) +: d_callback(callback) +, d_stopTime() // unused in V2 +, d_numUnconfirmedTotal(0) // unused in V2 +{ + BSLS_ASSERT_SAFE(d_callback); +} + inline ClientSession::ShutdownContext::~ShutdownContext() { // Assume 'd_callback' does not require specific thread @@ -789,6 +819,12 @@ inline bool ClientSession::isDisconnected() const return d_operationState == e_DISCONNECTED || d_operationState == e_DEAD; } +inline bool ClientSession::isProxy() const +{ + return d_clientIdentity_p->clientType() == + bmqp_ctrlmsg::ClientType::E_TCPBROKER; +} + inline bsl::shared_ptr ClientSession::channel() const { return d_channel_sp; diff --git a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp index 91d8313e99..0b85b4f950 100644 --- a/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp +++ b/src/groups/mqb/mqba/mqba_sessionnegotiator.cpp @@ -133,7 +133,9 @@ void loadBrokerIdentity(bmqp_ctrlmsg::ClientIdentity* identity, .append(";") .append(bmqp::HighAvailabilityFeatures::k_FIELD_NAME) .append(":") - .append(bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN); + .append(bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN) + .append(",") + .append(bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN_V2); if (shouldBroadcastToProxies) { features.append(",").append( diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index d177fbbada..921bf8f903 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -611,7 +611,8 @@ void Cluster::processCommandDispatched(mqbcmd::ClusterResult* result, result->makeError().message() = os.str(); } -void Cluster::initiateShutdownDispatched(const VoidFunctor& callback) +void Cluster::initiateShutdownDispatched(const VoidFunctor& callback, + bool suppportShutdownV2) { // executed by the *DISPATCHER* thread @@ -622,84 +623,107 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback) d_isStopping = true; - // Send StopRequest to all nodes and proxies. The peers are expected not - // to send any PUT msgs to this node after receiving StopRequest. For each - // queue for which this node is the primary, peers (replicas and proxies) - // will de-configure the queue, wait for configured timeout, close the - // queue, and respond with StopResponse. The peers are expected not to - // send any PUT/PUSH/ACK/CONFIRM msgs to this node after sending - // StopResponse. - // - // Call 'initiateShutdown' for all client sessions. - // - // Also update self's status. Note that this node does not explicitly - // issue a close-queue request for each of the queues. - d_clusterData.membership().setSelfNodeStatus( bmqp_ctrlmsg::NodeStatus::E_STOPPING); - mwcu::OperationChainLink link(d_shutdownChain.allocator()); - bsls::TimeInterval shutdownTimeout; - shutdownTimeout.addMilliseconds( - d_clusterData.clusterConfig().queueOperations().shutdownTimeoutMs()); + if (suppportShutdownV2) { + d_clusterOrchestrator.queueHelper().requestToStopPushing(); - SessionSpVec sessions; - for (mqbnet::TransportManagerIterator sessIt( - &d_clusterData.transportManager()); - sessIt; - ++sessIt) { - bsl::shared_ptr sessionSp = sessIt.session().lock(); - if (!sessionSp) { - continue; // CONTINUE - } + bsls::TimeInterval whenToStop( + bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC)); + whenToStop.addMilliseconds(d_clusterData.clusterConfig() + .queueOperations() + .shutdownTimeoutMs()); - const bmqp_ctrlmsg::NegotiationMessage& negoMsg = - sessionSp->negotiationMessage(); + d_shutdownChain.appendInplace( + bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2, + &d_clusterOrchestrator.queueHelper(), + whenToStop, + bdlf::PlaceHolders::_1)); // completionCb + } + else { + // Temporary, remove after switching all to version 2 + // Send StopRequest to all nodes and proxies. The peers are expected + // not to send any PUT msgs to this node after receiving StopRequest. + // For each queue for which this node is the primary, peers (replicas + // and proxies) will de-configure the queue, wait for configured + // timeout, close the queue, and respond with StopResponse. The peers + // are expected not to send any PUT/PUSH/ACK/CONFIRM msgs to this node + // after sending StopResponse. + // + // Call 'initiateShutdown' for all client sessions. + + mwcu::OperationChainLink link(d_shutdownChain.allocator()); + bsls::TimeInterval shutdownTimeout; + shutdownTimeout.addMilliseconds(d_clusterData.clusterConfig() + .queueOperations() + .shutdownTimeoutMs()); + + SessionSpVec sessions; + for (mqbnet::TransportManagerIterator sessIt( + &d_clusterData.transportManager()); + sessIt; + ++sessIt) { + bsl::shared_ptr sessionSp = + sessIt.session().lock(); + if (!sessionSp) { + continue; // CONTINUE + } - const bmqp_ctrlmsg::ClientIdentity& peerIdentity = - negoMsg.isClientIdentityValue() - ? negoMsg.clientIdentity() - : negoMsg.brokerResponse().brokerIdentity(); + const bmqp_ctrlmsg::NegotiationMessage& negoMsg = + sessionSp->negotiationMessage(); - if (peerIdentity.clusterNodeId() == - d_clusterData.membership().netCluster()->selfNodeId()) { - continue; // CONTINUE - } + const bmqp_ctrlmsg::ClientIdentity& peerIdentity = + negoMsg.isClientIdentityValue() + ? negoMsg.clientIdentity() + : negoMsg.brokerResponse().brokerIdentity(); - if (mqbnet::ClusterUtil::isClient(negoMsg)) { - link.insert(bdlf::BindUtil::bind( - &mqbnet::Session::initiateShutdown, - sessionSp, - bdlf::PlaceHolders::_1, // completion callback - shutdownTimeout)); - continue; // CONTINUE - } + if (peerIdentity.clusterNodeId() == + d_clusterData.membership().netCluster()->selfNodeId()) { + continue; // CONTINUE + } - if (peerIdentity.clusterName() == name()) { - // Expect all proxies and nodes support this feature. - if (!bmqp::ProtocolUtil::hasFeature( - bmqp::HighAvailabilityFeatures::k_FIELD_NAME, - bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN, - peerIdentity.features())) { - BALL_LOG_ERROR << description() << ": Peer doesn't support " - << "GRACEFUL_SHUTDOWN. Skip sending stopRequest" - << " to [" << peerIdentity << "]"; + if (mqbnet::ClusterUtil::isClient(negoMsg)) { + // if (!d_suppportShutdownV2) { + link.insert(bdlf::BindUtil::bind( + &mqbnet::Session::initiateShutdown, + sessionSp, + bdlf::PlaceHolders::_1, // completion callback + shutdownTimeout, + false)); + // } + // else there is no need to de-confgiure queues and wait for + // unconfirmed since V2 upstreams do that on StopRequest V2 continue; // CONTINUE } - sessions.push_back(sessionSp); + + if (peerIdentity.clusterName() == name()) { + // Expect all proxies and nodes support this feature. + if (!bmqp::ProtocolUtil::hasFeature( + bmqp::HighAvailabilityFeatures::k_FIELD_NAME, + bmqp::HighAvailabilityFeatures::k_GRACEFUL_SHUTDOWN, + peerIdentity.features())) { + BALL_LOG_ERROR + << description() << ": Peer doesn't support " + << "GRACEFUL_SHUTDOWN. Skip sending stopRequest" + << " to [" << peerIdentity << "]"; + continue; // CONTINUE + } + sessions.push_back(sessionSp); + } } - } - link.insert( - bdlf::BindUtil::bind(&Cluster::sendStopRequest, - this, - sessions, - bdlf::PlaceHolders::_1)); // completion callback + link.insert(bdlf::BindUtil::bind( + &Cluster::sendStopRequest, + this, + sessions, + bdlf::PlaceHolders::_1)); // completion callback - d_shutdownChain.append(&link); + d_shutdownChain.append(&link); + } - // Add callback to be invoked once all the client sessions are shut down - // and stop responses are received + // Also update self's status. Note that this node does not explicitly + // issue a close-queue request for each of the queues. d_shutdownChain.appendInplace( bdlf::BindUtil::bind(&Cluster::continueShutdown, @@ -717,7 +741,7 @@ void Cluster::sendStopRequest(const SessionSpVec& sessions, // Send a StopRequest to available cluster nodes and proxies connected to // the cluster StopRequestManagerType::RequestContextSp contextSp = - d_stopRequestsManager.createRequestContext(); + d_stopRequestsManager_p->createRequestContext(); bmqp_ctrlmsg::StopRequest& request = contextSp->request() .choice() .makeClusterMessage() @@ -736,7 +760,7 @@ void Cluster::sendStopRequest(const SessionSpVec& sessions, BALL_LOG_INFO << "Sending StopRequest to " << sessions.size() << " brokers; timeout is " << timeoutMs; - d_stopRequestsManager.sendRequest(contextSp, timeoutMs); + d_stopRequestsManager_p->sendRequest(contextSp, timeoutMs); // continue after receipt of all StopResponses or the timeout } @@ -2510,6 +2534,7 @@ Cluster::Cluster(const bslstl::StringRef& name, BlobSpPool* blobSpPool, bdlbb::BlobBufferFactory* bufferFactory, mqbnet::TransportManager* transportManager, + StopRequestManagerType* stopRequestsManager, bslma::Allocator* allocator) : d_allocator_p(allocator) , d_allocators(d_allocator_p) @@ -2546,7 +2571,7 @@ Cluster::Cluster(const bslstl::StringRef& name, , d_throttledDroppedPushMessages(5000, 5) // 5 logs per 5s interval , d_logSummarySchedulerHandle() , d_queueGcSchedulerHandle() -, d_stopRequestsManager(&d_clusterData.requestManager(), allocator) +, d_stopRequestsManager_p(stopRequestsManager) , d_shutdownChain(allocator) { // PRECONDITIONS @@ -2658,7 +2683,8 @@ int Cluster::start(bsl::ostream& errorDescription) return rc; } -void Cluster::initiateShutdown(const VoidFunctor& callback) +void Cluster::initiateShutdown(const VoidFunctor& callback, + bool suppportShutdownV2) { // executed by *ANY* thread @@ -2670,7 +2696,8 @@ void Cluster::initiateShutdown(const VoidFunctor& callback) dispatcher()->execute( bdlf::BindUtil::bind(&Cluster::initiateShutdownDispatched, this, - callback), + callback, + suppportShutdownV2), this); // Wait for above event to complete. This is needed because @@ -2999,8 +3026,7 @@ void Cluster::processClusterControlMessage( case MsgChoice::SELECTION_ID_STORAGE_SYNC_RESPONSE: case MsgChoice::SELECTION_ID_PARTITION_SYNC_STATE_QUERY_RESPONSE: case MsgChoice::SELECTION_ID_PARTITION_SYNC_DATA_QUERY_RESPONSE: - case MsgChoice::SELECTION_ID_CLUSTER_SYNC_RESPONSE: - case MsgChoice::SELECTION_ID_STOP_RESPONSE: { + case MsgChoice::SELECTION_ID_CLUSTER_SYNC_RESPONSE: { // NOTE: that we cant simply just check if the msg has an id, because // in cluster, it can receive requests which will have an id; so // only messages that are response type should be sent to the @@ -3016,6 +3042,11 @@ void Cluster::processClusterControlMessage( source), this); } break; // BREAK + + case MsgChoice::SELECTION_ID_STOP_RESPONSE: { + BALL_LOG_INFO << description() << ": processStopResponse: " << message; + d_stopRequestsManager_p->processResponse(message); + } break; case MsgChoice::SELECTION_ID_PARTITION_PRIMARY_ADVISORY: { dispatcher()->execute( bdlf::BindUtil::bind( diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index ee16b1d1b0..a7cdc97e32 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -334,7 +334,7 @@ class Cluster : public mqbi::Cluster, // Scheduler handle for the recurring // queue gc check. - StopRequestManagerType d_stopRequestsManager; + StopRequestManagerType* d_stopRequestsManager_p; mwcu::OperationChain d_shutdownChain; // Mechanism used for the Cluster @@ -410,8 +410,10 @@ class Cluster : public mqbi::Cluster, const mqbcmd::ClusterCommand& command); /// Executed by dispatcher thread. - void initiateShutdownDispatched(const VoidFunctor& callback); + void initiateShutdownDispatched(const VoidFunctor& callback, + bool suppportShutdownV2); + // // Temporary, remove after switching all to version 2 /// Send stop request to proxies and nodes specified in `sessions` using /// the specified `stopCb` as a callback to be called once all the /// requests get responses. @@ -545,6 +547,7 @@ class Cluster : public mqbi::Cluster, BlobSpPool* blobSpPool, bdlbb::BlobBufferFactory* bufferFactory, mqbnet::TransportManager* transportManager, + StopRequestManagerType* stopRequestsManager, bslma::Allocator* allocator); /// Destructor @@ -561,8 +564,13 @@ class Cluster : public mqbi::Cluster, /// Initiate the shutdown of the cluster. It is expected that `stop()` /// will be called soon after this routine is invoked. Invoke the /// specified `callback` upon completion of (asynchronous) shutdown - /// sequence. - void initiateShutdown(const VoidFunctor& callback) BSLS_KEYWORD_OVERRIDE; + /// sequence. If the optional (temporary) specified 'suppportShutdownV2' + /// is 'true' execute shutdown logic V2 where upstream (not downstream) + /// nodes deconfigure queues and he shutting down node (not downstream) + /// wait for CONFIRMS. + void + initiateShutdown(const VoidFunctor& callback, + bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Stop the `Cluster`. void stop() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp index 6c81d96c23..c9fcc61c49 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.cpp @@ -180,6 +180,7 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription, d_blobSpPool_p, d_bufferFactory_p, d_transportManager_p, + &d_stopRequestsManager, clusterAllocator); info.d_cluster_sp.reset(cluster, clusterAllocator); @@ -222,6 +223,7 @@ int ClusterCatalog::createCluster(bsl::ostream& errorDescription, d_blobSpPool_p, d_dispatcher_p, d_transportManager_p, + &d_stopRequestsManager, clusterAllocator); info.d_cluster_sp.reset(cluster, clusterAllocator); @@ -375,6 +377,12 @@ ClusterCatalog::ClusterCatalog(bdlmt::EventScheduler* scheduler, , d_reversedClusterConnections(d_allocator_p) , d_clusters(d_allocator_p) , d_statContexts(statContexts) +, d_requestManager(bmqp::EventType::e_CONTROL, + d_bufferFactory_p, + d_scheduler_p, + false, // lateResponseMode + d_allocator_p) +, d_stopRequestsManager(&d_requestManager, d_allocator_p) { // PRECONDITIONS BSLS_ASSERT_SAFE(scheduler->clockType() == @@ -388,7 +396,7 @@ ClusterCatalog::~ClusterCatalog() "stop() must be called before destroying this object"); } -int ClusterCatalog::loadBrokerClusterConfig(bsl::ostream& errorDescription) +int ClusterCatalog::loadBrokerClusterConfig(bsl::ostream&) { // executed by the *MAIN* thread diff --git a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h index 60e5587d65..12e44bb49e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h +++ b/src/groups/mqb/mqbblp/mqbblp_clustercatalog.h @@ -68,6 +68,7 @@ #include #include #include +#include #include // MWC @@ -178,6 +179,15 @@ class ClusterCatalog { // cluster. }; + typedef bmqp::RequestManager + RequestManagerType; + + typedef mqbnet::MultiRequestManager > + StopRequestManagerType; + private: // PRIVATE TYPES @@ -280,6 +290,14 @@ class ClusterCatalog { StatContextsMap d_statContexts; // Map of stat contexts + RequestManagerType d_requestManager; + // Request manager to use + + // Should be part of 'ClusterResources' + StopRequestManagerType d_stopRequestsManager; + // Request manager to send stop + // requests to connected brokers. + private: // NOT IMPLEMENTED ClusterCatalog(const ClusterCatalog&) BSLS_CPP11_DELETED; @@ -408,6 +426,9 @@ class ClusterCatalog { int processCommand(mqbcmd::ClustersResult* result, const mqbcmd::ClustersCommand& command); + StopRequestManagerType& stopRequestManger(); + void processStopResponse(const bmqp_ctrlmsg::ControlMessage& message); + // ACCESSORS /// Return the node Id of this host in the cluster identified by the @@ -504,6 +525,18 @@ inline bool ClusterCatalog::isMemberOf(const bsl::string& clusterName) const return (d_myClusters.find(clusterName) != d_myClusters.end()); } +inline ClusterCatalog::StopRequestManagerType& +ClusterCatalog::stopRequestManger() +{ + return d_stopRequestsManager; +} + +inline void ClusterCatalog::processStopResponse( + const bmqp_ctrlmsg::ControlMessage& message) +{ + d_requestManager.processResponse(message); +} + // ---------------------------- // class ClusterCatalogIterator // ---------------------------- diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp index e624c17e7d..ef9310b9b8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp @@ -849,15 +849,6 @@ void ClusterOrchestrator::processStopRequest( request.choice().clusterMessage().choice().stopRequest(); const bsl::string& name = d_cluster_p->name(); - if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(stopRequest.clusterName() != - name)) { - BSLS_PERFORMANCEHINT_UNLIKELY_HINT; - BALL_LOG_ERROR << d_clusterData_p->identity().description() - << ": invalid cluster name in the StopRequest from " - << source->nodeDescription() << ", " << request; - return; // RETURN - } - mqbc::ClusterNodeSession* ns = d_clusterData_p->membership().getClusterNodeSession(source); BSLS_ASSERT_SAFE(ns); @@ -889,6 +880,15 @@ void ClusterOrchestrator::processStopRequest( << ", current status: " << ns->nodeStatus() << ", new status: " << bmqp_ctrlmsg::NodeStatus::E_STOPPING; + // Temporary, remove after switching all to version 2 + if (stopRequest.version() == 1 && stopRequest.clusterName() != name) { + BSLS_PERFORMANCEHINT_UNLIKELY_HINT; + BALL_LOG_ERROR << d_clusterData_p->identity().description() + << ": invalid cluster name in the StopRequest from " + << source->nodeDescription() << ", " << request; + return; // RETURN + } + ns->setNodeStatus(bmqp_ctrlmsg::NodeStatus::E_STOPPING); processNodeStoppingNotification(ns, &request); @@ -1025,7 +1025,7 @@ void ClusterOrchestrator::processNodeStoppingNotification( d_queueHelper.processNodeStoppingNotification(ns->clusterNode(), request, - &ns->primaryPartitions()); + ns); // For each partition for which self is primary, notify the StorageMgr // about the status of a peer node. Self may end up issuing a diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index c44616d0ac..03d07d2394 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -74,7 +74,7 @@ typedef bsl::function CompletionCallback; /// Utility function used in `mwcu::OperationChain` as the operation /// callback which just calls the completion callback. -void allClientSessionsShutDown(const CompletionCallback& callback) +void completeShutDown(const CompletionCallback& callback) { callback(); } @@ -157,7 +157,8 @@ void ClusterProxy::startDispatched() this)); } -void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback) +void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback, + bool suppportShutdownV2) { // executed by the *DISPATCHER* thread @@ -170,51 +171,72 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback) // Mark self as stopping. d_isStopping = true; - // Fill the first link with client session shutdown operations - mwcu::OperationChainLink link(d_shutdownChain.allocator()); - SessionSpVec sessions; - bsls::TimeInterval shutdownTimeout; - shutdownTimeout.addMilliseconds( - clusterProxyConfig()->queueOperations().shutdownTimeoutMs()); + if (suppportShutdownV2) { + d_queueHelper.requestToStopPushing(); - for (mqbnet::TransportManagerIterator sessIt( - &d_clusterData.transportManager()); - sessIt; - ++sessIt) { - bsl::shared_ptr sessionSp = sessIt.session().lock(); - if (!sessionSp) { - continue; // CONTINUE - } + bsls::TimeInterval whenToStop( + bsls::SystemTime::now(bsls::SystemClockType::e_MONOTONIC)); + whenToStop.addMilliseconds(d_clusterData.clusterConfig() + .queueOperations() + .shutdownTimeoutMs()); + + d_shutdownChain.appendInplace( + bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2, + &d_queueHelper, + whenToStop, + bdlf::PlaceHolders::_1)); // completionCb + } + else { + // Temporary, remove after switching all to version 2 - const bmqp_ctrlmsg::NegotiationMessage& negoMsg = - sessionSp->negotiationMessage(); - if (mqbnet::ClusterUtil::isClientOrProxy(negoMsg)) { - if (mqbnet::ClusterUtil::isClient(negoMsg)) { - link.insert( - bdlf::BindUtil::bind(&mqbnet::Session::initiateShutdown, - sessionSp, - bdlf::PlaceHolders::_1, - shutdownTimeout)); + // Fill the first link with client session shutdown operations + mwcu::OperationChainLink link(d_shutdownChain.allocator()); + SessionSpVec sessions; + bsls::TimeInterval shutdownTimeout; + shutdownTimeout.addMilliseconds( + clusterProxyConfig()->queueOperations().shutdownTimeoutMs()); + + for (mqbnet::TransportManagerIterator sessIt( + &d_clusterData.transportManager()); + sessIt; + ++sessIt) { + bsl::shared_ptr sessionSp = + sessIt.session().lock(); + if (!sessionSp) { + continue; // CONTINUE } - else { - sessions.push_back(sessionSp); + + const bmqp_ctrlmsg::NegotiationMessage& negoMsg = + sessionSp->negotiationMessage(); + if (mqbnet::ClusterUtil::isClientOrProxy(negoMsg)) { + if (mqbnet::ClusterUtil::isClient(negoMsg)) { + link.insert(bdlf::BindUtil::bind( + &mqbnet::Session::initiateShutdown, + sessionSp, + bdlf::PlaceHolders::_1, + shutdownTimeout, + false)); + } + else { + sessions.push_back(sessionSp); + } } } - } - link.insert( - bdlf::BindUtil::bind(&ClusterProxy::sendStopRequest, - this, - sessions, - bdlf::PlaceHolders::_1)); // completion callback + link.insert(bdlf::BindUtil::bind( + &ClusterProxy::sendStopRequest, + this, + sessions, + bdlf::PlaceHolders::_1)); // completion callback - d_shutdownChain.append(&link); + d_shutdownChain.append(&link); + } - // Add callback to be invoked once all the client sessions are shut down - d_shutdownChain.appendInplace( - bdlf::BindUtil::bind(&allClientSessionsShutDown, - bdlf::PlaceHolders::_1), - callback); + // Add callback to be invoked once V1 shuts down all client sessions or + // V2 finishes waiting for unconfirmed + d_shutdownChain.appendInplace(bdlf::BindUtil::bind(&completeShutDown, + bdlf::PlaceHolders::_1), + callback); d_shutdownChain.start(); } @@ -681,6 +703,15 @@ void ClusterProxy::processEvent(const bmqp::Event& event, this); return; // RETURN } + if (clusterMessage.choice().isStopResponseValue()) { + dispatcher()->execute( + bdlf::BindUtil::bind( + &ClusterProxy::processPeerStopResponse, + this, + controlMessage), + this); + return; // RETURN + } if (clusterMessage.choice().isNodeStatusAdvisoryValue()) { dispatcher()->execute( bdlf::BindUtil::bind( @@ -832,6 +863,14 @@ void ClusterProxy::processResponse( this); } +void ClusterProxy::processPeerStopResponse( + const bmqp_ctrlmsg::ControlMessage& response) +{ + BALL_LOG_INFO << description() << ": processStopResponse: " << response; + + d_stopRequestsManager_p->processResponse(response); +} + void ClusterProxy::processPeerStopRequest( mqbnet::ClusterNode* clusterNode, const bmqp_ctrlmsg::ControlMessage& request) @@ -841,11 +880,10 @@ void ClusterProxy::processPeerStopRequest( // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - const bsl::vector* noPartitions = 0; d_queueHelper.processNodeStoppingNotification( clusterNode, &request, - noPartitions, + 0, bdlf::BindUtil::bind(&ClusterProxy::finishStopSequence, this, clusterNode)); @@ -859,6 +897,7 @@ void ClusterProxy::finishStopSequence( // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + // REVISIT // Internal-ticket D169562052 // TODO: handle/eliminate the possibility of Multiple StopRequests. // Currently, cannot switch the active node because another StopRequest can @@ -966,7 +1005,7 @@ void ClusterProxy::sendStopRequest(const SessionSpVec& sessions, { // Send a StopRequest to available proxies connected to the virtual cluster StopRequestManagerType::RequestContextSp contextSp = - d_stopRequestsManager.createRequestContext(); + d_stopRequestsManager_p->createRequestContext(); bmqp_ctrlmsg::StopRequest& request = contextSp->request() .choice() .makeClusterMessage() @@ -985,7 +1024,7 @@ void ClusterProxy::sendStopRequest(const SessionSpVec& sessions, BALL_LOG_INFO << "Sending StopRequest to " << sessions.size() << " proxies; timeout is " << timeoutMs; - d_stopRequestsManager.sendRequest(contextSp, timeoutMs); + d_stopRequestsManager_p->sendRequest(contextSp, timeoutMs); // continue after receipt of all StopResponses or the timeout } @@ -1012,6 +1051,7 @@ ClusterProxy::ClusterProxy( BlobSpPool* blobSpPool, mqbi::Dispatcher* dispatcher, mqbnet::TransportManager* transportManager, + StopRequestManagerType* stopRequestsManager, bslma::Allocator* allocator) : d_allocator_p(allocator) , d_isStarted(false) @@ -1042,7 +1082,7 @@ ClusterProxy::ClusterProxy( , d_clusterMonitor(&d_clusterData, &d_state, d_allocator_p) , d_activeNodeLookupEventHandle() , d_shutdownChain(d_allocator_p) -, d_stopRequestsManager(&d_clusterData.requestManager(), d_allocator_p) +, d_stopRequestsManager_p(stopRequestsManager) { // PRECONDITIONS mqbnet::Cluster* netCluster_p = d_clusterData.membership().netCluster(); @@ -1109,7 +1149,8 @@ int ClusterProxy::start(BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription) return 0; } -void ClusterProxy::initiateShutdown(const VoidFunctor& callback) +void ClusterProxy::initiateShutdown(const VoidFunctor& callback, + bool suppportShutdownV2) { // executed by *ANY* thread @@ -1121,7 +1162,8 @@ void ClusterProxy::initiateShutdown(const VoidFunctor& callback) dispatcher()->execute( bdlf::BindUtil::bind(&ClusterProxy::initiateShutdownDispatched, this, - callback), + callback, + suppportShutdownV2), this); dispatcher()->synchronize(this); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index b21e2af106..4bdcd8033d 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -251,7 +251,8 @@ class ClusterProxy : public mqbc::ClusterStateObserver, // execution of the shutdown callbacks // from the client sessions. - StopRequestManagerType d_stopRequestsManager; + // Should be part of 'ClusterResources' + StopRequestManagerType* d_stopRequestsManager_p; // Request manager to send stop // requests to connected proxies. @@ -264,7 +265,8 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Initiate the shutdown of the cluster. The specified `callback` will /// be called when the shutdown is completed. This routine is invoked /// in the cluster-dispatcher thread. - void initiateShutdownDispatched(const VoidFunctor& callback); + void initiateShutdownDispatched(const VoidFunctor& callback, + bool suppportShutdownV2 = false); /// Stop the `Cluster`. void stopDispatched(); @@ -373,6 +375,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// transmitted request. void processResponse(const bmqp_ctrlmsg::ControlMessage& response) BSLS_KEYWORD_OVERRIDE; + void processPeerStopResponse(const bmqp_ctrlmsg::ControlMessage& response); void processPeerStopRequest(mqbnet::ClusterNode* clusterNode, const bmqp_ctrlmsg::ControlMessage& request); @@ -394,6 +397,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver, void processResponseDispatched(const bmqp_ctrlmsg::ControlMessage& response); + // Temporary, remove after switching all to version 2 /// Send stop request to proxies specified in `sessions` using the /// specified `stopCb` as a callback to be called once all the requests /// get responses. @@ -434,6 +438,7 @@ class ClusterProxy : public mqbc::ClusterStateObserver, BlobSpPool* blobSpPool, mqbi::Dispatcher* dispatcher, mqbnet::TransportManager* transportManager, + StopRequestManagerType* stopRequestsManager, bslma::Allocator* allocator); /// Destructor @@ -447,11 +452,16 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// error. int start(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; - /// Initiate the shutdown of the cluster. It is expected that `stop()` - /// will be called soon after this routine is invoked. Invoke the - /// specified `callback` upon completion of (asynchronous) shutdown - /// sequence. - void initiateShutdown(const VoidFunctor& callback) BSLS_KEYWORD_OVERRIDE; + /// Initiate the shutdown of the cluster and invoke the specified + /// `callback` upon completion of (asynchronous) shutdown sequence. It + /// is expected that `stop()` will be called soon after this routine is + /// invoked. If the optional (temporary) specified 'suppportShutdownV2' is + /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes + /// deconfigure queues and he shutting down node (not downstream) wait for + /// CONFIRMS. + void + initiateShutdown(const VoidFunctor& callback, + bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Stop the `Cluster`. void stop() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 94870c7c47..0ce063878b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -173,6 +173,12 @@ void handleHolderDummy(const bsl::shared_ptr& handle) handle->queue()->dispatcher()->inDispatcherThread(handle->queue())); } +void countUnconfirmed(bsls::Types::Int64* result, mqbi::Queue* queue) +{ + *result += queue->countUnconfirmed( + bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID); +} + } // close unnamed namespace // ----------------------------------------- @@ -1438,6 +1444,9 @@ void ClusterQueueHelper::onReopenQueueResponse( --d_numPendingReopenQueueRequests; + // Process Close request instead of parking it + sqit->value().d_state = SubQueueContext::k_CLOSED; + return; // RETURN } @@ -1468,7 +1477,8 @@ void ClusterQueueHelper::onReopenQueueResponse( notifyQueue(queueContext.get(), upstreamSubQueueId, generationCount, - false); + false, + false); // isWriterOnly // No need to send a configure-queue request for this queue. // Decrement the num pending reopen queue request counter though, @@ -1746,7 +1756,8 @@ void ClusterQueueHelper::onConfigureQueueResponse( notifyQueue(queueContext.get(), itStream->subId(), generationCount, - true); + true, + false); // isWriterOnly } } @@ -2013,7 +2024,8 @@ bool ClusterQueueHelper::createQueue( notifyQueue(queueContext, bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID, genCount, - true); + true, // isOpen + true); // isWriterOnly } context.d_callback(status, @@ -2625,7 +2637,8 @@ void ClusterQueueHelper::onGetQueueHandleDispatched( void ClusterQueueHelper::notifyQueue(QueueContext* queueContext, unsigned int upstreamSubQueueId, bsls::Types::Uint64 generationCount, - bool isOpen) + bool isOpen, + bool isWriterOnly) { mqbi::Queue* queue = queueContext->d_liveQInfo.d_queue_sp.get(); if (queue == 0) { @@ -2644,7 +2657,8 @@ void ClusterQueueHelper::notifyQueue(QueueContext* queueContext, bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream, queue, generationCount, - upstreamSubQueueId), + upstreamSubQueueId, + isWriterOnly), queue); } } @@ -2687,6 +2701,22 @@ void ClusterQueueHelper::configureQueueDispatched( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); + if (d_suppportShutdownV2) { + BMQ_LOGTHROTTLE_INFO() + << d_cluster_p->description() + << ": Shutting down and skipping configure queue [: " << uri + << "], queueId: " << queueId + << ", stream parameters: " << streamParameters; + if (callback) { + bmqp_ctrlmsg::Status status; + status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS; + status.message() = "Shutting down."; + callback(status, streamParameters); + } + + return; // RETURN + } + QueueContextMapIter queueContextIt = d_queues.find(uri); if (queueContextIt == d_queues.end()) { @@ -2835,6 +2865,23 @@ void ClusterQueueHelper::releaseQueueDispatched( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); + if (d_suppportShutdownV2) { + BMQ_LOGTHROTTLE_INFO() + << d_cluster_p->description() + << ": Shutting down and skipping close queue [: " + << handleParameters.uri() + << "], queueId: " << handleParameters.qId() + << ", handle parameters: " << handleParameters; + if (callback) { + bmqp_ctrlmsg::Status status; + status.category() = bmqp_ctrlmsg::StatusCategory::E_SUCCESS; + status.message() = "Shutting down."; + callback(status); + } + + return; // RETURN + } + bmqt::Uri uri(handleParameters.uri()); QueueContextMapIter queueContextIt = d_queues.find(uri.canonical()); if (queueContextIt == d_queues.end()) { @@ -4485,6 +4532,7 @@ ClusterQueueHelper::ClusterQueueHelper( , d_numPendingReopenQueueRequests(0) , d_primaryNotLeaderAlarmRaised(false) , d_stopContexts(allocator) +, d_suppportShutdownV2(false) { BSLS_ASSERT( d_clusterData_p->clusterConfig() @@ -5227,17 +5275,53 @@ void ClusterQueueHelper::processShutdownEvent() << "], queueKey [" << queueContextSp->key() << "] which was assigned to PartitionId [" << queueContextSp->partitionId() - << "], because self is going down and this queue has no " - << "handles."; + << "], because self is going down."; deleteQueue(queueContextSp.get()); } } +/// Stop sending PUSHes but continue receiving CONFIRMs, receiving and +/// sending PUTs and ACKs. +void ClusterQueueHelper::requestToStopPushing() +{ + // executed by the cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); + + // Assume Shutdown V2 + d_suppportShutdownV2 = true; + + for (QueueContextMapIter it = d_queues.begin(); it != d_queues.end(); + ++it) { + QueueContextSp& queueContextSp = it->second; + QueueLiveState& qinfo = queueContextSp->d_liveQInfo; + mqbi::Queue* queue = qinfo.d_queue_sp.get(); + + if (!queue) { + continue; // CONTINUE + } + + queue->dispatcher()->execute( + bdlf::BindUtil::bind(&mqbi::Queue::stopPushing, queue), + queue); + } +} + +void ClusterQueueHelper::onDeconfiguredHandle( + const bsl::shared_ptr& contextSp) +{ + BALL_LOG_INFO << d_clusterData_p->identity().description() + << ": deconfiguring " << " " << contextSp.numReferences(); + (void)contextSp; +} + void ClusterQueueHelper::processNodeStoppingNotification( mqbnet::ClusterNode* clusterNode, const bmqp_ctrlmsg::ControlMessage* request, - const bsl::vector* partitions, + mqbc::ClusterNodeSession* ns, const VoidFunctor& callback) { // executed by the cluster *DISPATCHER* thread @@ -5250,6 +5334,11 @@ void ClusterQueueHelper::processNodeStoppingNotification( // The 'shared_ptr' serves as a reference count of all pending queue // operations. Once all functors complete, the 'finishStopSequence' // deleter sends back StopResponse. + + // TEMPORARY, remove 'timeout; 'after switching to StopRequest V2 + // No need to wait for CONFIRMs, the waiting is done by the shutting down + // node. + int timeout = d_clusterData_p->clusterConfig().queueOperations().stopTimeoutMs(); @@ -5287,12 +5376,137 @@ void ClusterQueueHelper::processNodeStoppingNotification( << clusterNode->nodeDescription() << " with timeout (ms) " << timeout; - // Self node needs to issue close-queue requests for all the queues for - // which specified 'source' node is the primary. + // TEMPORARY, remove 'after switching to StopRequest V2 + bool suppportShutdownV2 = true; - if (!d_cluster_p->isRemote() || - d_clusterData_p->electorInfo().leaderNode() == contextSp->d_peer) { - deconfigureQueues(contextSp, partitions); + if (request) { + const bmqp_ctrlmsg::StopRequest& stopRequest = + request->choice().clusterMessage().choice().stopRequest(); + + if (stopRequest.version() == 1) { + suppportShutdownV2 = false; + } + else { + BSLS_ASSERT_SAFE(stopRequest.version() == 2); + } + } + // StopRequests have replaced E_STOPPING advisory. + // In any case, do minimal (V2) work unless explicitly requested + + if (suppportShutdownV2) { + if (ns) { + // As an Upstream, deconfigure queues of the (shutting down) + // ClusterNodeSession 'ns'. + // Call 'mqbi::QueueHandle::deconfigureAll' for each handle + + const mqbc::ClusterNodeSession::QueueHandleMap& handles = + ns->queueHandles(); + + for (mqbc::ClusterNodeSession::QueueHandleMap::const_iterator + cit = handles.begin(); + cit != handles.end(); + ++cit) { + cit->second.d_handle_p->deconfigureAll( + bdlf::BindUtil::bind( + &ClusterQueueHelper::onDeconfiguredHandle, + this, + contextSp)); + } + BALL_LOG_INFO << d_clusterData_p->identity().description() + << ": deconfigured " << handles.size() + << " handles while processing StopRequest from " + << clusterNode->nodeDescription() << " " + << contextSp.numReferences(); + } + // else, this is a ClusterProxy (downstream) receiving request from + // an upstream Cluster Node (a request from a Proxy would arrive to + // ClientSession). + // Downstreams do not deconfigure queues in V2. + // See comment in 'ClusterProxy::processPeerStopRequest' + + // As a Downstream, notify relevant queues about their shutting + // down upstream + for (QueueContextMapConstIter cit = d_queues.begin(); + cit != d_queues.end(); + ++cit) { + const QueueContextSp& queueContextSp = cit->second; + const QueueLiveState& queueLiveState = + queueContextSp->d_liveQInfo; + mqbi::Queue* queue = queueLiveState.d_queue_sp.get(); + + if (0 == queue || bmqp::QueueId::k_UNASSIGNED_QUEUE_ID == + queueContextSp->d_liveQInfo.d_id) { + continue; // CONTINUE + } + + if (!d_cluster_p->isRemote()) { + const int pid = queueContextSp->partitionId(); + + BSLS_ASSERT_SAFE(ns); + + const bsl::vector& partitions = + ns->primaryPartitions(); + if (partitions.end() == + bsl::find(partitions.begin(), partitions.end(), pid)) { + continue; // CONTINUE + } + const ClusterStatePartitionInfo& pinfo = + d_clusterState_p->partition(pid); + + if (bmqp_ctrlmsg::PrimaryStatus::E_ACTIVE != + pinfo.primaryStatus()) { + // It's possible for a primary node to be non-active + // when it is shutting down -- if it was stopped before + // the node had a chance to transition to active + // primary for this partition. + + continue; // CONTINUE + } + BSLS_ASSERT(pinfo.primaryNode() == clusterNode); + } + else if (d_clusterData_p->electorInfo().leaderNode() != + clusterNode) { + continue; // CONTINUE + } + + if (queueLiveState.d_subQueueIds.findBySubIdSafe( + bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) == + queueLiveState.d_subQueueIds.end()) { + // Only buffering PUTs. Still sending CONFIRMs + continue; // CONTINUE + } + + queue->dispatcher()->execute( + bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream, + queue, + 0, + bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID, + true), // isWriterOnly + queue); + } + + // As a way to bind 'contextSp' to all 'onOpenUpstream' + d_cluster_p->dispatcher()->execute( + bdlf::BindUtil::bind(&ClusterQueueHelper::onDeconfiguredHandle, + this, + contextSp), + mqbi::DispatcherClientType::e_QUEUE); + } + else { + // TEMPORARY, remove 'after switching to StopRequest V2 + // Downstreams do not need to deconfigure queues for which the + // shutting down node is the upstream. The deconfiguring is done + // by the upstream of the shutting down node instead. + // Nor do they need to wait for CONFIRMs, the waiting is done by + // the shutting down node. + + if (ns) { + deconfigureQueues(contextSp, &ns->primaryPartitions()); + } + else if (d_clusterData_p->electorInfo().leaderNode() == + contextSp->d_peer) { + deconfigureQueues(contextSp, 0); + } } } else { @@ -5378,6 +5592,8 @@ void ClusterQueueHelper::deconfigureQueues( const bsl::shared_ptr& contextSp, const bsl::vector* partitions) { + // TEMPORARY, remove after switching all to StopRequest V2 + // executed by the cluster *DISPATCHER* thread // PRECONDITIONS @@ -5440,7 +5656,8 @@ void ClusterQueueHelper::deconfigureQueues( bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream, queue, 0, - bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID), + bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID, + true), // isWriterOnly queue); queue->dispatcher()->synchronize(queue); @@ -5659,6 +5876,92 @@ void ClusterQueueHelper::checkUnconfirmed( queueSp.get()); } +void ClusterQueueHelper::checkUnconfirmedV2( + const bsls::TimeInterval& whenToStop, + const bsl::function& completionCallback) +{ + d_cluster_p->dispatcher()->execute( + bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2Dispatched, + this, + whenToStop, + completionCallback), + d_cluster_p); +} + +void ClusterQueueHelper::checkUnconfirmedV2Dispatched( + const bsls::TimeInterval& whenToStop, + const bsl::function& completionCallback) +{ + // executed by the cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); + + bsls::Types::Int64 result = 0; + for (QueueContextMapIter it = d_queues.begin(); it != d_queues.end(); + ++it) { + QueueContextSp& queueContextSp = it->second; + QueueLiveState& qinfo = queueContextSp->d_liveQInfo; + mqbi::Queue* queue = qinfo.d_queue_sp.get(); + + if (!queue) { + continue; // CONTINUE + } + + queue->dispatcher()->execute( + bdlf::BindUtil::bind(&countUnconfirmed, &result, queue), + queue); + queue->dispatcher()->synchronize(queue); + } + + // Synchronize with all Queue Dispatcher threads + bslmt::Latch latch(1); + d_cluster_p->dispatcher()->execute( + mqbi::Dispatcher::ProcessorFunctor(), // empty + mqbi::DispatcherClientType::e_QUEUE, + bdlf::BindUtil::bind(&bslmt::Latch::arrive, &latch)); + + latch.wait(); + + if (result == 0) { + BALL_LOG_INFO << d_cluster_p->description() + << ": no unconfirmed message(s)"; + + completionCallback(); + return; + } + + bsls::TimeInterval t = bsls::SystemTime::now( + bsls::SystemClockType::e_MONOTONIC); + + if (t < whenToStop) { + BALL_LOG_INFO << d_cluster_p->description() << ": waiting for " + << result << " unconfirmed message(s)"; + + t.addSeconds(1); + if (t > whenToStop) { + t = whenToStop; + } + bdlmt::EventScheduler::EventHandle eventHandle; + // Never cancel the timer + d_clusterData_p->scheduler()->scheduleEvent( + &eventHandle, + t, + bdlf::BindUtil::bind(&ClusterQueueHelper::checkUnconfirmedV2, + this, + whenToStop, + completionCallback)); + + return; // RETURN + } + else { + BALL_LOG_WARN << d_cluster_p->description() << ": giving up on " + << result << " unconfirmed message(s)"; + completionCallback(); + } +} + void ClusterQueueHelper::checkUnconfirmedQueueDispatched( const bsl::shared_ptr& contextSp, const QueueContextSp& queueContextSp, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 1508caef69..efbb9021df 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -169,6 +169,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, // State of the upstream bdlmt::EventScheduler::EventHandle d_timer; + // TEMPORARY, remove 'after switching to StopRequest V2 // (timer handle 1s) when waiting for // unconfirmed. This is to cancel the timer in // the case when this broker stops while @@ -486,6 +487,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, StopContexts d_stopContexts; + bool d_suppportShutdownV2; + private: // PRIVATE MANIPULATORS @@ -758,7 +761,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, void notifyQueue(QueueContext* queueContext, unsigned int upstreamSubQueueId, bsls::Types::Uint64 generationCount, - bool isOpen); + bool isOpen, + bool isWriterOnly = false); void configureQueueDispatched( const bmqt::Uri& uri, @@ -854,6 +858,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, void deconfigureQueue(const bsl::shared_ptr& contextSp, const QueueContextSp& queueContextSp); + // TEMPORARY, remove 'after switching to StopRequest V2 /// Second step of StopRequest / CLOSING node advisory processing /// (after de-configure response). Start timer to wait the configured /// `stopTimeoutMs` is there are any pending PUSH messages to collect @@ -871,6 +876,7 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, unsigned int subId, bsls::TimeInterval& t); + // TEMPORARY, remove 'after switching to StopRequest V2 void checkUnconfirmed(const bsl::shared_ptr& contextSp, const QueueContextSp& queueContextSp, unsigned int subId); @@ -881,6 +887,10 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, const QueueContextSp& queueContextSp, unsigned int subId); + void checkUnconfirmedV2Dispatched( + const bsls::TimeInterval& whenToStop, + const bsl::function& completionCallback); + void waitForUnconfirmedDispatched(const bsl::shared_ptr& contextSp, const QueueContextSp& queueContextSp, @@ -905,6 +915,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, /// Send StopResponse to the request in the specified 'context. void finishStopSequenceDispatched(StopContext* context); + void onDeconfiguredHandle(const bsl::shared_ptr& contextSp); + // PRIVATE ACCESSORS /// Return true if for the specified `partitionId`, there is currently a @@ -1082,6 +1094,13 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, /// Delete and unregister all queues which have no clients. void processShutdownEvent(); + /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and + /// sending PUTs and ACKs. + void requestToStopPushing(); + + void checkUnconfirmedV2(const bsls::TimeInterval& whenToStop, + const bsl::function& completionCallback); + /// Garbage-collect all queues which meet the criteria, and have /// expired. If the optionally specified `immediate` flag is true, /// delete the qualified queues immediately instead of marking them for @@ -1111,8 +1130,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver, void processNodeStoppingNotification( mqbnet::ClusterNode* clusterNode, const bmqp_ctrlmsg::ControlMessage* request, - const bsl::vector* partitions = 0, - const VoidFunctor& callback = VoidFunctor()); + mqbc::ClusterNodeSession* ns, + const VoidFunctor& callback = VoidFunctor()); void onLeaderAvailable(); // Called upon leader becoming available. diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.cpp b/src/groups/mqb/mqbblp/mqbblp_queue.cpp index 19606582d6..39b43699a1 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queue.cpp @@ -576,7 +576,8 @@ void Queue::onOpenFailure(unsigned int subQueueId) } void Queue::onOpenUpstream(bsls::Types::Uint64 genCount, - unsigned int subQueueId) + unsigned int subQueueId, + bool isWriterOnly) { // executed by the *QUEUE* dispatcher thread @@ -584,7 +585,7 @@ void Queue::onOpenUpstream(bsls::Types::Uint64 genCount, BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); if (d_remoteQueue_mp) { - d_remoteQueue_mp->onOpenUpstream(genCount, subQueueId); + d_remoteQueue_mp->onOpenUpstream(genCount, subQueueId, isWriterOnly); } } @@ -926,6 +927,11 @@ bsls::Types::Int64 Queue::countUnconfirmed(unsigned int subId) // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); + if (subId == bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID) { + return d_state.handleCatalog().countUnconfirmed(); // RETURN + } + + // Temporary, remove after switching all to shutdown version 2 struct local { static void sum(bsls::Types::Int64* sum, mqbi::QueueHandle* handle, @@ -949,5 +955,12 @@ bsls::Types::Int64 Queue::countUnconfirmed(unsigned int subId) return result; } +void Queue::stopPushing() +{ + d_state.routingContext().shutdown(); + + queueEngine()->resetState(true); // keepConfirming +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h index 29411db51b..15785dc49e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.h +++ b/src/groups/mqb/mqbblp/mqbblp_queue.h @@ -265,6 +265,10 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { bsls::Types::Int64 countUnconfirmed(unsigned int subId) BSLS_KEYWORD_OVERRIDE; + /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and + /// sending PUTs and ACKs. + void stopPushing() BSLS_KEYWORD_OVERRIDE; + void onPushMessage( const bmqt::MessageGUID& msgGUID, const bsl::shared_ptr& appData, @@ -314,7 +318,8 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// /// THREAD: This method is called from the Queue's dispatcher thread. void onOpenUpstream(bsls::Types::Uint64 genCount, - unsigned int upstreamSubQueueId) BSLS_KEYWORD_OVERRIDE; + unsigned int upstreamSubQueueId, + bool isWriterOnly = false) BSLS_KEYWORD_OVERRIDE; /// Notify the (remote) queue about reopen failure. The queue NACKs all /// pending and incoming PUTs and drops CONFIRMs related to to the diff --git a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp index e9ab00da51..f22c3ceb2f 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queueengineutil.cpp @@ -875,8 +875,10 @@ QueueEngineUtil_AppState::QueueEngineUtil_AppState( QueueEngineUtil_AppState::~QueueEngineUtil_AppState() { // PRECONDITIONS - BSLS_ASSERT_SAFE(!hasConsumers()); BSLS_ASSERT_SAFE(!d_throttleEventHandle); + + // In the case of `convertToLocal`, the new `RootQueueEngine` can reuse the + // existing `RelayQueueEngine` routing contexts. } size_t diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp index 2ae391ea6f..965d49b8ca 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp @@ -659,7 +659,7 @@ void QueueHandle::registerSubscription(unsigned int downstreamSubId, // Ceil the limits values, so that if max redeliveries is 1, it will // compute ok const bsls::Types::Int64 lowWatermarkBytes = - static_cast( + static_cast( bsl::ceil(ci.maxUnconfirmedBytes() * k_WATERMARK_RATIO)); // We only care about whether we are at or above the `capacity` diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp index c23e4ae829..43b2ff6eeb 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.cpp @@ -122,7 +122,7 @@ void QueueHandleCatalog::queueHandleDeleter(mqbi::QueueHandle* handle) QueueHandleCatalog::QueueHandleCatalog(mqbi::Queue* queue, bslma::Allocator* allocator) : d_queue_p(queue) -, d_handleFactory_mp(new (*allocator) DefaultHandleFactory(), allocator) +, d_handleFactory_mp(new(*allocator) DefaultHandleFactory(), allocator) , d_handles(allocator) , d_allocator_p(allocator) { @@ -364,5 +364,25 @@ void QueueHandleCatalog::loadInternals( } } +bsls::Types::Int64 QueueHandleCatalog::countUnconfirmed() const +{ + // executed by the *QUEUE* dispatcher thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE(d_queue_p->dispatcher()->inDispatcherThread(d_queue_p)); + + bsls::Types::Int64 result = 0; + + for (HandleMap::const_iterator cit = d_handles.begin(); + cit != d_handles.end(); + ++cit) { + const mqbi::QueueHandle* handle(cit->value().get()); + + result += handle->countUnconfirmed( + bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID); + } + return result; +} + } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h index dfe44cbc1a..47dad6cd55 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h +++ b/src/groups/mqb/mqbblp/mqbblp_queuehandlecatalog.h @@ -30,9 +30,9 @@ // must be executed by the dispatcher thread of the associated queue. // MQB - #include #include +#include // BMQ #include @@ -170,7 +170,8 @@ class QueueHandleCatalog { // CREATOR /// Create a new object associated to the specified `queue`. Use the - /// specified `allocator` for any memory allocations. + /// specified `allocator` for any memory allocations. Use the specified + /// 'counter' to aggregate the counting of unconfirmed by each handle. QueueHandleCatalog(mqbi::Queue* queue, bslma::Allocator* allocator); /// Destructor. @@ -240,6 +241,8 @@ class QueueHandleCatalog { /// Load into the specified `out` list the internal details about the /// handles managed by this catalog. void loadInternals(bsl::vector* out) const; + + bsls::Types::Int64 countUnconfirmed() const; }; // ============================================================================ diff --git a/src/groups/mqb/mqbblp/mqbblp_queuestate.h b/src/groups/mqb/mqbblp/mqbblp_queuestate.h index 3ac02ce32a..2366c66ffe 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queuestate.h +++ b/src/groups/mqb/mqbblp/mqbblp_queuestate.h @@ -203,7 +203,9 @@ class QueueState { /// Create a new `QueueState` associated to the specified `queue` and /// having the specified `uri`, `id`, `key`, `partitionId` and `domain`. - /// Use the specified `allocator` for any memory allocations. + /// Use the specified `allocator` for any memory allocations. Use the + /// specified 'unconfirmedCounter' to aggregate the counting of + /// unconfirmed by each queue handle. QueueState(mqbi::Queue* queue, const bmqt::Uri& uri, unsigned int id, diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp index 6e7b7fafde..6a74d286b0 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp @@ -909,15 +909,17 @@ int RelayQueueEngine::configure( return 0; } -void RelayQueueEngine::resetState() +void RelayQueueEngine::resetState(bool keepConfirming) { - d_self.reset(this); + // d_self.reset(this); for (AppsMap::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { it->second->reset(); - it->second->d_routing_sp.reset(); + // Keep the routing which new engine can reuse + } + if (!keepConfirming) { + d_apps.clear(); } - d_apps.clear(); } int RelayQueueEngine::rebuildInternalState( diff --git a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h index bb67b825f9..3fd2503b34 100644 --- a/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h @@ -376,8 +376,10 @@ class RelayQueueEngine : public mqbi::QueueEngine { virtual int configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; - /// Reset the internal state of this engine. - virtual void resetState() BSLS_KEYWORD_OVERRIDE; + /// Reset the internal state of this engine. If the optionally specified + /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs + /// processing. + virtual void resetState(bool keepConfirming = false) BSLS_KEYWORD_OVERRIDE; /// Rebuild the internal state of this engine. This method is invoked /// when the queue this engine is associated with is created from an diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp index fc11d89a40..7df2d78e6c 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp @@ -523,6 +523,7 @@ RemoteQueue::RemoteQueue(QueueState* state, , d_unackedPutCounter(0) , d_subStreams(allocator) , d_statePool_p(statePool) +, d_producerState() , d_allocator_p(allocator) { // PRECONDITIONS @@ -948,8 +949,7 @@ void RemoteQueue::postMessage(const bmqp::PutHeader& putHeaderIn, return; // RETURN } - SubStreamContext& ctx = subStreamContext( - bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID); + SubStreamContext& ctx = d_producerState; if (ctx.d_state == SubStreamContext::e_NONE) { BALL_LOG_WARN << "#CLIENT_IMPROPER_BEHAVIOR " << d_state_p->uri() @@ -1590,6 +1590,9 @@ void RemoteQueue::onOpenFailure(unsigned int upstreamSubQueueId) ctx.d_state = SubStreamContext::e_CLOSED; ctx.d_genCount = 0; + d_producerState.d_state = SubStreamContext::e_CLOSED; + d_producerState.d_genCount = 0; + if (upstreamSubQueueId == bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) { size_t numMessages = d_pendingMessages.size(); if (numMessages) { @@ -1669,11 +1672,15 @@ void RemoteQueue::onLostUpstream() i->d_genCount = 0; } + d_producerState.d_state = SubStreamContext::e_STOPPED; + d_producerState.d_genCount = 0; + BALL_LOG_INFO << d_state_p->uri() << ": has lost the upstream"; } void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount, - unsigned int upstreamSubQueueId) + unsigned int upstreamSubQueueId, + bool isWriterOnly) { // executed by the *DISPATCHER* thread @@ -1681,7 +1688,9 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount, BSLS_ASSERT_SAFE(d_state_p->queue()->dispatcher()->inDispatcherThread( d_state_p->queue())); - SubStreamContext& ctx = subStreamContext(upstreamSubQueueId); + SubStreamContext& ctx = isWriterOnly + ? d_producerState + : subStreamContext(upstreamSubQueueId); if (genCount == 0) { // This is a result of StopRequest processing. @@ -1689,7 +1698,7 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount, // Until then, we buffer. if (ctx.d_state == SubStreamContext::e_OPENED) { if (upstreamSubQueueId == bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) { - if (d_state_p->hasMultipleSubStreams()) { + if (isWriterOnly) { BALL_LOG_INFO << d_state_p->uri() << ": buffering PUTs with generation count " << ctx.d_genCount @@ -1700,6 +1709,8 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount, << ": buffering PUTs and CONFIRMs with" << " generation count " << ctx.d_genCount << " because upstream is stopping."; + d_producerState.d_state = SubStreamContext::e_STOPPED; + d_producerState.d_genCount = 0; } } else { @@ -1734,6 +1745,9 @@ void RemoteQueue::onOpenUpstream(bsls::Types::Uint64 genCount, ctx.d_state = SubStreamContext::e_OPENED; if (upstreamSubQueueId == bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) { + d_producerState.d_genCount = genCount; + d_producerState.d_state = SubStreamContext::e_OPENED; + retransmitPendingMessagesDispatched(genCount); } retransmitPendingConfirmsDispatched(upstreamSubQueueId); diff --git a/src/groups/mqb/mqbblp/mqbblp_remotequeue.h b/src/groups/mqb/mqbblp/mqbblp_remotequeue.h index 177c95b035..ba6cb1cf47 100644 --- a/src/groups/mqb/mqbblp/mqbblp_remotequeue.h +++ b/src/groups/mqb/mqbblp/mqbblp_remotequeue.h @@ -255,6 +255,10 @@ class RemoteQueue { StateSpPool* d_statePool_p; + SubStreamContext d_producerState; + // To discern consumer and producer which share the same + // `k_DEFAULT_SUBQUEUE_ID` in the priority mode. + bslma::Allocator* d_allocator_p; // Allocator to use private: @@ -457,7 +461,8 @@ class RemoteQueue { /// /// THREAD: This method is called from the Queue's dispatcher thread. void onOpenUpstream(bsls::Types::Uint64 genCount, - unsigned int upstreamSubQueueId); + unsigned int upstreamSubQueueId, + bool isWriterOnly = false); /// Notify the (remote) queue about reopen failure. The queue NACKs all /// pending and incoming PUTs and drops CONFIRMs related to to the diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp index 2e5b853a70..996ab048c8 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp @@ -425,10 +425,18 @@ int RootQueueEngine::initializeAppId(const bsl::string& appId, return 0; } -void RootQueueEngine::resetState() +void RootQueueEngine::resetState(bool keepConfirming) { + for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) { + it->value()->reset(); + it->value()->d_routing_sp->reset(); + } + d_consumptionMonitor.reset(); - d_apps.clear(); + + if (!keepConfirming) { + d_apps.clear(); + } } void RootQueueEngine::rebuildSelectedApp( diff --git a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h index 8e75982471..5e4a4c725a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h +++ b/src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h @@ -258,8 +258,10 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine { virtual int configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; - /// Reset the internal state of this engine. - virtual void resetState() BSLS_KEYWORD_OVERRIDE; + /// Reset the internal state of this engine. If the optionally specified + /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs + /// processing. + virtual void resetState(bool keepConfirming = false) BSLS_KEYWORD_OVERRIDE; /// Rebuild the internal state of this engine. This method is invoked /// when the queue this engine is associated with is created from an diff --git a/src/groups/mqb/mqbblp/mqbblp_routers.h b/src/groups/mqb/mqbblp/mqbblp_routers.h index a7d814294f..7a960f5915 100644 --- a/src/groups/mqb/mqbblp/mqbblp_routers.h +++ b/src/groups/mqb/mqbblp/mqbblp_routers.h @@ -595,6 +595,8 @@ class Routers { bmqeval::EvaluationContext d_evaluationContext; + bool d_isShuttingDown; + bslma::Allocator* d_allocator_p; QueueRoutingContext(bmqp::SchemaLearner& schemaLearner, @@ -607,6 +609,8 @@ class Routers { bool onUsable(unsigned int* upstreamSubQueueId, unsigned int upstreamSubscriptionId); + void shutdown(); + void loadInternals(mqbcmd::Routing* out) const; }; @@ -842,9 +846,10 @@ inline Routers::QueueRoutingContext::QueueRoutingContext( : d_expressions(allocator) , d_nextSubscriptionId(0) , d_groupIds(allocator) -, d_preader(new (*allocator) MessagePropertiesReader(schemaLearner, allocator), +, d_preader(new(*allocator) MessagePropertiesReader(schemaLearner, allocator), allocator) , d_evaluationContext(0, allocator) +, d_isShuttingDown(false) , d_allocator_p(allocator) { d_evaluationContext.setPropertiesReader(d_preader.get()); @@ -880,6 +885,11 @@ Routers::QueueRoutingContext::onUsable(unsigned int* upstreamSubQueueId, return false; } +inline void Routers::QueueRoutingContext::shutdown() +{ + d_isShuttingDown = true; +} + // ----------------------------- // struct Routers::Expression // ----------------------------- diff --git a/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp index 9322c56039..b0ef644e28 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstateledgerutil.cpp @@ -142,7 +142,7 @@ int ClusterStateLedgerUtil::validateFileHeader( const ClusterStateFileHeader& header, const mqbu::StorageKey& expectedLogId) { - if (static_cast(header.protocolVersion()) != + if (static_cast(header.protocolVersion()) != ClusterStateLedgerProtocol::k_VERSION) { return ClusterStateLedgerUtilRc::e_INVALID_PROTOCOL_VERSION; // RETURN } diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 3a93208c20..e12abaa0ba 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -257,8 +257,12 @@ class Cluster : public DispatcherClient { /// Initiate the shutdown of the cluster and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence. It /// is expected that `stop()` will be called soon after this routine is - /// invoked. - virtual void initiateShutdown(const VoidFunctor& callback) = 0; + /// invoked. If the optional (temporary) specified 'suppportShutdownV2' is + /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes + /// deconfigure queues and he shutting down node (not downstream) wait for + /// CONFIRMS. + virtual void initiateShutdown(const VoidFunctor& callback, + bool suppportShutdownV2 = false) = 0; /// Stop the `Cluster`; this is the counterpart of the `start()` /// operation. diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h index 26547f5fd2..9476c2be7f 100644 --- a/src/groups/mqb/mqbi/mqbi_queue.h +++ b/src/groups/mqb/mqbi/mqbi_queue.h @@ -803,6 +803,10 @@ class Queue : public DispatcherClient { /// `specified `subId'. virtual bsls::Types::Int64 countUnconfirmed(unsigned int subId) = 0; + /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and + /// sending PUTs and ACKs. + virtual void stopPushing() = 0; + /// Called when a message with the specified `msgGUID`, `appData`, /// `options`, `compressionAlgorithmType` payload is pushed to this /// queue. Note that depending upon the location of the queue instance, @@ -855,7 +859,8 @@ class Queue : public DispatcherClient { /// /// THREAD: This method is called from the Queue's dispatcher thread. virtual void onOpenUpstream(bsls::Types::Uint64 genCount, - unsigned int upstreamSubQueueId) = 0; + unsigned int upstreamSubQueueId, + bool isWriterOnly = false) = 0; /// Notify the (remote) queue about (re)open failure. The queue NACKs /// all pending and incoming PUTs and drops CONFIRMs related to to the diff --git a/src/groups/mqb/mqbi/mqbi_queueengine.h b/src/groups/mqb/mqbi/mqbi_queueengine.h index b6400d43c5..7fe7734ed1 100644 --- a/src/groups/mqb/mqbi/mqbi_queueengine.h +++ b/src/groups/mqb/mqbi/mqbi_queueengine.h @@ -74,8 +74,10 @@ class QueueEngine { /// otherwise and populate the specified `errorDescription`. virtual int configure(bsl::ostream& errorDescription) = 0; - /// Reset the internal state of this engine. - virtual void resetState() = 0; + /// Reset the internal state of this engine. If the optionally specified + /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs + /// processing. + virtual void resetState(bool keepConfirming = false) = 0; /// Rebuild the internal state of this engine. This method is invoked /// when the queue this engine is associated with is created from an diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index c0ab9f225c..2f82459b72 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -322,7 +322,8 @@ int Cluster::start(BSLS_ANNOTATION_UNUSED bsl::ostream& errorDescription) } void Cluster::initiateShutdown( - BSLS_ANNOTATION_UNUSED const VoidFunctor& callback) + BSLS_ANNOTATION_UNUSED const VoidFunctor& callback, + BSLS_ANNOTATION_UNUSED bool suppportShutdownV2) { // PRECONDITIONS BSLS_ASSERT_OPT(!d_isStarted && diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index 07a319a7ef..a2a5a7f8b9 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -298,11 +298,16 @@ class Cluster : public mqbi::Cluster { /// error. int start(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; - /// Initiate the shutdown of the cluster. It is expected that `stop()` - /// will be called soon after this routine is invoked. Invoke the - /// specified `callback` upon completion of (asynchronous) shutdown - /// sequence. - void initiateShutdown(const VoidFunctor& callback) BSLS_KEYWORD_OVERRIDE; + /// Initiate the shutdown of the cluster and invoke the specified + /// `callback` upon completion of (asynchronous) shutdown sequence. It + /// is expected that `stop()` will be called soon after this routine is + /// invoked. If the optional (temporary) specified 'suppportShutdownV2' is + /// 'true' execute shutdown logic V2 where upstream (not downstream) nodes + /// deconfigure queues and he shutting down node (not downstream) wait for + /// CONFIRMS. + void + initiateShutdown(const VoidFunctor& callback, + bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Stop the `Cluster`. void stop() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.cpp b/src/groups/mqb/mqbmock/mqbmock_queue.cpp index 26d6b4d753..76505fcf69 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queue.cpp @@ -251,6 +251,11 @@ Queue::countUnconfirmed(BSLS_ANNOTATION_UNUSED unsigned int subId) return 0; } +void Queue::stopPushing() +{ + // NOT IMPLENTED +} + void Queue::onPushMessage( BSLS_ANNOTATION_UNUSED const bmqt::MessageGUID& msgGUID, BSLS_ANNOTATION_UNUSED const bsl::shared_ptr& appData, @@ -348,7 +353,8 @@ void Queue::onLostUpstream() } void Queue::onOpenUpstream(BSLS_ANNOTATION_UNUSED bsls::Types::Uint64 genCount, - BSLS_ANNOTATION_UNUSED unsigned int subQueueId) + BSLS_ANNOTATION_UNUSED unsigned int subQueueId, + BSLS_ANNOTATION_UNUSED bool isWriterOnly) { // NOTHING } diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h index 4ce103c79a..cc79eed542 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.h +++ b/src/groups/mqb/mqbmock/mqbmock_queue.h @@ -249,6 +249,10 @@ class Queue : public mqbi::Queue { bsls::Types::Int64 countUnconfirmed(unsigned int subId) BSLS_KEYWORD_OVERRIDE; + /// Stop sending PUSHes but continue receiving CONFIRMs, receiving and + /// sending PUTs and ACKs. + void stopPushing() BSLS_KEYWORD_OVERRIDE; + /// Called when a message with the specified `msgGUID`, `appData`, /// `options` and compressionAlgorithmType payload is pushed to this /// queue. Note that depending upon the location of the queue instance, @@ -302,7 +306,8 @@ class Queue : public mqbi::Queue { /// /// THREAD: This method is called from the Queue's dispatcher thread. void onOpenUpstream(bsls::Types::Uint64 genCount, - unsigned int upstreamSubQueueId) BSLS_KEYWORD_OVERRIDE; + unsigned int upstreamSubQueueId, + bool isWriterOnly = false) BSLS_KEYWORD_OVERRIDE; /// Notify the (remote) queue about reopen failure. The queue NACKs all /// pending and incoming PUTs and drops CONFIRMs related to to the diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp index 4dcf068e5e..208a65d224 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.cpp @@ -46,7 +46,7 @@ int QueueEngine::configure( return 0; } -void QueueEngine::resetState() +void QueueEngine::resetState(BSLS_ANNOTATION_UNUSED bool keepConfirming) { // NOTHING } diff --git a/src/groups/mqb/mqbmock/mqbmock_queueengine.h b/src/groups/mqb/mqbmock/mqbmock_queueengine.h index a52a1ef510..b156ba56a6 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queueengine.h +++ b/src/groups/mqb/mqbmock/mqbmock_queueengine.h @@ -88,8 +88,10 @@ class QueueEngine : public mqbi::QueueEngine { virtual int configure(bsl::ostream& errorDescription) BSLS_KEYWORD_OVERRIDE; - /// Reset the internal state of this engine. - virtual void resetState() BSLS_KEYWORD_OVERRIDE; + /// Reset the internal state of this engine. If the optionally specified + /// 'keepConfirming' is 'true', keep the data structures for CONFIRMs + /// processing. + virtual void resetState(bool keepConfirming = false) BSLS_KEYWORD_OVERRIDE; /// Rebuild the internal state of this engine. This method is invoked /// when the queue this engine is associated with is created from an diff --git a/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp b/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp index 2407963f86..4dcb2e3c82 100644 --- a/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_dummysession.cpp @@ -58,7 +58,8 @@ void DummySession::tearDown( void DummySession::initiateShutdown( BSLS_ANNOTATION_UNUSED const ShutdownCb& callback, - BSLS_ANNOTATION_UNUSED const bsls::TimeInterval& timeout) + BSLS_ANNOTATION_UNUSED const bsls::TimeInterval& timeout, + BSLS_ANNOTATION_UNUSED bool suppportShutdownV2) { // NOTHING } diff --git a/src/groups/mqb/mqbnet/mqbnet_dummysession.h b/src/groups/mqb/mqbnet/mqbnet_dummysession.h index 402709d8f2..3db79d8898 100644 --- a/src/groups/mqb/mqbnet/mqbnet_dummysession.h +++ b/src/groups/mqb/mqbnet/mqbnet_dummysession.h @@ -135,10 +135,14 @@ class DummySession : public Session { /// Initiate the shutdown of the session and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence or - /// if the specified `timeout` is expired. + /// if the specified `timeout` is expired. If the optional (temporary) + /// specified 'suppportShutdownV2' is 'true' execute shutdown logic V2 + /// where upstream (not downstream) nodes deconfigure queues and the + /// shutting down node (not downstream) waits for CONFIRMS. void initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE; + const bsls::TimeInterval& timeout, + bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE; /// Make the session abandon any work it has. void invalidate() BSLS_KEYWORD_OVERRIDE; diff --git a/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h b/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h index db7a67b362..577ad3124b 100644 --- a/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h +++ b/src/groups/mqb/mqbnet/mqbnet_multirequestmanager.h @@ -252,6 +252,8 @@ class MultiRequestManager { void sendRequest(const RequestContextSp& context, bsls::TimeInterval timeout); + + void processResponse(const bmqp_ctrlmsg::ControlMessage& message); }; // ============================================================================ @@ -486,6 +488,13 @@ void MultiRequestManager::sendRequest( } } +template +inline void MultiRequestManager::processResponse( + const bmqp_ctrlmsg::ControlMessage& response) +{ + d_requestManager_p->processResponse(response); +}; + template inline const bsl::string& MultiRequestManager::targetDescription( diff --git a/src/groups/mqb/mqbnet/mqbnet_session.h b/src/groups/mqb/mqbnet/mqbnet_session.h index bde6867ac4..e702e192cb 100644 --- a/src/groups/mqb/mqbnet/mqbnet_session.h +++ b/src/groups/mqb/mqbnet/mqbnet_session.h @@ -128,9 +128,13 @@ class Session : public SessionEventProcessor { /// Initiate the shutdown of the session and invoke the specified /// `callback` upon completion of (asynchronous) shutdown sequence or - /// if the specified `timeout` is expired. + /// if the specified `timeout` is expired. If the optional (temporary) + /// specified 'suppportShutdownV2' is 'true' execute shutdown logic V2 + /// where upstream (not downstream) nodes deconfigure queues and the + /// shutting down node (not downstream) waits for CONFIRMS. virtual void initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout) = 0; + const bsls::TimeInterval& timeout, + bool suppportShutdownV2 = false) = 0; /// Make the session abandon any work it has. virtual void invalidate() = 0; diff --git a/src/groups/mqb/mqbnet/mqbnet_session.t.cpp b/src/groups/mqb/mqbnet/mqbnet_session.t.cpp index fb5c17b428..7775b21b99 100644 --- a/src/groups/mqb/mqbnet/mqbnet_session.t.cpp +++ b/src/groups/mqb/mqbnet/mqbnet_session.t.cpp @@ -71,7 +71,8 @@ struct SessionTestImp : bsls::ProtocolTestImp { void initiateShutdown(const ShutdownCb& callback, - const bsls::TimeInterval& timeout) BSLS_KEYWORD_OVERRIDE + const bsls::TimeInterval& timeout, + bool suppportShutdownV2 = false) BSLS_KEYWORD_OVERRIDE { markDone(); } diff --git a/src/integration-tests/test_graceful_shutdown.py b/src/integration-tests/test_graceful_shutdown.py index 8f9e3cb287..68fdaf77d3 100644 --- a/src/integration-tests/test_graceful_shutdown.py +++ b/src/integration-tests/test_graceful_shutdown.py @@ -186,7 +186,7 @@ def kill_wait_unconfirmed(self, peer): # start graceful shutdown peer.exit_gracefully() - capture = peer.capture(r"Waiting for (-?\d+) unconfirmed messages", timeout=2) + capture = peer.capture(r"waiting for (-?\d+) unconfirmed message", timeout=2) assert capture num_messages = int(capture[1]) @@ -198,7 +198,7 @@ def kill_wait_unconfirmed(self, peer): consumer.drain() - capture = peer.capture(r"Waiting for (-?\d+) unconfirmed messages", timeout=2) + capture = peer.capture(r"waiting for (-?\d+) unconfirmed message", timeout=2) assert capture num_messages = int(capture[1]) @@ -209,14 +209,9 @@ def kill_wait_unconfirmed(self, peer): consumer.drain() - capture = peer.capture( - r"finish shutdown sequence having (-?\d+) unconfirmed messages", timeout=2 - ) + capture = peer.capture(r"no unconfirmed message", timeout=2) assert capture - num_messages = int(capture[1]) - assert num_messages == 0 - peer.wait() def setup_cluster(self, cluster): @@ -259,11 +254,8 @@ def test_wait_unconfirmed_replica( replica = cluster.process(self.replica_proxy.get_active_node()) self.kill_wait_unconfirmed(replica) - @tweak.cluster.queue_operations.stop_timeout_ms(3000) - @tweak.cluster.queue_operations.shutdown_timeout_ms(2000) - def test_cancel_unconfirmed_timer( - self, multi_node # pylint: disable=unused-argument - ): + @tweak.cluster.queue_operations.shutdown_timeout_ms(1000) + def test_give_up_unconfirmed(self, multi_node): # pylint: disable=unused-argument uriWrite = tc.URI_FANOUT uriRead = tc.URI_FANOUT_FOO @@ -287,23 +279,19 @@ def test_cancel_unconfirmed_timer( # start graceful shutdown leader.exit_gracefully() - capture = replica.capture(r"waiting for 2 unconfirmed message", timeout=2) + capture = leader.capture(r"waiting for 2 unconfirmed message", timeout=2) assert capture - leader.force_stop() + capture = replica.capture(r"giving up on 2 unconfirmed message", timeout=2) + assert not capture - replica.drain() + leader.force_stop() # wait for the queue to recover self.producer.post(uriWrite, payload=["msg3"], succeed=True) consumer.wait_push_event() - # the timer should be cancelled - capture = replica.capture(r"giving up on 2 unconfirmed message", timeout=2) - assert not capture - - @tweak.cluster.queue_operations.stop_timeout_ms(3000) - @tweak.cluster.queue_operations.shutdown_timeout_ms(2000) + @tweak.cluster.queue_operations.shutdown_timeout_ms(1000) def test_multiple_stop_requests(self, multi_cluster: Cluster): cluster = multi_cluster @@ -326,63 +314,13 @@ def test_multiple_stop_requests(self, multi_cluster: Cluster): consumer.wait_push_event() assert wait_until(lambda: len(consumer.list(uriRead, block=True)) == 2, 2) - # start graceful shutdown - for node in cluster.virtual_nodes(): - node.exit_gracefully() - - capture = self.replica_proxy.capture( - r"waiting for 2 unconfirmed message", timeout=2 - ) - assert capture - - @tweak.cluster.queue_operations.stop_timeout_ms(999999) - @tweak.cluster.queue_operations.shutdown_timeout_ms(999999) - def test_active_node_down_stop_requests(self, multi_cluster: Cluster): - """ - Ticket 169782591 - We have: Consumer -> Proxy -> active_node -> upstream_node. - Start shutting down active_node (one of cluster.virtual_nodes()) - Because there are unconfirmed, Proxy lingers with StopResponse. - Kill upstream_node. That event should not cancel StopRequest! - """ - cluster = multi_cluster - - uriWrite = tc.URI_FANOUT - uriRead = tc.URI_FANOUT_FOO - active_node = cluster.process(self.replica_proxy.get_active_node()) - assert active_node in cluster.virtual_nodes() - - upstream_node = cluster.process(active_node.get_active_node()) - - # post 2 PUTs - self.producer.post(uriWrite, payload=["msg1"], succeed=True) - self.producer.post(uriWrite, payload=["msg2"], succeed=True) - - # start consumer - consumer = self.replica_proxy.create_client("consumer") - - consumer.open(uriRead, flags=["read"], succeed=True) - - # receive messages - consumer.wait_push_event() - assert wait_until(lambda: len(consumer.list(uriRead, block=True)) == 2, 2) # start graceful shutdown - active_node.exit_gracefully() - - capture = self.replica_proxy.capture( - r"waiting for 2 unconfirmed message", timeout=2 - ) - assert capture - - upstream_node.force_stop() - - consumer.confirm(uriRead, "*", succeed=True) + for node in cluster.virtual_nodes(): + node.exit_gracefully() - capture = active_node.capture( - r"Received control message: \[ rId = (-?\d+) choice = \[ clusterMessage = \[ choice = \[ stopResponse" - ) + capture = active_node.capture(r"waiting for 2 unconfirmed message", timeout=2) assert capture @tweak.cluster.queue_operations.stop_timeout_ms(999999) diff --git a/src/integration-tests/test_maxunconfirmed.py b/src/integration-tests/test_maxunconfirmed.py index 2938e6eed7..7cfdb7b8a9 100644 --- a/src/integration-tests/test_maxunconfirmed.py +++ b/src/integration-tests/test_maxunconfirmed.py @@ -48,7 +48,7 @@ def post_n_msgs(self, uri, n): ) return all(res == Client.e_SUCCESS for res in results) - @tweak.cluster.queue_operations.stop_timeout_ms(1000) + @tweak.cluster.queue_operations.shutdown_timeout_ms(1000) def test_maxunconfirmed(self, multi_node: Cluster): # Post 100 messages assert self.post_n_msgs(tc.URI_PRIORITY, 100)