Skip to content

Commit

Permalink
Backport Bitcoin PR#9289: net: drop boost::thread_group (#1568)
Browse files Browse the repository at this point in the history
* net: a few small cleanups before replacing boost threads

- Drop the interruption point directly after the pnode allocation. This would
    be leaky if hit.
- Rearrange thread creation so that the socket handler comes first

* net: add CThreadInterrupt and InterruptibleSleep

* net: make net interruptible

Also now that net threads are interruptible, switch them to use std
threads/binds/mutexes/condvars.

* net: make net processing interruptible

* net: remove thread_interrupted catch

This is now a std::thread, so there's no hope of catching a boost interruption
point.

* net: make proxy receives interruptible

* net: misc header cleanups
  • Loading branch information
OlegGirko authored and UdjinM6 committed Aug 9, 2017
1 parent df6d458 commit 42c784d
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 82 deletions.
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ BITCOIN_CORE_H = \
support/pagelocker.h \
sync.h \
threadsafety.h \
threadinterrupt.h \
timedata.h \
tinyformat.h \
torcontrol.h \
Expand Down Expand Up @@ -387,6 +388,7 @@ libbitcoin_util_a_SOURCES = \
support/cleanse.cpp \
sync.cpp \
uint256.cpp \
threadinterrupt.cpp \
util.cpp \
utilmoneystr.cpp \
utilstrencodings.cpp \
Expand Down
4 changes: 3 additions & 1 deletion src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ void Interrupt(boost::thread_group& threadGroup)
InterruptRPC();
InterruptREST();
InterruptTorControl();
if (g_connman)
g_connman->Interrupt();
threadGroup.interrupt_all();
}

Expand Down Expand Up @@ -2025,7 +2027,7 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
connOptions.nSendBufferMaxSize = 1000*GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER);
connOptions.nReceiveFloodSize = 1000*GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER);

if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions))
if (!connman.Start(scheduler, strNodeError, connOptions))
return InitError(strNodeError);

// Generate coins in the background
Expand Down
130 changes: 85 additions & 45 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
#include <miniupnpc/upnperrors.h>
#endif

#include <boost/filesystem.hpp>
#include <boost/thread.hpp>

#include <math.h>

Expand Down Expand Up @@ -1071,7 +1069,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
void CConnman::ThreadSocketHandler()
{
unsigned int nPrevNodeCount = 0;
while (true)
while (!interruptNet)
{
//
// Disconnect nodes
Expand Down Expand Up @@ -1211,7 +1209,8 @@ void CConnman::ThreadSocketHandler()

int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
boost::this_thread::interruption_point();
if (interruptNet)
return;

if (nSelect == SOCKET_ERROR)
{
Expand All @@ -1224,7 +1223,8 @@ void CConnman::ThreadSocketHandler()
}
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
MilliSleep(timeout.tv_usec/1000);
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
return;
}

//
Expand All @@ -1244,7 +1244,8 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = CopyNodeVector();
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
boost::this_thread::interruption_point();
if (interruptNet)
return;

//
// Receive
Expand All @@ -1266,7 +1267,7 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
if(notify)
messageHandlerCondition.notify_one();
condMsgProc.notify_one();
pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
Expand Down Expand Up @@ -1471,7 +1472,8 @@ void CConnman::ThreadDNSAddressSeed()
// goal: only query DNS seeds if address need is acute
if ((addrman.size() > 0) &&
(!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) {
MilliSleep(11 * 1000);
if (!interruptNet.sleep_for(std::chrono::seconds(11)))
return;

LOCK(cs_vNodes);
if (vNodes.size() >= 2) {
Expand Down Expand Up @@ -1569,10 +1571,12 @@ void CConnman::ThreadOpenConnections()
OpenNetworkConnection(addr, NULL, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++)
{
MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}
MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}

Expand All @@ -1581,14 +1585,16 @@ void CConnman::ThreadOpenConnections()

// Minimum time before next feeler connection (in microseconds).
int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL);
while (true)
while (!interruptNet)
{
ProcessOneShot();

MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;

CSemaphoreGrant grant(*semOutbound);
boost::this_thread::interruption_point();
if (interruptNet)
return;

// Add seed nodes if DNS seeds are all down (an infrastructure attack?).
if (addrman.size() == 0 && (GetTime() - nStart > 60)) {
Expand Down Expand Up @@ -1645,7 +1651,7 @@ void CConnman::ThreadOpenConnections()

int64_t nANow = GetAdjustedTime();
int nTries = 0;
while (true)
while (!interruptNet)
{
CAddrInfo addr = addrman.Select(fFeeler);

Expand Down Expand Up @@ -1684,7 +1690,8 @@ void CConnman::ThreadOpenConnections()
if (fFeeler) {
// Add small amount of random noise before connection to avoid synchronization.
int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000);
MilliSleep(randsleep);
if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep)))
return;
LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString());
}

Expand Down Expand Up @@ -1762,11 +1769,12 @@ void CConnman::ThreadOpenAddedConnections()
// OpenNetworkConnection can detect existing connections to that IP/port.
CService service(info.strAddedNode, Params().GetDefaultPort());
OpenNetworkConnection(CAddress(service, NODE_NONE), &grant, info.strAddedNode.c_str(), false);
MilliSleep(500);
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
return;
}
}

MilliSleep(120000); // Retry every 2 minutes
if (!interruptNet.sleep_for(std::chrono::minutes(2)))
return;
}
}

