Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EC-343] Add block header validation during fast sync #380

Merged
merged 6 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,11 @@ mantis {
# on - missing state node will be redownloaded from a peer and block execution will be retried. This can repeat
# several times until block execution succeeds
redownload-missing-state-nodes = on

# See: https://github.com/ethereum/go-ethereum/pull/1889
fast-sync-block-validation-k = 100
fast-sync-block-validation-n = 2048
fast-sync-block-validation-x = 50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you choose this X value? In the PR (white paper) X=24

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, so you doubled it because we treat target/pivot differently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was not sure how to interpret that sentence, but my intuition is that validating pivot - 2x to pivot is the same as validating pivot - x to pivot + x. (and validation after pivot happens anyway in regular sync).

}

pruning {
Expand Down
111 changes: 83 additions & 28 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.validators.Validators
import org.spongycastle.util.encoders.Hex

import scala.annotation.tailrec
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Random, Success, Try}

class FastSync(
val fastSyncStateStorage: FastSyncStateStorage,
Expand Down Expand Up @@ -153,11 +154,86 @@ class FastSync(
assignedHandlers -= handler
}

private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]) = {
if (checkHeadersChain(headers)) insertHeaders(headers)
else blacklist(peer.id, blacklistDuration, "error in block headers response")
private def insertHeader(header: BlockHeader): Boolean = {
val parentTdOpt = blockchain.getTotalDifficultyByHash(header.parentHash)
parentTdOpt foreach { parentTotalDifficulty =>
blockchain.save(header)
blockchain.save(header.hash, parentTotalDifficulty + header.difficulty)

processSyncing()
if (header.number > syncState.bestBlockHeaderNumber) {
syncState = syncState.copy(bestBlockHeaderNumber = header.number)
}

syncState = syncState
.enqueueBlockBodies(Seq(header.hash))
.enqueueReceipts(Seq(header.hash))
}

parentTdOpt.isDefined
}

private def discardLastBlocks(startBlock: BigInt, blocksToDiscard: Int): Unit = {
(startBlock to ((startBlock - blocksToDiscard) max 1) by -1).foreach { n =>
blockchain.getBlockHeaderByNumber(n).foreach { headerToRemove =>
blockchain.removeBlock(headerToRemove.hash, saveParentAsBestBlock = false)
}
}
blockchain.saveBestBlockNumber((startBlock - blocksToDiscard - 1) max 0)
}

@tailrec
private def processHeaders(peer: Peer, headers: Seq[BlockHeader]): Boolean = {
import syncConfig.{fastSyncBlockValidationK => K, fastSyncBlockValidationN => N, fastSyncBlockValidationX => X}
if (headers.nonEmpty) {
val header = headers.head
val remaining = headers.tail

val shouldValidate = header.number >= syncState.nextBlockToFullyValidate
if (shouldValidate) {
syncState = syncState.copy(
nextBlockToFullyValidate =
if (syncState.bestBlockHeaderNumber >= syncState.targetBlock.number - X) header.number + 1
else header.number + K / 2 + Random.nextInt(K))

validators.blockHeaderValidator.validate(header, blockchain) match {
case Right(_) =>
if (insertHeader(header)) processHeaders(peer, remaining)
else true

case Left(error) =>
log.warning(s"Block header validation failed during fast sync at block ${header.number}: $error")

if (header.number == syncState.targetBlock.number) {
// target validation failed, declare a failure and stop syncing
log.warning(s"Sync failure! Block header validation failed at fast sync target block. Blockchain state may be invalid.")
sys.exit(1)
false
} else {
// validation failed at non-target block, discard N blocks and resume
blacklist(peer.id, blacklistDuration, "block header validation failed")
discardLastBlocks(header.number, N)
syncState = syncState.copy(
blockBodiesQueue = Seq.empty,
receiptsQueue = Seq.empty,
bestBlockHeaderNumber = (header.number - N - 1) max 0,
nextBlockToFullyValidate = (header.number - N) max 1)
true // do not process remaining headers and resume
}
}
} else {
if (insertHeader(header)) processHeaders(peer, remaining)
else true // do not process remaining headers and resume
}
} else true
}

private def handleBlockHeaders(peer: Peer, headers: Seq[BlockHeader]) = {
if (checkHeadersChain(headers)) {
if (processHeaders(peer, headers)) processSyncing()
} else {
blacklist(peer.id, blacklistDuration, "error in block headers response")
processSyncing()
}
}

private def handleBlockBodies(peer: Peer, requestedHashes: Seq[ByteString], blockBodies: Seq[BlockBody]) = {
Expand Down Expand Up @@ -384,28 +460,6 @@ class FastSync(
}
}

private def insertHeaders(headers: Seq[BlockHeader]): Unit = {
val blockHeadersObtained = headers.takeWhile { header =>
val parentTd: Option[BigInt] = blockchain.getTotalDifficultyByHash(header.parentHash)
parentTd foreach { parentTotalDifficulty =>
blockchain.save(header)
blockchain.save(header.hash, parentTotalDifficulty + header.difficulty)
}
parentTd.isDefined
}

blockHeadersObtained.lastOption.foreach { lastHeader =>
if (lastHeader.number > syncState.bestBlockHeaderNumber) {
syncState = syncState.copy(bestBlockHeaderNumber = lastHeader.number)
}
}

val blockHashes = blockHeadersObtained.map(_.hash)
syncState = syncState
.enqueueBlockBodies(blockHashes)
.enqueueReceipts(blockHashes)
}

def processSyncing(): Unit = {
if (fullySynced) {
finish()
Expand Down Expand Up @@ -595,7 +649,8 @@ object FastSync {
blockBodiesQueue: Seq[ByteString] = Nil,
receiptsQueue: Seq[ByteString] = Nil,
downloadedNodesCount: Int = 0,
bestBlockHeaderNumber: BigInt = 0) {
bestBlockHeaderNumber: BigInt = 0,
nextBlockToFullyValidate: BigInt = 1) {

def enqueueBlockBodies(blockBodies: Seq[ByteString]): SyncState =
copy(blockBodiesQueue = blockBodiesQueue ++ blockBodies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ trait SyncBlocksValidator {
result
}


def checkHeadersChain(headers: Seq[BlockHeader]): Boolean =
if (headers.length > 1) headers.zip(headers.tail).forall { case (parent, child) => parent.hash == child.parentHash && parent.number + 1 == child.number }
else true
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/io/iohk/ethereum/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ object Config {
maxNewBlockHashAge: Int,
maxNewHashes: Int,

redownloadMissingStateNodes: Boolean
redownloadMissingStateNodes: Boolean,

fastSyncBlockValidationK: Int,
fastSyncBlockValidationN: Int,
fastSyncBlockValidationX: Int
)

object SyncConfig {
Expand Down Expand Up @@ -182,7 +186,11 @@ object Config {
maxNewBlockHashAge = syncConfig.getInt("max-new-block-hash-age"),
maxNewHashes = syncConfig.getInt("max-new-hashes"),

redownloadMissingStateNodes = syncConfig.getBoolean("redownload-missing-state-nodes")
redownloadMissingStateNodes = syncConfig.getBoolean("redownload-missing-state-nodes"),

fastSyncBlockValidationK = syncConfig.getInt("fast-sync-block-validation-k"),
fastSyncBlockValidationN = syncConfig.getInt("fast-sync-block-validation-n"),
fastSyncBlockValidationX = syncConfig.getInt("fast-sync-block-validation-x")
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ class RegularSyncSpec extends TestKit(ActorSystem("RegularSync_system")) with Wo
maxQueuedBlockNumberBehind = 10,
maxNewBlockHashAge = 20,
maxNewHashes = 64,
redownloadMissingStateNodes = true
redownloadMissingStateNodes = true,
fastSyncBlockValidationK = 100,
fastSyncBlockValidationN = 2048,
fastSyncBlockValidationX = 50
)

val syncConfig = defaultSyncConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBody, _}
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, GetReceipts, NodeData, Receipts}
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer}
import io.iohk.ethereum.utils.Config.SyncConfig
import io.iohk.ethereum.validators.BlockHeaderError.HeaderPoWError
import io.iohk.ethereum.validators.{BlockHeaderValidator, Validators}
import io.iohk.ethereum.{Fixtures, Mocks}
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
Expand Down Expand Up @@ -172,6 +174,66 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))
}

