Skip to content

Commit

Permalink
Backport Bitcoin PR#9441: Net: Massive speedup. Net locks overhaul (#…
Browse files Browse the repository at this point in the history
…1586)

* net: fix typo causing the wrong receive buffer size

Surprisingly this hasn't been causing me any issues while testing, probably
because it requires lots of large blocks to be flying around.

Send/Recv corks need tests!

* net: make vRecvMsg a list so that we can use splice()

* net: make GetReceiveFloodSize public

This will be needed so that the message processor can cork incoming messages

* net: only disconnect if fDisconnect has been set

These conditions are problematic to check without locking, and we shouldn't be
relying on the refcount to disconnect.

* net: wait until the node is destroyed to delete its recv buffer

when vRecvMsg becomes a private buffer, it won't make sense to allow other
threads to mess with it anymore.

* net: set message deserialization version when it's actually time to deserialize

We'll soon no longer have access to vRecvMsg, and this is more intuitive anyway.

* net: handle message accounting in ReceiveMsgBytes

This allows locking to be pushed down to only where it's needed

Also reuse the current time rather than checking multiple times.

* net: record bytes written before notifying the message processor

* net: Add a simple function for waking the message handler

This may be used publicly in the future

* net: remove useless comments

* net: remove redundant max sendbuffer size check

This is left-over from before there was proper accounting. Hitting 2x the
sendbuffer size should not be possible.

* net: rework the way that the messagehandler sleeps

In order to sleep accurately, the message handler needs to know if _any_ node
has more processing that it should do before the entire thread sleeps.

Rather than returning a value that represents whether ProcessMessages
encountered a message that should trigger a disconnnect, interpret the return
value as whether or not that node has more work to do.

Also, use a global fProcessWake value that can be set by other threads,
which takes precedence (for one cycle) over the messagehandler's decision.

Note that the previous behavior was to only process one message per loop
(except in the case of a bad checksum or invalid header). That was changed in
PR #3180.

The only change here in that regard is that the current node now falls to the
back of the processing queue for the bad checksum/invalid header cases.

* net: add a new message queue for the message processor

This separates the storage of messages from the net and queued messages for
processing, allowing the locks to be split.

* net: add a flag to indicate when a node's process queue is full

Messages are dumped very quickly from the socket handler to the processor, so
it's the depth of the processing queue that's interesting.

The socket handler checks the process queue's size during the brief message
hand-off and pauses if necessary, and the processor possibly unpauses each time
a message is popped off of its queue.

* net: add a flag to indicate when a node's send buffer is full

Similar to the recv flag, but this one indicates whether or not the net's send
buffer is full.

The socket handler checks the send queue when a new message is added and pauses
if necessary, and possibly unpauses after each message is drained from its buffer.

* net: remove cs_vRecvMsg

vRecvMsg is now only touched by the socket handler thread.

The accounting vars (nRecvBytes/nLastRecv/mapRecvBytesPerMsgCmd) are also
only used by the socket handler thread, with the exception of queries from
rpc/gui. These accesses are not threadsafe, but they never were. This needs to
be addressed separately.

Also, update comment describing data flow
  • Loading branch information
OlegGirko authored and UdjinM6 committed Aug 23, 2017
1 parent b9c6725 commit 2472999
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 124 deletions.
116 changes: 58 additions & 58 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,6 @@ void CNode::CloseSocketDisconnect()
LogPrint("net", "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}

// in case this fails, we'll empty the recv buffer when the CNode is deleted
TRY_LOCK(cs_vRecvMsg, lockRecv);
if (lockRecv)
vRecvMsg.clear();
}

void CConnman::ClearBanned()
Expand Down Expand Up @@ -673,16 +668,18 @@ void CNode::copyStats(CNodeStats &stats)
}
#undef X

// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
int64_t nTimeMicros = GetTimeMicros();
nLastRecv = nTimeMicros / 1000000;
nRecvBytes += nBytes;
while (nBytes > 0) {

// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));

CNetMessage& msg = vRecvMsg.back();

Expand Down Expand Up @@ -714,7 +711,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;

msg.nTime = GetTimeMicros();
msg.nTime = nTimeMicros;
complete = true;
}
}
Expand Down Expand Up @@ -805,7 +802,7 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)


// requires LOCK(cs_vSend)
size_t SocketSendData(CNode *pnode)
size_t CConnman::SocketSendData(CNode *pnode)
{
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
Expand All @@ -822,6 +819,7 @@ size_t SocketSendData(CNode *pnode)
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
Expand Down Expand Up @@ -1110,8 +1108,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect ||
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
if (pnode->fDisconnect)
{
LogPrintf("ThreadSocketHandler -- removing node: peer=%d addr=%s nRefCount=%d fNetworkNode=%d fInbound=%d fMasternode=%d\n",
pnode->id, pnode->addr.ToString(), pnode->GetRefCount(), pnode->fNetworkNode, pnode->fInbound, pnode->fMasternode);
Expand Down Expand Up @@ -1148,13 +1145,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv)
fDelete = true;
}
}
}
if (fDelete)
Expand Down Expand Up @@ -1209,15 +1202,10 @@ void CConnman::ThreadSocketHandler()
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is no (complete) message in the receive buffer,
// or there is space left in the buffer, select() for receiving data.
// * (if neither of the above applies, there is certainly one message
// in the receiver buffer ready to be processed).
// Together, that means that at least one of the following is always possible,
// so we don't deadlock:
// * We send some data.
// * We wait for data to be received (and disconnect after timeout).
// * We process a message in the buffer (message handler thread).
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
Expand All @@ -1228,10 +1216,7 @@ void CConnman::ThreadSocketHandler()
}
}
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv && (
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
if (!pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
Expand Down Expand Up @@ -1284,8 +1269,6 @@ void CConnman::ThreadSocketHandler()
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
{
// typical socket buffer is 8K-64K
Expand All @@ -1296,11 +1279,23 @@ void CConnman::ThreadSocketHandler()
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
if(notify)
condMsgProc.notify_one();
pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
if (notify) {
size_t nSizeAdded = 0;
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
pnode->nProcessQueueSize += nSizeAdded;
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
WakeMessageHandler();
}
}
else if (nBytes == 0)
{
Expand Down Expand Up @@ -1334,8 +1329,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
size_t nBytes = SocketSendData(pnode);
if (nBytes)
if (nBytes) {
RecordBytesSent(nBytes);
}
}
}

Expand Down Expand Up @@ -1371,8 +1367,14 @@ void CConnman::ThreadSocketHandler()
}
}



void CConnman::WakeMessageHandler()
{
{
std::lock_guard<std::mutex> lock(mutexMsgProc);
fMsgProcWake = true;
}
condMsgProc.notify_one();
}



Expand Down Expand Up @@ -1892,30 +1894,16 @@ void CConnman::ThreadMessageHandler()
{
std::vector<CNode*> vNodesCopy = CopyNodeVector();

bool fSleep = true;
bool fMoreWork = false;

BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect)
continue;

// Receive messages
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
pnode->fDisconnect = true;

if (pnode->nSendSize < GetSendBufferSize())
{
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
{
fSleep = false;
}
}
}
}
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;

Expand All @@ -1931,10 +1919,11 @@ void CConnman::ThreadMessageHandler()

ReleaseNodeVector(vNodesCopy);

if (fSleep) {
std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutexMsgProc);
if (!fMoreWork) {
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
fMsgProcWake = false;
}
}

Expand Down Expand Up @@ -2129,7 +2118,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nMaxFeeler = connOptions.nMaxFeeler;

nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
nReceiveFloodSize = connOptions.nSendBufferMaxSize;
nReceiveFloodSize = connOptions.nReceiveFloodSize;

SetBestHeight(connOptions.nBestHeight);

