Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Split quorum contrib data out of evodb #5481

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/llmq/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
#include <llmq/signing_shares.h>

LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CDeterministicMNManager& dmnman, CEvoDB& evo_db,
CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman, CTxMemPool& mempool,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const std::unique_ptr<PeerManager>& peerman, bool unit_tests, bool wipe) :
CMasternodeMetaMan& mn_metaman, CMNHFManager& mnhfman, CSporkManager& sporkman,
CTxMemPool& mempool, const CActiveMasternodeManager* const mn_activeman,
const CMasternodeSync& mn_sync, const std::unique_ptr<PeerManager>& peerman, bool unit_tests,
bool wipe) :
is_masternode{mn_activeman != nullptr},
bls_worker{std::make_shared<CBLSWorker>()},
dkg_debugman{std::make_unique<llmq::CDKGDebugManager>()},
quorum_block_processor{std::make_unique<llmq::CQuorumBlockProcessor>(chainstate, dmnman, evo_db, peerman)},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, dmnman, *dkg_debugman, mn_metaman, *quorum_block_processor, mn_activeman, sporkman, peerman, unit_tests, wipe)},
qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainstate, connman, dmnman, *qdkgsman, evo_db, *quorum_block_processor, mn_activeman, mn_sync, sporkman)},
qdkgsman{std::make_unique<llmq::CDKGSessionManager>(*bls_worker, chainstate, connman, dmnman, *dkg_debugman,
mn_metaman, *quorum_block_processor, mn_activeman, sporkman,
peerman, unit_tests, wipe)},
qman{std::make_unique<llmq::CQuorumManager>(*bls_worker, chainstate, connman, dmnman, *qdkgsman, evo_db,
*quorum_block_processor, mn_activeman, mn_sync, sporkman, unit_tests,
wipe)},
sigman{std::make_unique<llmq::CSigningManager>(connman, mn_activeman, chainstate, *qman, peerman, unit_tests, wipe)},
shareman{std::make_unique<llmq::CSigSharesManager>(connman, *sigman, mn_activeman, *qman, sporkman, peerman)},
clhandler{[&]() -> llmq::CChainLocksHandler* const {
Expand All @@ -39,7 +44,8 @@ LLMQContext::LLMQContext(CChainState& chainstate, CConnman& connman, CDeterminis
llmq::quorumInstantSendManager = std::make_unique<llmq::CInstantSendManager>(*llmq::chainLocksHandler, chainstate, connman, *qman, *sigman, *shareman, sporkman, mempool, mn_sync, peerman, is_masternode, unit_tests, wipe);
return llmq::quorumInstantSendManager.get();
}()},
ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainstate, mnhfman, *sigman, *shareman, mempool, *qman, sporkman, peerman)}
ehfSignalsHandler{std::make_unique<llmq::CEHFSignalsHandler>(chainstate, mnhfman, *sigman, *shareman, mempool,
*qman, sporkman, peerman)}
{
// NOTE: we use this only to wipe the old db, do NOT use it for anything else
// TODO: remove it in some future version
Expand Down
95 changes: 81 additions & 14 deletions src/llmq/quorums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ int CQuorum::GetMemberIndex(const uint256& proTxHash) const
return -1;
}

void CQuorum::WriteContributions(CEvoDB& evoDb) const
void CQuorum::WriteContributions(CDBWrapper& db) const
{
uint256 dbKey = MakeQuorumKey(*this);

Expand All @@ -175,19 +175,19 @@ void CQuorum::WriteContributions(CEvoDB& evoDb) const
for (auto& pubkey : *quorumVvec) {
s << CBLSPublicKeyVersionWrapper(pubkey, false);
}
evoDb.GetRawDB().Write(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s);
db.Write(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s);
}
if (skShare.IsValid()) {
evoDb.GetRawDB().Write(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare);
db.Write(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare);
}
}

