Skip to content

Commit

Permalink
Merge #8969: Decouple peer-processing-logic from block-connection-log…
Browse files Browse the repository at this point in the history
…ic (ElementsProject#2)

f5b960b Move nTimeBestReceived updating into net processing code (Matt Corallo)
d8670fb Move all calls to CheckBlockIndex out of net-processing logic (Matt Corallo)
d6ea737 Remove network state wipe from UnloadBlockIndex. (Matt Corallo)
fc0c24f Move MarkBlockAsReceived out of ProcessNewMessage (Matt Corallo)
65f35eb Move FlushStateToDisk call out of ProcessMessages::TX into ATMP (Matt Corallo)
  • Loading branch information
laanwj committed Nov 3, 2016
2 parents fcf61b8 + f5b960b commit 3665483
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 29 deletions.
4 changes: 4 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,10 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
return false;
#endif
// ********************************************************* Step 6: network initialization
// Note that we absolutely cannot open any actual connections
// until the very end ("start node") as the UTXO/block state
// is not yet setup and may end up being set up twice if we
// need to reindex later.

assert(!g_connman);
g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max())));
Expand Down
72 changes: 44 additions & 28 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ CCriticalSection cs_main;
BlockMap mapBlockIndex;
CChain chainActive;
CBlockIndex *pindexBestHeader = NULL;
int64_t nTimeBestReceived = 0;
int64_t nTimeBestReceived = 0; // Used only to inform the wallet of when we last received a block
CWaitableCriticalSection csBestBlock;
CConditionVariable cvBlockChange;
int nScriptCheckThreads = 0;
Expand Down Expand Up @@ -691,6 +691,16 @@ CBlockIndex* FindForkInGlobalIndex(const CChain& chain, const CBlockLocator& loc
CCoinsViewCache *pcoinsTip = NULL;
CBlockTreeDB *pblocktree = NULL;

enum FlushStateMode {
FLUSH_STATE_NONE,
FLUSH_STATE_IF_NEEDED,
FLUSH_STATE_PERIODIC,
FLUSH_STATE_ALWAYS
};

// See definition for documentation
bool static FlushStateToDisk(CValidationState &state, FlushStateMode mode);

//////////////////////////////////////////////////////////////////////////////
//
// mapOrphanTransactions
Expand Down Expand Up @@ -1581,6 +1591,9 @@ bool AcceptToMemoryPoolWithTime(CTxMemPool& pool, CValidationState &state, const
BOOST_FOREACH(const uint256& hashTx, vHashTxToUncache)
pcoinsTip->Uncache(hashTx);
}
// After we've (potentially) uncached entries, ensure our coins cache is still within its size limits
CValidationState stateDummy;
FlushStateToDisk(stateDummy, FLUSH_STATE_PERIODIC);
return res;
}

Expand Down Expand Up @@ -2565,13 +2578,6 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
return true;
}

enum FlushStateMode {
FLUSH_STATE_NONE,
FLUSH_STATE_IF_NEEDED,
FLUSH_STATE_PERIODIC,
FLUSH_STATE_ALWAYS
};

