Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Gossip simulator module #269

Merged
merged 36 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
82a519d
Move schedulers to a separate module
Nashatyrev Feb 17, 2023
a44f53b
Add simulator module
Nashatyrev Feb 20, 2023
d50f817
Remove obsolete classes
Nashatyrev Feb 20, 2023
fdf2269
Sort unit tests
Nashatyrev Feb 20, 2023
654060e
Subscribe peers to topics in GossipSimulation instead of GossipSimPee…
Nashatyrev Feb 20, 2023
20b3083
Minor Network interface clean up
Nashatyrev Feb 20, 2023
0f4282c
BlobDecouplingSimulation minor clean up
Nashatyrev Feb 20, 2023
2a59318
Remove HostSimPeer, WireSimPeer and their accompanying classes: they …
Nashatyrev Feb 20, 2023
163d260
Refactor Simulation2 to GossipScoreTestSimulation
Nashatyrev Feb 21, 2023
2d9c734
Get rid of StreamSimPeer.currentTime default value for fast fail
Nashatyrev Feb 21, 2023
2f31ada
Minor clean up
Nashatyrev Feb 21, 2023
8ca97d3
Add ConnectionsMessageCollector.clear() function
Nashatyrev Feb 21, 2023
4b2e02b
Field rename
Nashatyrev Feb 21, 2023
5402940
Rename property
Nashatyrev Feb 21, 2023
6c90ac8
Get rid of GlobalNetworkStatsCollector in favor of GossipMessageColle…
Nashatyrev Feb 21, 2023
da20dce
Add Slf4j binding runtime dependency for simulator
Nashatyrev Feb 21, 2023
369ec5d
Move Simulation1 to sources
Nashatyrev Feb 21, 2023
b61cc94
Lint
Nashatyrev Feb 21, 2023
984e13c
Add SimpleSimulation and integration test. Clean up other tests
Nashatyrev Feb 21, 2023
529262a
Add ability to run simulations from gradle CLI
Nashatyrev Feb 21, 2023
8ce24ed
Add simulator README.md
Nashatyrev Feb 21, 2023
c19c1df
Rename the gradle application placeholder
Nashatyrev Feb 21, 2023
919af60
Add simulator README reference from the main README
Nashatyrev Feb 21, 2023
6f8d76b
Minor README wording fix
Nashatyrev Feb 21, 2023
95df73d
Extract GroupByRangeAggregator class
Nashatyrev Feb 22, 2023
be4d5a2
Add SimPeer.simPeerId
Nashatyrev Feb 22, 2023
6fc6814
Derive GossipPubDeliveryResult from generic messages of GossipMessage…
Nashatyrev Feb 22, 2023
05e4068
Refactor ConnectionsMessageCollector
Nashatyrev Feb 22, 2023
3d91d76
Add SimpleSimulationIntegrationTest testcase
Nashatyrev Feb 22, 2023
fae7519
Get rid of collecting ApiMessages inside GossipSimPeer
Nashatyrev Feb 23, 2023
df63e86
Adjust MiscParamsOptimizationSimulation to use GossipSimulation class
Nashatyrev Feb 23, 2023
ff7cfca
Replace java Duration with kotlin Duration
Nashatyrev Feb 23, 2023
5a5360c
Add more nice looking ScheduledExecutorService.schedule() extension
Nashatyrev Feb 23, 2023
b077411
Linting
Nashatyrev Feb 23, 2023
05b83b9
Return back allWarningsAsErrors option
Nashatyrev Feb 23, 2023
587afaf
Fix compiler warnings
Nashatyrev Feb 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ We are explicitly leaving out the peerstore, DHT, pubsub, connection manager,
etc. and other subsystems or concepts that are internal to implementations and
do not impact the ability to hold communications with other libp2p processes.

## Gossip simulator

Deterministic Gossip simulator which may simulate networks as large as 10000 of peers

Please check the Simulator [README](tools/simulator/README.md) for more details

## Adding as a dependency to your project