it should "handle blocks that fail validation" in new TestSetup(validators = new Mocks.MockValidatorsAlwaysSucceed {
override val blockHeaderValidator: BlockHeaderValidator = { (blockHeader, getBlockHeaderByHash) => Left(HeaderPoWError) }
}) {
val peer2TestProbe: TestProbe = TestProbe()(system)
val peer2 = Peer(new InetSocketAddress("127.0.0.1", 0), peer2TestProbe.ref, false)


val expectedTargetBlock = 399500
val targetBlockHeader: BlockHeader = baseBlockHeader.copy(
number = expectedTargetBlock,
stateRoot = ByteString(Hex.decode("deae1dfad5ec8dcef15915811e1f044d2543674fd648f94345231da9fc2646cc")))
val bestBlockHeaderNumber: BigInt = targetBlockHeader.number - 2
storagesInstance.storages.fastSyncStateStorage.putSyncState(SyncState(targetBlockHeader)
.copy(bestBlockHeaderNumber = bestBlockHeaderNumber,
pendingMptNodes = Seq(StateMptNodeHash(targetBlockHeader.stateRoot))))

Thread.sleep(1.seconds.toMillis)

val peer2Status = Status(1, 1, 20, ByteString("peer2_bestHash"), ByteString("unused"))

syncController ! SyncController.Start

val handshakedPeers = HandshakedPeers(Map(
peer2 -> PeerInfo(peer2Status, forkAccepted = true, totalDifficulty = peer2Status.totalDifficulty, maxBlockNumber = 0)))

etcPeerManager.send(syncController.getSingleChild("fast-sync").getChild(Seq("target-block-selector").toIterator), handshakedPeers)
etcPeerManager.send(syncController.getSingleChild("fast-sync"), handshakedPeers)

val stateMptLeafWithAccount =
ByteString(Hex.decode("f86d9e328415c225a782bb339b22acad1c739e42277bc7ef34de3623114997ce78b84cf84a0186cb7d8738d800a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a0c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"))

val watcher = TestProbe()
watcher.watch(syncController)

etcPeerManager.expectMsg(
EtcPeerManagerActor.SendMessage(GetNodeData(Seq(targetBlockHeader.stateRoot)), peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer2.id))))
peerMessageBus.reply(MessageFromPeer(NodeData(Seq(stateMptLeafWithAccount)), peer2.id))
peerMessageBus.expectMsg(Unsubscribe())