Expand All @@ -1776,12 +1784,14 @@ void CConnman::ThreadMnbRequestConnections()
if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0)
return;

while (true)
while (!interruptNet)
{
MilliSleep(1000);
if (!interruptNet.sleep_for(std::chrono::milliseconds(1000)))
return;

CSemaphoreGrant grant(*semMasternodeOutbound);
boost::this_thread::interruption_point();
if (interruptNet)
return;

std::pair<CService, std::set<uint256> > p = mnodeman.PopScheduledMnbRequestConnection();
if(p.first == CService() || p.second.empty()) continue;
Expand Down Expand Up @@ -1820,7 +1830,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGran
//
// Initiate outbound network connection
//
boost::this_thread::interruption_point();
if (interruptNet) {
return false;
}
if (!pszDest) {
if (IsLocal(addrConnect) ||
FindNode((CNetAddr)addrConnect) || IsBanned(addrConnect) ||
Expand All @@ -1830,7 +1842,6 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGran
return false;

CNode* pnode = ConnectNode(addrConnect, pszDest);
boost::this_thread::interruption_point();

if (!pnode)
return false;
Expand All @@ -1844,14 +1855,10 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGran
return true;
}


void CConnman::ThreadMessageHandler()
{
boost::mutex condition_mutex;
boost::unique_lock<boost::mutex> lock(condition_mutex);

SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL);
while (true)
while (!flagInterruptMsgProc)
{
std::vector<CNode*> vNodesCopy = CopyNodeVector();

Expand All @@ -1867,7 +1874,7 @@ void CConnman::ThreadMessageHandler()
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
if (!GetNodeSignals().ProcessMessages(pnode, *this))
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
pnode->fDisconnect = true;

if (pnode->nSendSize < GetSendBufferSize())
Expand All @@ -1879,21 +1886,25 @@ void CConnman::ThreadMessageHandler()
}
}
}
boost::this_thread::interruption_point();
if (flagInterruptMsgProc)
return;

// Send messages
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
GetNodeSignals().SendMessages(pnode, *this);
GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
}
boost::this_thread::interruption_point();
if (flagInterruptMsgProc)
return;
}

ReleaseNodeVector(vNodesCopy);

if (fSleep)
messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100));
if (fSleep) {
std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
}
}
}

Expand Down Expand Up @@ -2064,14 +2075,15 @@ CConnman::CConnman()
nMaxOutbound = 0;
nBestHeight = 0;
clientInterface = NULL;
flagInterruptMsgProc = false;
}

NodeId CConnman::GetNewNodeId()
{
return nLastNodeId.fetch_add(1, std::memory_order_relaxed);
}

bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions)
bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions)
{
nTotalBytesRecv = 0;
nTotalBytesSent = 0;
Expand Down Expand Up @@ -2145,26 +2157,29 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st
//
// Start threads
//
InterruptSocks5(false);
interruptNet.reset();
flagInterruptMsgProc = false;

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

if (!GetBoolArg("-dnsseed", true))
LogPrintf("DNS seeding disabled\n");
else
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", boost::function<void()>(boost::bind(&CConnman::ThreadDNSAddressSeed, this))));

// Send and receive from sockets, accept connections
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "net", boost::function<void()>(boost::bind(&CConnman::ThreadSocketHandler, this))));
threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));

// Initiate outbound connections from -addnode
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "addcon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenAddedConnections, this))));
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));

// Initiate outbound connections
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "opencon", boost::function<void()>(boost::bind(&CConnman::ThreadOpenConnections, this))));
threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this)));

// Initiate masternode connections
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "mnbcon", boost::function<void()>(boost::bind(&CConnman::ThreadMnbRequestConnections, this))));
threadMnbRequestConnections = std::thread(&TraceThread<std::function<void()> >, "mnbcon", std::function<void()>(std::bind(&CConnman::ThreadMnbRequestConnections, this)));

// Process messages
threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "msghand", boost::function<void()>(boost::bind(&CConnman::ThreadMessageHandler, this))));
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));

// Dump network addresses
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
Expand Down Expand Up @@ -2195,12 +2210,36 @@ void CExplicitNetCleanup::callCleanup()
delete tmp; // Stroustrup's gonna kill me for that
}

void CConnman::Stop()
void CConnman::Interrupt()
{
LogPrintf("%s\n",__func__);
{
std::lock_guard<std::mutex> lock(mutexMsgProc);
flagInterruptMsgProc = true;
}
condMsgProc.notify_all();

interruptNet();
InterruptSocks5(true);

if (semOutbound)
for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++)
semOutbound->post();
}

void CConnman::Stop()
{
if (threadMessageHandler.joinable())
threadMessageHandler.join();
if (threadMnbRequestConnections.joinable())
threadMnbRequestConnections.join();
if (threadOpenConnections.joinable())
threadOpenConnections.join();
if (threadOpenAddedConnections.joinable())
threadOpenAddedConnections.join();
if (threadDNSAddressSeed.joinable())
threadDNSAddressSeed.join();
if (threadSocketHandler.joinable())
threadSocketHandler.join();

if (semMasternodeOutbound)
for (int i=0; i<MAX_OUTBOUND_MASTERNODE_CONNECTIONS; i++)
Expand Down Expand Up @@ -2252,6 +2291,7 @@ void CConnman::DeleteNode(CNode* pnode)

CConnman::~CConnman()
{
Interrupt();
Stop();
}

Expand Down
Loading

0 comments on commit 42c784d

Please sign in to comment.