Skip to content

Commit

Permalink
Add Gossip simulator module (#269)
Browse files Browse the repository at this point in the history
* Extract libp2p.testFixtures schedulers package to a dedicated :tools:schedulers module
* Add :tools:simulator module
* Simulation main classes are located in the package io.libp2p.simulate.main
* Any simulation could be run from the gradle command line (gradle application plugin applied in the module)
* Simulations are configured programmatically only
* Add simulator README and the reference from the main README
  • Loading branch information
Nashatyrev authored Feb 24, 2023
1 parent d0b86ff commit 70f712b
Show file tree
Hide file tree
Showing 88 changed files with 4,588 additions and 0 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ We are explicitly leaving out the peerstore, DHT, pubsub, connection manager,
etc. and other subsystems or concepts that are internal to implementations and
do not impact the ability to hold communications with other libp2p processes.

## Gossip simulator

Deterministic Gossip simulator which may simulate networks as large as 10000 of peers

Please check the Simulator [README](tools/simulator/README.md) for more details

## Adding as a dependency to your project

Hosting of artefacts is graciously provided by [Cloudsmith](https://cloudsmith.com).
Expand Down
3 changes: 3 additions & 0 deletions libp2p/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ dependencies {
implementation("org.bouncycastle:bcprov-jdk15on")
implementation("org.bouncycastle:bcpkix-jdk15on")

testImplementation(project(":tools:schedulers"))

jmhImplementation(project(":tools:schedulers"))
jmhImplementation("org.openjdk.jmh:jmh-core")
jmhAnnotationProcessor("org.openjdk.jmh:jmh-generator-annprocess")
}
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
rootProject.name = 'jvm-libp2p-minimal'

include ':libp2p'
include ':tools:schedulers'
include ':tools:simulator'
include ':examples:chatter'
include ':examples:cli-chatter'
include ':examples:pinger'
33 changes: 33 additions & 0 deletions tools/simulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# jvm-libp2p Gossip simulator

## Description

This is Gossip simulator which may simulate networks as large as 10000 peers

The simulator is _deterministic_. That is:
- yields 100% identical results across different runs with the same configuration and the same random seed
- a simulation may forward current time as needed

## Configuring simulation

All simulations are configured programmatically inside the simulation code.

You could make a copy of an existing simulation (from `io.libp2p.simulate.main` package) or create a new one
and change simulation configuration right in the Kotlin class

## Running simulation with gradle

Any main function could be run from CLI with gradle using the following syntax:
```shell
> gradle :tools:simulator:run -PmainClass=<your.main.Class> [--args="you args"]
```

For example to run the sample simulation use the command below:
```shell
> gradle :tools:simulator:run -PmainClass=io.libp2p.simulate.main.SimpleSimulationKt
```

## License

Dual-licensed under MIT and ASLv2, by way of the [Permissive License
Stack](https://protocol.ai/blog/announcing-the-permissive-license-stack/).
30 changes: 30 additions & 0 deletions tools/simulator/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apply plugin: "application"

dependencies {

api project(':libp2p')
api project(':tools:schedulers')

implementation("org.jgrapht:jgrapht-core:1.3.1")
api("org.apache.commons:commons-math3:3.6.1")
implementation("org.jetbrains.kotlin:kotlin-reflect:1.3.0")

runtimeOnly("org.apache.logging.log4j:log4j-core")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}


if (project.hasProperty("mainClass")) {
ext.javaMainClass = project.getProperty("mainClass")
} else {
ext.javaMainClass = "io.libp2p.simulate.util.gradle.DefaultGradleApplicationKt"
}

application {
mainClassName = javaMainClass
applicationDefaultJvmArgs = ['-Xmx12G', '-XX:+HeapDumpOnOutOfMemoryError']

if ( project.hasProperty('jvmArgs') ) {
applicationDefaultJvmArgs = applicationDefaultJvmArgs + project.jvmArgs.split('\\s+').toList()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.libp2p.simulate

import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.ArrayList

abstract class AbstractSimPeer : SimPeer {

override val simPeerId = counter.getAndIncrement()

override val connections: MutableList<SimConnection> = Collections.synchronizedList(ArrayList())

override fun connect(other: SimPeer): CompletableFuture<SimConnection> {
return connectImpl(other).thenApply { conn ->
val otherAbs = other as? AbstractSimPeer
connections += conn
otherAbs?.connections?.add(conn)
conn.closed.thenAccept {
connections -= conn
otherAbs?.connections?.remove(conn)
}
conn
}
}

abstract fun connectImpl(other: SimPeer): CompletableFuture<SimConnection>

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as AbstractSimPeer
return name == other.name
}

override fun hashCode(): Int = name.hashCode()

companion object {
val counter = AtomicInteger()
}
}
32 changes: 32 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/Bandwidth.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.libp2p.simulate

import java.util.concurrent.CompletableFuture
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

data class Bandwidth(val bytesPerSecond: Long) {
fun getTransmitTimeMillis(size: Long): Long = (size * 1000 / bytesPerSecond)
fun getTransmitTime(size: Long): Duration = getTransmitTimeMillis(size).milliseconds

fun getTransmitSize(timeMillis: Long): Long =
bytesPerSecond * timeMillis / 1000

operator fun div(d: Int) = Bandwidth(bytesPerSecond / d)

companion object {
val UNLIM = Bandwidth(Long.MAX_VALUE)
fun mbitsPerSec(mbsec: Int) = Bandwidth(mbsec.toLong() * (1 shl 20) / 10)
}
}

interface BandwidthDelayer : MessageDelayer {

val totalBandwidth: Bandwidth

companion object {
val UNLIM_BANDWIDTH = object : BandwidthDelayer {
override val totalBandwidth = Bandwidth.UNLIM
override fun delay(size: Long) = CompletableFuture.completedFuture(Unit)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.libp2p.simulate

import java.util.concurrent.CompletableFuture

fun interface MessageDelayer {
fun delay(size: Long): CompletableFuture<Unit>

companion object {
val NO_DELAYER = MessageDelayer { CompletableFuture.completedFuture(null) }
}
}
17 changes: 17 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/Network.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.libp2p.simulate

interface Network {

val peers: List<SimPeer>

val activeConnections: List<SimConnection>

val topologyGraph: TopologyGraph
}

class ImmutableNetworkImpl(
override val activeConnections: List<SimConnection>,
override val topologyGraph: TopologyGraph
) : Network {
override val peers = activeConnections.map { it.dialer }.distinct()
}
33 changes: 33 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/RandomValue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.libp2p.simulate

import java.util.Random

interface RandomValue {

fun next(): Double

companion object {
fun const(constVal: Double) = object : RandomValue {
override fun next() = constVal
}
fun uniform(from: Double, to: Double, rnd: Random) = object : RandomValue {
override fun next() = from + rnd.nextDouble() * (to - from)
}
}
}

interface RandomDistribution {
fun newValue(rnd: Random): RandomValue

companion object {
fun const(constVal: Double) = ConstRandomDistr(constVal)
fun uniform(from: Double, to: Double) = UniformRandomDistr(from, to)
}

data class ConstRandomDistr(val constVal: Double) : RandomDistribution {
override fun newValue(rnd: Random) = RandomValue.const(constVal)
}
data class UniformRandomDistr(val from: Double, val to: Double) : RandomDistribution {
override fun newValue(rnd: Random) = RandomValue.uniform(from, to, rnd)
}
}
20 changes: 20 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/SimChannel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.libp2p.simulate

interface SimChannel {

val stream: SimStream
val isStreamInitiator: Boolean

val peer get() =
when {
isStreamInitiator -> stream.streamInitiatorPeer
else -> stream.streamAcceptorPeer
}

val msgVisitors: MutableList<SimChannelMessageVisitor>
}

interface SimChannelMessageVisitor {
fun onInbound(message: Any)
fun onOutbound(message: Any)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.libp2p.simulate

import java.util.concurrent.CompletableFuture

interface SimConnection {

val dialer: SimPeer
val listener: SimPeer

val streams: List<SimStream>

val closed: CompletableFuture<Unit>

var connectionLatency: MessageDelayer

fun close()
}
23 changes: 23 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/SimPeer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.libp2p.simulate

import io.libp2p.core.PeerId
import java.util.concurrent.CompletableFuture

typealias SimPeerId = Int

interface SimPeer {

val simPeerId: SimPeerId
val name: String get() = simPeerId.toString()
val peerId: PeerId
val connections: List<SimConnection>

var inboundBandwidth: BandwidthDelayer
var outboundBandwidth: BandwidthDelayer

fun start() = CompletableFuture.completedFuture(Unit)

fun connect(other: SimPeer): CompletableFuture<SimConnection>

fun stop(): CompletableFuture<Unit> = CompletableFuture.completedFuture(Unit)
}
26 changes: 26 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/SimStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.libp2p.simulate

import io.libp2p.core.multistream.ProtocolId

interface SimStream {

enum class StreamInitiator { CONNECTION_DIALER, CONNECTION_LISTENER }

val connection: SimConnection
val streamInitiator: StreamInitiator
val streamProtocol: ProtocolId

val streamInitiatorPeer get() =
when (streamInitiator) {
StreamInitiator.CONNECTION_DIALER -> connection.dialer
StreamInitiator.CONNECTION_LISTENER -> connection.listener
}
val streamAcceptorPeer get() =
when (streamInitiator) {
StreamInitiator.CONNECTION_DIALER -> connection.listener
StreamInitiator.CONNECTION_LISTENER -> connection.dialer
}

val initiatorChannel: SimChannel
val acceptorChannel: SimChannel
}
38 changes: 38 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/Topology.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.libp2p.simulate

import io.libp2p.simulate.topology.CustomTopologyGraph
import java.util.Random

interface Topology {

var random: Random

fun generateGraph(verticesCount: Int): TopologyGraph
}

interface TopologyGraph {
data class Edge(val srcV: Int, val destV: Int)

val edges: Collection<Edge>

val vertices get() =
edges.flatMap { listOf(it.srcV, it.destV) }.distinct().sorted()

fun calcDiameter(): Int

fun connect(peers: List<SimPeer>): Network {
require(peers.size == vertices.size)
return edges
.map { peers[it.srcV].connect(peers[it.destV]).join() }
.let { ImmutableNetworkImpl(it, this) }
}

companion object {
fun customTopology(vararg vertices: Pair<Int, Int>) =
CustomTopologyGraph(
vertices.map { Edge(it.first, it.second) }
)
}
}

fun Topology.generateAndConnect(peers: List<SimPeer>) = generateGraph(peers.size).connect(peers)
Loading

0 comments on commit 70f712b

Please sign in to comment.