Skip to content

Commit 4629656

Browse files
committed
connman is in charge of pushing messages
The changes here are dense and subtle, but hopefully all is more explicit than before. - CConnman is now in charge of sending data rather than the nodes themselves. This is necessary because many decisions need to be made with all nodes in mind, and a model that requires the nodes calling up to their manager quickly turns to spaghetti. - The per-node-serializer (ssSend) has been replaced with a (quasi-)const send-version. Since the send version for serialization can only change once per connection, we now explicitly tag messages with INIT_PROTO_VERSION if they are sent before the handshake. With this done, there's no need to lock for access to nSendVersion. Also, a new stream is used for each message, so there's no need to lock during the serialization process. - This takes care of accounting for optimistic sends, so the nOptimisticBytesWritten hack can be removed. - -dropmessagestest and -fuzzmessagestest have not been preserved, as I suspect they haven't been used in years.
1 parent db78f31 commit 4629656

File tree

4 files changed

+135
-32
lines changed

4 files changed

+135
-32
lines changed

src/main.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5036,7 +5036,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
50365036
// Each connection can only send one version message
50375037
if (pfrom->nVersion != 0)
50385038
{
5039-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
5039+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
50405040
LOCK(cs_main);
50415041
Misbehaving(pfrom->GetId(), 1);
50425042
return false;
@@ -5056,7 +5056,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
50565056
if (pfrom->nServicesExpected & ~pfrom->nServices)
50575057
{
50585058
LogPrint("net", "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom->id, pfrom->nServices, pfrom->nServicesExpected);
5059-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
5059+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
50605060
strprintf("Expected to offer services %08x", pfrom->nServicesExpected));
50615061
pfrom->fDisconnect = true;
50625062
return false;
@@ -5066,7 +5066,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
50665066
{
50675067
// disconnect from peers older than this proto version
50685068
LogPrintf("peer=%d using obsolete version %i; disconnecting\n", pfrom->id, pfrom->nVersion);
5069-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
5069+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
50705070
strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION));
50715071
pfrom->fDisconnect = true;
50725072
return false;
@@ -5107,7 +5107,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
51075107

51085108
// Be shy and don't send version until we hear
51095109
if (pfrom->fInbound)
5110-
pfrom->PushVersion();
5110+
connman.PushVersion(pfrom, GetAdjustedTime());
51115111

51125112
pfrom->fClient = !(pfrom->nServices & NODE_NETWORK);
51135113

@@ -5124,8 +5124,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
51245124
}
51255125

51265126
// Change version
5127-
pfrom->PushMessage(NetMsgType::VERACK);
5128-
pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
5127+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::VERACK);
5128+
pfrom->SetSendVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
51295129

51305130
if (!pfrom->fInbound)
51315131
{
@@ -6375,7 +6375,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
63756375
}
63766376
catch (const std::ios_base::failure& e)
63776377
{
6378-
pfrom->PushMessage(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
6378+
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
63796379
if (strstr(e.what(), "end of data"))
63806380
{
63816381
// Allow exceptions from under-length message on vRecv

src/net.cpp

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
394394
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
395395
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
396396

397+
398+
PushVersion(pnode, GetTime());
399+
397400
GetNodeSignals().InitializeNode(pnode->GetId(), pnode);
398401
pnode->AddRef();
399402

@@ -415,6 +418,24 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
415418
return NULL;
416419
}
417420

421+
void CConnman::PushVersion(CNode* pnode, int64_t nTime)
422+
{
423+
ServiceFlags nLocalNodeServices = pnode->GetLocalServices();
424+
CAddress addrYou = (pnode->addr.IsRoutable() && !IsProxy(pnode->addr) ? pnode->addr : CAddress(CService(), pnode->addr.nServices));
425+
CAddress addrMe = CAddress(CService(), nLocalNodeServices);
426+
uint64_t nonce = pnode->GetLocalNonce();
427+
int nNodeStartingHeight = pnode->nMyStartingHeight;
428+
NodeId id = pnode->GetId();
429+
430+
PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
431+
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes);
432+
433+
if (fLogIPs)
434+
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
435+
else
436+
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), id);
437+
}
438+
418439
void CConnman::DumpBanlist()
419440
{
420441
SweepBanned(); // clean unused entries (if bantime has expired)
@@ -450,23 +471,6 @@ void CNode::CloseSocketDisconnect()
450471
vRecvMsg.clear();
451472
}
452473

453-
void CNode::PushVersion()
454-
{
455-
int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime());
456-
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
457-
CAddress addrMe = CAddress(CService(), nLocalServices);
458-
if (fLogIPs)
459-
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id);
460-
else
461-
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id);
462-
PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe,
463-
nLocalHostNonce, strSubVersion, nMyStartingHeight, ::fRelayTxes);
464-
}
465-
466-
467-
468-
469-
470474
void CConnman::ClearBanned()
471475
{
472476
{
@@ -2530,7 +2534,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25302534
filterInventoryKnown(50000, 0.000001),
25312535
nLocalHostNonce(nLocalHostNonceIn),
25322536
nLocalServices(nLocalServicesIn),
2533-
nMyStartingHeight(nMyStartingHeightIn)
2537+
nMyStartingHeight(nMyStartingHeightIn),
2538+
nSendVersion(0)
25342539
{
25352540
nServices = NODE_NONE;
25362541
nServicesExpected = NODE_NONE;
@@ -2587,10 +2592,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
25872592
LogPrint("net", "Added connection to %s peer=%d\n", addrName, id);
25882593
else
25892594
LogPrint("net", "Added connection peer=%d\n", id);
2590-
2591-
// Be shy and don't send version until we hear
2592-
if (hSocket != INVALID_SOCKET && !fInbound)
2593-
PushVersion();
25942595
}
25952596

25962597
CNode::~CNode()
@@ -2696,6 +2697,52 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend)
26962697
LEAVE_CRITICAL_SECTION(cs_vSend);
26972698
}
26982699

