Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 88 additions & 23 deletions src/llmq/instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ void CInstantSendManager::ProcessMessageInstantSendLock(const CNode* pfrom, cons
}

LOCK(cs);
if (pendingInstantSendLocks.count(hash) || db.KnownInstantSendLock(hash)) {
if (pendingInstantSendLocks.count(hash) || pendingNoTxInstantSendLocks.count(hash) || db.KnownInstantSendLock(hash)) {
return;
}

Expand Down Expand Up @@ -1086,10 +1086,17 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
}
}

db.WriteNewInstantSendLock(hash, *islock);
if (pindexMined) {
db.WriteInstantSendLockMined(hash, pindexMined->nHeight);
if (tx == nullptr) {
// put it in a separate pending map and try again later
LOCK(cs);
pendingNoTxInstantSendLocks.try_emplace(hash, std::make_pair(from, islock));
} else {
db.WriteNewInstantSendLock(hash, *islock);
if (pindexMined) {
db.WriteInstantSendLockMined(hash, pindexMined->nHeight);
}
}

{
LOCK(cs);

Expand All @@ -1111,13 +1118,16 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
}

ResolveBlockConflicts(hash, *islock);
RemoveMempoolConflictsForLock(hash, *islock);

if (tx != nullptr) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about an in-time lock for tx %s\n", __func__, tx->GetHash().ToString());
RemoveMempoolConflictsForLock(hash, *islock);
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about lock %s for tx %s\n", __func__,
hash.ToString(), tx->GetHash().ToString());
GetMainSignals().NotifyTransactionLock(tx, islock);
// bump mempool counter to make sure newly locked txes are picked up by getblocktemplate
mempool.AddTransactionsUpdated(1);
} else {
AskNodesForLockedTx(islock->txid);
}
}

Expand All @@ -1127,26 +1137,30 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
return;
}

CInstantSendLockPtr islock = db.GetInstantSendLockByTxid(tx->GetHash());
CInstantSendLockPtr islock{nullptr};
{
LOCK(cs);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
// we received an islock ealier
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
islock = it->second.second;
pendingInstantSendLocks.try_emplace(it->first, it->second);
pendingNoTxInstantSendLocks.erase(it);
break;
}
++it;
}
}

if (islock == nullptr) {
ProcessTx(*tx, false, Params().GetConsensus());
// TX is not locked, so make sure it is tracked
AddNonLockedTx(tx, nullptr);
} else {
{
// TX is locked, so make sure we don't track it anymore
LOCK(cs);
RemoveNonLockedTx(tx->GetHash(), true);
}
// In case the islock was received before the TX, filtered announcement might have missed this islock because
// we were unable to check for filter matches deep inside the TX. Now we have the TX, so we should retry.
CInv inv(islock->IsDeterministic() ? MSG_ISDLOCK : MSG_ISLOCK, ::SerializeHash(*islock));
g_connman->RelayInvFiltered(inv, *tx, islock->IsDeterministic() ? ISDLOCK_PROTO_VERSION : LLMQS_PROTO_VERSION);
// If the islock was received before the TX, we know we were not able to send
// the notification at that time, we need to do it now.
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about an earlier received lock for tx %s\n", __func__, tx->GetHash().ToString());
GetMainSignals().NotifyTransactionLock(tx, islock);
RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock);
}
}

Expand Down Expand Up @@ -1221,6 +1235,19 @@ void CInstantSendManager::AddNonLockedTx(const CTransactionRef& tx, const CBlock
}
}

auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == tx->GetHash()) {
// we received an islock ealier, let's put it back into pending and verify/lock
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
tx->GetHash().ToString(), it->first.ToString());
pendingInstantSendLocks.try_emplace(it->first, it->second);
pendingNoTxInstantSendLocks.erase(it);
break;
}
++it;
}

LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, pindexMined=%s\n", __func__,
tx->GetHash().ToString(), pindexMined ? pindexMined->GetBlockHash().ToString() : "");
}
Expand Down Expand Up @@ -1443,6 +1470,8 @@ void CInstantSendManager::ResolveBlockConflicts(const uint256& islockHash, const
return;
}

bool isLockedTxKnown = WITH_LOCK(cs, return pendingNoTxInstantSendLocks.find(islockHash) == pendingNoTxInstantSendLocks.end());

bool activateBestChain = false;
for (const auto& p : conflicts) {
auto pindex = p.first;
Expand All @@ -1464,7 +1493,13 @@ void CInstantSendManager::ResolveBlockConflicts(const uint256& islockHash, const
// This should not have happened and we are in a state were it's not safe to continue anymore
assert(false);
}
activateBestChain = true;
if (isLockedTxKnown) {
activateBestChain = true;
} else {
LogPrintf("CInstantSendManager::%s -- resetting block %s\n", __func__, pindex2->GetBlockHash().ToString());
LOCK(cs_main);
ResetBlockFailureFlags(pindex2);
}
}