/**
* Update the on-disk chain state.
* The caches and indexes are flushed depending on the mode we're called with
Expand Down Expand Up @@ -2691,7 +2697,6 @@ void static UpdateTip(CBlockIndex *pindexNew, const CChainParams& chainParams) {
chainActive.SetTip(pindexNew);

// New best block
nTimeBestReceived = GetTime();
mempool.AddTransactionsUpdated(1);

cvBlockChange.notify_all();
Expand Down Expand Up @@ -3676,6 +3681,8 @@ static bool AcceptBlockHeader(const CBlockHeader& block, CValidationState& state
if (ppindex)
*ppindex = pindex;

CheckBlockIndex(chainparams.GetConsensus());

return true;
}

Expand Down Expand Up @@ -3703,6 +3710,11 @@ static bool AcceptBlock(const CBlock& block, CValidationState& state, const CCha
// not process unrequested blocks.
bool fTooFarAhead = (pindex->nHeight > int(chainActive.Height() + MIN_BLOCKS_TO_KEEP));

// TODO: Decouple this function from the block download logic by removing fRequested
// This requires some new chain datastructure to efficiently look up if a
// block is in a chain leading to a candidate for best tip, despite not
// being such a candidate itself.

// TODO: deal better with return value and error conditions for duplicate
// and unrequested blocks.
if (fAlreadyHave) return true;
Expand Down Expand Up @@ -3751,13 +3763,11 @@ bool ProcessNewBlock(CValidationState& state, const CChainParams& chainparams, C
{
{
LOCK(cs_main);
bool fRequested = MarkBlockAsReceived(pblock->GetHash());
fRequested |= fForceProcessing;

// Store to disk
CBlockIndex *pindex = NULL;
bool fNewBlock = false;
bool ret = AcceptBlock(*pblock, state, chainparams, &pindex, fRequested, dbp, &fNewBlock);
bool ret = AcceptBlock(*pblock, state, chainparams, &pindex, fForceProcessing, dbp, &fNewBlock);
if (pindex && pfrom) {
mapBlockSource[pindex->GetBlockHash()] = pfrom->GetId();
if (fNewBlock) pfrom->nLastBlockTime = GetTime();
Expand Down Expand Up @@ -4269,6 +4279,9 @@ bool RewindBlockIndex(const CChainParams& params)
return true;
}

// May NOT be used after any connections are up as much
// of the peer-processing logic assumes a consistent
// block index state
void UnloadBlockIndex()
{
LOCK(cs_main);
Expand All @@ -4279,18 +4292,12 @@ void UnloadBlockIndex()
mempool.clear();
mapOrphanTransactions.clear();
mapOrphanTransactionsByPrev.clear();
nSyncStarted = 0;
mapBlocksUnlinked.clear();
vinfoBlockFile.clear();
nLastBlockFile = 0;
nBlockSequenceId = 1;
mapBlockSource.clear();
mapBlocksInFlight.clear();
nPreferredDownload = 0;
setDirtyBlockIndex.clear();
setDirtyFileInfo.clear();
mapNodeState.clear();
recentRejects.reset(NULL);
versionbitscache.Clear();
for (int b = 0; b < VERSIONBITS_NUM_BITS; b++) {
warningcache[b].clear();
Expand All @@ -4315,9 +4322,6 @@ bool InitBlockIndex(const CChainParams& chainparams)
{
LOCK(cs_main);

// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));

// Check whether we're already initialized
if (chainActive.Genesis() != NULL)
return true;
Expand Down Expand Up @@ -4706,6 +4710,11 @@ std::string GetWarnings(const std::string& strFor)
// blockchain -> download logic notification
//

PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn) : connman(connmanIn) {
// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
}

void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
const int nNewHeight = pindexNew->nHeight;
connman->SetBestHeight(nNewHeight);
Expand All @@ -4732,6 +4741,8 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB
}
});
}

nTimeBestReceived = GetTime();
}

void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationState& state) {
Expand Down Expand Up @@ -5690,7 +5701,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
Misbehaving(pfrom->GetId(), nDoS);
}
}
FlushStateToDisk(state, FLUSH_STATE_PERIODIC);
}


Expand Down Expand Up @@ -5826,8 +5836,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman);
}
}

CheckBlockIndex(chainparams.GetConsensus());
}

else if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing
Expand Down Expand Up @@ -5859,12 +5867,16 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
std::vector<CInv> invs;
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus()), resp.blockhash));
pfrom->PushMessage(NetMsgType::GETDATA, invs);
} else
} else {
MarkBlockAsReceived(resp.blockhash); // it is now an empty pointer
fBlockRead = true;
}
} // Don't hold cs_main when we call into ProcessNewBlock
if (fBlockRead) {
CValidationState state;
ProcessNewBlock(state, chainparams, pfrom, &block, false, NULL);
// Since we requested this block (it was in mapBlocksInFlight), force it to be processed,
// even if it would not be a candidate for new tip (missing previous block, chain not long enough, etc)
ProcessNewBlock(state, chainparams, pfrom, &block, true, NULL);
int nDoS;
if (state.IsInvalid(nDoS)) {
assert (state.GetRejectCode() < REJECT_INTERNAL); // Blocks are never rejected with internal reject codes
Expand Down Expand Up @@ -6020,8 +6032,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
}
}

CheckBlockIndex(chainparams.GetConsensus());
}

NotifyHeaderTip();
Expand All @@ -6040,6 +6050,12 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Such an unrequested block may still be processed, subject to the
// conditions in AcceptBlock().
bool forceProcessing = pfrom->fWhitelisted && !IsInitialBlockDownload();
{
LOCK(cs_main);
// Also always process if we requested the block explicitly, as we may
// need it even though it is not a candidate for a new best tip.
forceProcessing |= MarkBlockAsReceived(block.GetHash());
}
ProcessNewBlock(state, chainparams, pfrom, &block, forceProcessing, NULL);
int nDoS;
if (state.IsInvalid(nDoS)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ class PeerLogicValidation : public CValidationInterface {
CConnman* connman;

public:
PeerLogicValidation(CConnman* connmanIn) : connman(connmanIn) {}
PeerLogicValidation(CConnman* connmanIn);

virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
virtual void BlockChecked(const CBlock& block, const CValidationState& state);
Expand Down

0 comments on commit 3665483

Please sign in to comment.