@@ -67,7 +67,6 @@ void TcpRemotingClient::boost_asio_work() {
67
67
TcpRemotingClient::~TcpRemotingClient () {
68
68
m_tcpTable.clear ();
69
69
m_futureTable.clear ();
70
- m_asyncFutureTable.clear ();
71
70
m_namesrvAddrList.clear ();
72
71
removeAllTimerCallback ();
73
72
}
@@ -93,8 +92,11 @@ void TcpRemotingClient::stopAllTcpTransportThread() {
93
92
{
94
93
std::lock_guard<std::mutex> lock (m_futureTableLock);
95
94
for (const auto & future : m_futureTable) {
96
- if (future.second )
97
- future.second ->releaseThreadCondition ();
95
+ if (future.second ) {
96
+ if (!future.second ->getAsyncFlag ()) {
97
+ future.second ->releaseThreadCondition ();
98
+ }
99
+ }
98
100
}
99
101
}
100
102
@@ -222,7 +224,7 @@ bool TcpRemotingClient::invokeAsync(const string& addr,
222
224
responseFuture->setRetrySendTimes (retrySendTimes);
223
225
responseFuture->setBrokerAddr (addr);
224
226
responseFuture->setRequestCommand (request);
225
- addAsyncResponseFuture (opaque, responseFuture);
227
+ addResponseFuture (opaque, responseFuture);
226
228
227
229
// timeout monitor
228
230
boost::asio::deadline_timer* t =
@@ -468,14 +470,11 @@ void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr)
468
470
469
471
// <!process self;
470
472
if (pRespondCmd->isResponseType ()) {
471
- std::shared_ptr<ResponseFuture> pFuture = findAndDeleteAsyncResponseFuture (opaque);
473
+ std::shared_ptr<ResponseFuture> pFuture = findAndDeleteResponseFuture (opaque);
472
474
if (!pFuture) {
473
- pFuture = findAndDeleteResponseFuture (opaque);
474
- if (!pFuture) {
475
- LOG_DEBUG (" responseFuture was deleted by timeout of opaque:%d" , opaque);
476
- deleteAndZero (pRespondCmd);
477
- return ;
478
- }
475
+ LOG_DEBUG (" responseFuture was deleted by timeout of opaque:%d" , opaque);
476
+ deleteAndZero (pRespondCmd);
477
+ return ;
479
478
}
480
479
481
480
LOG_DEBUG (" find_response opaque:%d" , opaque);
@@ -514,7 +513,7 @@ void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_cod
514
513
515
514
LOG_DEBUG (" handleAsyncRequestTimeout opaque:%d, e_code:%d, msg:%s" , opaque, e.value (), e.message ().data ());
516
515
517
- std::shared_ptr<ResponseFuture> pFuture (findAndDeleteAsyncResponseFuture (opaque));
516
+ std::shared_ptr<ResponseFuture> pFuture (findAndDeleteResponseFuture (opaque));
518
517
if (pFuture) {
519
518
LOG_ERROR (" no response got for opaque:%d" , opaque);
520
519
eraseTimerCallback (opaque);
@@ -560,23 +559,6 @@ std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(i
560
559
return pResponseFuture;
561
560
}
562
561
563
- void TcpRemotingClient::addAsyncResponseFuture (int opaque, std::shared_ptr<ResponseFuture> pFuture) {
564
- std::lock_guard<std::mutex> lock (m_asyncFutureTableLock);
565
- m_asyncFutureTable[opaque] = pFuture;
566
- }
567
-
568
- // Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will
569
- // be erased, so caller must ensure the life cycle of returned shared_ptr;
570
- std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteAsyncResponseFuture (int opaque) {
571
- std::lock_guard<std::mutex> lock (m_asyncFutureTableLock);
572
- std::shared_ptr<ResponseFuture> pResponseFuture;
573
- if (m_asyncFutureTable.find (opaque) != m_asyncFutureTable.end ()) {
574
- pResponseFuture = m_asyncFutureTable[opaque];
575
- m_asyncFutureTable.erase (opaque);
576
- }
577
- return pResponseFuture;
578
- }
579
-
580
562
void TcpRemotingClient::registerProcessor (MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor) {
581
563
if (m_requestTable.find (requestCode) != m_requestTable.end ())
582
564
m_requestTable.erase (requestCode);
0 commit comments