Skip to content

Commit

Permalink
Backport Bitcoin PR#8708: net: have CConnman handle message sending (#…
Browse files Browse the repository at this point in the history
…1553)

* serialization: teach serializers variadics

Also add a variadic CDataStream ctor for ease-of-use.

* connman is in charge of pushing messages

The changes here are dense and subtle, but hopefully all is more explicit
than before.

- CConnman is now in charge of sending data rather than the nodes themselves.
  This is necessary because many decisions need to be made with all nodes in
  mind, and a model that requires the nodes calling up to their manager quickly
  turns to spaghetti.

- The per-node-serializer (ssSend) has been replaced with a (quasi-)const
  send-version. Since the send version for serialization can only change once
  per connection, we now explicitly tag messages with INIT_PROTO_VERSION if
  they are sent before the handshake. With this done, there's no need to lock
  for access to nSendVersion.

  Also, a new stream is used for each message, so there's no need to lock
  during the serialization process.

- This takes care of accounting for optimistic sends, so the
  nOptimisticBytesWritten hack can be removed.

- -dropmessagestest and -fuzzmessagestest have not been preserved, as I suspect
  they haven't been used in years.

* net: switch all callers to connman for pushing messages

Drop all of the old stuff.

* drop the optimistic write counter hack

This is now handled properly in realtime.

* net: remove now-unused ssSend and Fuzz

* net: construct CNodeStates in place

* net: handle version push in InitializeNode
  • Loading branch information
OlegGirko authored and UdjinM6 committed Jul 27, 2017
1 parent 8b7dffb commit b621cfb
Show file tree
Hide file tree
Showing 18 changed files with 363 additions and 452 deletions.
4 changes: 2 additions & 2 deletions src/alert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ bool CAlert::AppliesToMe() const
return AppliesTo(PROTOCOL_VERSION, FormatSubVersion(CLIENT_NAME, CLIENT_VERSION, std::vector<std::string>()));
}

bool CAlert::RelayTo(CNode* pnode) const
bool CAlert::RelayTo(CNode* pnode, CConnman& connman) const
{
if (!IsInEffect())
return false;
Expand All @@ -139,7 +139,7 @@ bool CAlert::RelayTo(CNode* pnode) const
AppliesToMe() ||
GetAdjustedTime() < nRelayUntil)
{
pnode->PushMessage(NetMsgType::ALERT, *this);
connman.PushMessage(pnode, NetMsgType::ALERT, *this);
return true;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/alert.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

class CAlert;
class CNode;
class CConnman;
class uint256;

extern std::map<uint256, CAlert> mapAlerts;
Expand Down Expand Up @@ -99,7 +100,7 @@ class CAlert : public CUnsignedAlert
bool Cancels(const CAlert& alert) const;
bool AppliesTo(int nVersion, const std::string& strSubVerIn) const;
bool AppliesToMe() const;
bool RelayTo(CNode* pnode) const;
bool RelayTo(CNode* pnode, CConnman& connman) const;
bool Sign();
bool CheckSignature(const std::vector<unsigned char>& alertKey) const;
bool ProcessAlert(const std::vector<unsigned char>& alertKey, bool fThread = true); // fThread means run -alertnotify in a free-running thread
Expand Down
8 changes: 4 additions & 4 deletions src/governance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,8 @@ void CGovernanceManager::Sync(CNode* pfrom, const uint256& nProp, const CBloomFi
}
}

pfrom->PushMessage(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ, nObjCount);
pfrom->PushMessage(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ_VOTE, nVoteCount);
g_connman->PushMessage(pfrom, NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ, nObjCount);
g_connman->PushMessage(pfrom, NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_GOVOBJ_VOTE, nVoteCount);
LogPrintf("CGovernanceManager::Sync -- sent %d objects and %d votes to peer=%d\n", nObjCount, nVoteCount, pfrom->id);
}

Expand Down Expand Up @@ -1147,7 +1147,7 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH
LogPrint("gobject", "CGovernanceObject::RequestGovernanceObject -- hash = %s (peer=%d)\n", nHash.ToString(), pfrom->GetId());

if(pfrom->nVersion < GOVERNANCE_FILTER_PROTO_VERSION) {
pfrom->PushMessage(NetMsgType::MNGOVERNANCESYNC, nHash);
g_connman->PushMessage(pfrom, NetMsgType::MNGOVERNANCESYNC, nHash);
return;
}