bool CQuorum::ReadContributions(CEvoDB& evoDb)
bool CQuorum::ReadContributions(const CDBWrapper& db)
{
uint256 dbKey = MakeQuorumKey(*this);
CDataStream s(SER_DISK, CLIENT_VERSION);

if (!evoDb.GetRawDB().ReadDataStream(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s)) {
if (!db.ReadDataStream(std::make_pair(DB_QUORUM_QUORUM_VVEC, dbKey), s)) {
return false;
}

Expand All @@ -203,27 +203,31 @@ bool CQuorum::ReadContributions(CEvoDB& evoDb)
quorumVvec = std::make_shared<std::vector<CBLSPublicKey>>(std::move(qv));
// We ignore the return value here as it is ok if this fails. If it fails, it usually means that we are not a
// member of the quorum but observed the whole DKG process to have the quorum verification vector.
evoDb.GetRawDB().Read(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare);
db.Read(std::make_pair(DB_QUORUM_SK_SHARE, dbKey), skShare);

return true;
}

CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGSessionManager& _dkgManager, CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, const CSporkManager& sporkman) :
CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman,
CDeterministicMNManager& dmnman, CDKGSessionManager& _dkgManager, CEvoDB& _evoDb,
CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const CSporkManager& sporkman, bool unit_tests, bool wipe) :
db(std::make_unique<CDBWrapper>(unit_tests ? "" : (gArgs.GetDataDirNet() / "llmq" / "quorumdb"), 1 << 20,
unit_tests, wipe)),
blsWorker(_blsWorker),
m_chainstate(chainstate),
connman(_connman),
m_dmnman(dmnman),
dkgManager(_dkgManager),
m_evoDb(_evoDb),
quorumBlockProcessor(_quorumBlockProcessor),
m_mn_activeman(mn_activeman),
m_mn_sync(mn_sync),
m_sporkman(sporkman)
{
utils::InitQuorumsCache(mapQuorumsCache, false);
quorumThreadInterrupt.reset();
MigrateOldQuorumDB(_evoDb);
}

void CQuorumManager::Start()
Expand Down Expand Up @@ -404,11 +408,11 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l
quorum->Init(std::move(qc), pQuorumBaseBlockIndex, minedBlockHash, members);

bool hasValidVvec = false;
if (quorum->ReadContributions(m_evoDb)) {
if (WITH_LOCK(cs_db, return quorum->ReadContributions(*db))) {
hasValidVvec = true;
} else {
if (BuildQuorumContributions(quorum->qc, quorum)) {
quorum->WriteContributions(m_evoDb);
WITH_LOCK(cs_db, quorum->WriteContributions(*db));
hasValidVvec = true;
} else {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- llmqType[%d] quorumIndex[%d] quorum.ReadContributions and BuildQuorumContributions for quorumHash[%s] failed\n", __func__, ToUnderlying(llmqType), quorum->qc->quorumIndex, quorum->qc->quorumHash.ToString());
Expand Down Expand Up @@ -846,7 +850,7 @@ PeerMsgRet CQuorumManager::ProcessMessage(CNode& pfrom, const std::string& msg_t
return errorHandler("Invalid secret key share received");
}
}
pQuorum->WriteContributions(m_evoDb);
WITH_LOCK(cs_db, pQuorum->WriteContributions(*db));
return {};
}
return {};
Expand Down Expand Up @@ -1100,13 +1104,76 @@ void CQuorumManager::StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex)
}

if (!quorumThreadInterrupt) {
DataCleanupHelper(m_evoDb.GetRawDB(), dbKeysToSkip);
WITH_LOCK(cs_db, DataCleanupHelper(*db, dbKeysToSkip));
}

LogPrint(BCLog::LLMQ, "CQuorumManager::StartCleanupOldQuorumDataThread -- done. time=%d\n", t.count());
});
}

