diff --git a/src/coinjoin/client.cpp b/src/coinjoin/client.cpp index 4c15f98f4a518..9804b1be5bee9 100644 --- a/src/coinjoin/client.cpp +++ b/src/coinjoin/client.cpp @@ -132,7 +132,7 @@ PeerMsgRet CCoinJoinClientQueueManager::ProcessDSQueue(const CNode& peer, CDataS WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq)); } } // cs_ProcessDSQueue - dsq.Relay(connman, *peerman); + peerman->RelayDSQ(dsq); return {}; } diff --git a/src/coinjoin/client.h b/src/coinjoin/client.h index c4e22b71230ce..17239b9a71b7d 100644 --- a/src/coinjoin/client.h +++ b/src/coinjoin/client.h @@ -28,6 +28,7 @@ class CMasternodeSync; class CNode; class CoinJoinWalletManager; class CTxMemPool; +class PeerManager; class UniValue; diff --git a/src/coinjoin/coinjoin.cpp b/src/coinjoin/coinjoin.cpp index 338b4cf054c42..74fc644a70911 100644 --- a/src/coinjoin/coinjoin.cpp +++ b/src/coinjoin/coinjoin.cpp @@ -14,7 +14,6 @@ #include <masternode/node.h> #include <masternode/sync.h> #include <messagesigner.h> -#include <net_processing.h> #include <netmessagemaker.h> #include <txmempool.h> #include <util/moneystr.h> @@ -71,19 +70,6 @@ bool CCoinJoinQueue::CheckSignature(const CBLSPublicKey& blsPubKey) const return true; } -bool CCoinJoinQueue::Relay(CConnman& connman, PeerManager& peerman) -{ - CInv inv(MSG_DSQ, GetHash()); - peerman.RelayInv(inv, DSQ_INV_VERSION); - connman.ForEachNode([&connman, this](CNode* pnode) { - CNetMsgMaker msgMaker(pnode->GetCommonVersion()); - if (pnode->fSendDSQueue && pnode->nVersion < DSQ_INV_VERSION) { - connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSQUEUE, (*this))); - } - }); - return true; -} - bool CCoinJoinQueue::IsTimeOutOfBounds(int64_t current_time) const { return current_time - nTime > COINJOIN_QUEUE_TIMEOUT || diff --git a/src/coinjoin/coinjoin.h b/src/coinjoin/coinjoin.h index 5680b8abc20fc..68e2f40c05549 100644 --- a/src/coinjoin/coinjoin.h +++ b/src/coinjoin/coinjoin.h @@ -24,14 +24,12 @@ class CActiveMasternodeManager; class CChainState; -class CConnman; class CBLSPublicKey; class CBlockIndex; class ChainstateManager; class CMasternodeSync; class CTxMemPool; class TxValidationState; -class PeerManager; namespace llmq { class CChainLocksHandler; @@ -221,8 +219,6 @@ class CCoinJoinQueue /// Check if we have a valid Masternode address [[nodiscard]] bool CheckSignature(const CBLSPublicKey& blsPubKey) const; - bool Relay(CConnman& connman, PeerManager& peerman); - /// Check if a queue is too old or too far into the future [[nodiscard]] bool IsTimeOutOfBounds(int64_t current_time = GetAdjustedTime()) const; diff --git a/src/coinjoin/server.cpp b/src/coinjoin/server.cpp index 856cf5fc7ed5e..bb65990dd9006 100644 --- a/src/coinjoin/server.cpp +++ b/src/coinjoin/server.cpp @@ -186,7 +186,7 @@ PeerMsgRet CCoinJoinServer::ProcessDSQUEUE(const CNode& peer, CDataStream& vRecv TRY_LOCK(cs_vecqueue, lockRecv); if (!lockRecv) return {}; vecCoinJoinQueue.push_back(dsq); - dsq.Relay(connman, *m_peerman); + m_peerman->RelayDSQ(dsq); } return {}; } @@ -519,7 +519,7 @@ void CCoinJoinServer::CheckForCompleteQueue() LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckForCompleteQueue -- queue is ready, signing and relaying (%s) " /* Continued */ "with %d participants\n", dsq.ToString(), vecSessionCollaterals.size()); dsq.Sign(*m_mn_activeman); - dsq.Relay(connman, *m_peerman); + m_peerman->RelayDSQ(dsq); } } @@ -732,7 +732,7 @@ bool CCoinJoinServer::CreateNewSession(const CCoinJoinAccept& dsa, PoolMessage& GetAdjustedTime(), false); LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateNewSession -- signing and relaying new queue: %s\n", dsq.ToString()); dsq.Sign(*m_mn_activeman); - dsq.Relay(connman, *m_peerman); + m_peerman->RelayDSQ(dsq); LOCK(cs_vecqueue); vecCoinJoinQueue.push_back(dsq); } diff --git a/src/coinjoin/server.h b/src/coinjoin/server.h index e7f3a2340b640..c33e98fbf8e70 100644 --- a/src/coinjoin/server.h +++ b/src/coinjoin/server.h @@ -11,6 +11,7 @@ class CActiveMasternodeManager; class CCoinJoinServer; +class CConnman; class CDataStream; class CDeterministicMNManager; class CDSTXManager; diff --git a/src/net_processing.cpp b/src/net_processing.cpp index b6f75dfa00155..0997044e504d2 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -366,6 +366,15 @@ struct Peer { /** Whether the peer has signaled support for receiving ADDRv2 (BIP155) * messages, indicating a preference to receive ADDRv2 instead of ADDR ones. */ std::atomic_bool m_wants_addrv2{false}; + + enum class WantsDSQ { + NONE, // Peer doesn't want DSQs + INV, // Peer will be notified of DSQs over Inventory System (see: DSQ_INV_VERSION) + ALL, // Peer will be notified of all DSQs, by simply sending them the DSQ + }; + + std::atomic<WantsDSQ> m_wants_dsq{WantsDSQ::NONE}; + /** Whether this peer has already sent us a getaddr message. */ bool m_getaddr_recvd GUARDED_BY(NetEventsInterface::g_msgproc_mutex){false}; /** Number of addresses that can be processed from this peer. Start at 1 to @@ -607,6 +616,7 @@ class PeerManagerImpl final : public PeerManager void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void RelayInvFiltered(CInv &inv, const uint256 &relatedTxHash, const int minProtoVersion) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void RelayTransaction(const uint256& txid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void RelayDSQ(const CCoinJoinQueue& queue) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SetBestHeight(int height) override { m_best_height = height; }; void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, @@ -2276,6 +2286,34 @@ void PeerManagerImpl::RelayInv(CInv &inv, const int minProtoVersion) }); } +void PeerManagerImpl::RelayDSQ(const CCoinJoinQueue& queue) +{ + CInv inv{MSG_DSQ, queue.GetHash()}; + std::vector<NodeId> nodes_send_all; + { + LOCK(m_peer_mutex); + for (const auto& [nodeid, peer] : m_peer_map) { + switch (peer->m_wants_dsq) { + case Peer::WantsDSQ::NONE: + break; + case Peer::WantsDSQ::INV: + PushInv(*peer, inv); + break; + case Peer::WantsDSQ::ALL: + nodes_send_all.push_back(nodeid); + break; + } + } + } + for (auto nodeId : nodes_send_all) { + m_connman.ForNode(nodeId, [&](CNode* pnode) -> bool { + CNetMsgMaker msgMaker(pnode->GetCommonVersion()); + m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::DSQUEUE, queue)); + return true; + }); + } +} + void PeerManagerImpl::RelayInvFiltered(CInv &inv, const CTransaction& relatedTx, const int minProtoVersion) { // TODO: Migrate to iteration through m_peer_map @@ -3940,7 +3978,13 @@ void PeerManagerImpl::ProcessMessage( { bool b; vRecv >> b; - pfrom.fSendDSQueue = b; + if (!b) { + peer->m_wants_dsq = Peer::WantsDSQ::NONE; + } else if (pfrom.GetCommonVersion() < DSQ_INV_VERSION) { + peer->m_wants_dsq = Peer::WantsDSQ::ALL; + } else { + peer->m_wants_dsq = Peer::WantsDSQ::INV; + } return; } diff --git a/src/net_processing.h b/src/net_processing.h index a857201516053..58925a6db7218 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -16,6 +16,7 @@ class CActiveMasternodeManager; class AddrMan; class CTxMemPool; +class CCoinJoinQueue; class CDeterministicMNManager; class CMasternodeMetaMan; class CMasternodeSync; @@ -93,6 +94,9 @@ class PeerManager : public CValidationInterface, public NetEventsInterface /** Broadcast inventory message to a specific peer. */ virtual void PushInventory(NodeId nodeid, const CInv& inv) = 0; + /** Relay DSQ based on peer preference */ + virtual void RelayDSQ(const CCoinJoinQueue& queue) = 0; + /** Relay inventories to all peers */ virtual void RelayInv(CInv &inv, const int minProtoVersion = MIN_PEER_PROTO_VERSION) = 0; virtual void RelayInvFiltered(CInv &inv, const CTransaction &relatedTx,