Hosting of artefacts is graciously provided by [Cloudsmith](https://cloudsmith.com).
Expand Down
3 changes: 3 additions & 0 deletions libp2p/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ dependencies {
implementation("org.bouncycastle:bcprov-jdk15on")
implementation("org.bouncycastle:bcpkix-jdk15on")

testImplementation(project(":tools:schedulers"))

jmhImplementation(project(":tools:schedulers"))
jmhImplementation("org.openjdk.jmh:jmh-core")
jmhAnnotationProcessor("org.openjdk.jmh:jmh-generator-annprocess")
}
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
rootProject.name = 'jvm-libp2p-minimal'

include ':libp2p'
include ':tools:schedulers'
include ':tools:simulator'
include ':examples:chatter'
include ':examples:cli-chatter'
include ':examples:pinger'
33 changes: 33 additions & 0 deletions tools/simulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# jvm-libp2p Gossip simulator

## Description

This is Gossip simulator which may simulate networks as large as 10000 peers

The simulator is _deterministic_. That is:
- yields 100% identical results across different runs with the same configuration and the same random seed
- a simulation may forward current time as needed

## Configuring simulation

All simulations are configured programmatically inside the simulation code.

You could make a copy of an existing simulation (from `io.libp2p.simulate.main` package) or create a new one
and change simulation configuration right in the Kotlin class

## Running simulation with gradle

Any main function could be run from CLI with gradle using the following syntax:
```shell
> gradle :tools:simulator:run -PmainClass=<your.main.Class> [--args="you args"]
```

For example to run the sample simulation use the command below:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple simulation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sample simple simulation :)

```shell
> gradle :tools:simulator:run -PmainClass=io.libp2p.simulate.main.SimpleSimulationKt
```

## License