// TODO: remove in v23
void CQuorumManager::MigrateOldQuorumDB(CEvoDB& evoDb) const
UdjinM6 marked this conversation as resolved.
Show resolved Hide resolved
{
LOCK(cs_db);
if (!db->IsEmpty()) return;

const auto prefixes = {DB_QUORUM_QUORUM_VVEC, DB_QUORUM_SK_SHARE};

LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- start\n", __func__);

CDBBatch batch(*db);
std::unique_ptr<CDBIterator> pcursor(evoDb.GetRawDB().NewIterator());

for (const auto& prefix : prefixes) {
auto start = std::make_tuple(prefix, uint256());
pcursor->Seek(start);

int count{0};
while (pcursor->Valid()) {
decltype(start) k;
CDataStream s(SER_DISK, CLIENT_VERSION);
CBLSSecretKey sk;

if (!pcursor->GetKey(k) || std::get<0>(k) != prefix) {
break;
}

if (prefix == DB_QUORUM_QUORUM_VVEC) {
if (!evoDb.GetRawDB().ReadDataStream(k, s)) {
break;
}
batch.Write(k, s);
}
if (prefix == DB_QUORUM_SK_SHARE) {
if (!pcursor->GetValue(sk)) {
break;
}
batch.Write(k, sk);
}

if (batch.SizeEstimate() >= (1 << 24)) {
db->WriteBatch(batch);
batch.Clear();
}

++count;
pcursor->Next();
}

db->WriteBatch(batch);

LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s moved %d\n", __func__, prefix, count);
}

pcursor.reset();
db->CompactFull();

DataCleanupHelper(evoDb.GetRawDB(), {});
evoDb.CommitRootTransaction();

LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- done\n", __func__);
}

CQuorumCPtr SelectQuorumForSigning(const Consensus::LLMQParams& llmq_params, const CChain& active_chain, const CQuorumManager& qman,
const uint256& selectionHash, int signHeight, int signOffset)
{
Expand Down
12 changes: 8 additions & 4 deletions src/llmq/quorums.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ class CQuorum

private:
bool HasVerificationVectorInternal() const EXCLUSIVE_LOCKS_REQUIRED(cs_vvec_shShare);
void WriteContributions(CEvoDB& evoDb) const;
bool ReadContributions(CEvoDB& evoDb);
void WriteContributions(CDBWrapper& db) const;
bool ReadContributions(const CDBWrapper& db);
};

/**
Expand All @@ -230,12 +230,14 @@ class CQuorum
class CQuorumManager
{
private:
mutable Mutex cs_db;
std::unique_ptr<CDBWrapper> db GUARDED_BY(cs_db){nullptr};

CBLSWorker& blsWorker;
CChainState& m_chainstate;
CConnman& connman;
CDeterministicMNManager& m_dmnman;
CDKGSessionManager& dkgManager;
CEvoDB& m_evoDb;
CQuorumBlockProcessor& quorumBlockProcessor;
const CActiveMasternodeManager* const m_mn_activeman;
const CMasternodeSync& m_mn_sync;
Expand All @@ -254,7 +256,8 @@ class CQuorumManager
public:
CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate, CConnman& _connman, CDeterministicMNManager& dmnman,
CDKGSessionManager& _dkgManager, CEvoDB& _evoDb, CQuorumBlockProcessor& _quorumBlockProcessor,
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync, const CSporkManager& sporkman);
const CActiveMasternodeManager* const mn_activeman, const CMasternodeSync& mn_sync,
const CSporkManager& sporkman, bool unit_tests, bool wipe);
~CQuorumManager() { Stop(); };

void Start();
Expand Down Expand Up @@ -294,6 +297,7 @@ class CQuorumManager
void StartQuorumDataRecoveryThread(const CQuorumCPtr pQuorum, const CBlockIndex* pIndex, uint16_t nDataMask) const;

void StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex) const;
void MigrateOldQuorumDB(CEvoDB& evoDb) const;
};

// when selecting a quorum for signing and verification, we use CQuorumManager::SelectQuorum with this offset as
Expand Down
Loading