Skip to content

Commit

Permalink
refactor: move PeerManager out of CInstantSendManager ctor
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed Dec 5, 2024
1 parent 82d1aed commit d9e5cc7
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/dsnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
{
assert(m_cj_ctx && m_llmq_ctx);

m_llmq_ctx->isman->TransactionAddedToMempool(ptx);
m_llmq_ctx->isman->TransactionAddedToMempool(m_peerman, ptx);
m_llmq_ctx->clhandler->TransactionAddedToMempool(ptx, nAcceptTime);
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
}
Expand Down
6 changes: 3 additions & 3 deletions src/llmq/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ LLMQContext::LLMQContext(ChainstateManager& chainman, CConnman& connman, CDeterm
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler,
chainman.ActiveChainstate(), *qman,
*sigman, *shareman, sporkman,
mempool, mn_sync, peerman,
is_masternode, unit_tests, wipe);
mempool, mn_sync, is_masternode,
unit_tests, wipe);
return llmq::quorumInstantSendManager.get();
}()},
ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainman, mnhfman, *sigman, *shareman, *qman)}
Expand Down Expand Up @@ -87,7 +87,7 @@ void LLMQContext::Start(CConnman& connman, PeerManager& peerman)
sigman->StartWorkerThread(peerman);

llmq::chainLocksHandler->Start();
llmq::quorumInstantSendManager->Start();
llmq::quorumInstantSendManager->Start(peerman);
}

void LLMQContext::Stop() {
Expand Down
52 changes: 29 additions & 23 deletions src/llmq/instantsend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,14 @@ std::vector<uint256> CInstantSendDb::RemoveChainedInstantSendLocks(const uint256
////////////////


void CInstantSendManager::Start()
void CInstantSendManager::Start(PeerManager& peerman)
{
// can't start new thread if we have one running already
if (workThread.joinable()) {
assert(false);
}

workThread = std::thread(&util::TraceThread, "isman", [this] { WorkThreadMain(); });
workThread = std::thread(&util::TraceThread, "isman", [this, &peerman] { WorkThreadMain(peerman); });

sigman.RegisterRecoveredSigsListener(this);
}
Expand Down Expand Up @@ -732,21 +732,23 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco
pendingInstantSendLocks.emplace(hash, std::make_pair(-1, islock));
}

PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv)
PeerMsgRet CInstantSendManager::ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type,
CDataStream& vRecv)
{
if (IsInstantSendEnabled() && msg_type == NetMsgType::ISDLOCK) {
const auto islock = std::make_shared<CInstantSendLock>();
vRecv >> *islock;
return ProcessMessageInstantSendLock(pfrom, islock);
return ProcessMessageInstantSendLock(pfrom, peerman, islock);
}
return {};
}

PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, const llmq::CInstantSendLockPtr& islock)
PeerMsgRet CInstantSendManager::ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman,
const llmq::CInstantSendLockPtr& islock)
{
auto hash = ::SerializeHash(*islock);

WITH_LOCK(::cs_main, Assert(m_peerman)->EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash)));
WITH_LOCK(::cs_main, peerman.EraseObjectRequest(pfrom.GetId(), CInv(MSG_ISDLOCK, hash)));

if (!islock->TriviallyValid()) {
return tl::unexpected{100};
Expand Down Expand Up @@ -800,7 +802,7 @@ bool CInstantSendLock::TriviallyValid() const
return true;
}

bool CInstantSendManager::ProcessPendingInstantSendLocks()
bool CInstantSendManager::ProcessPendingInstantSendLocks(PeerManager& peerman)
{
decltype(pendingInstantSendLocks) pend;
bool fMoreWork{false};
Expand Down Expand Up @@ -845,7 +847,7 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
auto dkgInterval = llmq_params.dkgInterval;

// First check against the current active set and don't ban
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, 0, pend, false);
auto badISLocks = ProcessPendingInstantSendLocks(llmq_params, peerman, /*signOffset=*/0, pend, false);
if (!badISLocks.empty()) {
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- doing verification on old active set\n", __func__);

Expand All @@ -858,13 +860,15 @@ bool CInstantSendManager::ProcessPendingInstantSendLocks()
}
}
// Now check against the previous active set and perform banning if this fails
ProcessPendingInstantSendLocks(llmq_params, dkgInterval, pend, true);
ProcessPendingInstantSendLocks(llmq_params, peerman, dkgInterval, pend, true);
}

return fMoreWork;
}