//wait for peers throttle
Thread.sleep(syncConfig.fastSyncThrottle.toMillis)
//trigger scheduling

etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(
GetBlockHeaders(Left(targetBlockHeader.number - 1), expectedTargetBlock - bestBlockHeaderNumber, 0, reverse = false),
peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))
peerMessageBus.reply(MessageFromPeer(BlockHeaders(Seq(targetBlockHeader.copy(number = targetBlockHeader.number - 1))), peer2.id))
peerMessageBus.expectMsg(Unsubscribe())

syncController.getSingleChild("fast-sync") ! FastSync.PersistSyncState
Thread.sleep(200)
val syncState = storagesInstance.storages.fastSyncStateStorage.getSyncState().get
syncState.bestBlockHeaderNumber shouldBe (bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN)
syncState.blockBodiesQueue.isEmpty shouldBe true
syncState.receiptsQueue.isEmpty shouldBe true
syncState.nextBlockToFullyValidate shouldBe (bestBlockHeaderNumber - syncConfig.fastSyncBlockValidationN + 1)
}

it should "throttle requests to peer" in new TestSetup() {
val peerTestProbe: TestProbe = TestProbe()(system)
val peer = Peer(new InetSocketAddress("127.0.0.1", 0), peerTestProbe.ref, incomingConnection = false)
Expand Down Expand Up @@ -389,7 +451,7 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
EtcPeerManagerActor.SendMessage(GetNodeData(Seq(ByteString("node_hash"))), peer.id))
}

class TestSetup(blocksForWhichLedgerFails: Seq[BigInt] = Nil) extends EphemBlockchainTestSetup {
class TestSetup(blocksForWhichLedgerFails: Seq[BigInt] = Nil, validators: Validators = new Mocks.MockValidatorsAlwaysSucceed) extends EphemBlockchainTestSetup {

private def isNewBlock(msg: Message): Boolean = msg match {
case _: NewBlock => true
Expand Down Expand Up @@ -438,7 +500,10 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
maxQueuedBlockNumberBehind = 10,
maxNewBlockHashAge = 20,
maxNewHashes = 64,
redownloadMissingStateNodes = false
redownloadMissingStateNodes = false,
fastSyncBlockValidationK = 100,
fastSyncBlockValidationN = 2048,
fastSyncBlockValidationX = 50
)

lazy val syncConfig = defaultSyncConfig
Expand All @@ -448,7 +513,7 @@ class SyncControllerSpec extends FlatSpec with Matchers with BeforeAndAfter with
blockchain,
storagesInstance.storages.fastSyncStateStorage,
ledger,
new Mocks.MockValidatorsAlwaysSucceed,
validators,
peerMessageBus.ref, pendingTransactionsManager.ref, ommersPool.ref, etcPeerManager.ref,
syncConfig,
() => (),
Expand Down