diff --git a/src/llmq/blockprocessor.h b/src/llmq/blockprocessor.h index 92c9036978703e..d45e2cb0e105b1 100644 --- a/src/llmq/blockprocessor.h +++ b/src/llmq/blockprocessor.h @@ -35,7 +35,7 @@ class CQuorumBlockProcessor CEvoDB& evoDb; // TODO cleanup - mutable CCriticalSection minableCommitmentsCs; + mutable Mutex minableCommitmentsCs; std::map, uint256> minableCommitmentsByQuorum GUARDED_BY(minableCommitmentsCs); std::map minableCommitments GUARDED_BY(minableCommitmentsCs); diff --git a/src/llmq/debug.h b/src/llmq/debug.h index d4cab07390cd0f..591b5a28701b8f 100644 --- a/src/llmq/debug.h +++ b/src/llmq/debug.h @@ -89,7 +89,7 @@ class CDKGDebugStatus class CDKGDebugManager { private: - mutable CCriticalSection cs; + mutable Mutex cs; CDKGDebugStatus localStatus GUARDED_BY(cs); public: diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index 5ffd35968ebcc2..34653786432a18 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -33,8 +33,8 @@ namespace llmq // - commit-omit // - commit-lie -static CCriticalSection cs_simDkgError; -static std::map simDkgErrorMap; +static Mutex cs_simDkgError; +static std::map simDkgErrorMap GUARDED_BY(cs_simDkgError); void SetSimulatedDKGErrorRate(const std::string& type, double rate) { @@ -65,7 +65,6 @@ CDKGMember::CDKGMember(const CDeterministicMNCPtr& _dmn, size_t _idx) : idx(_idx), id(_dmn->proTxHash) { - } bool CDKGSession::Init(const CBlockIndex* _pQuorumBaseBlockIndex, const std::vector& mns, const uint256& _myProTxHash) diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index c8809c69eb92fb..4e47b879b1d4b6 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -43,7 +43,7 @@ class CDKGPendingMessages using BinaryMessage = std::pair>; private: - mutable CCriticalSection cs; + mutable Mutex cs; const int invType; size_t maxMessagesPerNode GUARDED_BY(cs); std::list pendingMessages GUARDED_BY(cs); @@ -54,10 +54,10 @@ class CDKGPendingMessages explicit CDKGPendingMessages(size_t _maxMessagesPerNode, int _invType) : invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {}; - void PushPendingMessage(NodeId from, CDataStream& vRecv); - std::list PopPendingMessages(size_t maxCount); - bool HasSeen(const uint256& hash) const; - void Clear(); + void PushPendingMessage(NodeId from, CDataStream& vRecv) LOCKS_EXCLUDED(cs); + std::list PopPendingMessages(size_t maxCount) LOCKS_EXCLUDED(cs); + bool HasSeen(const uint256& hash) const LOCKS_EXCLUDED(cs); + void Clear() LOCKS_EXCLUDED(cs); template void PushPendingMessage(NodeId from, Message& msg) diff --git a/src/llmq/dkgsessionmgr.cpp b/src/llmq/dkgsessionmgr.cpp index c4922f6f8baba6..e5671a1e0edf94 100644 --- a/src/llmq/dkgsessionmgr.cpp +++ b/src/llmq/dkgsessionmgr.cpp @@ -306,7 +306,6 @@ void CDKGSessionManager::WriteEncryptedContributions(Consensus::LLMQType llmqTyp bool CDKGSessionManager::GetVerifiedContributions(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, const std::vector& validMembers, std::vector& memberIndexesRet, std::vector& vvecsRet, BLSSecretKeyVector& skContributionsRet) const { - LOCK(contributionsCacheCs); auto members = CLLMQUtils::GetAllQuorumMembers(GetLLMQParams(llmqType), pQuorumBaseBlockIndex); memberIndexesRet.clear(); @@ -315,6 +314,8 @@ bool CDKGSessionManager::GetVerifiedContributions(Consensus::LLMQType llmqType, memberIndexesRet.reserve(members.size()); vvecsRet.reserve(members.size()); skContributionsRet.reserve(members.size()); + + LOCK(contributionsCacheCs); for (size_t i = 0; i < members.size(); i++) { if (validMembers[i]) { const uint256& proTxHash = members[i]->proTxHash; @@ -371,8 +372,8 @@ bool CDKGSessionManager::GetEncryptedContributions(Consensus::LLMQType llmqType, void CDKGSessionManager::CleanupCache() const { - LOCK(contributionsCacheCs); auto curTime = GetTimeMillis(); + LOCK(contributionsCacheCs); for (auto it = contributionsCache.begin(); it != contributionsCache.end(); ) { if (curTime - it->second.entryTime > MAX_CONTRIBUTION_CACHE_TIME) { it = contributionsCache.erase(it); diff --git a/src/llmq/dkgsessionmgr.h b/src/llmq/dkgsessionmgr.h index 35533172cfb5d6..e7fecf8d38f16a 100644 --- a/src/llmq/dkgsessionmgr.h +++ b/src/llmq/dkgsessionmgr.h @@ -26,7 +26,6 @@ class CDKGSessionManager std::map dkgSessionHandlers; - mutable CCriticalSection contributionsCacheCs; struct ContributionsCacheKey { Consensus::LLMQType llmqType; uint256 quorumHash; @@ -43,6 +42,7 @@ class CDKGSessionManager BLSVerificationVectorPtr vvec; CBLSSecretKey skContribution; }; + mutable Mutex contributionsCacheCs; mutable std::map contributionsCache GUARDED_BY(contributionsCacheCs); public: @@ -64,7 +64,10 @@ class CDKGSessionManager // Contributions are written while in the DKG void WriteVerifiedVvecContribution(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, const uint256& proTxHash, const BLSVerificationVectorPtr& vvec); void WriteVerifiedSkContribution(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, const uint256& proTxHash, const CBLSSecretKey& skContribution); - bool GetVerifiedContributions(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, const std::vector& validMembers, std::vector& memberIndexesRet, std::vector& vvecsRet, BLSSecretKeyVector& skContributionsRet) const; + bool GetVerifiedContributions(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, + const std::vector& validMembers, std::vector& memberIndexesRet, + std::vector& vvecsRet, + BLSSecretKeyVector& skContributionsRet) const LOCKS_EXCLUDED(contributionsCacheCs); /// Write encrypted (unverified) DKG contributions for the member with the given proTxHash to the llmqDb void WriteEncryptedContributions(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex, const uint256& proTxHash, const CBLSIESMultiRecipientObjects& contributions); /// Read encrypted (unverified) DKG contributions for the member with the given proTxHash from the llmqDb @@ -72,7 +75,7 @@ class CDKGSessionManager private: void MigrateDKG(); - void CleanupCache() const; + void CleanupCache() const LOCKS_EXCLUDED(contributionsCacheCs); }; bool IsQuorumDKGEnabled(); diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index ad29222fc52f9f..bb49e339ee5b9e 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -31,7 +31,7 @@ static const std::string DB_QUORUM_QUORUM_VVEC = "q_Qqvvec"; CQuorumManager* quorumManager; -CCriticalSection cs_data_requests; +Mutex cs_data_requests; static std::unordered_map, CQuorumDataRequest, StaticSaltedHasher> mapQuorumDataRequests GUARDED_BY(cs_data_requests); static uint256 MakeQuorumKey(const CQuorum& q) @@ -285,7 +285,6 @@ void CQuorumManager::EnsureQuorumConnections(const Consensus::LLMQParams& llmqPa CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex) const { - AssertLockHeld(quorumsCacheCs); assert(pQuorumBaseBlockIndex); const uint256& quorumHash{pQuorumBaseBlockIndex->GetBlockHash()}; @@ -321,7 +320,7 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l StartCachePopulatorThread(quorum); } - mapQuorumsCache[llmqType].insert(quorumHash, quorum); + WITH_LOCK(quorumsCacheCs, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); return quorum; } @@ -336,20 +335,22 @@ bool CQuorumManager::BuildQuorumContributions(const CFinalCommitmentPtr& fqc, co } cxxtimer::Timer t2(true); - LOCK(quorum->cs); - quorum->quorumVvec = blsWorker.BuildQuorumVerificationVector(vvecs); - if (!quorum->HasVerificationVector()) { - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- failed to build quorumVvec\n", __func__); - // without the quorum vvec, there can't be a skShare, so we fail here. Failure is not fatal here, as it still - // allows to use the quorum as a non-member (verification through the quorum pub key) - return false; - } - quorum->skShare = blsWorker.AggregateSecretKeys(skContributions); - if (!quorum->skShare.IsValid()) { - quorum->skShare.Reset(); - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- failed to build skShare\n", __func__); - // We don't bail out here as this is not a fatal error and still allows us to recover public key shares (as we - // have a valid quorum vvec at this point) + { + LOCK(quorum->cs); + quorum->quorumVvec = blsWorker.BuildQuorumVerificationVector(vvecs); + if (!quorum->HasVerificationVector()) { + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- failed to build quorumVvec\n", __func__); + // without the quorum vvec, there can't be a skShare, so we fail here. Failure is not fatal here, as it still + // allows to use the quorum as a non-member (verification through the quorum pub key) + return false; + } + quorum->skShare = blsWorker.AggregateSecretKeys(skContributions); + if (!quorum->skShare.IsValid()) { + quorum->skShare.Reset(); + LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- failed to build skShare\n", __func__); + // We don't bail out here as this is not a fatal error and still allows us to recover public key shares (as we + // have a valid quorum vvec at this point) + } } t2.stop(); @@ -493,9 +494,8 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, const CBlock return nullptr; } - LOCK(quorumsCacheCs); CQuorumPtr pQuorum; - if (mapQuorumsCache[llmqType].get(quorumHash, pQuorum)) { + if (WITH_LOCK(quorumsCacheCs, return mapQuorumsCache[llmqType].get(quorumHash, pQuorum))) { return pQuorum; } @@ -649,12 +649,9 @@ void CQuorumManager::ProcessMessage(CNode* pFrom, const std::string& strCommand, } CQuorumPtr pQuorum; - { - LOCK(quorumsCacheCs); - if (!mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { - errorHandler("Quorum not found", 0); // Don't bump score because we asked for it - return; - } + if (LOCK(quorumsCacheCs); !mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { + errorHandler("Quorum not found", 0); // Don't bump score because we asked for it + return; } // Check if request has QUORUM_VERIFICATION_VECTOR data diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 784128e63b9e6d..c44515c2190e99 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -191,7 +191,7 @@ class CQuorumManager CBLSWorker& blsWorker; CDKGSessionManager& dkgManager; - mutable CCriticalSection quorumsCacheCs; + mutable Mutex quorumsCacheCs; mutable std::map> mapQuorumsCache GUARDED_BY(quorumsCacheCs); mutable std::map, StaticSaltedHasher>> scanQuorumsCache GUARDED_BY(quorumsCacheCs); @@ -226,7 +226,7 @@ class CQuorumManager // all private methods here are cs_main-free void EnsureQuorumConnections(const Consensus::LLMQParams& llmqParams, const CBlockIndex *pindexNew) const; - CQuorumPtr BuildQuorumFromCommitment(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex) const EXCLUSIVE_LOCKS_REQUIRED(quorumsCacheCs); + CQuorumPtr BuildQuorumFromCommitment(Consensus::LLMQType llmqType, const CBlockIndex* pQuorumBaseBlockIndex) const; bool BuildQuorumContributions(const CFinalCommitmentPtr& fqc, const std::shared_ptr& quorum) const; CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const CBlockIndex* pindex) const; diff --git a/src/llmq/signing.cpp b/src/llmq/signing.cpp index e17c5a6f579f84..47d1022c1a2bf3 100644 --- a/src/llmq/signing.cpp +++ b/src/llmq/signing.cpp @@ -358,8 +358,6 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteHashKey, bool deleteTimeKey) { - AssertLockHeld(cs); - CRecoveredSig recSig; if (!ReadRecoveredSig(llmqType, id, recSig)) { return; @@ -389,6 +387,7 @@ void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType l } } + LOCK(cs); hasSigForIdCache.erase(std::make_pair(recSig.llmqType, recSig.id)); hasSigForSessionCache.erase(signHash); if (deleteHashKey) { @@ -399,7 +398,6 @@ void CRecoveredSigsDb::RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType l // Completely remove any traces of the recovered sig void CRecoveredSigsDb::RemoveRecoveredSig(Consensus::LLMQType llmqType, const uint256& id) { - LOCK(cs); CDBBatch batch(*db); RemoveRecoveredSig(batch, llmqType, id, true, true); db->WriteBatch(batch); @@ -409,7 +407,6 @@ void CRecoveredSigsDb::RemoveRecoveredSig(Consensus::LLMQType llmqType, const ui // This will leave the byHash key in-place so that HasRecoveredSigForHash still returns true void CRecoveredSigsDb::TruncateRecoveredSig(Consensus::LLMQType llmqType, const uint256& id) { - LOCK(cs); CDBBatch batch(*db); RemoveRecoveredSig(batch, llmqType, id, false, false); db->WriteBatch(batch); @@ -448,15 +445,12 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) } CDBBatch batch(*db); - { - LOCK(cs); - for (const auto& e : toDelete) { - RemoveRecoveredSig(batch, e.first, e.second, true, false); + for (const auto& e : toDelete) { + RemoveRecoveredSig(batch, e.first, e.second, true, false); - if (batch.SizeEstimate() >= (1 << 24)) { - db->WriteBatch(batch); - batch.Clear(); - } + if (batch.SizeEstimate() >= (1 << 24)) { + db->WriteBatch(batch); + batch.Clear(); } } diff --git a/src/llmq/signing.h b/src/llmq/signing.h index 0142d59b979f04..1af12e95d81541 100644 --- a/src/llmq/signing.h +++ b/src/llmq/signing.h @@ -78,7 +78,7 @@ class CRecoveredSigsDb private: std::unique_ptr db{nullptr}; - mutable CCriticalSection cs; + mutable Mutex cs; mutable unordered_lru_cache, bool, StaticSaltedHasher, 30000> hasSigForIdCache GUARDED_BY(cs); mutable unordered_lru_cache hasSigForSessionCache GUARDED_BY(cs); mutable unordered_lru_cache hasSigForHashCache GUARDED_BY(cs); @@ -113,7 +113,7 @@ class CRecoveredSigsDb void MigrateRecoveredSigs(); bool ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret) const; - void RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteHashKey, bool deleteTimeKey) EXCLUSIVE_LOCKS_REQUIRED(cs); + void RemoveRecoveredSig(CDBBatch& batch, Consensus::LLMQType llmqType, const uint256& id, bool deleteHashKey, bool deleteTimeKey); }; class CRecoveredSigsListener @@ -134,10 +134,10 @@ class CSigningManager static constexpr int SIGN_HEIGHT_OFFSET{8}; private: - mutable CCriticalSection cs; - CRecoveredSigsDb db; + mutable CCriticalSection cs; + // Incoming and not verified yet std::unordered_map>> pendingRecoveredSigs GUARDED_BY(cs); std::unordered_map, StaticSaltedHasher> pendingReconstructedRecoveredSigs GUARDED_BY(cs); diff --git a/src/llmq/signing_shares.cpp b/src/llmq/signing_shares.cpp index 823c34920d2f7f..36e0646849b794 100644 --- a/src/llmq/signing_shares.cpp +++ b/src/llmq/signing_shares.cpp @@ -322,7 +322,7 @@ bool CSigSharesManager::ProcessMessageSigSesAnn(const CNode* pfrom, const CSigSe return true; // let's still try other announcements from the same message } - LOCK(cs); + LOCK(cs_nodeStates); auto& nodeState = nodeStates[pfrom->GetId()]; auto& session = nodeState.GetOrCreateSessionFromAnn(ann); nodeState.sessionByRecvId.erase(session.recvSessionId); @@ -365,7 +365,7 @@ bool CSigSharesManager::ProcessMessageSigSharesInv(const CNode* pfrom, const CSi return true; } - LOCK(cs); + LOCK(cs_nodeStates); auto& nodeState = nodeStates[pfrom->GetId()]; auto session = nodeState.GetSessionByRecvId(inv.sessionId); if (!session) { @@ -395,7 +395,7 @@ bool CSigSharesManager::ProcessMessageGetSigShares(const CNode* pfrom, const CSi LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId()); - LOCK(cs); + LOCK(cs_nodeStates); auto& nodeState = nodeStates[pfrom->GetId()]; auto session = nodeState.GetSessionByRecvId(inv.sessionId); if (!session) { @@ -421,7 +421,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode* pfrom, const sigSharesToProcess.reserve(batchedSigShares.sigShares.size()); { - LOCK(cs); + LOCK(cs_nodeStates); auto& nodeState = nodeStates[pfrom->GetId()]; for (const auto& sigSharetmp : batchedSigShares.sigShares) { @@ -432,7 +432,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode* pfrom, const // It's important to only skip seen *valid* sig shares here. If a node sends us a // batch of mostly valid sig shares with a single invalid one and thus batched // verification fails, we'd skip the valid ones in the future if received from other nodes - if (sigShares.Has(sigShare.GetKey())) { + if (LOCK(cs_sigShares); sigShares.Has(sigShare.GetKey())) { continue; } @@ -452,7 +452,7 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(const CNode* pfrom, const return true; } - LOCK(cs); + LOCK(cs_nodeStates); auto& nodeState = nodeStates[pfrom->GetId()]; for (const auto& s : sigSharesToProcess) { nodeState.pendingIncomingSigShares.Add(s.GetKey(), s); @@ -492,17 +492,15 @@ void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& s return; } - { - LOCK(cs); - - if (sigShares.Has(sigShare.GetKey())) { - return; - } - - if (quorumSigningManager->HasRecoveredSigForId(sigShare.llmqType, sigShare.id)) { - return; - } + if (LOCK(cs_sigShares); sigShares.Has(sigShare.GetKey())) { + return; + } + if (quorumSigningManager->HasRecoveredSigForId(sigShare.llmqType, sigShare.id)) { + return; + } + { + LOCK(cs_nodeStates); auto& nodeState = nodeStates[fromId]; nodeState.pendingIncomingSigShares.Add(sigShare.GetKey(), sigShare); } @@ -558,7 +556,7 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) { { - LOCK(cs); + LOCK(cs_nodeStates); if (nodeStates.empty()) { return; } @@ -570,23 +568,24 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( // the whole verification process std::unordered_set, StaticSaltedHasher> uniqueSignHashes; - CLLMQUtils::IterateNodesRandom(nodeStates, [&]() { - return uniqueSignHashes.size() < maxUniqueSessions; - }, [&](NodeId nodeId, CSigSharesNodeState& ns) { - if (ns.pendingIncomingSigShares.Empty()) { - return false; - } - auto& sigShare = *ns.pendingIncomingSigShares.GetFirst(); - - AssertLockHeld(cs); - if (const bool alreadyHave = this->sigShares.Has(sigShare.GetKey()); !alreadyHave) { - uniqueSignHashes.emplace(nodeId, sigShare.GetSignHash()); - retSigShares[nodeId].emplace_back(sigShare); - } - ns.pendingIncomingSigShares.Erase(sigShare.GetKey()); - return !ns.pendingIncomingSigShares.Empty(); - }, rnd); + { + LOCK(cs_rnd); + CLLMQUtils::IterateNodesRandom(nodeStates, [&]() { + return uniqueSignHashes.size() < maxUniqueSessions; + }, [&](NodeId nodeId, CSigSharesNodeState &ns) { + if (ns.pendingIncomingSigShares.Empty()) { + return false; + } + auto &sigShare = *ns.pendingIncomingSigShares.GetFirst(); + if (const bool alreadyHave = WITH_LOCK(cs_sigShares, return this->sigShares.Has(sigShare.GetKey())); !alreadyHave) { + uniqueSignHashes.emplace(nodeId, sigShare.GetSignHash()); + retSigShares[nodeId].emplace_back(sigShare); + } + ns.pendingIncomingSigShares.Erase(sigShare.GetKey()); + return !ns.pendingIncomingSigShares.Empty(); + }, rnd); + } if (retSigShares.empty()) { return; } @@ -712,21 +711,21 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnma } { - LOCK(cs); - - if (!sigShares.Add(sigShare.GetKey(), sigShare)) { + if (LOCK(cs_sigShares); !sigShares.Add(sigShare.GetKey(), sigShare)) { return; } if (!CLLMQUtils::IsAllMembersConnectedEnabled(llmqType)) { + LOCK(cs_sigSharesQueuedToAnnounce); sigSharesQueuedToAnnounce.Add(sigShare.GetKey(), true); } // Update the time we've seen the last sigShare - timeSeenForSessions[sigShare.GetSignHash()] = GetAdjustedTime(); + WITH_LOCK(cs_timeSeenForSessions, timeSeenForSessions[sigShare.GetSignHash()] = GetAdjustedTime()); if (!quorumNodes.empty()) { // don't announce and wait for other nodes to request this share and directly send it to them // there is no way the other nodes know about this share as this is the one created on this node + LOCK(cs_nodeStates); for (auto otherNodeId : quorumNodes) { auto& nodeState = nodeStates[otherNodeId]; auto& session = nodeState.GetOrCreateSessionFromShare(sigShare); @@ -736,7 +735,7 @@ void CSigSharesManager::ProcessSigShare(const CSigShare& sigShare, const CConnma } } - size_t sigShareCount = sigShares.CountForSignHash(sigShare.GetSignHash()); + size_t sigShareCount = WITH_LOCK(cs_sigShares, return sigShares.CountForSignHash(sigShare.GetSignHash())); if (sigShareCount >= size_t(quorum->params.threshold)) { canTryRecovery = true; } @@ -756,9 +755,8 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& std::vector sigSharesForRecovery; std::vector idsForRecovery; { - LOCK(cs); - auto signHash = CLLMQUtils::BuildSignHash(quorum->params.type, quorum->qc->quorumHash, id, msgHash); + LOCK(cs_sigShares); auto sigSharesForSignHash = sigShares.GetAllForSignHash(signHash); if (!sigSharesForSignHash) { return; @@ -826,7 +824,7 @@ CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPt void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) { - AssertLockHeld(cs); + LOCK(cs_nodeStates); int64_t now = GetAdjustedTime(); const size_t maxRequestsForNode = 32; @@ -840,7 +838,8 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_mapsecond >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != p->first) { - // other node timed out, re-request from this node - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- other node timeout while waiting for %s-%d, re-request from=%d, node=%d\n", __func__, - k.first.ToString(), k.second, nodeId, p->first); - } else { - continue; + { + LOCK(cs_sigSharesRequested); + if (const auto p = sigSharesRequested.Get(k)) { + if (now - p->second >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != p->first) { + // other node timed out, re-request from this node + LogPrint(BCLog::LLMQ_SIGS, + "CSigSharesManager::%s -- other node timeout while waiting for %s-%d, re-request from=%d, node=%d\n", + __func__, + k.first.ToString(), k.second, nodeId, p->first); + } else { + continue; + } } - } - // if we got this far we should do a request + // if we got this far we should do a request - // track when we initiated the request so that we can detect timeouts - nodeState.requestedSigShares.Add(k, now); + // track when we initiated the request so that we can detect timeouts + nodeState.requestedSigShares.Add(k, now); - // don't request it from other nodes until a timeout happens - auto& r = sigSharesRequested.GetOrAdd(k); - r.first = nodeId; - r.second = now; + // don't request it from other nodes until a timeout happens + auto &r = sigSharesRequested.GetOrAdd(k); + r.first = nodeId; + r.second = now; + } if (!invMap) { invMap = &sigSharesToRequest[nodeId]; @@ -921,7 +925,7 @@ void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToSend) { - AssertLockHeld(cs); + LOCK(cs_nodeStates); for (auto& [nodeId, nodeState] : nodeStates) { if (nodeState.banned) { @@ -948,6 +952,7 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) { - AssertLockHeld(cs); - std::unordered_map proTxToNode; for (const auto& pnode : vNodes) { auto verifiedProRegTxHash = pnode->GetVerifiedProRegTxHash(); @@ -984,46 +987,51 @@ void CSigSharesManager::CollectSigSharesToSendConcentrated(std::unordered_map().count(); - for (auto& [_, signedSession] : signedSessions) { - if (!CLLMQUtils::IsAllMembersConnectedEnabled(signedSession.quorum->params.type)) { - continue; - } - - if (signedSession.attempt > signedSession.quorum->params.recoveryMembers) { - continue; - } - - if (curTime >= signedSession.nextAttemptTime) { - int64_t waitTime = exp2(signedSession.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT; - waitTime = std::min(MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime); - signedSession.nextAttemptTime = curTime + waitTime; - auto dmn = SelectMemberForRecovery(signedSession.quorum, signedSession.sigShare.id, signedSession.attempt); - signedSession.attempt++; - - LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, sending to %s, attempt=%d\n", __func__, - signedSession.sigShare.GetSignHash().ToString(), dmn->proTxHash.ToString(), signedSession.attempt); + { + LOCK(cs_signedSessions); + for (auto& [_, signedSession]: signedSessions) { + if (!CLLMQUtils::IsAllMembersConnectedEnabled(signedSession.quorum->params.type)) { + continue; + } - auto it = proTxToNode.find(dmn->proTxHash); - if (it == proTxToNode.end()) { + if (signedSession.attempt > signedSession.quorum->params.recoveryMembers) { continue; } - auto& m = sigSharesToSend[it->second->GetId()]; - m.emplace_back(signedSession.sigShare); + if (curTime >= signedSession.nextAttemptTime) { + int64_t waitTime = exp2(signedSession.attempt) * EXP_SEND_FOR_RECOVERY_TIMEOUT; + waitTime = std::min(MAX_SEND_FOR_RECOVERY_TIMEOUT, waitTime); + signedSession.nextAttemptTime = curTime + waitTime; + auto dmn = SelectMemberForRecovery(signedSession.quorum, signedSession.sigShare.id, + signedSession.attempt); + signedSession.attempt++; + + LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, sending to %s, attempt=%d\n", + __func__, + signedSession.sigShare.GetSignHash().ToString(), dmn->proTxHash.ToString(), + signedSession.attempt); + + auto it = proTxToNode.find(dmn->proTxHash); + if (it == proTxToNode.end()) { + continue; + } + + auto &m = sigSharesToSend[it->second->GetId()]; + m.emplace_back(signedSession.sigShare); + } } } } void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) { - AssertLockHeld(cs); - std::unordered_map, std::unordered_set, StaticSaltedHasher> quorumNodesMap; + LOCK(cs_sigSharesQueuedToAnnounce); sigSharesQueuedToAnnounce.ForEach([this, &quorumNodesMap, &sigSharesToAnnounce](const SigShareKey& sigShareKey, bool) { - AssertLockHeld(cs); const auto& signHash = sigShareKey.first; auto quorumMember = sigShareKey.second; + LOCK(cs_sigShares); const CSigShare* sigShare = sigShares.Get(sigShareKey); if (!sigShare) { return; @@ -1039,6 +1047,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_mapsecond; + LOCK(cs_nodeStates); for (auto& nodeId : quorumNodes) { auto& nodeState = nodeStates[nodeId]; @@ -1075,7 +1084,7 @@ bool CSigSharesManager::SendMessages() std::unordered_map> sigSessionAnnouncements; auto addSigSesAnnIfNeeded = [&](NodeId nodeId, const uint256& signHash) { - AssertLockHeld(cs); + AssertLockHeld(cs_nodeStates); auto& nodeState = nodeStates[nodeId]; auto session = nodeState.GetSessionBySignHash(signHash); assert(session); @@ -1097,12 +1106,12 @@ bool CSigSharesManager::SendMessages() std::vector vNodesCopy = g_connman->CopyNodeVector(CConnman::FullyConnectedOnly); { - LOCK(cs); CollectSigSharesToRequest(sigSharesToRequest); CollectSigSharesToSend(sigShareBatchesToSend); CollectSigSharesToAnnounce(sigSharesToAnnounce); CollectSigSharesToSendConcentrated(sigSharesToSend, vNodesCopy); + LOCK(cs_nodeStates); for (auto& [nodeId, sigShareMap] : sigSharesToRequest) { for (auto& [hash, sigShareInv] : sigShareMap) { sigShareInv.sessionId = addSigSesAnnIfNeeded(nodeId, hash); @@ -1233,7 +1242,7 @@ bool CSigSharesManager::SendMessages() bool CSigSharesManager::GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo) { - LOCK(cs); + LOCK(cs_nodeStates); return nodeStates[nodeId].GetSessionInfoByRecvId(sessionId, retInfo); } @@ -1265,7 +1274,7 @@ void CSigSharesManager::Cleanup() std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; { - LOCK(cs); + LOCK(cs_sigShares); sigShares.ForEach([&quorums](const SigShareKey&, const CSigShare& sigShare) { quorums.try_emplace(std::make_pair(sigShare.llmqType, sigShare.quorumHash), nullptr); }); @@ -1283,7 +1292,7 @@ void CSigSharesManager::Cleanup() { // Now delete sessions which are for inactive quorums - LOCK(cs); + LOCK(cs_sigShares); std::unordered_set inactiveQuorumSessions; sigShares.ForEach([&quorums, &inactiveQuorumSessions](const SigShareKey&, const CSigShare& sigShare) { if (!quorums.count(std::make_pair(sigShare.llmqType, sigShare.quorumHash))) { @@ -1296,7 +1305,7 @@ void CSigSharesManager::Cleanup() } { - LOCK(cs); + LOCK(cs_sigShares); // Remove sessions which were successfully recovered std::unordered_set doneSessions; @@ -1314,11 +1323,15 @@ void CSigSharesManager::Cleanup() // Remove sessions which timed out std::unordered_set timeoutSessions; - for (const auto& [signHash, lastSeenTime] : timeSeenForSessions) { - if (now - lastSeenTime >= SESSION_NEW_SHARES_TIMEOUT) { - timeoutSessions.emplace(signHash); + { + LOCK(cs_timeSeenForSessions); + for (const auto& [signHash, lastSeenTime]: timeSeenForSessions) { + if (now - lastSeenTime >= SESSION_NEW_SHARES_TIMEOUT) { + timeoutSessions.emplace(signHash); + } } } + LOCK(cs_sigShares); for (auto& signHash : timeoutSessions) { if (const size_t count = sigShares.CountForSignHash(signHash); count > 0) { @@ -1353,7 +1366,7 @@ void CSigSharesManager::Cleanup() // Find node states for peers that disappeared from CConnman std::unordered_set nodeStatesToDelete; { - LOCK(cs); + LOCK(cs_nodeStates); for (const auto& [nodeId, _] : nodeStates) { nodeStatesToDelete.emplace(nodeId); } @@ -1363,17 +1376,20 @@ void CSigSharesManager::Cleanup() }); // Now delete these node states - LOCK(cs); + LOCK(cs_nodeStates); for (const auto& nodeId : nodeStatesToDelete) { auto it = nodeStates.find(nodeId); if (it == nodeStates.end()) { continue; } // remove global requested state to force a re-request from another node - it->second.requestedSigShares.ForEach([this](const SigShareKey& k, bool) { - AssertLockHeld(cs); - sigSharesRequested.Erase(k); - }); + { + LOCK(cs_sigSharesRequested); + it->second.requestedSigShares.ForEach([this](const SigShareKey &k, bool) { + AssertLockHeld(cs_sigSharesRequested); + sigSharesRequested.Erase(k); + }); + } nodeStates.erase(nodeId); } @@ -1382,31 +1398,35 @@ void CSigSharesManager::Cleanup() void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) { - AssertLockHeld(cs); - - for (auto& [_, nodeState] : nodeStates) { - nodeState.RemoveSession(signHash); + { + LOCK(cs_nodeStates); + for (auto& [_, nodeState] : nodeStates) { + nodeState.RemoveSession(signHash); + } } - sigSharesRequested.EraseAllForSignHash(signHash); - sigSharesQueuedToAnnounce.EraseAllForSignHash(signHash); - sigShares.EraseAllForSignHash(signHash); - signedSessions.erase(signHash); - timeSeenForSessions.erase(signHash); + WITH_LOCK(cs_sigSharesRequested, sigSharesRequested.EraseAllForSignHash(signHash)); + WITH_LOCK(cs_sigSharesQueuedToAnnounce, sigSharesQueuedToAnnounce.EraseAllForSignHash(signHash)); + WITH_LOCK(cs_sigShares, sigShares.EraseAllForSignHash(signHash)); + WITH_LOCK(cs_signedSessions, signedSessions.erase(signHash)); + WITH_LOCK(cs_timeSeenForSessions, timeSeenForSessions.erase(signHash)); } void CSigSharesManager::RemoveBannedNodeStates() { // Called regularly to cleanup local node states for banned nodes - LOCK2(cs_main, cs); + LOCK2(cs_main, cs_nodeStates); for (auto it = nodeStates.begin(); it != nodeStates.end();) { if (IsBanned(it->first)) { // re-request sigshares from other nodes - it->second.requestedSigShares.ForEach([this](const SigShareKey& k, int64_t) { - AssertLockHeld(cs); - sigSharesRequested.Erase(k); - }); + { + LOCK(cs_sigSharesRequested); + it->second.requestedSigShares.ForEach([this](const SigShareKey &k, int64_t) { + AssertLockHeld(cs_sigSharesRequested); + sigSharesRequested.Erase(k); + }); + } it = nodeStates.erase(it); } else { ++it; @@ -1425,7 +1445,7 @@ void CSigSharesManager::BanNode(NodeId nodeId) Misbehaving(nodeId, 100); } - LOCK(cs); + LOCK(cs_nodeStates); auto it = nodeStates.find(nodeId); if (it == nodeStates.end()) { return; @@ -1433,10 +1453,13 @@ void CSigSharesManager::BanNode(NodeId nodeId) auto& nodeState = it->second; // Whatever we requested from him, let's request it from someone else now - nodeState.requestedSigShares.ForEach([this](const SigShareKey& k, int64_t) { - AssertLockHeld(cs); - sigSharesRequested.Erase(k); - }); + { + LOCK(cs_sigSharesRequested); + nodeState.requestedSigShares.ForEach([this](const SigShareKey &k, int64_t) { + AssertLockHeld(cs_sigSharesRequested); + sigSharesRequested.Erase(k); + }); + } nodeState.requestedSigShares.Clear(); nodeState.banned = true; @@ -1478,7 +1501,7 @@ void CSigSharesManager::WorkThreadMain() void CSigSharesManager::AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash) { - LOCK(cs); + LOCK(cs_pendingSigns); pendingSigns.emplace_back(quorum, id, msgHash); } @@ -1486,7 +1509,7 @@ void CSigSharesManager::SignPendingSigShares() { std::vector v; { - LOCK(cs); + LOCK(cs_pendingSigns); v = std::move(pendingSigns); } @@ -1498,7 +1521,7 @@ void CSigSharesManager::SignPendingSigShares() ProcessSigShare(sigShare, *g_connman, pQuorum); if (CLLMQUtils::IsAllMembersConnectedEnabled(pQuorum->params.type)) { - LOCK(cs); + LOCK(cs_signedSessions); auto& session = signedSessions[sigShare.GetSignHash()]; session.sigShare = sigShare; session.quorum = pQuorum; @@ -1560,14 +1583,15 @@ void CSigSharesManager::ForceReAnnouncement(const CQuorumCPtr& quorum, Consensus return; } - LOCK(cs); auto signHash = CLLMQUtils::BuildSignHash(llmqType, quorum->qc->quorumHash, id, msgHash); - if (const auto sigs = sigShares.GetAllForSignHash(signHash)) { + if (LOCK(cs_sigShares); const auto sigs = sigShares.GetAllForSignHash(signHash)) { + LOCK(cs_sigSharesQueuedToAnnounce); for (const auto& [quorumMemberIndex, _] : *sigs) { // re-announce every sigshare to every node sigSharesQueuedToAnnounce.Add(std::make_pair(signHash, quorumMemberIndex), true); } } + LOCK(cs_nodeStates); for (auto& [_, nodeState] : nodeStates) { auto session = nodeState.GetSessionBySignHash(signHash); if (!session) { @@ -1582,7 +1606,6 @@ void CSigSharesManager::ForceReAnnouncement(const CQuorumCPtr& quorum, Consensus void CSigSharesManager::HandleNewRecoveredSig(const llmq::CRecoveredSig& recoveredSig) { - LOCK(cs); RemoveSigSharesForSession(CLLMQUtils::BuildSignHash(recoveredSig)); } diff --git a/src/llmq/signing_shares.h b/src/llmq/signing_shares.h index 2bc1b156e74c08..574dfb90e87a19 100644 --- a/src/llmq/signing_shares.h +++ b/src/llmq/signing_shares.h @@ -347,20 +347,25 @@ class CSigSharesManager : public CRecoveredSigsListener static constexpr int64_t MAX_SEND_FOR_RECOVERY_TIMEOUT{10000}; static constexpr size_t MAX_MSGS_SIG_SHARES{32}; - CCriticalSection cs; - std::thread workThread; CThreadInterrupt workInterrupt; - SigShareMap sigShares GUARDED_BY(cs); - std::unordered_map signedSessions GUARDED_BY(cs); + Mutex cs_sigShares; + SigShareMap sigShares GUARDED_BY(cs_sigShares); + + Mutex cs_signedSessions; + std::unordered_map signedSessions GUARDED_BY(cs_signedSessions); // stores time of last receivedSigShare. Used to detect timeouts - std::unordered_map timeSeenForSessions GUARDED_BY(cs); + Mutex cs_timeSeenForSessions; + std::unordered_map timeSeenForSessions GUARDED_BY(cs_timeSeenForSessions); - std::unordered_map nodeStates GUARDED_BY(cs); - SigShareMap> sigSharesRequested GUARDED_BY(cs); - SigShareMap sigSharesQueuedToAnnounce GUARDED_BY(cs); + Mutex cs_nodeStates; + std::unordered_map nodeStates GUARDED_BY(cs_nodeStates); + Mutex cs_sigSharesRequested; + SigShareMap> sigSharesRequested GUARDED_BY(cs_sigSharesRequested); + Mutex cs_sigSharesQueuedToAnnounce; + SigShareMap sigSharesQueuedToAnnounce GUARDED_BY(cs_sigSharesQueuedToAnnounce); struct PendingSignatureData { const CQuorumCPtr quorum; @@ -370,11 +375,13 @@ class CSigSharesManager : public CRecoveredSigsListener PendingSignatureData(CQuorumCPtr quorum, const uint256& id, const uint256& msgHash) : quorum(std::move(quorum)), id(id), msgHash(msgHash){} }; - std::vector pendingSigns GUARDED_BY(cs); + Mutex cs_pendingSigns; + std::vector pendingSigns GUARDED_BY(cs_pendingSigns); - FastRandomContext rnd GUARDED_BY(cs); + Mutex cs_rnd; + FastRandomContext rnd GUARDED_BY(cs_rnd); - int64_t lastCleanupTime{0}; + std::atomic lastCleanupTime{0}; std::atomic recoveredSigsCounter{0}; public: @@ -427,16 +434,16 @@ class CSigSharesManager : public CRecoveredSigsListener static CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const std::pair& in); void Cleanup(); - void RemoveSigSharesForSession(const uint256& signHash) EXCLUSIVE_LOCKS_REQUIRED(cs); + void RemoveSigSharesForSession(const uint256& signHash); void RemoveBannedNodeStates(); void BanNode(NodeId nodeId); bool SendMessages(); - void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) EXCLUSIVE_LOCKS_REQUIRED(cs); - void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend) EXCLUSIVE_LOCKS_REQUIRED(cs); - void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes) EXCLUSIVE_LOCKS_REQUIRED(cs); - void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) EXCLUSIVE_LOCKS_REQUIRED(cs); + void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest); + void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend); + void CollectSigSharesToSendConcentrated(std::unordered_map>& sigSharesToSend, const std::vector& vNodes); + void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce); void SignPendingSigShares(); void WorkThreadMain(); };