Expand All @@ -1170,7 +1170,7 @@ void CGovernanceManager::RequestGovernanceObject(CNode* pfrom, const uint256& nH
}

LogPrint("gobject", "CGovernanceManager::RequestGovernanceObject -- nHash %s nVoteCount %d peer=%d\n", nHash.ToString(), nVoteCount, pfrom->id);
pfrom->PushMessage(NetMsgType::MNGOVERNANCESYNC, nHash, filter);
g_connman->PushMessage(pfrom, NetMsgType::MNGOVERNANCESYNC, nHash, filter);
}

int CGovernanceManager::RequestGovernanceObjectVotes(CNode* pnode)
Expand Down
144 changes: 85 additions & 59 deletions src/main.cpp

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/masternode-payments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ void CMasternodePayments::Sync(CNode* pnode)
}

LogPrintf("CMasternodePayments::Sync -- Sent %d votes to peer %d\n", nInvCount, pnode->id);
pnode->PushMessage(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_MNW, nInvCount);
g_connman->PushMessage(pnode, NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_MNW, nInvCount);
}

// Request low data/unknown payment blocks in batches directly from some node instead of/after preliminary Sync.
Expand All @@ -864,7 +864,7 @@ void CMasternodePayments::RequestLowDataPaymentBlocks(CNode* pnode)
// We should not violate GETDATA rules
if(vToFetch.size() == MAX_INV_SZ) {
LogPrintf("CMasternodePayments::SyncLowDataPaymentBlocks -- asking peer %d for %d blocks\n", pnode->id, MAX_INV_SZ);
pnode->PushMessage(NetMsgType::GETDATA, vToFetch);
g_connman->PushMessage(pnode, NetMsgType::GETDATA, vToFetch);
// Start filling new batch
vToFetch.clear();
}
Expand Down Expand Up @@ -912,7 +912,7 @@ void CMasternodePayments::RequestLowDataPaymentBlocks(CNode* pnode)
// We should not violate GETDATA rules
if(vToFetch.size() == MAX_INV_SZ) {
LogPrintf("CMasternodePayments::SyncLowDataPaymentBlocks -- asking peer %d for %d payment blocks\n", pnode->id, MAX_INV_SZ);
pnode->PushMessage(NetMsgType::GETDATA, vToFetch);
g_connman->PushMessage(pnode, NetMsgType::GETDATA, vToFetch);
// Start filling new batch
vToFetch.clear();
}
Expand All @@ -921,7 +921,7 @@ void CMasternodePayments::RequestLowDataPaymentBlocks(CNode* pnode)
// Ask for the rest of it
if(!vToFetch.empty()) {
LogPrintf("CMasternodePayments::SyncLowDataPaymentBlocks -- asking peer %d for %d payment blocks\n", pnode->id, vToFetch.size());
pnode->PushMessage(NetMsgType::GETDATA, vToFetch);
g_connman->PushMessage(pnode, NetMsgType::GETDATA, vToFetch);
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/masternode-sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,12 @@ void CMasternodeSync::ProcessTick()
if(Params().NetworkIDString() == CBaseChainParams::REGTEST)
{
if(nRequestedMasternodeAttempt <= 2) {
pnode->PushMessage(NetMsgType::GETSPORKS); //get current network sporks
g_connman->PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::GETSPORKS); //get current network sporks
} else if(nRequestedMasternodeAttempt < 4) {
mnodeman.DsegUpdate(pnode);
} else if(nRequestedMasternodeAttempt < 6) {
int nMnCount = mnodeman.CountMasternodes();
pnode->PushMessage(NetMsgType::MASTERNODEPAYMENTSYNC, nMnCount); //sync payment votes
g_connman->PushMessage(pnode, NetMsgType::MASTERNODEPAYMENTSYNC, nMnCount); //sync payment votes
SendGovernanceSyncRequest(pnode);
} else {
nRequestedMasternodeAssets = MASTERNODE_SYNC_FINISHED;
Expand All @@ -355,7 +355,7 @@ void CMasternodeSync::ProcessTick()
// only request once from each peer
netfulfilledman.AddFulfilledRequest(pnode->addr, "spork-sync");
// get current network sporks
pnode->PushMessage(NetMsgType::GETSPORKS);
g_connman->PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::GETSPORKS);
LogPrintf("CMasternodeSync::ProcessTick -- nTick %d nRequestedMasternodeAssets %d -- requesting sporks from peer %d\n", nTick, nRequestedMasternodeAssets, pnode->id);
continue; // always get sporks first, switch to the next node without waiting for the next tick
}
Expand Down Expand Up @@ -431,7 +431,7 @@ void CMasternodeSync::ProcessTick()
nRequestedMasternodeAttempt++;