std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params, int signOffset, const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
{
CBLSBatchVerifier<NodeId, uint256> batchVerifier(false, true, 8);
std::unordered_map<uint256, CRecoveredSig, StaticSaltedHasher> recSigs;
Expand Down Expand Up @@ -936,7 +940,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
for (const auto& nodeId : batchVerifier.badSources) {
// Let's not be too harsh, as the peer might simply be unlucky and might have sent us an old lock which
// does not validate anymore due to changed quorums
Assert(m_peerman)->Misbehaving(nodeId, 20);
peerman.Misbehaving(nodeId, 20);
}
}
for (const auto& p : pend) {
Expand All @@ -951,7 +955,7 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
continue;
}

ProcessInstantSendLock(nodeId, hash, islock);
ProcessInstantSendLock(nodeId, peerman, hash, islock);

// See comment further on top. We pass a reconstructed recovered sig to the signing manager to avoid
// double-verification of the sig.
Expand All @@ -969,7 +973,8 @@ std::unordered_set<uint256, StaticSaltedHasher> CInstantSendManager::ProcessPend
return badISLocks;
}

void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock)
void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash,
const CInstantSendLockPtr& islock)
{
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n", __func__,
islock->txid.ToString(), hash.ToString(), from);
Expand Down Expand Up @@ -1030,28 +1035,28 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has

CInv inv(MSG_ISDLOCK, hash);
if (tx != nullptr) {
Assert(m_peerman)->RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
peerman.RelayInvFiltered(inv, *tx, ISDLOCK_PROTO_VERSION);
} else {
// we don't have the TX yet, so we only filter based on txid. Later when that TX arrives, we will re-announce
// with the TX taken into account.
Assert(m_peerman)->RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
peerman.RelayInvFiltered(inv, islock->txid, ISDLOCK_PROTO_VERSION);
}

ResolveBlockConflicts(hash, *islock);

if (tx != nullptr) {
RemoveMempoolConflictsForLock(hash, *islock);
RemoveMempoolConflictsForLock(peerman, hash, *islock);
LogPrint(BCLog::INSTANTSEND, "CInstantSendManager::%s -- notify about lock %s for tx %s\n", __func__,
hash.ToString(), tx->GetHash().ToString());
GetMainSignals().NotifyTransactionLock(tx, islock);
// bump mempool counter to make sure newly locked txes are picked up by getblocktemplate
mempool.AddTransactionsUpdated(1);
} else {
m_peerman->AskPeersForTransaction(islock->txid, m_is_masternode);
peerman.AskPeersForTransaction(islock->txid, m_is_masternode);
}
}

void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
void CInstantSendManager::TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx)
{
if (!IsInstantSendEnabled() || !m_mn_sync.IsBlockchainSynced() || tx->vin.empty()) {
return;
Expand Down Expand Up @@ -1080,7 +1085,7 @@ void CInstantSendManager::TransactionAddedToMempool(const CTransactionRef& tx)
// TX is not locked, so make sure it is tracked
AddNonLockedTx(tx, nullptr);
} else {
RemoveMempoolConflictsForLock(::SerializeHash(*islock), *islock);
RemoveMempoolConflictsForLock(peerman, ::SerializeHash(*islock), *islock);
}
}

Expand Down Expand Up @@ -1292,7 +1297,8 @@ void CInstantSendManager::HandleFullyConfirmedBlock(const CBlockIndex* pindex)
}
}

void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock)
void CInstantSendManager::RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash,
const CInstantSendLock& islock)
{
std::unordered_map<uint256, CTransactionRef, StaticSaltedHasher> toDelete;

Expand Down Expand Up @@ -1321,7 +1327,7 @@ void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, con
for (const auto& p : toDelete) {
RemoveConflictedTx(*p.second);
}
m_peerman->AskPeersForTransaction(islock.txid, m_is_masternode);
peerman.AskPeersForTransaction(islock.txid, m_is_masternode);
}
}

Expand Down Expand Up @@ -1583,10 +1589,10 @@ size_t CInstantSendManager::GetInstantSendLockCount() const
return db.GetInstantSendLockCount();
}

