diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp index 4d64aa0e0..6e1a39224 100644 --- a/src/MQClientFactory.cpp +++ b/src/MQClientFactory.cpp @@ -83,7 +83,8 @@ void MQClientFactory::start() { } } -void MQClientFactory::updateTopicRouteInfo(boost::system::error_code& ec, boost::asio::deadline_timer* t) { +void MQClientFactory::updateTopicRouteInfo(boost::system::error_code& ec, + boost::shared_ptr t) { if ((getConsumerTableSize() == 0) && (getProducerTableSize() == 0)) { return; } @@ -784,7 +785,8 @@ void MQClientFactory::sendHeartbeatToAllBroker() { brokerTable.clear(); } -void MQClientFactory::persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer* t) { +void MQClientFactory::persistAllConsumerOffset(boost::system::error_code& ec, + boost::shared_ptr t) { { boost::lock_guard lock(m_consumerTableMutex); if (m_consumerTable.size() > 0) { @@ -814,7 +816,8 @@ HeartbeatData* MQClientFactory::prepareHeartbeatData() { return pHeartbeatData; } -void MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec, boost::asio::deadline_timer* t) { +void MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec, + boost::shared_ptr t) { sendHeartbeatToAllBroker(); boost::system::error_code e; @@ -822,7 +825,8 @@ void MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code t->async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this, ec, t)); } -void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer* t) { +void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code& ec, + boost::shared_ptr t) { cleanOfflineBrokers(); boost::system::error_code e; @@ -830,7 +834,8 @@ void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code& ec, t->async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec, t)); } -void MQClientFactory::fetchNameServerAddr(boost::system::error_code& ec, boost::asio::deadline_timer* t) { +void MQClientFactory::fetchNameServerAddr(boost::system::error_code& ec, + boost::shared_ptr t) { m_pClientAPIImpl->fetchNameServerAddr(m_nameSrvDomain); boost::system::error_code e; @@ -845,21 +850,24 @@ void MQClientFactory::startScheduledTask(bool startFetchNSService) { // callback boost::system::error_code ec1; - boost::asio::deadline_timer t1(m_async_ioService, boost::posix_time::seconds(3)); - t1.async_wait(boost::bind(&MQClientFactory::updateTopicRouteInfo, this, ec1, &t1)); + boost::shared_ptr t1 = + boost::make_shared(m_async_ioService, boost::posix_time::seconds(3)); + t1->async_wait(boost::bind(&MQClientFactory::updateTopicRouteInfo, this, ec1, t1)); boost::system::error_code ec2; - boost::asio::deadline_timer t2(m_async_ioService, boost::posix_time::milliseconds(10)); - t2.async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this, ec2, &t2)); + boost::shared_ptr t2 = + boost::make_shared(m_async_ioService, boost::posix_time::milliseconds(10)); + t2->async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this, ec2, t2)); boost::system::error_code ec3; - boost::asio::deadline_timer t3(m_async_ioService, boost::posix_time::seconds(3)); - t3.async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec3, &t3)); + boost::shared_ptr t3 = + boost::make_shared(m_async_ioService, boost::posix_time::seconds(3)); + t3->async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec3, t3)); if (startFetchNSService) { boost::system::error_code ec5; - boost::asio::deadline_timer* t5 = - new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::seconds(60 * 2)); + boost::shared_ptr t5 = + boost::make_shared(m_async_ioService, boost::posix_time::seconds(60 * 2)); t5->async_wait(boost::bind(&MQClientFactory::fetchNameServerAddr, this, ec5, t5)); } @@ -885,19 +893,22 @@ void MQClientFactory::consumer_timerOperation() { // callback boost::system::error_code ec1; - boost::asio::deadline_timer t(m_consumer_async_ioService, boost::posix_time::seconds(10)); - t.async_wait(boost::bind(&MQClientFactory::timerCB_doRebalance, this, ec1, &t)); + boost::shared_ptr t1 = + boost::make_shared(m_consumer_async_ioService, boost::posix_time::seconds(10)); + t1->async_wait(boost::bind(&MQClientFactory::timerCB_doRebalance, this, ec1, t1)); boost::system::error_code ec2; - boost::asio::deadline_timer t2(m_consumer_async_ioService, boost::posix_time::seconds(5)); - t2.async_wait(boost::bind(&MQClientFactory::persistAllConsumerOffset, this, ec2, &t2)); + boost::shared_ptr t2 = + boost::make_shared(m_consumer_async_ioService, boost::posix_time::seconds(5)); + t2->async_wait(boost::bind(&MQClientFactory::persistAllConsumerOffset, this, ec2, t2)); boost::system::error_code ec; m_consumer_async_ioService.run(ec); LOG_INFO("clientFactory:%s stop consumer_timerOperation", m_clientId.c_str()); } -void MQClientFactory::timerCB_doRebalance(boost::system::error_code& ec, boost::asio::deadline_timer* t) { +void MQClientFactory::timerCB_doRebalance(boost::system::error_code& ec, + boost::shared_ptr t) { doRebalance(); boost::system::error_code e; diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h index b5e544172..32c337cd8 100644 --- a/src/MQClientFactory.h +++ b/src/MQClientFactory.h @@ -129,17 +129,18 @@ class MQClientFactory { void startScheduledTask(bool startFetchNSService = true); // t); + void updateTopicRouteInfo(boost::system::error_code& ec, boost::shared_ptr t); + void timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec, + boost::shared_ptr t); - void timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer* t); + void timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::shared_ptr t); // consumer related operation void consumer_timerOperation(); - void persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer* t); + void persistAllConsumerOffset(boost::system::error_code& ec, boost::shared_ptr t); void doRebalance(); - void timerCB_doRebalance(boost::system::error_code& ec, boost::asio::deadline_timer* t); + void timerCB_doRebalance(boost::system::error_code& ec, boost::shared_ptr t); bool getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials); bool addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer); void eraseConsumerFromTable(const string& consumerName); diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp index 068b8c4ff..f638f6f44 100644 --- a/src/common/MQClient.cpp +++ b/src/common/MQClient.cpp @@ -150,6 +150,7 @@ void MQClient::start() { } void MQClient::shutdown() { + m_clientFactory->shutdown(); m_clientFactory = NULL; }