diff --git a/src/net.h b/src/net.h index 2032a3aefef4..5b2cbd658d92 100644 --- a/src/net.h +++ b/src/net.h @@ -884,6 +884,8 @@ class CNode // If true, we will send him all quorum related messages, even if he is not a member of our quorums std::atomic qwatch{false}; + std::set orphan_work_set; + CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false); ~CNode(); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 71a0ef1706c6..cc7fc68e3870 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -774,6 +774,8 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphansSize) return nEvicted; } +void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans); + // Requires cs_main. void Misbehaving(NodeId pnode, int howmuch) { @@ -843,13 +845,23 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &schedu } void PeerLogicValidation::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex, const std::vector& vtxConflicted) { - LOCK(g_cs_orphans); + LOCK2(cs_main, g_cs_orphans); std::vector vOrphanErase; + std::set orphanWorkSet; for (const CTransactionRef& ptx : pblock->vtx) { const CTransaction& tx = *ptx; + // Which orphan pool entries we should reprocess and potentially try to accept into mempool again? + for (size_t i = 0; i < tx.vin.size(); i++) { + auto itByPrev = mapOrphanTransactionsByPrev.find(COutPoint(tx.GetHash(), (uint32_t)i)); + if (itByPrev == mapOrphanTransactionsByPrev.end()) continue; + for (const auto& elem : itByPrev->second) { + orphanWorkSet.insert(elem->first); + } + } + // Which orphan pool entries must we evict? for (const auto& txin : tx.vin) { auto itByPrev = mapOrphanTransactionsByPrev.find(txin.prevout); @@ -871,6 +883,11 @@ void PeerLogicValidation::BlockConnected(const std::shared_ptr& pb LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx included or conflicted by block\n", nErased); } + while (!orphanWorkSet.empty()) { + LogPrint(BCLog::MEMPOOL, "Trying to process %d orphans\n", orphanWorkSet.size()); + ProcessOrphanTx(g_connman.get(), orphanWorkSet); + } + g_last_tip_update = GetTime(); } @@ -1649,6 +1666,64 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve return true; } +void static ProcessOrphanTx(CConnman* connman, std::set& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) +{ + AssertLockHeld(cs_main); + AssertLockHeld(g_cs_orphans); + std::set setMisbehaving; + bool done = false; + while (!done && !orphan_work_set.empty()) { + const uint256 orphanHash = *orphan_work_set.begin(); + orphan_work_set.erase(orphan_work_set.begin()); + + auto orphan_it = mapOrphanTransactions.find(orphanHash); + if (orphan_it == mapOrphanTransactions.end()) continue; + + const CTransactionRef porphanTx = orphan_it->second.tx; + const CTransaction& orphanTx = *porphanTx; + NodeId fromPeer = orphan_it->second.fromPeer; + bool fMissingInputs2 = false; + // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan + // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get + // anyone relaying LegitTxX banned) + CValidationState stateDummy; + + if (setMisbehaving.count(fromPeer)) continue; + if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) { + LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); + connman->RelayTransaction(orphanTx); + for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + orphan_work_set.insert(elem->first); + } + } + } + EraseOrphanTx(orphanHash); + done = true; + } else if (!fMissingInputs2) { + int nDos = 0; + if (stateDummy.IsInvalid(nDos) && nDos > 0) { + // Punish peer that gave us an invalid orphan tx + Misbehaving(fromPeer, nDos); + setMisbehaving.insert(fromPeer); + LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); + } + // Has inputs but not accepted to mempool + // Probably non-standard or insufficient fee + LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); + if (!stateDummy.CorruptionPossible()) { + assert(recentRejects); + recentRejects->insert(orphanHash); + } + EraseOrphanTx(orphanHash); + done = true; + } + mempool.check(pcoinsTip); + } +} + bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic& interruptMsgProc) { LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId()); @@ -2341,8 +2416,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - std::deque vWorkQueue; - std::vector vEraseQueue; CTransactionRef ptx; CPrivateSendBroadcastTx dstx; int nInvType = MSG_TX; @@ -2418,7 +2491,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.check(pcoinsTip); connman->RelayTransaction(tx); for (unsigned int i = 0; i < tx.vout.size(); i++) { - vWorkQueue.emplace_back(inv.hash, i); + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + pfrom->orphan_work_set.insert(elem->first); + } + } } pfrom->nLastTXTime = GetTime(); @@ -2429,62 +2507,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.size(), mempool.DynamicMemoryUsage() / 1000); // Recursively process any orphan transactions that depended on this one - std::set setMisbehaving; - while (!vWorkQueue.empty()) { - auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front()); - vWorkQueue.pop_front(); - if (itByPrev == mapOrphanTransactionsByPrev.end()) - continue; - for (auto mi = itByPrev->second.begin(); - mi != itByPrev->second.end(); - ++mi) - { - const CTransactionRef& porphanTx = (*mi)->second.tx; - const CTransaction& orphanTx = *porphanTx; - const uint256& orphanHash = orphanTx.GetHash(); - NodeId fromPeer = (*mi)->second.fromPeer; - bool fMissingInputs2 = false; - // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan - // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get - // anyone relaying LegitTxX banned) - CValidationState stateDummy; - - - if (setMisbehaving.count(fromPeer)) - continue; - if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, true, &fMissingInputs2)) { - LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); - connman->RelayTransaction(orphanTx); - for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { - vWorkQueue.emplace_back(orphanHash, i); - } - vEraseQueue.push_back(orphanHash); - } - else if (!fMissingInputs2) - { - int nDos = 0; - if (stateDummy.IsInvalid(nDos) && nDos > 0) - { - // Punish peer that gave us an invalid orphan tx - Misbehaving(fromPeer, nDos); - setMisbehaving.insert(fromPeer); - LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); - } - // Has inputs but not accepted to mempool - // Probably non-standard or insufficient fee - LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); - vEraseQueue.push_back(orphanHash); - if (!stateDummy.CorruptionPossible()) { - assert(recentRejects); - recentRejects->insert(orphanHash); - } - } - mempool.check(pcoinsTip); - } - } - - for (uint256 hash : vEraseQueue) - EraseOrphanTx(hash); + ProcessOrphanTx(connman, pfrom->orphan_work_set); } else if (fMissingInputs) { @@ -3203,11 +3226,17 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic& inter if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); + if (!pfrom->orphan_work_set.empty()) { + LOCK2(cs_main, g_cs_orphans); + ProcessOrphanTx(connman, pfrom->orphan_work_set); + } + if (pfrom->fDisconnect) return false; // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return true; + if (!pfrom->orphan_work_set.empty()) return true; // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) diff --git a/test/functional/invalidtxrequest.py b/test/functional/invalidtxrequest.py index 8d0e4f10e6d1..5792af2abeec 100755 --- a/test/functional/invalidtxrequest.py +++ b/test/functional/invalidtxrequest.py @@ -62,7 +62,16 @@ def run_test(self): # Save the coinbase for later block1 = block tip = block.sha256 - node.p2p.send_blocks_and_test([block], node, success=True) + + # Create a second one to test orphan resolution via block receival + height += 1 + block_time += 1 + block = create_block(tip, create_coinbase(height), block_time) + block.solve() + # Save the coinbase for later + block2 = block + tip = block.sha256 + node.p2p.send_blocks_and_test([block1, block2], node, success=True) self.log.info("Mature the block.") self.nodes[0].generate(100) @@ -80,10 +89,18 @@ def run_test(self): self.reconnect_p2p(num_connections=2) self.log.info('Test orphan transaction handling ... ') + self.test_orphan_tx_handling(block1.vtx[0].sha256, False) + node.generate(1) # Clear mempool + self.reconnect_p2p(num_connections=2) + self.test_orphan_tx_handling(block2.vtx[0].sha256, True) + + def test_orphan_tx_handling(self, base_tx, resolve_via_block): + node = self.nodes[0] # convenience reference to the node + # Create a root transaction that we withold until all dependend transactions # are sent out and in the orphan cache tx_withhold = CTransaction() - tx_withhold.vin.append(CTxIn(outpoint=COutPoint(block1.vtx[0].sha256, 0))) + tx_withhold.vin.append(CTxIn(outpoint=COutPoint(base_tx, 0))) tx_withhold.vout.append(CTxOut(nValue=50 * COIN - 12000, scriptPubKey=b'\x51')) tx_withhold.calc_sha256() @@ -119,7 +136,17 @@ def run_test(self): assert_equal(2, len(node.getpeerinfo())) # p2ps[1] is still connected self.log.info('Send the withhold tx ... ') - node.p2p.send_txs_and_test([tx_withhold], node, success=True) + if resolve_via_block: + # Test orphan handling/resolution by publishing the withhold TX via a mined block + prev_block = node.getblockheader(node.getbestblockhash()) + block = create_block(int(prev_block['hash'], 16), create_coinbase(prev_block['height'] + 1), prev_block["time"] + 1) + block.vtx.append(tx_withhold) + block.hashMerkleRoot = block.calc_merkle_root() + block.solve() + node.p2p.send_blocks_and_test([block], node, success=True) + else: + # Test orphan handling/resolution by publishing the withhold TX via the mempool + node.p2p.send_txs_and_test([tx_withhold], node, success=True) # Transactions that should end up in the mempool expected_mempool = { @@ -133,6 +160,9 @@ def run_test(self): # Transactions that do not end up in the mempool # tx_orphan_no_fee, because it has too low fee (p2ps[0] is not disconnected for relaying that tx) # tx_orphan_invaid, because it has negative fee (p2ps[1] is disconnected for relaying that tx) + if resolve_via_block: + # This TX has appeared in a block instead of being broadcasted via the mempool + expected_mempool.remove(tx_withhold.hash) wait_until(lambda: 1 == len(node.getpeerinfo()), timeout=12) # p2ps[1] is no longer connected assert_equal(expected_mempool, set(node.getrawmempool()))