Skip to content
Merged
4 changes: 2 additions & 2 deletions src/activemasternode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ void CActiveMasternode::ManageStateInitial()
if(!fFoundLocal) {
bool empty = true;
// If we have some peers, let's try to find our local address from one of them
g_connman->ForEachNodeContinueIf([&fFoundLocal, &empty, this](CNode* pnode) {
g_connman->ForEachNodeContinueIf(CConnman::AllNodes, [&fFoundLocal, &empty, this](CNode* pnode) {
empty = false;
if (pnode->fSuccessfullyConnected && pnode->addr.IsIPv4())
if (pnode->addr.IsIPv4())
fFoundLocal = GetLocal(service, &pnode->addr) && CMasternode::IsValidNetAddr(service);
return !fFoundLocal;
});
Expand Down
4 changes: 2 additions & 2 deletions src/masternode-sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void CMasternodeSync::SwitchToNextAsset()
// TRY_LOCK(cs_vNodes, lockRecv);
// if(lockRecv) { ... }

g_connman->ForEachNode([](CNode* pnode) {
g_connman->ForEachNode(CConnman::AllNodes, [](CNode* pnode) {
netfulfilledman.AddFulfilledRequest(pnode->addr, "full-sync");
});
LogPrintf("CMasternodeSync::SwitchToNextAsset -- Sync has finished\n");
Expand Down Expand Up @@ -132,7 +132,7 @@ void CMasternodeSync::ClearFulfilledRequests()
// TRY_LOCK(cs_vNodes, lockRecv);
// if(!lockRecv) return;

g_connman->ForEachNode([](CNode* pnode) {
g_connman->ForEachNode(CConnman::AllNodes, [](CNode* pnode) {
netfulfilledman.RemoveFulfilledRequest(pnode->addr, "spork-sync");
netfulfilledman.RemoveFulfilledRequest(pnode->addr, "masternode-list-sync");
netfulfilledman.RemoveFulfilledRequest(pnode->addr, "masternode-payment-sync");
Expand Down
2 changes: 1 addition & 1 deletion src/masternodeman.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ void CMasternodeMan::ProcessMasternodeConnections()
//we don't care about this for regtest
if(Params().NetworkIDString() == CBaseChainParams::REGTEST) return;

g_connman->ForEachNode([](CNode* pnode) {
g_connman->ForEachNode(CConnman::AllNodes, [](CNode* pnode) {
if(pnode->fMasternode) {
if(privateSendClient.infoMixingMasternode.fInfoValid && pnode->addr == privateSendClient.infoMixingMasternode.addr) return true;
LogPrintf("Closing Masternode connection: peer=%d, addr=%s\n", pnode->id, pnode->addr.ToString());
Expand Down
43 changes: 39 additions & 4 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@

const static std::string NET_MESSAGE_COMMAND_OTHER = "*other*";

constexpr const CConnman::CFullyConnectedOnly CConnman::FullyConnectedOnly;
constexpr const CConnman::CAllNodes CConnman::AllNodes;

//
// Global state variables
//
Expand Down Expand Up @@ -719,6 +722,33 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
return true;
}

void CNode::SetSendVersion(int nVersionIn)
{
// Send version may only be changed in the version message, and
// only one version message is allowed per session. We can therefore
// treat this value as const and even atomic as long as it's only used
// once a version message has been successfully processed. Any attempt to
// set this twice is an error.
if (nSendVersion != 0) {
error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn);
} else {
nSendVersion = nVersionIn;
}
}

int CNode::GetSendVersion() const
{
// The send version should always be explicitly set to
// INIT_PROTO_VERSION rather than using this value until SetSendVersion
// has been called.
if (nSendVersion == 0) {
error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION);
return INIT_PROTO_VERSION;
}
return nSendVersion;
}


int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
{
// copy data to temporary parsing buffer
Expand Down Expand Up @@ -2725,6 +2755,11 @@ void CNode::AskFor(const CInv& inv)
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}

bool CConnman::NodeFullyConnected(const CNode* pnode)
{
return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
}

std::vector<unsigned char> CNode::CalculateKeyedNetGroup(CAddress& address)
{
if(vchSecretKey.size() == 0) {
Expand Down Expand Up @@ -2792,7 +2827,7 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s
RecordBytesSent(nBytesSent);
}

bool CConnman::ForNode(const CService& addr, std::function<bool(CNode* pnode)> func)
bool CConnman::ForNode(const CService& addr, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;
LOCK(cs_vNodes);
Expand All @@ -2802,10 +2837,10 @@ bool CConnman::ForNode(const CService& addr, std::function<bool(CNode* pnode)> f
break;
}
}
return found != nullptr && func(found);
return found != nullptr && cond(found) && func(found);
}

bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
bool CConnman::ForNode(NodeId id, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;
LOCK(cs_vNodes);
Expand All @@ -2815,7 +2850,7 @@ bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
break;
}
}
return found != nullptr && func(found);
return found != nullptr && cond(found) && func(found);
}

int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) {
Expand Down
156 changes: 93 additions & 63 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,34 @@ class CConnman
// because it's used in many Dash-specific places (masternode, privatesend).
CNode* ConnectNode(CAddress addrConnect, const char *pszDest = NULL, bool fConnectToMasternode = false);

bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
bool ForNode(const CService& addr, std::function<bool(CNode* pnode)> func);
struct CFullyConnectedOnly {
bool operator() (const CNode* pnode) const {
return NodeFullyConnected(pnode);
}
};

constexpr static const CFullyConnectedOnly FullyConnectedOnly{};

struct CAllNodes {
bool operator() (const CNode*) const {return true;}
};

constexpr static const CAllNodes AllNodes{};

bool ForNode(NodeId id, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func);
bool ForNode(const CService& addr, std::function<bool(const CNode* pnode)> cond, std::function<bool(CNode* pnode)> func);

template<typename Callable>
bool ForNode(const CService& addr, Callable&& func)
{
return ForNode(addr, FullyConnectedOnly, func);
}

template<typename Callable>
bool ForNode(NodeId id, Callable&& func)
{
return ForNode(id, FullyConnectedOnly, func);
}

template <typename... Args>
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
Expand Down Expand Up @@ -173,87 +199,105 @@ class CConnman
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
}

template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func)
template<typename Condition, typename Callable>
bool ForEachNodeContinueIf(const Condition& cond, Callable&& func)
{
LOCK(cs_vNodes);
for (auto&& node : vNodes)
if(!func(node))
return false;
if (cond(node))
if(!func(node))
return false;
return true;
};

template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func) const
bool ForEachNodeContinueIf(Callable&& func)
{
return ForEachNodeContinueIf(FullyConnectedOnly, func);
}

template<typename Condition, typename Callable>
bool ForEachNodeContinueIf(const Condition& cond, Callable&& func) const
{
LOCK(cs_vNodes);
for (const auto& node : vNodes)
if(!func(node))
return false;
if (cond(node))
if(!func(node))
return false;
return true;
};

template<typename Callable, typename CallableAfter>
bool ForEachNodeContinueIfThen(Callable&& pre, CallableAfter&& post)
template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func) const
{
bool ret = true;
LOCK(cs_vNodes);
for (auto&& node : vNodes)
if(!pre(node)) {
ret = false;
break;
}
post();
return ret;
};
return ForEachNodeContinueIf(FullyConnectedOnly, func);
}

template<typename Callable, typename CallableAfter>
bool ForEachNodeContinueIfThen(Callable&& pre, CallableAfter&& post) const
template<typename Condition, typename Callable>
void ForEachNode(const Condition& cond, Callable&& func)
{
bool ret = true;
LOCK(cs_vNodes);
for (const auto& node : vNodes)
if(!pre(node)) {
ret = false;
break;
}
post();
return ret;
for (auto&& node : vNodes) {
if (cond(node))
func(node);
}
};

template<typename Callable>
void ForEachNode(Callable&& func)
{
ForEachNode(FullyConnectedOnly, func);
}

template<typename Condition, typename Callable>
void ForEachNode(const Condition& cond, Callable&& func) const
{
LOCK(cs_vNodes);
for (auto&& node : vNodes)
func(node);
for (auto&& node : vNodes) {
if (cond(node))
func(node);
}
};

template<typename Callable>
void ForEachNode(Callable&& func) const
{
ForEachNode(FullyConnectedOnly, func);
}

template<typename Condition, typename Callable, typename CallableAfter>
void ForEachNodeThen(const Condition& cond, Callable&& pre, CallableAfter&& post)
{
LOCK(cs_vNodes);
for (const auto& node : vNodes)
func(node);
for (auto&& node : vNodes) {
if (cond(node))
pre(node);
}
post();
};

template<typename Callable, typename CallableAfter>
void ForEachNodeThen(Callable&& pre, CallableAfter&& post)
{
ForEachNodeThen(FullyConnectedOnly, pre, post);
}

template<typename Condition, typename Callable, typename CallableAfter>
void ForEachNodeThen(const Condition& cond, Callable&& pre, CallableAfter&& post) const
{
LOCK(cs_vNodes);
for (auto&& node : vNodes)
pre(node);
for (auto&& node : vNodes) {
if (cond(node))
pre(node);
}
post();
};

template<typename Callable, typename CallableAfter>
void ForEachNodeThen(Callable&& pre, CallableAfter&& post) const
{
LOCK(cs_vNodes);
for (const auto& node : vNodes)
pre(node);
post();
};
ForEachNodeThen(FullyConnectedOnly, pre, post);
}

std::vector<CNode*> CopyNodeVector();
void ReleaseNodeVector(const std::vector<CNode*>& vecNodes);
Expand Down Expand Up @@ -391,6 +435,9 @@ class CConnman
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);

// Whether the node should be passed out in ForEach* callbacks
static bool NodeFullyConnected(const CNode* pnode);

// Network usage totals
CCriticalSection cs_totalBytesRecv;
CCriticalSection cs_totalBytesSent;
Expand Down Expand Up @@ -634,7 +681,7 @@ class CNode
std::string addrName;
CService addrLocal;
int nNumWarningsSkipped;
int nVersion;
std::atomic<int> nVersion;
// strSubVer is whatever byte array we read from the wire. However, this field is intended
// to be printed out, displayed to humans in various forms and so on. So we sanitize it and
// store the sanitized version in cleanSubVer. The original should be used when dealing with
Expand All @@ -646,7 +693,7 @@ class CNode
bool fClient;
bool fInbound;
bool fNetworkNode;
bool fSuccessfullyConnected;
std::atomic_bool fSuccessfullyConnected;
bool fDisconnect;
// We use fRelayTxes for two purposes -
// a) it allows us to not relay tx invs before receiving the peer's version message
Expand Down Expand Up @@ -763,25 +810,8 @@ class CNode
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
msg.SetVersion(nVersionIn);
}
void SetSendVersion(int nVersionIn)
{
// Send version may only be changed in the version message, and
// only one version message is allowed per session. We can therefore
// treat this value as const and even atomic as long as it's only used
// once the handshake is complete. Any attempt to set this twice is an
// error.
assert(nSendVersion == 0);
nSendVersion = nVersionIn;
}

int GetSendVersion() const
{
// The send version should always be explicitly set to
// INIT_PROTO_VERSION rather than using this value until the handshake
// is complete. See PushMessageWithVersion().
assert(nSendVersion != 0);
return nSendVersion;
}
void SetSendVersion(int nVersionIn);
int GetSendVersion() const;

CNode* AddRef()
{
Expand Down
Loading