2700+
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
2701+
{
2702+
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
2703+
}
2704+
2705+
void CConnman::EndMessage(CDataStream& strm)
2706+
{
2707+
// Set the size
2708+
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
2709+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2710+
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
2711+
// Set the checksum
2712+
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
2713+
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
2714+
2715+
}
2716+
2717+
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
2718+
{
2719+
if(strm.empty())
2720+
return;
2721+
2722+
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
2723+
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
2724+
2725+
size_t nBytesSent = 0;
2726+
{
2727+
LOCK(pnode->cs_vSend);
2728+
if(pnode->hSocket == INVALID_SOCKET) {
2729+
return;
2730+
}
2731+
bool optimisticSend(pnode->vSendMsg.empty());
2732+
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
2733+
2734+
//log total amount of bytes per command
2735+
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
2736+
pnode->nSendSize += strm.size();
2737+
2738+
// If write queue empty, attempt "optimistic write"
2739+
if (optimisticSend == true)
2740+
nBytesSent = SocketSendData(pnode);
2741+
}
2742+
if (nBytesSent)
2743+
RecordBytesSent(nBytesSent);
2744+
}
2745+
26992746
bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
27002747
{
27012748
CNode* found = nullptr;

src/net.h

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,36 @@ class CConnman
136136

137137
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
138138

139+
template <typename... Args>
140+
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
141+
{
142+
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
143+
::SerializeMany(msg, msg.nType, msg.nVersion, std::forward<Args>(args)...);
144+
EndMessage(msg);
145+
PushMessage(pnode, msg, sCommand);
146+
}
147+
148+
template <typename... Args>
149+
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
150+
{
151+
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
152+
}
153+
154+
template <typename... Args>
155+
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
156+
{
157+
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
158+
}
159+
160+
template <typename... Args>
161+
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
162+
{
163+
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
164+
}
165+
166+
void PushVersion(CNode* pnode, int64_t nTime);
167+
168+
139169
template<typename Callable>
140170
bool ForEachNodeContinueIf(Callable&& func)
141171
{
@@ -345,6 +375,10 @@ class CConnman
345375

346376
unsigned int GetReceiveFloodSize() const;
347377

378+
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
379+
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
380+
void EndMessage(CDataStream& strm);
381+
348382
// Network stats
349383
void RecordBytesRecv(uint64_t bytes);
350384
void RecordBytesSent(uint64_t bytes);
@@ -553,6 +587,7 @@ class CNetMessage {
553587
/** Information about a peer */
554588
class CNode
555589
{
590+
friend class CConnman;
556591
public:
557592
// socket
558593
ServiceFlags nServices;
@@ -681,6 +716,7 @@ class CNode
681716
// Services offered to this peer
682717
const ServiceFlags nLocalServices;
683718
const int nMyStartingHeight;
719+
int nSendVersion;
684720
public:
685721

686722
NodeId GetId() const {
@@ -716,6 +752,25 @@ class CNode
716752
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
717753
msg.SetVersion(nVersionIn);
718754
}
755+
void SetSendVersion(int nVersionIn)
756+
{
757+
// Send version may only be changed in the version message, and
758+
// only one version message is allowed per session. We can therefore
759+
// treat this value as const and even atomic as long as it's only used
760+
// once the handshake is complete. Any attempt to set this twice is an
761+
// error.
762+
assert(nSendVersion == 0);
763+
nSendVersion = nVersionIn;
764+
}
765+
766+
int GetSendVersion() const
767+
{
768+
// The send version should always be explicitly set to
769+
// INIT_PROTO_VERSION rather than using this value until the handshake
770+
// is complete. See PushMessageWithVersion().
771+
assert(nSendVersion != 0);
772+
return nSendVersion;
773+
}
719774

720775
CNode* AddRef()
721776
{
@@ -787,9 +842,6 @@ class CNode
787842
// TODO: Document the precondition of this function. Is cs_vSend locked?
788843
void EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend);
789844

790-
void PushVersion();
791-
792-
793845
void PushMessage(const char* pszCommand)
794846
{
795847
try

src/test/DoS_tests.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
4949
connman->ClearBanned();
5050
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
5151
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true);
52+
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
5253
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
5354
dummyNode1.nVersion = 1;
5455
Misbehaving(dummyNode1.GetId(), 100); // Should get banned
@@ -58,6 +59,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
5859

5960
CAddress addr2(ip(0xa0b0c002), NODE_NONE);
6061
CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, "", true);
62+
dummyNode2.SetSendVersion(PROTOCOL_VERSION);
6163
GetNodeSignals().InitializeNode(dummyNode2.GetId(), &dummyNode2);
6264
dummyNode2.nVersion = 1;
6365
Misbehaving(dummyNode2.GetId(), 50);
@@ -75,6 +77,7 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
7577
mapArgs["-banscore"] = "111"; // because 11 is my favorite number
7678
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
7779
CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, "", true);
80+
dummyNode1.SetSendVersion(PROTOCOL_VERSION);
7881
GetNodeSignals().InitializeNode(dummyNode1.GetId(), &dummyNode1);
7982
dummyNode1.nVersion = 1;
8083
Misbehaving(dummyNode1.GetId(), 100);
@@ -97,6 +100,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
97100

98101
CAddress addr(ip(0xa0b0c001), NODE_NONE);
99102
CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, "", true);
103+
dummyNode.SetSendVersion(PROTOCOL_VERSION);
100104
GetNodeSignals().InitializeNode(dummyNode.GetId(), &dummyNode);
101105
dummyNode.nVersion = 1;
102106

0 commit comments

Comments
 (0)