Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,8 @@ class CNode
// If true, we will send him all quorum related messages, even if he is not a member of our quorums
std::atomic<bool> qwatch{false};

std::set<uint256> orphan_work_set;

CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false);
~CNode();

Expand Down
149 changes: 89 additions & 60 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,8 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphansSize)
return nEvicted;
}

void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans);

// Requires cs_main.
void Misbehaving(NodeId pnode, int howmuch)
{
Expand Down Expand Up @@ -843,13 +845,23 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &schedu
}

void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex, const std::vector<CTransactionRef>& vtxConflicted) {
LOCK(g_cs_orphans);
LOCK2(cs_main, g_cs_orphans);

std::vector<uint256> vOrphanErase;
std::set<uint256> orphanWorkSet;

for (const CTransactionRef& ptx : pblock->vtx) {
const CTransaction& tx = *ptx;

// Which orphan pool entries we should reprocess and potentially try to accept into mempool again?
Copy link
Member

@PastaPastaPasta PastaPastaPasta Oct 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you squash this commit in af7124e9e7cfe6c825ccb121b7292852b6400880?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for (size_t i = 0; i < tx.vin.size(); i++) {
auto itByPrev = mapOrphanTransactionsByPrev.find(COutPoint(tx.GetHash(), (uint32_t)i));
if (itByPrev == mapOrphanTransactionsByPrev.end()) continue;
for (const auto& elem : itByPrev->second) {
orphanWorkSet.insert(elem->first);
}
}

// Which orphan pool entries must we evict?
for (const auto& txin : tx.vin) {
auto itByPrev = mapOrphanTransactionsByPrev.find(txin.prevout);
Expand All @@ -871,6 +883,11 @@ void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pb
LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx included or conflicted by block\n", nErased);
}

while (!orphanWorkSet.empty()) {
LogPrint(BCLog::MEMPOOL, "Trying to process %d orphans\n", orphanWorkSet.size());
ProcessOrphanTx(g_connman.get(), orphanWorkSet);
}

g_last_tip_update = GetTime();
}

Expand Down Expand Up @@ -1649,6 +1666,64 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
return true;
}

void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
{
AssertLockHeld(cs_main);
AssertLockHeld(g_cs_orphans);
std::set<NodeId> setMisbehaving;
bool done = false;
while (!done && !orphan_work_set.empty()) {
const uint256 orphanHash = *orphan_work_set.begin();
orphan_work_set.erase(orphan_work_set.begin());

auto orphan_it = mapOrphanTransactions.find(orphanHash);
if (orphan_it == mapOrphanTransactions.end()) continue;

const CTransactionRef porphanTx = orphan_it->second.tx;
const CTransaction& orphanTx = *porphanTx;
NodeId fromPeer = orphan_it->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;

if (setMisbehaving.count(fromPeer)) continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
connman->RelayTransaction(orphanTx);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
orphan_work_set.insert(elem->first);
}
}
}
EraseOrphanTx(orphanHash);
done = true;
} else if (!fMissingInputs2) {
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0) {
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
if (!stateDummy.CorruptionPossible()) {
assert(recentRejects);
recentRejects->insert(orphanHash);
}
EraseOrphanTx(orphanHash);
done = true;
}
mempool.check(pcoinsTip);
}
}

bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
{
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId());
Expand Down Expand Up @@ -2341,8 +2416,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}

std::deque<COutPoint> vWorkQueue;
std::vector<uint256> vEraseQueue;
CTransactionRef ptx;
CPrivateSendBroadcastTx dstx;
int nInvType = MSG_TX;
Expand Down Expand Up @@ -2418,7 +2491,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.check(pcoinsTip);
connman->RelayTransaction(tx);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
vWorkQueue.emplace_back(inv.hash, i);
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
pfrom->orphan_work_set.insert(elem->first);
}
}
}

pfrom->nLastTXTime = GetTime();
Expand All @@ -2429,62 +2507,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.size(), mempool.DynamicMemoryUsage() / 1000);