Dual-licensed under MIT and ASLv2, by way of the [Permissive License
Stack](https://protocol.ai/blog/announcing-the-permissive-license-stack/).
30 changes: 30 additions & 0 deletions tools/simulator/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apply plugin: "application"

dependencies {

api project(':libp2p')
api project(':tools:schedulers')

implementation("org.jgrapht:jgrapht-core:1.3.1")
api("org.apache.commons:commons-math3:3.6.1")
implementation("org.jetbrains.kotlin:kotlin-reflect:1.3.0")

runtimeOnly("org.apache.logging.log4j:log4j-core")
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl")
}


if (project.hasProperty("mainClass")) {
ext.javaMainClass = project.getProperty("mainClass")
} else {
ext.javaMainClass = "io.libp2p.simulate.util.gradle.DefaultGradleApplicationKt"
}

application {
mainClassName = javaMainClass
applicationDefaultJvmArgs = ['-Xmx12G', '-XX:+HeapDumpOnOutOfMemoryError']

if ( project.hasProperty('jvmArgs') ) {
applicationDefaultJvmArgs = applicationDefaultJvmArgs + project.jvmArgs.split('\\s+').toList()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.libp2p.simulate

import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.ArrayList

abstract class AbstractSimPeer : SimPeer {

override val name = counter.getAndIncrement().toString()

override val connections: MutableList<SimConnection> = Collections.synchronizedList(ArrayList())

override fun connect(other: SimPeer): CompletableFuture<SimConnection> {
return connectImpl(other).thenApply { conn ->
val otherAbs = other as? AbstractSimPeer
connections += conn
otherAbs?.connections?.add(conn)
conn.closed.thenAccept {
connections -= conn
otherAbs?.connections?.remove(conn)
}
conn
}
}

abstract fun connectImpl(other: SimPeer): CompletableFuture<SimConnection>

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as AbstractSimPeer
return name == other.name
}

override fun hashCode(): Int = name.hashCode()

companion object {
val counter = AtomicInteger()
}
}
31 changes: 31 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/Bandwidth.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.libp2p.simulate

import java.util.concurrent.CompletableFuture
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

data class Bandwidth(val bytesPerSecond: Long) {
fun getTransmitTimeMillis(size: Long): Long = (size * 1000 / bytesPerSecond)
fun getTransmitTime(size: Long): Duration = getTransmitTimeMillis(size).milliseconds

fun getTransmitSize(timeMillis: Long): Long =
bytesPerSecond * timeMillis / 1000

operator fun div(d: Int) = Bandwidth(bytesPerSecond / d)

companion object {
fun mbitsPerSec(mbsec: Int) = Bandwidth(mbsec.toLong() * (1 shl 20) / 10)
}
}

interface BandwidthDelayer : MessageDelayer {

val totalBandwidth: Bandwidth

companion object {
val UNLIM_BANDWIDTH = object : BandwidthDelayer {
override val totalBandwidth = Bandwidth(Long.MAX_VALUE)
override fun delay(size: Long) = CompletableFuture.completedFuture(Unit)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.libp2p.simulate

import java.util.concurrent.CompletableFuture

fun interface MessageDelayer {
fun delay(size: Long): CompletableFuture<Unit>

companion object {
val NO_DELAYER = MessageDelayer { CompletableFuture.completedFuture(null) }
}
}
17 changes: 17 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/Network.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.libp2p.simulate

interface Network {

val peers: List<SimPeer>

val activeConnections: List<SimConnection>

val topologyGraph: TopologyGraph
}

class ImmutableNetworkImpl(
override val activeConnections: List<SimConnection>,
override val topologyGraph: TopologyGraph
) : Network {
override val peers = activeConnections.map { it.dialer }.distinct()
}
33 changes: 33 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/RandomValue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.libp2p.simulate

import java.util.Random

interface RandomValue {

fun next(): Double

companion object {
fun const(constVal: Double) = object : RandomValue {
override fun next() = constVal
}
fun uniform(from: Double, to: Double, rnd: Random) = object : RandomValue {
override fun next() = from + rnd.nextDouble() * (to - from)
}
}
}

interface RandomDistribution {
fun newValue(rnd: Random): RandomValue

companion object {
fun const(constVal: Double) = ConstRandomDistr(constVal)
fun uniform(from: Double, to: Double) = UniformRandomDistr(from, to)
}

data class ConstRandomDistr(val constVal: Double) : RandomDistribution {
override fun newValue(rnd: Random) = RandomValue.const(constVal)
}
data class UniformRandomDistr(val from: Double, val to: Double) : RandomDistribution {
override fun newValue(rnd: Random) = RandomValue.uniform(from, to, rnd)
}
}
20 changes: 20 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/SimChannel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.libp2p.simulate

interface SimChannel {

val stream: SimStream
val isStreamInitiator: Boolean

val peer get() =
when {
isStreamInitiator -> stream.streamInitiatorPeer
else -> stream.streamAcceptorPeer
}

val msgVisitors: MutableList<SimChannelMessageVisitor>
}

interface SimChannelMessageVisitor {
fun onInbound(message: Any)
fun onOutbound(message: Any)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.libp2p.simulate

import java.util.concurrent.CompletableFuture

interface SimConnection {

val dialer: SimPeer
val listener: SimPeer

val streams: List<SimStream>

val closed: CompletableFuture<Unit>

var connectionLatency: MessageDelayer

fun close()
}
20 changes: 20 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/SimPeer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.libp2p.simulate

import io.libp2p.core.PeerId
import java.util.concurrent.CompletableFuture

interface SimPeer {

val name: String
val peerId: PeerId
val connections: List<SimConnection>

var inboundBandwidth: BandwidthDelayer
var outboundBandwidth: BandwidthDelayer

fun start() = CompletableFuture.completedFuture(Unit)

fun connect(other: SimPeer): CompletableFuture<SimConnection>

fun stop(): CompletableFuture<Unit> = CompletableFuture.completedFuture(Unit)
}
26 changes: 26 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/SimStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.libp2p.simulate

import io.libp2p.core.multistream.ProtocolId

interface SimStream {

enum class StreamInitiator { CONNECTION_DIALER, CONNECTION_LISTENER }

val connection: SimConnection
val streamInitiator: StreamInitiator
val streamProtocol: ProtocolId

val streamInitiatorPeer get() =
when (streamInitiator) {
StreamInitiator.CONNECTION_DIALER -> connection.dialer
StreamInitiator.CONNECTION_LISTENER -> connection.listener
}
val streamAcceptorPeer get() =
when (streamInitiator) {
StreamInitiator.CONNECTION_DIALER -> connection.listener
StreamInitiator.CONNECTION_LISTENER -> connection.dialer
}

val initiatorChannel: SimChannel
val acceptorChannel: SimChannel
}
38 changes: 38 additions & 0 deletions tools/simulator/src/main/kotlin/io/libp2p/simulate/Topology.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.libp2p.simulate

import io.libp2p.simulate.topology.CustomTopologyGraph
import java.util.Random

interface Topology {

var random: Random

fun generateGraph(verticesCount: Int): TopologyGraph
}

interface TopologyGraph {
data class Edge(val srcV: Int, val destV: Int)

val edges: Collection<Edge>

val vertices get() =
edges.flatMap { listOf(it.srcV, it.destV) }.distinct().sorted()

fun calcDiameter(): Int

fun connect(peers: List<SimPeer>): Network {
require(peers.size == vertices.size)
return edges
.map { peers[it.srcV].connect(peers[it.destV]).join() }
.let { ImmutableNetworkImpl(it, this) }
}

companion object {
fun customTopology(vararg vertices: Pair<Int, Int>) =
CustomTopologyGraph(
vertices.map { Edge(it.first, it.second) }
)
}
}

fun Topology.generateAndConnect(peers: List<SimPeer>) = generateGraph(peers.size).connect(peers)
Loading