void CInstantSendManager::WorkThreadMain()
void CInstantSendManager::WorkThreadMain(PeerManager& peerman)
{
while (!workInterrupt) {
bool fMoreWork = ProcessPendingInstantSendLocks();
bool fMoreWork = ProcessPendingInstantSendLocks(peerman);
ProcessPendingRetryLockTxs();

if (!fMoreWork && !workInterrupt.sleep_for(std::chrono::milliseconds(100))) {
Expand Down
32 changes: 13 additions & 19 deletions src/llmq/instantsend.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ class CInstantSendManager : public CRecoveredSigsListener
CSporkManager& spork_manager;
CTxMemPool& mempool;
const CMasternodeSync& m_mn_sync;
const std::unique_ptr<PeerManager>& m_peerman;

const bool m_is_masternode;

Expand Down Expand Up @@ -256,9 +255,8 @@ class CInstantSendManager : public CRecoveredSigsListener
public:
explicit CInstantSendManager(CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman,
CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman,
CTxMemPool& _mempool, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool is_masternode, bool unitTests,
bool fWipe) :
CTxMemPool& _mempool, const CMasternodeSync& mn_sync, bool is_masternode,
bool unitTests, bool fWipe) :
db(unitTests, fWipe),
clhandler(_clhandler),
m_chainstate(chainstate),
Expand All @@ -268,14 +266,13 @@ class CInstantSendManager : public CRecoveredSigsListener
spork_manager(sporkman),
mempool(_mempool),
m_mn_sync(mn_sync),
m_peerman(peerman),
m_is_masternode{is_masternode}
{
workInterrupt.reset();
}
~CInstantSendManager() = default;

void Start();
void Start(PeerManager& peerman);
void Stop();
void InterruptWorkerThread() { workInterrupt(); };

Expand All @@ -295,18 +292,15 @@ class CInstantSendManager : public CRecoveredSigsListener
const Consensus::Params& params) EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests);
void TrySignInstantSendLock(const CTransaction& tx) EXCLUSIVE_LOCKS_REQUIRED(!cs_creating);

PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, const CInstantSendLockPtr& islock);
bool ProcessPendingInstantSendLocks()
PeerMsgRet ProcessMessageInstantSendLock(const CNode& pfrom, PeerManager& peerman, const CInstantSendLockPtr& islock);
bool ProcessPendingInstantSendLocks(PeerManager& peerman)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);

std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(const Consensus::LLMQParams& llmq_params,
int signOffset,
const std::unordered_map<uint256,
std::pair<NodeId, CInstantSendLockPtr>,
StaticSaltedHasher>& pend,
bool ban)
std::unordered_set<uint256, StaticSaltedHasher> ProcessPendingInstantSendLocks(
const Consensus::LLMQParams& llmq_params, PeerManager& peerman, int signOffset,
const std::unordered_map<uint256, std::pair<NodeId, CInstantSendLockPtr>, StaticSaltedHasher>& pend, bool ban)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLockPtr& islock)
void ProcessInstantSendLock(NodeId from, PeerManager& peerman, const uint256& hash, const CInstantSendLockPtr& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);

void AddNonLockedTx(const CTransactionRef& tx, const CBlockIndex* pindexMined)
Expand All @@ -318,14 +312,14 @@ class CInstantSendManager : public CRecoveredSigsListener
void TruncateRecoveredSigsForInputs(const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests);

void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock)
void RemoveMempoolConflictsForLock(PeerManager& peerman, const uint256& hash, const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);
void ResolveBlockConflicts(const uint256& islockHash, const CInstantSendLock& islock)
EXCLUSIVE_LOCKS_REQUIRED(!cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void ProcessPendingRetryLockTxs()
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingRetry);

void WorkThreadMain()
void WorkThreadMain(PeerManager& peerman)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);

void HandleFullyConfirmedBlock(const CBlockIndex* pindex)
Expand All @@ -339,9 +333,9 @@ class CInstantSendManager : public CRecoveredSigsListener
[[nodiscard]] MessageProcessingResult HandleNewRecoveredSig(const CRecoveredSig& recoveredSig) override
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_pendingLocks);

PeerMsgRet ProcessMessage(const CNode& pfrom, std::string_view msg_type, CDataStream& vRecv);
PeerMsgRet ProcessMessage(const CNode& pfrom, PeerManager& peerman, std::string_view msg_type, CDataStream& vRecv);

void TransactionAddedToMempool(const CTransactionRef& tx)
void TransactionAddedToMempool(PeerManager& peerman, const CTransactionRef& tx)
EXCLUSIVE_LOCKS_REQUIRED(!cs_creating, !cs_inputReqests, !cs_nonLocked, !cs_pendingLocks, !cs_pendingRetry);
void TransactionRemovedFromMempool(const CTransactionRef& tx);
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex)
Expand Down
2 changes: 1 addition & 1 deletion src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5278,7 +5278,7 @@ void PeerManagerImpl::ProcessMessage(
return; // CLSIG
}

ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, msg_type, vRecv), pfrom);
ProcessPeerMsgRet(m_llmq_ctx->isman->ProcessMessage(pfrom, *this, msg_type, vRecv), pfrom);
return;
}

Expand Down

0 comments on commit d9e5cc7

Please sign in to comment.