if (activateBestChain) {
Expand Down Expand Up @@ -1585,7 +1620,7 @@ bool CInstantSendManager::AlreadyHave(const CInv& inv) const
}

LOCK(cs);
return pendingInstantSendLocks.count(inv.hash) != 0 || db.KnownInstantSendLock(inv.hash);
return pendingInstantSendLocks.count(inv.hash) != 0 || pendingNoTxInstantSendLocks.count(inv.hash) != 0 || db.KnownInstantSendLock(inv.hash);
}

bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CInstantSendLock& ret) const
Expand All @@ -1596,7 +1631,18 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CI

auto islock = db.GetInstantSendLockByHash(hash);
if (!islock) {
return false;
LOCK(cs);
auto it = pendingInstantSendLocks.find(hash);
if (it != pendingInstantSendLocks.end()) {
islock = it->second.second;
} else {
auto itNoTx = pendingNoTxInstantSendLocks.find(hash);
if (itNoTx != pendingNoTxInstantSendLocks.end()) {
islock = itNoTx->second.second;
} else {
return false;
}
}
}
ret = *islock;
return true;
Expand Down Expand Up @@ -1635,6 +1681,25 @@ bool CInstantSendManager::IsConflicted(const CTransaction& tx) const
return GetConflictingLock(tx) != nullptr;
}

bool CInstantSendManager::IsWaitingForTx(const uint256& txHash) const
{
if (!IsInstantSendEnabled()) {
return false;
}

LOCK(cs);
auto it = pendingNoTxInstantSendLocks.begin();
while (it != pendingNoTxInstantSendLocks.end()) {
if (it->second.second->txid == txHash) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s\n", __func__,
txHash.ToString(), it->first.ToString());
return true;
}
++it;
}
return false;
}

