Skip to content

Commit

Permalink
cleaning
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
  • Loading branch information
dorjesinpo committed Sep 18, 2024
1 parent 383e8d9 commit 3e590a1
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 41 deletions.
47 changes: 28 additions & 19 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5310,11 +5310,13 @@ void ClusterQueueHelper::requestToStopPushing()
}
}

void ClusterQueueHelper::onDeconfiguredHandle(
const bsl::shared_ptr<StopContext>& contextSp)
void ClusterQueueHelper::contextHolder(
const bsl::shared_ptr<StopContext>& contextSp,
const VoidFunctor& action)
{
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": deconfiguring " << " " << contextSp.numReferences();
if (action) {
action();
}
(void)contextSp;
}

Expand Down Expand Up @@ -5408,9 +5410,10 @@ void ClusterQueueHelper::processNodeStoppingNotification(
++cit) {
cit->second.d_handle_p->deconfigureAll(
bdlf::BindUtil::bind(
&ClusterQueueHelper::onDeconfiguredHandle,
&ClusterQueueHelper::contextHolder,
this,
contextSp));
contextSp,
VoidFunctor()));
}
BALL_LOG_INFO << d_clusterData_p->identity().description()
<< ": deconfigured " << handles.size()
Expand Down Expand Up @@ -5476,21 +5479,27 @@ void ClusterQueueHelper::processNodeStoppingNotification(
continue; // CONTINUE
}

VoidFunctor inner = bdlf::BindUtil::bind(
&mqbi::Queue::onOpenUpstream,
queue,
0,
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID,
true);

VoidFunctor outer = bdlf::BindUtil::bind(
&ClusterQueueHelper::contextHolder,
this,
contextSp,
inner);

queue->dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::Queue::onOpenUpstream,
queue,
0,
bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID,
true), // isWriterOnly
queue);
}
outer,
queue,
mqbi::DispatcherEventType::e_DISPATCHER);

// As a way to bind 'contextSp' to all 'onOpenUpstream'
d_cluster_p->dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterQueueHelper::onDeconfiguredHandle,
this,
contextSp),
mqbi::DispatcherClientType::e_QUEUE);
// Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid
// (re)enabling 'd_flushList'
}
}
else {
// TEMPORARY, remove 'after switching to StopRequest V2
Expand Down
3 changes: 2 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,8 @@ class ClusterQueueHelper : public mqbc::ClusterStateObserver,
/// Send StopResponse to the request in the specified 'context.
void finishStopSequenceDispatched(StopContext* context);

void onDeconfiguredHandle(const bsl::shared_ptr<StopContext>& contextSp);
void contextHolder(const bsl::shared_ptr<StopContext>& contextSp,
const VoidFunctor& action);

// PRIVATE ACCESSORS

Expand Down
4 changes: 1 addition & 3 deletions src/groups/mqb/mqbblp/mqbblp_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -957,9 +957,7 @@ bsls::Types::Int64 Queue::countUnconfirmed(unsigned int subId)

void Queue::stopPushing()
{
d_state.routingContext().shutdown();

queueEngine()->resetState(true); // keepConfirming
queueEngine()->resetState(true); // isShuttingDown
}

} // close package namespace
Expand Down
6 changes: 5 additions & 1 deletion src/groups/mqb/mqbblp/mqbblp_queuehandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,11 @@ void QueueHandle::deconfigureAll(
bdlf::BindUtil::bind(&QueueHandle::deconfigureDispatched,
this,
deconfiguredCb),
d_queue_sp.get());
d_queue_sp.get(),
mqbi::DispatcherEventType::e_DISPATCHER);

// Use 'mqbi::DispatcherEventType::e_DISPATCHER' to avoid (re)enabling
// 'd_flushList'
}

void QueueHandle::deconfigureDispatched(
Expand Down
9 changes: 6 additions & 3 deletions src/groups/mqb/mqbblp/mqbblp_relayqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -909,15 +909,18 @@ int RelayQueueEngine::configure(
return 0;
}

void RelayQueueEngine::resetState(bool keepConfirming)
void RelayQueueEngine::resetState(bool isShuttingDown)
{
// d_self.reset(this);

for (AppsMap::iterator it = d_apps.begin(); it != d_apps.end(); ++it) {
it->second->reset();
// Keep the routing which new engine can reuse
if (isShuttingDown) {
it->second->d_routing_sp->reset();
}
// else, keep the routing which new engine can reuse
}
if (!keepConfirming) {
if (!isShuttingDown) {
d_apps.clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_relayqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class RelayQueueEngine : public mqbi::QueueEngine {
/// Reset the internal state of this engine. If the optionally specified
/// 'keepConfirming' is 'true', keep the data structures for CONFIRMs
/// processing.
virtual void resetState(bool keepConfirming = false) BSLS_KEYWORD_OVERRIDE;
virtual void resetState(bool isShuttingDown = false) BSLS_KEYWORD_OVERRIDE;

/// Rebuild the internal state of this engine. This method is invoked
/// when the queue this engine is associated with is created from an
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbblp/mqbblp_rootqueueengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ int RootQueueEngine::initializeAppId(const bsl::string& appId,
return 0;
}

void RootQueueEngine::resetState(bool keepConfirming)
void RootQueueEngine::resetState(bool isShuttingDown)
{
for (Apps::iterator it = d_apps.begin(); it != d_apps.end(); ++it) {
it->value()->reset();
Expand All @@ -434,7 +434,7 @@ void RootQueueEngine::resetState(bool keepConfirming)

d_consumptionMonitor.reset();

if (!keepConfirming) {
if (!isShuttingDown) {
d_apps.clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_rootqueueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class RootQueueEngine BSLS_KEYWORD_FINAL : public mqbi::QueueEngine {
/// Reset the internal state of this engine. If the optionally specified
/// 'keepConfirming' is 'true', keep the data structures for CONFIRMs
/// processing.
virtual void resetState(bool keepConfirming = false) BSLS_KEYWORD_OVERRIDE;
virtual void resetState(bool isShuttingDown = false) BSLS_KEYWORD_OVERRIDE;

/// Rebuild the internal state of this engine. This method is invoked
/// when the queue this engine is associated with is created from an
Expand Down
7 changes: 0 additions & 7 deletions src/groups/mqb/mqbblp/mqbblp_routers.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,6 @@ class Routers {
bool onUsable(unsigned int* upstreamSubQueueId,
unsigned int upstreamSubscriptionId);

void shutdown();

void loadInternals(mqbcmd::Routing* out) const;
};

Expand Down Expand Up @@ -885,11 +883,6 @@ Routers::QueueRoutingContext::onUsable(unsigned int* upstreamSubQueueId,
return false;
}

inline void Routers::QueueRoutingContext::shutdown()
{
d_isShuttingDown = true;
}

// -----------------------------
// struct Routers::Expression
// -----------------------------
Expand Down
6 changes: 3 additions & 3 deletions src/groups/mqb/mqbi/mqbi_queueengine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class QueueEngine {
virtual int configure(bsl::ostream& errorDescription) = 0;

/// Reset the internal state of this engine. If the optionally specified
/// 'keepConfirming' is 'true', keep the data structures for CONFIRMs
/// processing.
virtual void resetState(bool keepConfirming = false) = 0;
/// 'isShuttingDown' is 'true', clear the routing state but keep the Apps
/// state for CONFIRMs processing.
virtual void resetState(bool isShuttingDown = false) = 0;

/// Rebuild the internal state of this engine. This method is invoked
/// when the queue this engine is associated with is created from an
Expand Down

0 comments on commit 3e590a1

Please sign in to comment.