Expand Down Expand Up @@ -2191,6 +2180,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;

{
std::unique_lock<std::mutex> lock(mutexMsgProc);
fMsgProcWake = false;
}

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

Expand Down Expand Up @@ -2684,6 +2678,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
vchKeyedNetGroup = CalculateKeyedNetGroup(addr);
id = idIn;
nLocalServices = nLocalServicesIn;
fPauseRecv = false;
fPauseSend = false;
nProcessQueueSize = 0;

GetRandBytes((unsigned char*)&nLocalHostNonce, sizeof(nLocalHostNonce));
nMyStartingHeight = nMyStartingHeightIn;
Expand Down Expand Up @@ -2819,6 +2816,9 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
pnode->nSendSize += strm.size();

if (pnode->nSendSize > nSendBufferMaxSize)
pnode->fPauseSend = true;

// If write queue empty, attempt "optimistic write"
if (optimisticSend == true)
nBytesSent = SocketSendData(pnode);
Expand Down
39 changes: 20 additions & 19 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ class CConnman
int GetBestHeight() const;


unsigned int GetReceiveFloodSize() const;
private:
struct ListenSocket {
SOCKET socket;
Expand All @@ -403,6 +404,8 @@ class CConnman
void ThreadDNSAddressSeed();
void ThreadMnbRequestConnections();

void WakeMessageHandler();

CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CSubNet& subNet);
CNode* FindNode(const std::string& addrName);
Expand All @@ -415,6 +418,7 @@ class CConnman

NodeId GetNewNodeId();

size_t SocketSendData(CNode *pnode);
//!check is the banlist has unwritten changes
bool BannedSetIsDirty();
//!set the "dirty" flag for the banlist
Expand All @@ -425,8 +429,6 @@ class CConnman
void DumpData();
void DumpBanlist();

unsigned int GetReceiveFloodSize() const;

CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
void EndMessage(CDataStream& strm);
Expand Down Expand Up @@ -487,6 +489,9 @@ class CConnman
std::atomic<int> nBestHeight;
CClientUIInterface* clientInterface;

/** flag for waking the message processor. */
bool fMsgProcWake;

std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;
Expand All @@ -505,7 +510,6 @@ void Discover(boost::thread_group& threadGroup);
void MapPort(bool fUseUPnP);
unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
size_t SocketSendData(CNode *pnode);

struct CombinerAll
{
Expand Down Expand Up @@ -666,11 +670,13 @@ class CNode
std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend;

CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
size_t nProcessQueueSize;

std::deque<CInv> vRecvGetData;
std::deque<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;
uint64_t nRecvBytes;
int nRecvVersion;
std::atomic<int> nRecvVersion;

int64_t nLastSend;
int64_t nLastRecv;
Expand Down Expand Up @@ -708,6 +714,9 @@ class CNode
CBloomFilter* pfilter;
int nRefCount;
NodeId id;

std::atomic_bool fPauseRecv;
std::atomic_bool fPauseSend;
protected:

mapMsgCmdSize mapSendBytesPerMsgCmd;
Expand Down Expand Up @@ -770,6 +779,7 @@ class CNode
ServiceFlags nLocalServices;
int nMyStartingHeight;
int nSendVersion;
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
public:

NodeId GetId() const {
Expand All @@ -791,24 +801,15 @@ class CNode
return nRefCount;
}

// requires LOCK(cs_vRecvMsg)
unsigned int GetTotalRecvSize()
{
unsigned int total = 0;
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
total += msg.vRecv.size() + 24;
return total;
}

// requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);

// requires LOCK(cs_vRecvMsg)
void SetRecvVersion(int nVersionIn)
{
nRecvVersion = nVersionIn;
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
msg.SetVersion(nVersionIn);
}
int GetRecvVersion()
{
return nRecvVersion;
}
void SetSendVersion(int nVersionIn);
int GetSendVersion() const;
Expand Down
Loading

0 comments on commit 2472999

Please sign in to comment.