// ask node for all payment votes it has (new nodes will only return votes for future payments)
pnode->PushMessage(NetMsgType::MASTERNODEPAYMENTSYNC, mnpayments.GetStorageLimit());
g_connman->PushMessage(pnode, NetMsgType::MASTERNODEPAYMENTSYNC, mnpayments.GetStorageLimit());
// ask node for missing pieces only (old nodes will not be asked)
mnpayments.RequestLowDataPaymentBlocks(pnode);

Expand Down Expand Up @@ -511,10 +511,10 @@ void CMasternodeSync::SendGovernanceSyncRequest(CNode* pnode)
CBloomFilter filter;
filter.clear();

pnode->PushMessage(NetMsgType::MNGOVERNANCESYNC, uint256(), filter);
g_connman->PushMessage(pnode, NetMsgType::MNGOVERNANCESYNC, uint256(), filter);
}
else {
pnode->PushMessage(NetMsgType::MNGOVERNANCESYNC, uint256());
g_connman->PushMessage(pnode, NetMsgType::MNGOVERNANCESYNC, uint256());
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/masternodeman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ void CMasternodeMan::AskForMN(CNode* pnode, const CTxIn &vin)
}
mWeAskedForMasternodeListEntry[vin.prevout][pnode->addr] = GetTime() + DSEG_UPDATE_SECONDS;

pnode->PushMessage(NetMsgType::DSEG, vin);
g_connman->PushMessage(pnode, NetMsgType::DSEG, vin);
}

void CMasternodeMan::Check()
Expand Down Expand Up @@ -438,7 +438,7 @@ void CMasternodeMan::DsegUpdate(CNode* pnode)
}
}

pnode->PushMessage(NetMsgType::DSEG, CTxIn());
g_connman->PushMessage(pnode, NetMsgType::DSEG, CTxIn());
int64_t askAgain = GetTime() + DSEG_UPDATE_SECONDS;
mWeAskedForMasternodeList[pnode->addr] = askAgain;

Expand Down Expand Up @@ -926,7 +926,7 @@ void CMasternodeMan::ProcessMessage(CNode* pfrom, std::string& strCommand, CData
}

if(vin == CTxIn()) {
pfrom->PushMessage(NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_LIST, nInvCount);
g_connman->PushMessage(pfrom, NetMsgType::SYNCSTATUSCOUNT, MASTERNODE_SYNC_LIST, nInvCount);
LogPrintf("DSEG -- Sent %d Masternode invs to peer %d\n", nInvCount, pfrom->id);
return;
}
Expand Down Expand Up @@ -1109,7 +1109,7 @@ bool CMasternodeMan::SendVerifyRequest(const CAddress& addr, const std::vector<C
CMasternodeVerification mnv(addr, GetRandInt(999999), pCurrentBlockIndex->nHeight - 1);
mWeAskedForVerification[addr] = mnv;
LogPrintf("CMasternodeMan::SendVerifyRequest -- verifying node using nonce %d addr=%s\n", mnv.nonce, addr.ToString());
pnode->PushMessage(NetMsgType::MNVERIFY, mnv);
g_connman->PushMessage(pnode, NetMsgType::MNVERIFY, mnv);

return true;
}
Expand Down Expand Up @@ -1150,7 +1150,7 @@ void CMasternodeMan::SendVerifyReply(CNode* pnode, CMasternodeVerification& mnv)
return;
}

pnode->PushMessage(NetMsgType::MNVERIFY, mnv);
g_connman->PushMessage(pnode, NetMsgType::MNVERIFY, mnv);
netfulfilledman.AddFulfilledRequest(pnode->addr, strprintf("%s", NetMsgType::MNVERIFY)+"-reply");
}

Expand Down
Loading

0 comments on commit b621cfb

Please sign in to comment.