Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

Fix race condition which permanently pauses sync #5865

Merged
merged 3 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- Fixed: [#5852](https://github.com/ethereum/aleth/pull/5852) Output correct original opcodes instead of synthetic `PUSHC`/`JUMPC`/`JUMPCI` in VM trace.
- Fixed: [#5829](https://github.com/ethereum/aleth/pull/5829) web3.eth.getBlock now returns block size in bytes. This requires a (automatic) database rebuild which can take a while depending on how many blocks are in the local chain.
- Fixed: [#5866](https://github.com/ethereum/aleth/pull/5866) Update output of `debug_accountRangeAt` and `eth_getTransactionCount` RPC functions to conform to Geth's output.
- Fixed: [#5865](https://github.com/ethereum/aleth/pull/5865) Fix bug which causes syncing to become permanently stuck.

## [1.7.2] - 2019-11-22

Expand Down
30 changes: 17 additions & 13 deletions libethereum/BlockChainSync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,18 @@ BlockChainSync::BlockChainSync(EthereumCapability& _host)
m_lastImportedBlock(m_startingBlock),
m_lastImportedBlockHash(_host.chain().currentHash())
{
m_bqRoomAvailable = host().bq().onRoomAvailable([this]() {
// Ensure that syncing occurs on the network thread (since the block queue onRoomAvailable
// handler can be called on the client thread)
host().capabilityHost().postWork([this]() {
RecursiveGuard l(x_sync);
m_state = SyncState::Blocks;
continueSync();
});
m_bqBlocksDrained = host().bq().onBlocksDrained([this]() {
if (isSyncPaused() && !host().bq().knownFull())
{
// Draining freed up space in the block queue. Let's resume syncing.
// Ensure that syncing occurs on the network thread (since the block queue handler is
// called on the client thread
host().capabilityHost().postWork([this]() {
RecursiveGuard l(x_sync);
m_state = SyncState::Blocks;
continueSync();
});
}
});
}

Expand Down Expand Up @@ -250,10 +254,8 @@ void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
u256 td = host().chain().details().totalDifficulty;
if (host().bq().isActive())
td += host().bq().difficulty();

u256 syncingDifficulty = std::max(m_syncingTotalDifficulty, td);

u256 peerTotalDifficulty = peer.totalDifficulty();
u256 const syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
u256 const peerTotalDifficulty = peer.totalDifficulty();

if (_force || peerTotalDifficulty > syncingDifficulty)
{
Expand All @@ -269,7 +271,9 @@ void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
LOG(m_logger) << "Syncing with peer " << peer.id();
m_state = SyncState::Blocks;
}
peer.requestBlockHeaders(peer.latestHash(), 1, 0, false);

// Request tip of peer's chain
peer.requestBlockHeaders(peer.latestHash(), 1 /* count */, 0 /* skip */, false /* reverse */);
peer.setWaitingForTransactions(true);
return;
}
Expand Down
7 changes: 6 additions & 1 deletion libethereum/BlockChainSync.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ class BlockChainSync final: public HasInvariants
};

EthereumCapability& m_host;
Handler<> m_bqRoomAvailable; ///< Triggered once block queue has space for more blocks

// Triggered once blocks have been drained from the block queue, potentially freeing up space
// for more blocks. Note that the block queue can still be full after a drain, depending on how
// many blocks are in the queue vs how many are being drained.
Handler<> m_bqBlocksDrained;

mutable RecursiveMutex x_sync;
/// Peers to which we've sent DAO request
std::set<NodeID> m_daoChallengedPeers;
Expand Down
5 changes: 1 addition & 4 deletions libethereum/BlockQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,9 @@ std::size_t BlockQueue::unknownCount() const

void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
{
bool wasFull = false;
DEV_WRITE_GUARDED(m_lock)
{
DEV_INVARIANT_CHECK;
wasFull = knownFull();
if (m_drainingSet.empty())
{
m_drainingDifficulty = 0;
Expand All @@ -437,8 +435,7 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
}
}
}
if (wasFull && !knownFull())
m_onRoomAvailable();
m_onBlocksDrained();
}

bool BlockQueue::invariants() const
Expand Down
48 changes: 24 additions & 24 deletions libethereum/BlockQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class SizedBlockQueue
}

std::deque<T> m_queue;
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
};

template<class KeyType>
Expand Down Expand Up @@ -196,7 +196,7 @@ class SizedBlockMap
}

