Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 78 additions & 51 deletions src/coinjoin/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
#include <masternode/node.h>
#include <masternode/sync.h>
#include <net.h>
#include <netmessagemaker.h>
#include <net_processing.h>
#include <netmessagemaker.h>
#include <scheduler.h>
#include <script/interpreter.h>
#include <shutdown.h>
#include <streams.h>
Expand All @@ -23,17 +24,17 @@

#include <univalue.h>

CCoinJoinServer::CCoinJoinServer(ChainstateManager& chainman, CConnman& _connman, CDeterministicMNManager& dmnman,
CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman, CTxMemPool& mempool,
PeerManager& peerman, const CActiveMasternodeManager& mn_activeman,
CCoinJoinServer::CCoinJoinServer(PeerManagerInternal* peer_manager, ChainstateManager& chainman, CConnman& _connman,
CDeterministicMNManager& dmnman, CDSTXManager& dstxman, CMasternodeMetaMan& mn_metaman,
CTxMemPool& mempool, const CActiveMasternodeManager& mn_activeman,
const CMasternodeSync& mn_sync, const llmq::CInstantSendManager& isman) :
NetHandler(peer_manager),
m_chainman{chainman},
connman{_connman},
m_dmnman{dmnman},
m_dstxman{dstxman},
m_mn_metaman{mn_metaman},
mempool{mempool},
m_peerman{peerman},
m_mn_activeman{mn_activeman},
m_mn_sync{mn_sync},
m_isman{isman},
Expand All @@ -44,20 +45,19 @@ CCoinJoinServer::CCoinJoinServer(ChainstateManager& chainman, CConnman& _connman

CCoinJoinServer::~CCoinJoinServer() = default;

MessageProcessingResult CCoinJoinServer::ProcessMessage(CNode& peer, std::string_view msg_type, CDataStream& vRecv)
void CCoinJoinServer::ProcessMessage(CNode& peer, const std::string& msg_type, CDataStream& vRecv)
{
if (!m_mn_sync.IsBlockchainSynced()) return {};
if (!m_mn_sync.IsBlockchainSynced()) return;

if (msg_type == NetMsgType::DSACCEPT) {
ProcessDSACCEPT(peer, vRecv);
} else if (msg_type == NetMsgType::DSQUEUE) {
return ProcessDSQUEUE(peer.GetId(), vRecv);
ProcessDSQUEUE(peer.GetId(), vRecv);
} else if (msg_type == NetMsgType::DSVIN) {
ProcessDSVIN(peer, vRecv);
} else if (msg_type == NetMsgType::DSSIGNFINALTX) {
ProcessDSSIGNFINALTX(vRecv);
}
return {};
}

void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv)
Expand Down Expand Up @@ -126,87 +126,85 @@ void CCoinJoinServer::ProcessDSACCEPT(CNode& peer, CDataStream& vRecv)
}
}

MessageProcessingResult CCoinJoinServer::ProcessDSQUEUE(NodeId from, CDataStream& vRecv)
void CCoinJoinServer::ProcessDSQUEUE(NodeId from, CDataStream& vRecv)
{
assert(m_mn_metaman.IsValid());

CCoinJoinQueue dsq;
vRecv >> dsq;

MessageProcessingResult ret{};
ret.m_to_erase = CInv{MSG_DSQ, dsq.GetHash()};
WITH_LOCK(cs_main, m_peer_manager->PeerEraseObjectRequest(from, CInv{MSG_DSQ, dsq.GetHash()}));

// Validate denomination first
if (!CoinJoin::IsValidDenomination(dsq.nDenom)) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- invalid denomination %d from peer %d\n", dsq.nDenom, from);
ret.m_error = MisbehavingError{10};
return ret;
m_peer_manager->PeerMisbehaving(from, 10);
return;
}

if (dsq.masternodeOutpoint.IsNull() && dsq.m_protxHash.IsNull()) {
ret.m_error = MisbehavingError{100};
return ret;
m_peer_manager->PeerMisbehaving(from, 100);
return;
}