CInstantSendLockPtr CInstantSendManager::GetConflictingLock(const CTransaction& tx) const
{
if (!IsInstantSendEnabled()) {
Expand Down
3 changes: 3 additions & 0 deletions src/llmq/instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class CInstantSendManager : public CRecoveredSigsListener

// Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingInstantSendLocks GUARDED_BY(cs);
// Tried to veryfy but there is no tx yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher> pendingNoTxInstantSendLocks GUARDED_BY(cs);

// TXs which are neither IS locked nor ChainLocked. We use this to determine for which TXs we need to retry IS locking
// of child TXs
Expand Down Expand Up @@ -251,6 +253,7 @@ class CInstantSendManager : public CRecoveredSigsListener

public:
bool IsLocked(const uint256& txHash) const;
bool IsWaitingForTx(const uint256& txHash) const;
CInstantSendLockPtr GetConflictingLock(const CTransaction& tx) const;

void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override;
Expand Down
4 changes: 3 additions & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,9 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
// masternode would not be able to exploit this to spam the network with specially
// crafted invalid DSTX-es and potentially cause high load cheaply, because
// corresponding checks in ProcessMessage won't let it to send DSTX-es too often.
bool fIgnoreRecentRejects = llmq::quorumInstantSendManager->IsLocked(inv.hash) || inv.type == MSG_DSTX;
bool fIgnoreRecentRejects = inv.type == MSG_DSTX ||
llmq::quorumInstantSendManager->IsWaitingForTx(inv.hash) ||
llmq::quorumInstantSendManager->IsLocked(inv.hash);

return (!fIgnoreRecentRejects && recentRejects->contains(inv.hash)) ||
(inv.type == MSG_DSTX && static_cast<bool>(CCoinJoin::GetDSTX(inv.hash))) ||
Expand Down
19 changes: 12 additions & 7 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,19 @@ static bool AcceptToMemoryPoolWorker(const CChainParams& chainparams, CTxMemPool
REJECT_INVALID, "tx-txlock-conflict");
}

// Check for conflicts with in-memory transactions
for (const CTxIn &txin : tx.vin)
{
const CTransaction* ptxConflicting = pool.GetConflictTx(txin.prevout);
if (ptxConflicting )
if (llmq::quorumInstantSendManager->IsWaitingForTx(hash)) {
pool.removeConflicts(tx);
pool.removeProTxConflicts(tx);
} else {
// Check for conflicts with in-memory transactions
for (const CTxIn &txin : tx.vin)
{
// Transaction conflicts with mempool and RBF doesn't exist in Dash
return state.Invalid(false, REJECT_DUPLICATE, "txn-mempool-conflict");
const CTransaction* ptxConflicting = pool.GetConflictTx(txin.prevout);
if (ptxConflicting)
{
// Transaction conflicts with mempool and RBF doesn't exist in Dash
return state.Invalid(false, REJECT_DUPLICATE, "txn-mempool-conflict");
}
}
}

Expand Down
16 changes: 13 additions & 3 deletions test/functional/feature_llmq_is_cl_conflicts.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,24 @@ def test_chainlock_overrides_islock_overrides_nonchainlock(self, deterministic):
self.sync_all()
assert self.nodes[0].getbestblockhash() == cl_block.hash

# Send the ISLOCK, which should result in the last 2 blocks to be invalidated, even though the nodes don't know
# the locked transaction yet
# Send the ISLOCK, which should result in the last 2 blocks to be disconnected,
# even though the nodes don't know the locked transaction yet
self.test_node.send_islock(islock, deterministic)
for node in self.nodes:
wait_until(lambda: node.getbestblockhash() == good_tip, timeout=10, sleep=0.5)
# islock for tx2 is incomplete, tx1 should return in mempool now that blocks are disconnected
assert rawtx1_txid in set(node.getrawmempool())

# Send the actual transaction and mine it
# Should drop tx1 and accept tx2 because there is an islock waiting for it
self.nodes[0].sendrawtransaction(rawtx2)
# bump mocktime to force tx relay
self.bump_mocktime(60)
for node in self.nodes:
self.wait_for_instantlock(rawtx2_txid, node)

# Should not allow competing txes now
assert_raises_rpc_error(-26, "tx-txlock-conflict", self.nodes[0].sendrawtransaction, rawtx1)

islock_tip = self.nodes[0].generate(1)[0]
self.sync_all()

Expand Down
25 changes: 15 additions & 10 deletions test/functional/p2p_instantsend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

from test_framework.test_framework import DashTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error, hash256, hex_str_to_bytes, isolate_node, reconnect_isolated_node
from test_framework.util import assert_equal, assert_raises_rpc_error, isolate_node, reconnect_isolated_node

'''
p2p_instantsend.py
Expand Down Expand Up @@ -85,6 +85,8 @@ def test_mempool_doublespend(self):
sender = self.nodes[self.sender_idx]
receiver = self.nodes[self.receiver_idx]
isolated = self.nodes[self.isolated_idx]
connected_nodes = self.nodes.copy()
del connected_nodes[self.isolated_idx]

# feed the sender with some balance
sender_addr = sender.getnewaddress()
Expand All @@ -95,28 +97,31 @@ def test_mempool_doublespend(self):

# create doublespending transaction, but don't relay it
dblspnd_tx = self.create_raw_tx(sender, isolated, 0.5, 1, 100)
dblspnd_txid = hash256(hex_str_to_bytes(dblspnd_tx['hex']))[::-1].hex()
# isolate one node from network
isolate_node(isolated)
# send doublespend transaction to isolated node
isolated.sendrawtransaction(dblspnd_tx['hex'])
dblspnd_txid = isolated.sendrawtransaction(dblspnd_tx['hex'])
assert dblspnd_txid in set(isolated.getrawmempool())
# let isolated node rejoin the network
# The previously isolated node should NOT relay the doublespending TX
reconnect_isolated_node(isolated, 0)
for node in self.nodes:
if node is not isolated:
assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", node.getrawtransaction, dblspnd_txid)
# instantsend to receiver. The previously isolated node should prune the doublespend TX and request the correct
# TX from other nodes.
for node in connected_nodes:
assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", node.getrawtransaction, dblspnd_txid)
# Instantsend to receiver. The previously isolated node won't accept the tx but it should
# request the correct TX from other nodes once the corresponding lock is received.
# And this time the doublespend TX should be pruned once the correct tx is received.
receiver_addr = receiver.getnewaddress()
is_id = sender.sendtoaddress(receiver_addr, 0.9)
# wait for the transaction to propagate
self.sync_mempools()
for node in self.nodes:
self.wait_for_instantlock(is_id, node)
assert_raises_rpc_error(-5, "No such mempool or blockchain transaction", isolated.getrawtransaction, dblspnd_txid)
assert dblspnd_txid not in set(isolated.getrawmempool())
# send coins back to the controller node without waiting for confirmations
receiver.sendtoaddress(self.nodes[0].getnewaddress(), 0.9, "", "", True)
sentback_id = receiver.sendtoaddress(self.nodes[0].getnewaddress(), 0.9, "", "", True)
self.sync_mempools()
for node in self.nodes:
self.wait_for_instantlock(sentback_id, node)
assert_equal(receiver.getwalletinfo()["balance"], 0)
# mine more blocks
self.bump_mocktime(1)
Expand Down