Skip to content

Commit

Permalink
Refactor from ficus to pureconfig (#3976)
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirlogachev authored Nov 19, 2024
1 parent 0720c42 commit 4e408d5
Show file tree
Hide file tree
Showing 43 changed files with 599 additions and 641 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.wavesplatform.state

import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus._
import pureconfig.ConfigSource
import pureconfig.generic.auto.*

case class Settings(
networkConfigFile: String,
Expand All @@ -15,7 +16,6 @@ case class Settings(

object Settings {
def fromConfig(config: Config): Settings = {
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
config.as[Settings]("waves.benchmark.state")
ConfigSource.fromConfig(config).at("waves.benchmark.state").loadOrThrow[Settings]
}
}
12 changes: 8 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ lazy val `node-tests` = project

lazy val `grpc-server` =
project.dependsOn(node % "compile;runtime->provided", `node-testkit`, `node-tests` % "test->test")
lazy val `ride-runner` = project.dependsOn(node, `grpc-server`, `node-tests` % "test->test")
lazy val `node-it` = project.dependsOn(`repl-jvm`, `grpc-server`, `node-tests` % "test->test")
lazy val `node-generator` = project.dependsOn(node, `node-testkit`, `node-tests` % "compile->test")
lazy val benchmark = project.dependsOn(node, `node-tests` % "test->test")
lazy val `ride-runner` = project.dependsOn(node, `grpc-server`, `node-tests` % "test->test")
lazy val `node-it` = project.dependsOn(`repl-jvm`, `grpc-server`, `node-tests` % "test->test")
lazy val `node-generator` = project
.dependsOn(node, `node-testkit`, `node-tests` % "compile->test")
.settings(
libraryDependencies += "com.iheart" %% "ficus" % "1.5.2"
)
lazy val benchmark = project.dependsOn(node, `node-tests` % "test->test")

lazy val repl = crossProject(JSPlatform, JVMPlatform)
.withoutSuffixFor(JVMPlatform)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import io.grpc.protobuf.services.ProtoReflectionService
import monix.execution.Scheduler
import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase
import net.ceedubs.ficus.Ficus.*
import net.ceedubs.ficus.readers.ArbitraryTypeReader.*
import pureconfig.ConfigSource
import pureconfig.generic.auto.*

import java.net.InetSocketAddress
import java.util.concurrent.Executors
import scala.concurrent.Future

class GRPCServerExtension(context: ExtensionContext) extends Extension with ScorexLogging {
private val settings = context.settings.config.as[GRPCSettings]("waves.grpc")
private val settings = ConfigSource.fromConfig(context.settings.config).at("waves.grpc").loadOrThrow[GRPCSettings]
private val executor = Executors.newFixedThreadPool(settings.workerThreads, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("grpc-server-worker-%d").build())
private implicit val apiScheduler: Scheduler = Scheduler(executor)
private val bindAddress = new InetSocketAddress(settings.host, settings.port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.{Metadata, Server, ServerStreamTracer, Status}
import monix.execution.schedulers.SchedulerService
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase
import net.ceedubs.ficus.Ficus.*
import net.ceedubs.ficus.readers.ArbitraryTypeReader.*
import org.rocksdb.RocksDB
import pureconfig.ConfigSource
import pureconfig.generic.auto.*

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
Expand All @@ -24,7 +23,7 @@ import scala.concurrent.duration.*
import scala.util.Try

class BlockchainUpdates(private val context: Context) extends Extension with ScorexLogging with BlockchainUpdateTriggers {
private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
private[this] val settings = ConfigSource.fromConfig(context.settings.config).at("waves.blockchain-updates").loadOrThrow[BlockchainUpdatesSettings]
private[this] implicit val scheduler: SchedulerService = Schedulers.fixedPool(
settings.workerThreads,
"blockchain-updates",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package com.wavesplatform.generator

import java.io.File
import java.net.InetSocketAddress
import java.net.{InetSocketAddress, URI}
import java.util.concurrent.Executors

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.*
import scala.concurrent.duration.*
import scala.util.{Failure, Random, Success}

import cats.implicits.showInterpolator
import com.typesafe.config.ConfigFactory
import com.typesafe.config.{Config, ConfigFactory}
import com.wavesplatform.account.AddressScheme
import com.wavesplatform.features.EstimatorProvider._
import com.wavesplatform.features.EstimatorProvider.*
import com.wavesplatform.generator.GeneratorSettings.NodeAddress
import com.wavesplatform.generator.Preconditions.{PGenSettings, UniverseHolder}
import com.wavesplatform.generator.cli.ScoptImplicits
Expand All @@ -22,20 +20,24 @@ import com.wavesplatform.transaction.Transaction
import com.wavesplatform.utils.{LoggerFacade, NTP}
import com.wavesplatform.Application
import monix.execution.Scheduler
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.Ficus.*
import net.ceedubs.ficus.readers.{EnumerationReader, NameMapper, ValueReader}
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import net.ceedubs.ficus.readers.ArbitraryTypeReader.*
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.Dsl.asyncHttpClient
import org.slf4j.LoggerFactory
import scopt.OptionParser

object TransactionsGeneratorApp extends App with ScoptImplicits with FicusImplicits with EnumerationReader {

implicit val inetSocketAddressReader: ValueReader[InetSocketAddress] = { (config: Config, path: String) =>
val uri = new URI(s"my://${config.getString(path)}")
new InetSocketAddress(uri.getHost, uri.getPort)
}

// IDEA bugs
implicit val inetSocketAddressReader: ValueReader[InetSocketAddress] = com.wavesplatform.settings.inetSocketAddressReader
implicit val readConfigInHyphen: NameMapper = net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase
implicit val httpClient: AsyncHttpClient = asyncHttpClient()
implicit val readConfigInHyphen: NameMapper = net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase
implicit val httpClient: AsyncHttpClient = asyncHttpClient()

val log = LoggerFacade(LoggerFactory.getLogger("generator"))

Expand Down Expand Up @@ -228,21 +230,20 @@ object TransactionsGeneratorApp extends App with ScoptImplicits with FicusImplic
log.info(s"Universe precondition tail transactions size: ${initialTailTransactions.size}")
log.info(s"Generator precondition tail transactions size: ${initialGenTailTransactions.size}")

val workers = finalConfig.sendTo.map {
case NodeAddress(node, nodeRestUrl) =>
log.info(s"Creating worker: ${node.getHostString}:${node.getPort}")
// new Worker(finalConfig.worker, sender, node, generator, initialTransactions.map(RawBytes.from))
new Worker(
finalConfig.worker,
Iterator.continually(generator.next()).flatten,
sender,
node,
nodeRestUrl,
() => canContinue,
initialUniTransactions ++ initialGenTransactions,
finalConfig.privateKeyAccounts.map(_.toAddress.toString),
initialTailTransactions ++ initialGenTailTransactions
)
val workers = finalConfig.sendTo.map { case NodeAddress(node, nodeRestUrl) =>
log.info(s"Creating worker: ${node.getHostString}:${node.getPort}")
// new Worker(finalConfig.worker, sender, node, generator, initialTransactions.map(RawBytes.from))
new Worker(
finalConfig.worker,
Iterator.continually(generator.next()).flatten,
sender,
node,
nodeRestUrl,
() => canContinue,
initialUniTransactions ++ initialGenTransactions,
finalConfig.privateKeyAccounts.map(_.toAddress.toString),
initialTailTransactions ++ initialGenTailTransactions
)
}

def close(status: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.wavesplatform.history.StorageFactory
import com.wavesplatform.settings.*
import com.wavesplatform.transaction.Asset.Waves
import com.wavesplatform.utils.NTP
import net.ceedubs.ficus.Ficus.*
import pureconfig.ConfigSource

object BaseTargetChecker {
def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -41,7 +41,7 @@ object BaseTargetChecker {
blockchainUpdater.processBlock(genesisBlock, genesisBlock.header.generationSignature, None)

NodeConfigs.Default.map(_.withFallback(sharedConfig)).collect {
case cfg if cfg.as[Boolean]("waves.miner.enable") =>
case cfg if ConfigSource.fromConfig(cfg).at("waves.miner.enable").loadOrThrow[Boolean] =>
val account = KeyPair.fromSeed(cfg.getString("account-seed")).explicitGet()
val address = account.toAddress
val balance = blockchainUpdater.balance(address, Waves)
Expand Down
24 changes: 15 additions & 9 deletions node-it/src/test/scala/com/wavesplatform/it/Docker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import com.wavesplatform.it.util.GlobalTimer.instance as timer
import com.wavesplatform.settings.*
import com.wavesplatform.utils.ScorexLogging
import monix.eval.Coeval
import net.ceedubs.ficus.Ficus.*
import net.ceedubs.ficus.readers.ArbitraryTypeReader.*
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.commons.compress.archivers.tar.TarArchiveEntry
import org.apache.commons.io.IOUtils
import org.asynchttpclient.Dsl.*
import pureconfig.ConfigSource
import pureconfig.generic.auto.*

import java.io.{FileOutputStream, IOException}
import java.net.{InetAddress, InetSocketAddress, URL}
Expand Down Expand Up @@ -306,7 +306,7 @@ class Docker(
private def getNodeInfo(containerId: String, settings: WavesSettings): NodeInfo = {
val restApiPort = settings.restAPISettings.port
// assume test nodes always have an open port
val networkPort = settings.networkSettings.bindAddress.get.getPort
val networkPort = settings.networkSettings.derivedBindAddress.get.getPort

val containerInfo = inspectContainer(containerId)
val wavesIpAddress = containerInfo.networkSettings().networks().get(wavesNetwork.name()).ipAddress()
Expand Down Expand Up @@ -581,22 +581,28 @@ object Docker {
|}""".stripMargin)

val genesisConfig = timestampOverrides.withFallback(configTemplate)
val gs = genesisConfig.as[GenesisSettings]("waves.blockchain.custom.genesis")
val features = featuresConfig
val gs = ConfigSource.fromConfig(genesisConfig).at("waves.blockchain.custom.genesis").loadOrThrow[GenesisSettings]
val featuresConfigAdjusted = featuresConfig
.map(_.withFallback(configTemplate))
.getOrElse(configTemplate)
.resolve()
.getAs[Map[Short, Int]]("waves.blockchain.custom.functionality.pre-activated-features")
val isRideV6Activated = features.exists(_.get(BlockchainFeatures.RideV6.id).contains(0))
val isTxStateSnapshotActivated = features.exists(_.get(BlockchainFeatures.LightNode.id).contains(0))
val features =
ConfigSource
.fromConfig(featuresConfigAdjusted)
.at("waves.blockchain.custom.functionality.pre-activated-features")
.loadOrThrow[Map[Short, Int]]

val isRideV6Activated = features.get(BlockchainFeatures.RideV6.id).contains(0)
val isTxStateSnapshotActivated = features.get(BlockchainFeatures.LightNode.id).contains(0)

val genesisSignature = Block.genesis(gs, isRideV6Activated, isTxStateSnapshotActivated).explicitGet().id()

parseString(s"waves.blockchain.custom.genesis.signature = $genesisSignature").withFallback(timestampOverrides)
}

AddressScheme.current = new AddressScheme {
override val chainId: Byte = configTemplate.as[String]("waves.blockchain.custom.address-scheme-character").charAt(0).toByte
override val chainId: Byte =
ConfigSource.fromConfig(configTemplate).at("waves.blockchain.custom.address-scheme-character").loadOrThrow[String].charAt(0).toByte
}

def apply(owner: Class[?]): Docker = new Docker(tag = owner.getSimpleName)
Expand Down
2 changes: 1 addition & 1 deletion node-it/src/test/scala/com/wavesplatform/it/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ abstract class Node(val config: Config) extends AutoCloseable {

object Node {
implicit class NodeExt(val n: Node) extends AnyVal {
def name: String = n.settings.networkSettings.nodeName
def name: String = n.settings.networkSettings.derivedNodeName
def publicKeyStr: String = n.publicKey.toString
def fee(txTypeId: Byte): Long = FeeValidation.FeeConstants(TransactionType(txTypeId)) * FeeValidation.FeeUnit
def blockDelay: FiniteDuration = n.settings.blockchainSettings.genesisSettings.averageBlockDelay
Expand Down
8 changes: 4 additions & 4 deletions node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
log.info(s"REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}")
}

for (addr <- settings.networkSettings.declaredAddress if settings.networkSettings.uPnPSettings.enable) {
for (addr <- settings.networkSettings.derivedDeclaredAddress if settings.networkSettings.uPnPSettings.enable) {
upnp.addPort(addr.getPort)
}

Expand All @@ -501,7 +501,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
log.info("Closing REST API")
if (settings.restAPISettings.enable)
Try(Await.ready(serverBinding.unbind(), 2.minutes)).failed.map(e => log.error("Failed to unbind REST API port", e))
for (addr <- settings.networkSettings.declaredAddress if settings.networkSettings.uPnPSettings.enable) upnp.deletePort(addr.getPort)
for (addr <- settings.networkSettings.derivedDeclaredAddress if settings.networkSettings.uPnPSettings.enable) upnp.deletePort(addr.getPort)

log.debug("Closing peer database")
peerDatabase.close()
Expand Down Expand Up @@ -604,12 +604,12 @@ object Application extends ScorexLogging {
}

private[wavesplatform] def loadBlockAt(rdb: RDB, blockchainUpdater: BlockchainUpdaterImpl)(
height: Int
height: Int
): Option[(BlockMeta, Seq[(TxMeta, Transaction)])] =
loadBlockInfoAt(rdb, blockchainUpdater)(height)

private[wavesplatform] def loadBlockInfoAt(rdb: RDB, blockchainUpdater: BlockchainUpdaterImpl)(
height: Int
height: Int
): Option[(BlockMeta, Seq[(TxMeta, Transaction)])] =
loadBlockMetaAt(rdb.db, blockchainUpdater)(height).map { meta =>
meta -> blockchainUpdater
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import com.wavesplatform.consensus.PoSCalculator.{generationSignature, hit}
import com.wavesplatform.consensus.{FairPoSCalculator, NxtPoSCalculator}
import com.wavesplatform.crypto.*
import com.wavesplatform.features.{BlockchainFeature, BlockchainFeatures}
import com.wavesplatform.settings.{FunctionalitySettings, GenesisSettings, GenesisTransactionSettings}
import com.wavesplatform.settings.*
import com.wavesplatform.transaction.{GenesisTransaction, TxNonNegativeAmount}
import com.wavesplatform.utils.*
import com.wavesplatform.wallet.Wallet
import net.ceedubs.ficus.Ficus.*
import net.ceedubs.ficus.readers.ArbitraryTypeReader.*
import pureconfig.ConfigSource
import pureconfig.generic.auto.*

import java.io.{File, FileNotFoundException}
import java.nio.file.Files
Expand Down Expand Up @@ -102,8 +102,7 @@ object GenesisBlockGenerator {
}

def parseSettings(config: Config): Settings = {
import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase
config.as[Settings]("genesis-generator")
ConfigSource.fromConfig(config).at("genesis-generator").loadOrThrow[Settings]
}

def createConfig(settings: Settings): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package com.wavesplatform.account
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.crypto.KeyLength
import play.api.libs.json.{Format, Writes}
import supertagged._
import supertagged.postfix._
import supertagged.*
import supertagged.postfix.*

object PrivateKey extends TaggedType[ByteStr] {
def apply(privateKey: ByteStr): PrivateKey = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object NetworkServer extends ScorexLogging {
peerDatabase: PeerDatabase,
allChannels: ChannelGroup,
peerInfo: ConcurrentHashMap[Channel, PeerInfo],
protocolSpecificPipeline: => Seq[ChannelHandlerAdapter],
protocolSpecificPipeline: => Seq[ChannelHandlerAdapter]
): NetworkServer = {
@volatile var shutdownInitiated = false

Expand All @@ -48,21 +48,21 @@ object NetworkServer extends ScorexLogging {
val handshake = Handshake(
applicationName,
Version.VersionTuple,
networkSettings.nodeName,
networkSettings.nonce,
networkSettings.declaredAddress
networkSettings.derivedNodeName,
networkSettings.derivedNonce,
networkSettings.derivedDeclaredAddress
)

val excludedAddresses: Set[InetSocketAddress] =
networkSettings.bindAddress.fold(Set.empty[InetSocketAddress]) { bindAddress =>
networkSettings.derivedBindAddress.fold(Set.empty[InetSocketAddress]) { bindAddress =>
val isLocal = Option(bindAddress.getAddress).exists(_.isAnyLocalAddress)
val localAddresses = if (isLocal) {
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala.map(a => new InetSocketAddress(a, bindAddress.getPort)))
.toSet
} else Set(bindAddress)

localAddresses ++ networkSettings.declaredAddress.toSet
localAddresses ++ networkSettings.derivedDeclaredAddress.toSet
}

val lengthFieldPrepender = new LengthFieldPrepender(4)
Expand Down Expand Up @@ -90,7 +90,7 @@ object NetworkServer extends ScorexLogging {
) ++ protocolSpecificPipeline ++
Seq(writeErrorHandler, channelClosedHandler, fatalErrorHandler)

val serverChannel = networkSettings.bindAddress.map { bindAddress =>
val serverChannel = networkSettings.derivedBindAddress.map { bindAddress =>
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(classOf[NioServerSocketChannel])
Expand Down Expand Up @@ -203,7 +203,8 @@ object NetworkServer extends ScorexLogging {
}

def scheduleConnectTask(): Unit = if (!shutdownInitiated) {
val delay = (if (peerConnectionsMap.isEmpty || networkSettings.minConnections.exists(_ > peerConnectionsMap.size())) AverageHandshakePeriod else 5.seconds) +
val delay = (if (peerConnectionsMap.isEmpty || networkSettings.minConnections.exists(_ > peerConnectionsMap.size())) AverageHandshakePeriod
else 5.seconds) +
(Random.nextInt(1000) - 500).millis // add some noise so that nodes don't attempt to connect to each other simultaneously

workerGroup.schedule(delay) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PeerDatabaseImpl(settings: NetworkSettings, ticker: Ticker = Ticker.system

override def addCandidate(socketAddress: InetSocketAddress): Boolean = unverifiedPeers.synchronized {
val r = !socketAddress.getAddress.isAnyLocalAddress &&
!(socketAddress.getAddress.isLoopbackAddress && settings.bindAddress.exists(_.getPort == socketAddress.getPort)) &&
!(socketAddress.getAddress.isLoopbackAddress && settings.derivedBindAddress.exists(_.getPort == socketAddress.getPort)) &&
Option(peersPersistence.getIfPresent(socketAddress)).isEmpty &&
!unverifiedPeers.contains(socketAddress)
if (r) unverifiedPeers.add(socketAddress)
Expand Down
Loading

0 comments on commit 4e408d5

Please sign in to comment.