const auto tip_mn_list = m_dmnman.GetListAtChainTip();
if (dsq.masternodeOutpoint.IsNull()) {
if (auto dmn = tip_mn_list.GetValidMN(dsq.m_protxHash)) {
dsq.masternodeOutpoint = dmn->collateralOutpoint;
} else {
ret.m_error = MisbehavingError{10};
return ret;
m_peer_manager->PeerMisbehaving(from, 10);
return;
}
}

{
TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return ret;
if (!lockRecv) return;

// process every dsq only once
for (const auto& q : vecCoinJoinQueue) {
if (q == dsq) {
return ret;
return;
}
if (q.fReady == dsq.fReady && q.masternodeOutpoint == dsq.masternodeOutpoint) {
// no way the same mn can send another dsq with the same readiness this soon
LogPrint(BCLog::COINJOIN, "DSQUEUE -- Peer %d is sending WAY too many dsq messages for a masternode with collateral %s\n", from, dsq.masternodeOutpoint.ToStringShort());
return ret;
return;
}
}
} // cs_vecqueue

LogPrint(BCLog::COINJOIN, "DSQUEUE -- %s new\n", dsq.ToString());

if (dsq.IsTimeOutOfBounds()) return ret;
if (dsq.IsTimeOutOfBounds()) return;

auto dmn = tip_mn_list.GetValidMNByCollateral(dsq.masternodeOutpoint);
if (!dmn) return ret;
if (!dmn) return;

if (dsq.m_protxHash.IsNull()) {
dsq.m_protxHash = dmn->proTxHash;
}

if (!dsq.CheckSignature(dmn->pdmnState->pubKeyOperator.Get())) {
ret.m_error = MisbehavingError{10};
return ret;
m_peer_manager->PeerMisbehaving(from, 10);
return;
}

if (!dsq.fReady) {
//don't allow a few nodes to dominate the queuing process
if (m_mn_metaman.IsMixingThresholdExceeded(dmn->proTxHash, tip_mn_list.GetValidMNsCount())) {
LogPrint(BCLog::COINJOIN, "DSQUEUE -- node sending too many dsq messages, masternode=%s\n", dmn->proTxHash.ToString());
return ret;
return;
}
m_mn_metaman.AllowMixing(dmn->proTxHash);

LogPrint(BCLog::COINJOIN, "DSQUEUE -- new CoinJoin queue, masternode=%s, queue=%s\n", dmn->proTxHash.ToString(), dsq.ToString());

TRY_LOCK(cs_vecqueue, lockRecv);
if (!lockRecv) return ret;
if (!lockRecv) return;
vecCoinJoinQueue.push_back(dsq);
ret.m_dsq.push_back(dsq);
m_peer_manager->PeerRelayDSQ(dsq);
}
return ret;
}

