Skip to content

Commit fd29759

Browse files
authored
feat(consumer): remove event if consumer service shutdown (#233)
1 parent f968f57 commit fd29759

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

src/consumer/ConsumeMessageConcurrentlyService.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,11 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
7373
request->m_messageQueue.toString().c_str());
7474
return;
7575
}
76-
if (!request->isDropped()) {
76+
if (!request->isDropped() && !m_ioService.stopped()) {
7777
m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this, request, msgs));
78+
} else {
79+
LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post ConsumeRequest.",
80+
request->m_messageQueue.toString().c_str());
7881
}
7982
}
8083
void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest> pullRequest,
@@ -93,13 +96,16 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_pt
9396
(request->m_messageQueue).toString().c_str());
9497
return;
9598
}
96-
if (!request->isDropped()) {
99+
if (!request->isDropped() && !m_ioService.stopped()) {
97100
boost::asio::deadline_timer* t =
98101
new boost::asio::deadline_timer(m_ioService, boost::posix_time::milliseconds(millis));
99102
t->async_wait(
100103
boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest), this, t, request, msgs));
101104
LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(),
102105
millis);
106+
} else {
107+
LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post delay ConsumeRequest.",
108+
request->m_messageQueue.toString().c_str());
103109
}
104110
}
105111

src/consumer/DefaultMQPushConsumer.cpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -575,11 +575,17 @@ bool DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr<PullRequest>
575575
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", request->m_messageQueue.toString().c_str());
576576
return false;
577577
}
578-
boost::asio::deadline_timer* t =
579-
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis));
580-
t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest), this, t, request));
581-
LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(), millis);
582-
return true;
578+
if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
579+
boost::asio::deadline_timer* t =
580+
new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis));
581+
t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest), this, t, request));
582+
LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(), millis);
583+
return true;
584+
} else {
585+
LOG_WARN("Service or TaskQueue shutdown, produce PullRequest of mq:%s failed",
586+
request->m_messageQueue.toString().c_str());
587+
return false;
588+
}
583589
}
584590

585591
bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest) {

0 commit comments

Comments
 (0)