Skip to content

Commit abb79a4

Browse files
committed
remove boost mutex, timed_mutex and condition_variable in TcpRemotingClient, TcpTransport and ReponseFunture.
1 parent 62176ab commit abb79a4

File tree

6 files changed

+54
-49
lines changed

6 files changed

+54
-49
lines changed

src/transport/ResponseFuture.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
* limitations under the License.
1616
*/
1717
#include "ResponseFuture.h"
18+
19+
#include <chrono>
20+
1821
#include "Logging.h"
1922
#include "TcpRemotingClient.h"
2023

@@ -55,12 +58,12 @@ void ResponseFuture::releaseThreadCondition() {
5558
}
5659

5760
RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) {
58-
boost::unique_lock<boost::mutex> eventLock(m_defaultEventLock);
61+
std::unique_lock<std::mutex> eventLock(m_defaultEventLock);
5962
if (!m_haveResponse) {
6063
if (timeoutMillis <= 0) {
6164
timeoutMillis = m_timeout;
6265
}
63-
if (!m_defaultEvent.timed_wait(eventLock, boost::posix_time::milliseconds(timeoutMillis))) {
66+
if (m_defaultEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) {
6467
LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, m_opaque);
6568
m_haveResponse = true;
6669
}
@@ -69,7 +72,7 @@ RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) {
6972
}
7073

7174
bool ResponseFuture::setResponse(RemotingCommand* pResponseCommand) {
72-
boost::unique_lock<boost::mutex> eventLock(m_defaultEventLock);
75+
std::unique_lock<std::mutex> eventLock(m_defaultEventLock);
7376

7477
if (m_haveResponse) {
7578
return false;

src/transport/ResponseFuture.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
#ifndef __RESPONSEFUTURE_H__
1818
#define __RESPONSEFUTURE_H__
1919

20-
#include <boost/atomic.hpp>
21-
#include <boost/thread/condition_variable.hpp>
20+
#include <atomic>
21+
#include <condition_variable>
2222

2323
#include "AsyncCallbackWrap.h"
2424
#include "RemotingCommand.h"
@@ -81,11 +81,11 @@ class ResponseFuture {
8181
AsyncCallbackWrap* m_pCallbackWrap;
8282

8383
AsyncCallbackStatus m_asyncCallbackStatus;
84-
boost::mutex m_asyncCallbackLock;
84+
std::mutex m_asyncCallbackLock;
8585

8686
bool m_haveResponse;
87-
boost::mutex m_defaultEventLock;
88-
boost::condition_variable_any m_defaultEvent;
87+
std::mutex m_defaultEventLock;
88+
std::condition_variable m_defaultEvent;
8989

9090
int64 m_beginTimestamp;
9191
bool m_sendRequestOK;

src/transport/TcpRemotingClient.cpp

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void TcpRemotingClient::stopAllTcpTransportThread() {
9191
m_threadpool.join_all();
9292

9393
{
94-
boost::lock_guard<boost::mutex> lock(m_futureTableLock);
94+
std::lock_guard<std::mutex> lock(m_futureTableLock);
9595
for (const auto& future : m_futureTable) {
9696
if (future.second)
9797
future.second->releaseThreadCondition();
@@ -108,9 +108,9 @@ void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
108108
return;
109109
}
110110

111-
boost::unique_lock<boost::timed_mutex> lock(m_namesrvLock, boost::try_to_lock);
111+
std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
112112
if (!lock.owns_lock()) {
113-
if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(10))) {
113+
if (!lock.try_lock_for(std::chrono::seconds(10))) {
114114
LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
115115
return;
116116
}
@@ -273,9 +273,9 @@ std::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& a
273273
{
274274
// try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
275275
// long time, if could not get m_tcpLock, return NULL
276-
boost::unique_lock<boost::timed_mutex> lock(m_tcpTableLock, boost::try_to_lock);
276+
std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
277277
if (!lock.owns_lock()) {
278-
if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
278+
if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
279279
LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
280280
std::shared_ptr<TcpTransport> pTcp;
281281
return pTcp;
@@ -338,9 +338,9 @@ std::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameServerTransport(bool
338338
// try get m_tcpLock until m_tcpTransportTryLockTimeout to avoid blocking long
339339
// time, if could not get m_namesrvlock, return NULL
340340
LOG_DEBUG("--CreateNameserverTransport--");
341-
boost::unique_lock<boost::timed_mutex> lock(m_namesrvLock, boost::try_to_lock);
341+
std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
342342
if (!lock.owns_lock()) {
343-
if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
343+
if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
344344
LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
345345
std::shared_ptr<TcpTransport> pTcp;
346346
return pTcp;
@@ -375,9 +375,9 @@ bool TcpRemotingClient::CloseTransport(const string& addr, std::shared_ptr<TcpTr
375375
return CloseNameServerTransport(pTcp);
376376
}
377377

378-
boost::unique_lock<boost::timed_mutex> lock(m_tcpTableLock, boost::try_to_lock);
378+
std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
379379
if (!lock.owns_lock()) {
380-
if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
380+
if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
381381
LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
382382
return false;
383383
}
@@ -411,9 +411,9 @@ bool TcpRemotingClient::CloseTransport(const string& addr, std::shared_ptr<TcpTr
411411
}
412412

413413
bool TcpRemotingClient::CloseNameServerTransport(std::shared_ptr<TcpTransport> pTcp) {
414-
boost::unique_lock<boost::timed_mutex> lock(m_namesrvLock, boost::try_to_lock);
414+
std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
415415
if (!lock.owns_lock()) {
416-
if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
416+
if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
417417
LOG_ERROR("CreateNameServerTransport get timed_mutex timeout");
418418
return false;
419419
}
@@ -545,14 +545,14 @@ void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const strin
545545
}
546546

547547
void TcpRemotingClient::addResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture) {
548-
boost::lock_guard<boost::mutex> lock(m_futureTableLock);
548+
std::lock_guard<std::mutex> lock(m_futureTableLock);
549549
m_futureTable[opaque] = pFuture;
550550
}
551551

552552
// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
553553
// be erased, so caller must ensure the life cycle of returned shared_ptr;
554554
std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
555-
boost::lock_guard<boost::mutex> lock(m_futureTableLock);
555+
std::lock_guard<std::mutex> lock(m_futureTableLock);
556556
std::shared_ptr<ResponseFuture> pResponseFuture;
557557
if (m_futureTable.find(opaque) != m_futureTable.end()) {
558558
pResponseFuture = m_futureTable[opaque];
@@ -562,14 +562,14 @@ std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(i
562562
}
563563

564564
void TcpRemotingClient::addAsyncResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture) {
565-
boost::lock_guard<boost::mutex> lock(m_asyncFutureTableLock);
565+
std::lock_guard<std::mutex> lock(m_asyncFutureTableLock);
566566
m_asyncFutureTable[opaque] = pFuture;
567567
}
568568

569569
// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will
570570
// be erased, so caller must ensure the life cycle of returned shared_ptr;
571571
std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
572-
boost::lock_guard<boost::mutex> lock(m_asyncFutureTableLock);
572+
std::lock_guard<std::mutex> lock(m_asyncFutureTableLock);
573573
std::shared_ptr<ResponseFuture> pResponseFuture;
574574
if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
575575
pResponseFuture = m_asyncFutureTable[opaque];
@@ -585,7 +585,7 @@ void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, ClientRemot
585585
}
586586

587587
void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int opaque) {
588-
boost::lock_guard<boost::mutex> lock(m_asyncTimerTableLock);
588+
std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
589589
if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
590590
LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
591591
boost::asio::deadline_timer* old_t = m_asyncTimerTable[opaque];
@@ -601,7 +601,7 @@ void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int opa
601601
}
602602

603603
void TcpRemotingClient::eraseTimerCallback(int opaque) {
604-
boost::lock_guard<boost::mutex> lock(m_asyncTimerTableLock);
604+
std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
605605
if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
606606
LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
607607
boost::asio::deadline_timer* t = m_asyncTimerTable[opaque];
@@ -611,7 +611,7 @@ void TcpRemotingClient::eraseTimerCallback(int opaque) {
611611
}
612612

613613
void TcpRemotingClient::cancelTimerCallback(int opaque) {
614-
boost::lock_guard<boost::mutex> lock(m_asyncTimerTableLock);
614+
std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
615615
if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
616616
LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);
617617
boost::asio::deadline_timer* t = m_asyncTimerTable[opaque];
@@ -626,7 +626,7 @@ void TcpRemotingClient::cancelTimerCallback(int opaque) {
626626
}
627627

628628
void TcpRemotingClient::removeAllTimerCallback() {
629-
boost::lock_guard<boost::mutex> lock(m_asyncTimerTableLock);
629+
std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
630630
for (const auto& timer : m_asyncTimerTable) {
631631
boost::asio::deadline_timer* t = timer.second;
632632
try {

src/transport/TcpRemotingClient.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define __TCPREMOTINGCLIENT_H__
1919

2020
#include <map>
21+
#include <mutex>
2122

2223
#include <boost/asio.hpp>
2324
#include <boost/asio/io_service.hpp>
@@ -99,23 +100,23 @@ class TcpRemotingClient {
99100
RequestMap m_requestTable;
100101

101102
TcpMap m_tcpTable; //<! addr->tcp;
102-
boost::timed_mutex m_tcpTableLock;
103+
std::timed_mutex m_tcpTableLock;
103104

104105
ResMap m_futureTable; //<! id->future;
105-
boost::mutex m_futureTableLock;
106+
std::mutex m_futureTableLock;
106107

107108
ResMap m_asyncFutureTable;
108-
boost::mutex m_asyncFutureTableLock;
109+
std::mutex m_asyncFutureTableLock;
109110

110111
AsyncTimerMap m_asyncTimerTable;
111-
boost::mutex m_asyncTimerTableLock;
112+
std::mutex m_asyncTimerTableLock;
112113

113114
int m_pullThreadNum;
114115
uint64_t m_tcpConnectTimeout; // ms
115116
uint64_t m_tcpTransportTryLockTimeout; // s
116117

117118
//<! NameServer
118-
boost::timed_mutex m_namesrvLock;
119+
std::timed_mutex m_namesrvLock;
119120
vector<string> m_namesrvAddrList;
120121
string m_namesrvAddrChoosed;
121122
unsigned int m_namesrvIndex;

src/transport/TcpTransport.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
#include "TcpTransport.h"
1818

19+
#include <chrono>
20+
1921
#ifndef WIN32
2022
#include <arpa/inet.h> // for sockaddr_in and inet_ntoa...
2123
#include <netinet/tcp.h>
@@ -65,9 +67,9 @@ TcpConnectStatus TcpTransport::getTcpConnectStatus() {
6567
}
6668

6769
TcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillis) {
68-
boost::unique_lock<boost::mutex> eventLock(m_connectEventLock);
70+
std::unique_lock<std::mutex> eventLock(m_connectEventLock);
6971
if (m_tcpConnectStatus == TCP_CONNECT_STATUS_WAIT) {
70-
if (!m_connectEvent.timed_wait(eventLock, boost::posix_time::milliseconds(timeoutMillis))) {
72+
if (m_connectEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) {
7173
LOG_INFO("connect timeout");
7274
}
7375
}
@@ -76,9 +78,9 @@ TcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillis) {
7678

7779
// internal method
7880
void TcpTransport::setTcpConnectEvent(TcpConnectStatus connectStatus) {
79-
TcpConnectStatus baseStatus = m_tcpConnectStatus.exchange(connectStatus, boost::memory_order_relaxed);
81+
TcpConnectStatus baseStatus = m_tcpConnectStatus.exchange(connectStatus, std::memory_order_relaxed);
8082
if (baseStatus == TCP_CONNECT_STATUS_WAIT) {
81-
boost::unique_lock<boost::mutex> eventLock(m_connectEventLock);
83+
std::unique_lock<std::mutex> eventLock(m_connectEventLock);
8284
m_connectEvent.notify_all();
8385
}
8486
}
@@ -125,7 +127,7 @@ u_long TcpTransport::getInetAddr(string& hostname) {
125127

126128
void TcpTransport::disconnect(const string& addr) {
127129
// disconnect is idempotent.
128-
boost::lock_guard<boost::mutex> lock(m_eventLock);
130+
std::lock_guard<std::mutex> lock(m_eventLock);
129131
if (getTcpConnectStatus() != TCP_CONNECT_STATUS_INIT) {
130132
LOG_INFO("disconnect:%s start. event:%p", addr.c_str(), m_event.get());
131133
freeBufferEvent();
@@ -144,7 +146,7 @@ TcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeoutMi
144146
}
145147

146148
{
147-
boost::lock_guard<boost::mutex> lock(m_eventLock);
149+
std::lock_guard<std::mutex> lock(m_eventLock);
148150

149151
struct sockaddr_in sin;
150152
memset(&sin, 0, sizeof(sin));
@@ -175,7 +177,7 @@ TcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeoutMi
175177
if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
176178
LOG_WARN("can not connect to server:%s", strServerURL.c_str());
177179

178-
boost::lock_guard<boost::mutex> lock(m_eventLock);
180+
std::lock_guard<std::mutex> lock(m_eventLock);
179181
freeBufferEvent();
180182
setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
181183
return TCP_CONNECT_STATUS_FAILED;
@@ -268,7 +270,7 @@ void TcpTransport::messageReceived(const MemoryBlock& mem, const std::string& ad
268270
}
269271

270272
bool TcpTransport::sendMessage(const char* pData, size_t len) {
271-
boost::lock_guard<boost::mutex> lock(m_eventLock);
273+
std::lock_guard<std::mutex> lock(m_eventLock);
272274
if (getTcpConnectStatus() != TCP_CONNECT_STATUS_SUCCESS) {
273275
return false;
274276
}
@@ -281,7 +283,7 @@ bool TcpTransport::sendMessage(const char* pData, size_t len) {
281283
}
282284

283285
const string TcpTransport::getPeerAddrAndPort() {
284-
boost::lock_guard<boost::mutex> lock(m_eventLock);
286+
std::lock_guard<std::mutex> lock(m_eventLock);
285287
return m_event ? m_event->getPeerAddrPort() : "";
286288
}
287289

src/transport/TcpTransport.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
#ifndef __TCPTRANSPORT_H__
1818
#define __TCPTRANSPORT_H__
1919

20-
#include <boost/atomic.hpp>
21-
#include <boost/thread/condition_variable.hpp>
22-
#include <boost/thread/mutex.hpp>
23-
#include <boost/thread/thread.hpp>
20+
#include <atomic>
21+
#include <condition_variable>
22+
#include <mutex>
2423

2524
#include "EventLoop.h"
2625
#include "dataBlock.h"
@@ -77,11 +76,11 @@ class TcpTransport : public std::enable_shared_from_this<TcpTransport> {
7776
uint64_t m_startTime;
7877

7978
std::shared_ptr<BufferEvent> m_event; // NOTE: use m_event in callback is unsafe.
80-
boost::mutex m_eventLock;
81-
boost::atomic<TcpConnectStatus> m_tcpConnectStatus;
79+
std::mutex m_eventLock;
80+
std::atomic<TcpConnectStatus> m_tcpConnectStatus;
8281

83-
boost::mutex m_connectEventLock;
84-
boost::condition_variable_any m_connectEvent;
82+
std::mutex m_connectEventLock;
83+
std::condition_variable m_connectEvent;
8584

8685
//<! read data callback
8786
TcpTransportReadCallback m_readCallback;

0 commit comments

Comments
 (0)