void CCoinJoinServer::ProcessDSVIN(CNode& peer, CDataStream& vRecv)
Expand Down Expand Up @@ -275,7 +273,8 @@ void CCoinJoinServer::SetNull()
//
void CCoinJoinServer::CheckPool()
{
if (int entries = GetEntriesCount(); entries != 0) LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckPool -- entries count %lu\n", entries);
if (int entries = GetEntriesCount(); entries != 0)
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckPool -- entries count %lu\n", entries);

// If we have an entry for each collateral, then create final tx
if (nState == POOL_STATE_ACCEPTING_ENTRIES && size_t(GetEntriesCount()) == vecSessionCollaterals.size()) {
Expand All @@ -286,8 +285,8 @@ void CCoinJoinServer::CheckPool()

// Check for Time Out
// If we timed out while accepting entries, then if we have more than minimum, create final tx
if (nState == POOL_STATE_ACCEPTING_ENTRIES && CCoinJoinServer::HasTimedOut()
&& GetEntriesCount() >= CoinJoin::GetMinPoolParticipants()) {
if (nState == POOL_STATE_ACCEPTING_ENTRIES && CCoinJoinServer::HasTimedOut() &&
GetEntriesCount() >= CoinJoin::GetMinPoolParticipants()) {
// Punish misbehaving participants
ChargeFees();
// Try to complete this session ignoring the misbehaving ones
Expand Down Expand Up @@ -326,7 +325,8 @@ void CCoinJoinServer::CreateFinalTransaction()
sort(txNew.vout.begin(), txNew.vout.end(), CompareOutputBIP69());

finalMutableTransaction = txNew;
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateFinalTransaction -- finalMutableTransaction=%s", txNew.ToString()); /* Continued */
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateFinalTransaction -- finalMutableTransaction=%s", /* Continued */
txNew.ToString());

// request signatures from clients
SetState(POOL_STATE_SIGNING);
Expand All @@ -340,14 +340,16 @@ void CCoinJoinServer::CommitFinalTransaction()
CTransactionRef finalTransaction = WITH_LOCK(cs_coinjoin, return MakeTransactionRef(finalMutableTransaction));
uint256 hashTx = finalTransaction->GetHash();

LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- finalTransaction=%s", finalTransaction->ToString()); /* Continued */
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- finalTransaction=%s", /* Continued */
finalTransaction->ToString());

{
// See if the transaction is valid
TRY_LOCK(::cs_main, lockMain);
mempool.PrioritiseTransaction(hashTx, 0.1 * COIN);
if (!lockMain || !ATMPIfSaneFee(m_chainman, finalTransaction)) {
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- ATMPIfSaneFee() error: Transaction not valid\n");
LogPrint(BCLog::COINJOIN, /* Continued */
"CCoinJoinServer::CommitFinalTransaction -- ATMPIfSaneFee() error: Transaction not valid\n");
WITH_LOCK(cs_coinjoin, SetNull());
// not much we can do in this case, just notify clients
RelayCompletedTransaction(ERR_INVALID_TX);
Expand All @@ -368,7 +370,7 @@ void CCoinJoinServer::CommitFinalTransaction()
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CommitFinalTransaction -- TRANSMITTING DSTX\n");

CInv inv(MSG_DSTX, hashTx);
m_peerman.RelayInv(inv);
m_peer_manager->PeerRelayInv(inv);

// Tell the clients it was successful
RelayCompletedTransaction(MSG_SUCCESS);
Expand Down Expand Up @@ -411,7 +413,9 @@ void CCoinJoinServer::ChargeFees() const

// This queue entry didn't send us the promised transaction
if (!fFound) {
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't send transaction), found offence\n");
LogPrint(BCLog::COINJOIN, /* Continued */
"CCoinJoinServer::ChargeFees -- found uncooperative node (didn't send transaction), found "
"offence\n");
vecOffendersCollaterals.push_back(txCollateral);
}
}
Expand All @@ -423,7 +427,8 @@ void CCoinJoinServer::ChargeFees() const
for (const auto& entry : vecEntries) {
for (const auto& txdsin : entry.vecTxDSIn) {
if (!txdsin.fHasSig) {
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't sign), found offence\n");
LogPrint(BCLog::COINJOIN, /* Continued */
"CCoinJoinServer::ChargeFees -- found uncooperative node (didn't sign), found offence\n");
vecOffendersCollaterals.push_back(entry.txCollateral);
}
}
Expand All @@ -443,8 +448,9 @@ void CCoinJoinServer::ChargeFees() const
Shuffle(vecOffendersCollaterals.begin(), vecOffendersCollaterals.end(), FastRandomContext());

if (nState == POOL_STATE_ACCEPTING_ENTRIES || nState == POOL_STATE_SIGNING) {
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeFees -- found uncooperative node (didn't %s transaction), charging fees: %s", /* Continued */
(nState == POOL_STATE_SIGNING) ? "sign" : "send", vecOffendersCollaterals[0]->ToString());
LogPrint(BCLog::COINJOIN, /* Continued */
"CCoinJoinServer::ChargeFees -- found uncooperative node (didn't %s transaction), charging fees: %s",
(nState == POOL_STATE_SIGNING) ? "sign" : "send", vecOffendersCollaterals[0]->ToString());
ConsumeCollateral(vecOffendersCollaterals[0]);
}
}
Expand All @@ -465,7 +471,8 @@ void CCoinJoinServer::ChargeRandomFees() const
{
for (const auto& txCollateral : vecSessionCollaterals) {
if (GetRand<int>(/*nMax=*/100) > 10) return;
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::ChargeRandomFees -- charging random fees, txCollateral=%s", txCollateral->ToString()); /* Continued */
LogPrint(BCLog::COINJOIN, /* Continued */
"CCoinJoinServer::ChargeRandomFees -- charging random fees, txCollateral=%s", txCollateral->ToString());
ConsumeCollateral(txCollateral);
}
}
Expand All @@ -476,7 +483,7 @@ void CCoinJoinServer::ConsumeCollateral(const CTransactionRef& txref) const
if (!ATMPIfSaneFee(m_chainman, txref)) {
LogPrint(BCLog::COINJOIN, "%s -- ATMPIfSaneFee failed\n", __func__);
} else {
m_peerman.RelayTransaction(txref->GetHash());
m_peer_manager->PeerRelayTransaction(txref->GetHash());
LogPrint(BCLog::COINJOIN, "%s -- Collateral was consumed\n", __func__);
}
}
Expand Down Expand Up @@ -521,7 +528,7 @@ void CCoinJoinServer::CheckForCompleteQueue()
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CheckForCompleteQueue -- queue is ready, signing and relaying (%s) " /* Continued */
"with %d participants\n", dsq.ToString(), vecSessionCollaterals.size());
dsq.vchSig = m_mn_activeman.SignBasic(dsq.GetSignatureHash());
m_peerman.RelayDSQ(dsq);
m_peer_manager->PeerRelayDSQ(dsq);
WITH_LOCK(cs_vecqueue, vecCoinJoinQueue.push_back(dsq));
}
}
Expand Down Expand Up @@ -731,7 +738,7 @@ bool CCoinJoinServer::CreateNewSession(const CCoinJoinAccept& dsa, PoolMessage&
GetAdjustedTime(), false);
LogPrint(BCLog::COINJOIN, "CCoinJoinServer::CreateNewSession -- signing and relaying new queue: %s\n", dsq.ToString());
dsq.vchSig = m_mn_activeman.SignBasic(dsq.GetSignatureHash());
m_peerman.RelayDSQ(dsq);
m_peer_manager->PeerRelayDSQ(dsq);
LOCK(cs_vecqueue);
vecCoinJoinQueue.push_back(dsq);
}
Expand Down Expand Up @@ -891,14 +898,18 @@ void CCoinJoinServer::SetState(PoolState nStateNew)
nState = nStateNew;
}

