Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NODE-2329 Reestabilish input stream on unexpected read bytes size #3704

Open
wants to merge 4 commits into
base: version-1.4.x
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
package com.wavesplatform

import java.io._
import java.net.{MalformedURLException, URL}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

import akka.actor.ActorSystem
import com.google.common.io.ByteStreams
import com.google.common.primitives.Ints
Expand All @@ -16,7 +9,7 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonB
import com.wavesplatform.block.{Block, BlockHeader}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.consensus.PoSSelector
import com.wavesplatform.database.{openDB, DBExt, KeyTags}
import com.wavesplatform.database.{DBExt, KeyTags, openDB}
import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent}
import com.wavesplatform.extensions.{Context, Extension}
import com.wavesplatform.features.BlockchainFeatures
Expand All @@ -25,12 +18,12 @@ import com.wavesplatform.lang.ValidationError
import com.wavesplatform.mining.Miner
import com.wavesplatform.protobuf.block.PBBlocks
import com.wavesplatform.settings.WavesSettings
import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height}
import com.wavesplatform.state.appender.BlockAppender
import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction}
import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height}
import com.wavesplatform.transaction.TxValidationError.GenericError
import com.wavesplatform.transaction.smart.script.trace.TracedResult
import com.wavesplatform.utils._
import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction}
import com.wavesplatform.utils.*
import com.wavesplatform.utx.{UtxPool, UtxPoolImpl}
import com.wavesplatform.wallet.Wallet
import kamon.Kamon
Expand All @@ -40,6 +33,12 @@ import monix.reactive.{Observable, Observer}
import org.iq80.leveldb.DB
import scopt.OParser

import java.io.*
import java.net.{MalformedURLException, URL}
import scala.concurrent.duration.*
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success, Try}

object Importer extends ScorexLogging {
import monix.execution.Scheduler.Implicits.global

Expand All @@ -59,7 +58,7 @@ object Importer extends ScorexLogging {
import scopt.OParser

val builder = OParser.builder[ImportOptions]
import builder._
import builder.*

OParser.sequence(
programName("waves import"),
Expand Down Expand Up @@ -167,12 +166,13 @@ object Importer extends ScorexLogging {
}
}

@volatile private var quit = false
private val lock = new Object
@volatile private var quit = false
@volatile private var inputStream: InputStream = null
private val lock = new Object

//noinspection UnstableApiUsage
// noinspection UnstableApiUsage
def startImport(
inputStream: BufferedInputStream,
getInputStream: () => InputStream,
blockchain: Blockchain,
appendBlock: AppendBlock,
importOptions: ImportOptions,
Expand All @@ -189,13 +189,15 @@ object Importer extends ScorexLogging {
if (blocksToSkip > 0) log.info(s"Skipping $blocksToSkip block(s)")

sys.addShutdownHook {
import scala.concurrent.duration._
import scala.concurrent.duration.*
val millis = (System.nanoTime() - start).nanos.toMillis
log.info(
s"Imported $counter block(s) from $startHeight to ${startHeight + counter} in ${humanReadableDuration(millis)}"
)
}

inputStream = getInputStream()

while (!quit && counter < blocksToApply) lock.synchronized {
val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES)
if (s1 == Ints.BYTES) {
Expand Down Expand Up @@ -236,7 +238,9 @@ object Importer extends ScorexLogging {
}
} else {
log.info(s"$factReadSize != expected $blockSize")
quit = true
log.info(s"reestablishing input stream")
inputStream.close()
inputStream = getInputStream()
}
} else {
if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})")
Expand Down Expand Up @@ -286,7 +290,7 @@ object Importer extends ScorexLogging {
val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, db, actorSystem)
checkGenesis(settings, blockchainUpdater, Miner.Disabled)

val importFileOffset =
def importFileOffset() =
if (importOptions.dryRun) 0
else
importOptions.format match {
Expand All @@ -304,7 +308,9 @@ object Importer extends ScorexLogging {

case _ => 0L
}
val inputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset), 2 * 1024 * 1024)

def establishInputStream() =
new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset()), 2 * 1024 * 1024)

if (importOptions.dryRun) {
def readNextBlock(): Future[Option[Block]] = Future.successful(None)
Expand Down Expand Up @@ -353,10 +359,10 @@ object Importer extends ScorexLogging {
levelDb.close()
db.close()
}
inputStream.close()
if (inputStream != null) inputStream.close()
}

startImport(inputStream, blockchainUpdater, extAppender, importOptions, importFileOffset == 0)
startImport(() => establishInputStream(), blockchainUpdater, extAppender, importOptions, importFileOffset() == 0)
Await.result(Kamon.stopModules(), 10.seconds)
}
}