// Recursively process any orphan transactions that depended on this one
std::set<NodeId> setMisbehaving;
while (!vWorkQueue.empty()) {
auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front());
vWorkQueue.pop_front();
if (itByPrev == mapOrphanTransactionsByPrev.end())
continue;
for (auto mi = itByPrev->second.begin();
mi != itByPrev->second.end();
++mi)
{
const CTransactionRef& porphanTx = (*mi)->second.tx;
const CTransaction& orphanTx = *porphanTx;
const uint256& orphanHash = orphanTx.GetHash();
NodeId fromPeer = (*mi)->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;


if (setMisbehaving.count(fromPeer))
continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
connman->RelayTransaction(orphanTx);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
vWorkQueue.emplace_back(orphanHash, i);
}
vEraseQueue.push_back(orphanHash);
}
else if (!fMissingInputs2)
{
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0)
{
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
vEraseQueue.push_back(orphanHash);
if (!stateDummy.CorruptionPossible()) {
assert(recentRejects);
recentRejects->insert(orphanHash);
}
}
mempool.check(pcoinsTip);
}
}

for (uint256 hash : vEraseQueue)
EraseOrphanTx(hash);
ProcessOrphanTx(connman, pfrom->orphan_work_set);
}
else if (fMissingInputs)
{
Expand Down Expand Up @@ -3203,11 +3226,17 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);

if (!pfrom->orphan_work_set.empty()) {
LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(connman, pfrom->orphan_work_set);
}

if (pfrom->fDisconnect)
return false;

// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return true;
if (!pfrom->orphan_work_set.empty()) return true;

// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)
Expand Down
36 changes: 33 additions & 3 deletions test/functional/invalidtxrequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,16 @@ def run_test(self):
# Save the coinbase for later
block1 = block
tip = block.sha256
node.p2p.send_blocks_and_test([block], node, success=True)

# Create a second one to test orphan resolution via block receival
height += 1
block_time += 1
block = create_block(tip, create_coinbase(height), block_time)
block.solve()
# Save the coinbase for later
block2 = block
tip = block.sha256
node.p2p.send_blocks_and_test([block1, block2], node, success=True)

self.log.info("Mature the block.")
self.nodes[0].generate(100)
Expand All @@ -80,10 +89,18 @@ def run_test(self):
self.reconnect_p2p(num_connections=2)

self.log.info('Test orphan transaction handling ... ')
self.test_orphan_tx_handling(block1.vtx[0].sha256, False)
node.generate(1) # Clear mempool
self.reconnect_p2p(num_connections=2)
self.test_orphan_tx_handling(block2.vtx[0].sha256, True)

def test_orphan_tx_handling(self, base_tx, resolve_via_block):
node = self.nodes[0] # convenience reference to the node

# Create a root transaction that we withold until all dependend transactions
# are sent out and in the orphan cache
tx_withhold = CTransaction()
tx_withhold.vin.append(CTxIn(outpoint=COutPoint(block1.vtx[0].sha256, 0)))
tx_withhold.vin.append(CTxIn(outpoint=COutPoint(base_tx, 0)))
tx_withhold.vout.append(CTxOut(nValue=50 * COIN - 12000, scriptPubKey=b'\x51'))
tx_withhold.calc_sha256()

Expand Down Expand Up @@ -119,7 +136,17 @@ def run_test(self):
assert_equal(2, len(node.getpeerinfo())) # p2ps[1] is still connected

self.log.info('Send the withhold tx ... ')
node.p2p.send_txs_and_test([tx_withhold], node, success=True)
if resolve_via_block:
# Test orphan handling/resolution by publishing the withhold TX via a mined block
prev_block = node.getblockheader(node.getbestblockhash())
block = create_block(int(prev_block['hash'], 16), create_coinbase(prev_block['height'] + 1), prev_block["time"] + 1)
block.vtx.append(tx_withhold)
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
node.p2p.send_blocks_and_test([block], node, success=True)
else:
# Test orphan handling/resolution by publishing the withhold TX via the mempool
node.p2p.send_txs_and_test([tx_withhold], node, success=True)

# Transactions that should end up in the mempool
expected_mempool = {
Expand All @@ -133,6 +160,9 @@ def run_test(self):
# Transactions that do not end up in the mempool
# tx_orphan_no_fee, because it has too low fee (p2ps[0] is not disconnected for relaying that tx)
# tx_orphan_invaid, because it has negative fee (p2ps[1] is disconnected for relaying that tx)
if resolve_via_block:
# This TX has appeared in a block instead of being broadcasted via the mempool
expected_mempool.remove(tx_withhold.hash)

wait_until(lambda: 1 == len(node.getpeerinfo()), timeout=12) # p2ps[1] is no longer connected
assert_equal(expected_mempool, set(node.getrawmempool()))
Expand Down