void CCoinJoinServer::DoMaintenance()
void CCoinJoinServer::Schedule(CScheduler& scheduler)
{
if (!m_mn_sync.IsBlockchainSynced()) return;
if (ShutdownRequested()) return;

CheckForCompleteQueue();
CheckPool();
CheckTimeout();
scheduler.scheduleEvery(
[this]() -> void {
if (!m_mn_sync.IsBlockchainSynced()) return;
if (ShutdownRequested()) return;

CheckForCompleteQueue();
CheckPool();
CheckTimeout();
},
std::chrono::seconds{1});
}

void CCoinJoinServer::GetJsonInfo(UniValue& obj) const
Expand All @@ -910,3 +921,19 @@ void CCoinJoinServer::GetJsonInfo(UniValue& obj) const
obj.pushKV("state", GetStateString());
obj.pushKV("entries_count", GetEntriesCount());
}

bool CCoinJoinServer::AlreadyHave(const CInv& inv)
{
return (inv.type == MSG_DSQ) ? HasQueue(inv.hash) : false;
}

bool CCoinJoinServer::ProcessGetData(CNode& pfrom, const CInv& inv, CConnman& connman, const CNetMsgMaker& msgMaker)
{
if (inv.type != MSG_DSQ) return false;

auto opt_dsq = GetQueueFromHash(inv.hash);
if (!opt_dsq.has_value()) return false;

connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::DSQUEUE, *opt_dsq));
return true;
}
Loading
Loading