BlockMultimap m_map;
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
};

/**
Expand Down Expand Up @@ -251,13 +251,13 @@ class BlockQueue: HasInvariants
QueueStatus blockStatus(h256 const& _h) const;

Handler<> onReady(std::function<void(void)> _t) { return m_onReady.add(_t); }
Handler<> onRoomAvailable(std::function<void(void)> _t) { return m_onRoomAvailable.add(_t); }
Handler<> onBlocksDrained(std::function<void(void)> _t) { return m_onBlocksDrained.add(_t); }

template <class T> void setOnBad(T const& _t) { m_onBad = _t; }

bool knownFull() const;
bool unknownFull() const;
u256 difficulty() const; // Total difficulty of queueud blocks
u256 difficulty() const; // Total difficulty of queueud blocks
bool isActive() const;

private:
Expand All @@ -282,31 +282,31 @@ class BlockQueue: HasInvariants
std::size_t unknownSize() const;
std::size_t unknownCount() const;

BlockChain const* m_bc; ///< The blockchain into which our imports go.
BlockChain const* m_bc; ///< The blockchain into which our imports go.

mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown.
h256Hash m_drainingSet; ///< All blocks being imported.
h256Hash m_readySet; ///< All blocks ready for chain import.
h256Hash m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain.
SizedBlockMap<h256> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
h256Hash m_knownBad; ///< Set of blocks that we know will never be valid.
SizedBlockMap<time_t> m_future; ///< Set of blocks that are not yet valid. Ordered by timestamp
mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown.
h256Hash m_drainingSet; ///< All blocks being imported.
h256Hash m_readySet; ///< All blocks ready for chain import.
h256Hash m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain.
SizedBlockMap<h256> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
h256Hash m_knownBad; ///< Set of blocks that we know will never be valid.
SizedBlockMap<time_t> m_future; ///< Set of blocks that are not yet valid. Ordered by timestamp
h256Hash m_futureSet; ///< Set of all blocks that are not yet valid.
Signal<> m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast.
Signal<> m_onRoomAvailable; ///< Called when space for new blocks becomes availabe after a drain. Be nice and exit fast.
Signal<> m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast.
Signal<> m_onBlocksDrained; ///< Called when blocks have been drained from the block queue. Be nice and exit fast.

mutable Mutex m_verification; ///< Mutex that allows writing to m_verified, m_verifying and m_unverified.
std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry.
SizedBlockQueue<VerifiedBlock> m_verified; ///< List of blocks, in correct order, verified and ready for chain-import.
SizedBlockQueue<VerifiedBlock> m_verifying; ///< List of blocks being verified; as long as the block component (bytes) is empty, it's not finished.
SizedBlockQueue<UnverifiedBlock> m_unverified; ///< List of <block hash, parent hash, block data> in correct order, ready for verification.
mutable Mutex m_verification; ///< Mutex that allows writing to m_verified, m_verifying and m_unverified.
std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry.
SizedBlockQueue<VerifiedBlock> m_verified; ///< List of blocks, in correct order, verified and ready for chain-import.
SizedBlockQueue<VerifiedBlock> m_verifying; ///< List of blocks being verified; as long as the block component (bytes) is empty, it's not finished.
SizedBlockQueue<UnverifiedBlock> m_unverified; ///< List of <block hash, parent hash, block data> in correct order, ready for verification.

std::vector<std::thread> m_verifiers; ///< Threads who only verify.
std::atomic<bool> m_deleting = {false}; ///< Exit condition for verifiers.
std::vector<std::thread> m_verifiers; ///< Threads who only verify.
std::atomic<bool> m_deleting = {false}; ///< Exit condition for verifiers.

std::function<void(Exception&)> m_onBad; ///< Called if we have a block that doesn't verify.
u256 m_difficulty; ///< Total difficulty of blocks in the queue
u256 m_drainingDifficulty; ///< Total difficulty of blocks in draining
std::function<void(Exception&)> m_onBad; ///< Called if we have a block that doesn't verify.
u256 m_difficulty; ///< Total difficulty of blocks in the queue
u256 m_drainingDifficulty; ///< Total difficulty of blocks in draining

Logger m_logger{createLogger(VerbosityDebug, "bq")};
Logger m_loggerDetail{createLogger(VerbosityTrace, "bq")};
Expand Down