Skip to content

Commit 57c7de7

Browse files
authored
0.9.2 release
2 parents dce532a + 8c8a0fa commit 57c7de7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+429
-31
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -189,3 +189,4 @@ $RECYCLE.BIN/
189189
# Node
190190
node_modules
191191
package-lock.json
192+
/src/jmh/java/generated/

build.gradle.kts

+17-7
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import java.nio.file.Paths
1212
// ./gradlew publish -PcloudsmithUser=<user> -PcloudsmithApiKey=<api-key>
1313

1414
group = "io.libp2p"
15-
version = "0.9.1-RELEASE"
15+
version = "0.9.2-RELEASE"
1616
description = "a minimal implementation of libp2p for the jvm"
1717

1818
plugins {
@@ -26,6 +26,7 @@ plugins {
2626
id("maven-publish")
2727
id("org.jetbrains.dokka").version("1.6.10")
2828
id("org.jmailen.kotlinter").version("3.8.0")
29+
id("java-test-fixtures")
2930
}
3031

3132
repositories {
@@ -36,6 +37,13 @@ repositories {
3637

3738
val log4j2Version = "2.17.1"
3839

40+
sourceSets.create("jmh") {
41+
compileClasspath += sourceSets["main"].runtimeClasspath
42+
compileClasspath += sourceSets["testFixtures"].runtimeClasspath
43+
runtimeClasspath += sourceSets["main"].runtimeClasspath
44+
runtimeClasspath += sourceSets["testFixtures"].runtimeClasspath
45+
}
46+
3947
dependencies {
4048
api("io.netty:netty-all:4.1.69.Final")
4149
api("com.google.protobuf:protobuf-java:3.19.2")
@@ -53,6 +61,9 @@ dependencies {
5361
implementation("org.apache.logging.log4j:log4j-core:${log4j2Version}")
5462
implementation("javax.xml.bind:jaxb-api:2.3.1")
5563

64+
testFixturesImplementation("org.apache.logging.log4j:log4j-api:${log4j2Version}")
65+
testFixturesImplementation("com.google.guava:guava:31.0.1-jre")
66+
5667
testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.2")
5768
testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.2")
5869
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.8.2")
@@ -61,14 +72,13 @@ dependencies {
6172
testImplementation("org.mockito:mockito-junit-jupiter:4.2.0")
6273
testImplementation("org.assertj:assertj-core:3.22.0")
6374

75+
"jmhImplementation"("org.openjdk.jmh:jmh-core:1.35")
76+
"jmhAnnotationProcessor"("org.openjdk.jmh:jmh-generator-annprocess:1.35")
6477
}
6578

66-
sourceSets {
67-
main {
68-
proto {
69-
srcDir("src/main/proto")
70-
}
71-
}
79+
task<JavaExec>("jmh") {
80+
mainClass.set("org.openjdk.jmh.Main")
81+
classpath = sourceSets["jmh"].compileClasspath + sourceSets["jmh"].runtimeClasspath
7282
}
7383

7484
protobuf {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package io.libp2p.pubsub.gossip;
2+
3+
import io.libp2p.core.PeerId;
4+
import io.libp2p.core.multiformats.Multiaddr;
5+
import io.libp2p.core.multiformats.Protocol;
6+
import io.libp2p.pubsub.DefaultPubsubMessage;
7+
import io.libp2p.tools.schedulers.ControlledExecutorServiceImpl;
8+
import io.libp2p.tools.schedulers.TimeController;
9+
import io.libp2p.tools.schedulers.TimeControllerImpl;
10+
import org.openjdk.jmh.annotations.*;
11+
import org.openjdk.jmh.infra.Blackhole;
12+
import pubsub.pb.Rpc;
13+
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.function.Function;
18+
import java.util.stream.Collectors;
19+
import java.util.stream.IntStream;
20+
import java.util.stream.Stream;
21+
22+
@State(Scope.Thread)
23+
@Fork(5)
24+
@Warmup(iterations = 5, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
25+
@Measurement(iterations = 10, time = 1000, timeUnit = TimeUnit.MILLISECONDS)
26+
public class GossipScoreBenchmark {
27+
28+
private final int peerCount = 5000;
29+
private final int connectedCount = 2000;
30+
private final int topicCount = 128;
31+
32+
private final List<String> topics = IntStream
33+
.range(0, topicCount)
34+
.mapToObj(i -> "Topic-" + i)
35+
.collect(Collectors.toList());
36+
37+
private final List<PeerId> peerIds = Stream.generate(PeerId::random).limit(peerCount).collect(Collectors.toList());
38+
private final List<Multiaddr> peerAddresses = IntStream
39+
.range(0, peerCount)
40+
.mapToObj(idx ->
41+
Multiaddr.empty()
42+
.withComponent(Protocol.IP4, new byte[]{(byte) (idx >>> 8 & 0xFF), (byte) (idx & 0xFF), 0, 0})
43+
.withComponent(Protocol.TCP, new byte[]{0x23, 0x28}))
44+
.collect(Collectors.toList());
45+
46+
private final TimeController timeController = new TimeControllerImpl();
47+
private final ControlledExecutorServiceImpl controlledExecutor = new ControlledExecutorServiceImpl();
48+
private final GossipScoreParams gossipScoreParams;
49+
private final DefaultGossipScore score;
50+
51+
public GossipScoreBenchmark() {
52+
Map<String, GossipTopicScoreParams> topicParamMap = topics.stream()
53+
.collect(Collectors.toMap(Function.identity(), __ -> new GossipTopicScoreParams()));
54+
GossipTopicsScoreParams gossipTopicsScoreParams = new GossipTopicsScoreParams(new GossipTopicScoreParams(), topicParamMap);
55+
56+
gossipScoreParams = new GossipScoreParams(new GossipPeerScoreParams(), gossipTopicsScoreParams, 0, 0, 0, 0, 0);
57+
controlledExecutor.setTimeController(timeController);
58+
score = new DefaultGossipScore(gossipScoreParams, controlledExecutor, timeController::getTime);
59+
60+
for (int i = 0; i < peerCount; i++) {
61+
PeerId peerId = peerIds.get(i);
62+
score.notifyConnected(peerId, peerAddresses.get(i));
63+
for (String topic : topics) {
64+
notifyUnseenMessage(peerId, topic);
65+
}
66+
}
67+
68+
for (int i = connectedCount; i < peerCount; i++) {
69+
score.notifyDisconnected(peerIds.get(i));
70+
}
71+
}
72+
73+
private void notifyUnseenMessage(PeerId peerId, String topic) {
74+
Rpc.Message message = Rpc.Message.newBuilder()
75+
.addTopicIDs(topic)
76+
.build();
77+
score.notifyUnseenValidMessage(peerId, new DefaultPubsubMessage(message));
78+
}
79+
80+
@Benchmark
81+
public void scoresDelay0(Blackhole bh) {
82+
for (int i = 0; i < connectedCount; i++) {
83+
double s = score.score(peerIds.get(i));
84+
bh.consume(s);
85+
}
86+
}
87+
88+
@Benchmark
89+
public void scoresDelay100(Blackhole bh) {
90+
timeController.addTime(100);
91+
92+
for (int i = 0; i < connectedCount; i++) {
93+
double s = score.score(peerIds.get(i));
94+
bh.consume(s);
95+
}
96+
}
97+
98+
@Benchmark
99+
public void scoresDelay10000(Blackhole bh) {
100+
timeController.addTime(10000);
101+
102+
for (int i = 0; i < connectedCount; i++) {
103+
double s = score.score(peerIds.get(i));
104+
bh.consume(s);
105+
}
106+
}
107+
108+
/**
109+
* Uncomment for debugging
110+
*/
111+
// public static void main(String[] args) {
112+
// GossipScoreBenchmark benchmark = new GossipScoreBenchmark();
113+
// Blackhole blackhole = new Blackhole("Today's password is swordfish. I understand instantiating Blackholes directly is dangerous.");
114+
// benchmark.scoresDelay0(blackhole);
115+
// }
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package io.libp2p.etc.types
2+
3+
fun <TValue1, TValue2> mutableMultiBiMap(): MutableMultiBiMap<TValue1, TValue2> = HashSetMultiBiMap()
4+
5+
/**
6+
* Associates values of type [TFirst] to a set of values of type [TSecond]
7+
* and back: associates values of type [TSecond] to a set of values of type [TFirst]
8+
*/
9+
interface MultiBiMap<TFirst, TSecond> {
10+
11+
fun getByFirst(first: TFirst): Set<TSecond>
12+
fun getBySecond(second: TSecond): Set<TFirst>
13+
14+
fun valuesFirst(): Set<TFirst>
15+
fun valuesSecond(): Set<TSecond>
16+
17+
fun asFirstToSecondMap(): Map<TFirst, Set<TSecond>> = valuesFirst().associateWith { getByFirst(it) }
18+
fun asSecondToFirstMap(): Map<TSecond, Set<TFirst>> = valuesSecond().associateWith { getBySecond(it) }
19+
}
20+
21+
interface MutableMultiBiMap<TFirst, TSecond> : MultiBiMap<TFirst, TSecond> {
22+
23+
fun add(first: TFirst, second: TSecond)
24+
25+
fun remove(first: TFirst, second: TSecond)
26+
fun removeAllByFirst(first: TFirst)
27+
fun removeAllBySecond(second: TSecond)
28+
}
29+
30+
internal class HashSetMultiBiMap<TFirst, TSecond> : MutableMultiBiMap<TFirst, TSecond> {
31+
private val firstToSecondMap = mutableMapOf<TFirst, MutableSet<TSecond>>()
32+
private val secondToFirstMap = mutableMapOf<TSecond, MutableSet<TFirst>>()
33+
34+
override fun getByFirst(first: TFirst): Set<TSecond> = firstToSecondMap[first] ?: emptySet()
35+
override fun getBySecond(second: TSecond): Set<TFirst> = secondToFirstMap[second] ?: emptySet()
36+
override fun valuesFirst(): Set<TFirst> = firstToSecondMap.keys
37+
override fun valuesSecond(): Set<TSecond> = secondToFirstMap.keys
38+
override fun asFirstToSecondMap(): Map<TFirst, Set<TSecond>> = firstToSecondMap
39+
override fun asSecondToFirstMap(): Map<TSecond, Set<TFirst>> = secondToFirstMap
40+
41+
override fun add(first: TFirst, second: TSecond) {
42+
firstToSecondMap.computeIfAbsent(first) { mutableSetOf() } += second
43+
secondToFirstMap.computeIfAbsent(second) { mutableSetOf() } += first
44+
}
45+
46+
private fun removeFromFirstToSecondMap(first: TFirst, second: TSecond) {
47+
firstToSecondMap.compute(first) { _, curSecondValues ->
48+
if (curSecondValues != null) {
49+
curSecondValues -= second
50+
if (curSecondValues.isNotEmpty()) {
51+
curSecondValues
52+
} else {
53+
null
54+
}
55+
} else {
56+
null
57+
}
58+
}
59+
}
60+
61+
private fun removeFromSecondToFirstMap(first: TFirst, second: TSecond) {
62+
secondToFirstMap.compute(second) { _, curFirstValues ->
63+
if (curFirstValues != null) {
64+
curFirstValues -= first
65+
if (curFirstValues.isNotEmpty()) {
66+
curFirstValues
67+
} else {
68+
null
69+
}
70+
} else {
71+
null
72+
}
73+
}
74+
}
75+
76+
override fun remove(first: TFirst, second: TSecond) {
77+
removeFromFirstToSecondMap(first, second)
78+
removeFromSecondToFirstMap(first, second)
79+
}
80+
81+
override fun removeAllByFirst(first: TFirst) {
82+
val removedSecondValues = firstToSecondMap.remove(first) ?: emptySet()
83+
removedSecondValues.forEach { removeFromSecondToFirstMap(first, it) }
84+
}
85+
86+
override fun removeAllBySecond(second: TSecond) {
87+
val removedFirstValues = secondToFirstMap.remove(second) ?: emptySet()
88+
removedFirstValues.forEach { removeFromFirstToSecondMap(it, second) }
89+
}
90+
}

src/main/kotlin/io/libp2p/etc/util/P2PService.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ abstract class P2PService(
166166
}
167167

168168
protected open fun streamDisconnected(stream: StreamHandler) {
169-
val peerHandler = stream.getPeerHandler()
170169
if (stream.aborted) return
170+
val peerHandler = stream.getPeerHandler()
171171
activePeersMutable -= peerHandler
172172
if (peersMutable.remove(peerHandler)) {
173173
onPeerDisconnected(peerHandler)

src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt

+10-22
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@ import io.libp2p.core.BadPeerException
44
import io.libp2p.core.PeerId
55
import io.libp2p.core.Stream
66
import io.libp2p.core.pubsub.ValidationResult
7-
import io.libp2p.etc.types.MultiSet
8-
import io.libp2p.etc.types.completedExceptionally
9-
import io.libp2p.etc.types.copy
10-
import io.libp2p.etc.types.forward
11-
import io.libp2p.etc.types.thenApplyAll
12-
import io.libp2p.etc.types.toWBytes
7+
import io.libp2p.etc.types.*
138
import io.libp2p.etc.util.P2PServiceSemiDuplex
149
import io.libp2p.etc.util.netty.protobuf.LimitedProtobufVarint32FrameDecoder
1510
import io.netty.channel.ChannelHandler
@@ -51,7 +46,7 @@ abstract class AbstractRouter(
5146

5247
protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") }
5348

54-
protected open val peerTopics = MultiSet<PeerHandler, Topic>()
49+
protected open val peersTopics = mutableMultiBiMap<PeerHandler, Topic>()
5550
protected open val subscribedTopics = linkedSetOf<Topic>()
5651
protected open val pendingRpcParts = PendingRpcPartsMap<RpcPartsQueue> { DefaultRpcPartsQueue() }
5752
protected open val pendingMessagePromises = MultiSet<PeerHandler, CompletableFuture<Unit>>()
@@ -176,7 +171,8 @@ abstract class AbstractRouter(
176171

177172
try {
178173
val subscriptions = msg.subscriptionsList.map { PubsubSubscription(it.topicid, it.subscribe) }
179-
subscriptionFilter.filterIncomingSubscriptions(subscriptions, peerTopics[peer])
174+
subscriptionFilter
175+
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
180176
.forEach { handleMessageSubscriptions(peer, it) }
181177
} catch (e: Exception) {
182178
logger.debug("Subscription filter error, ignoring message from peer $peer", e)
@@ -274,7 +270,7 @@ abstract class AbstractRouter(
274270

275271
override fun onPeerDisconnected(peer: PeerHandler) {
276272
super.onPeerDisconnected(peer)
277-
peerTopics.removeAll(peer)
273+
peersTopics.removeAllByFirst(peer)
278274
}
279275

280276
override fun onPeerWireException(peer: PeerHandler?, cause: Throwable) {
@@ -293,19 +289,15 @@ abstract class AbstractRouter(
293289

294290
private fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
295291
if (msg.subscribe) {
296-
peerTopics[peer] += msg.topic
292+
peersTopics.add(peer, msg.topic)
297293
} else {
298-
peerTopics[peer] -= msg.topic
294+
peersTopics.remove(peer, msg.topic)
299295
}
300296
}
301297

302-
protected fun getTopicsPeers(topics: Collection<String>) =
303-
activePeers.filter { topics.intersect(peerTopics[it]).isNotEmpty() }
298+
protected fun getTopicPeers(topic: Topic) = peersTopics.getBySecond(topic)
304299

305-
protected fun getTopicPeers(topic: String) =
306-
activePeers.filter { topic in peerTopics[it] }
307-
308-
override fun subscribe(vararg topics: String) {
300+
override fun subscribe(vararg topics: Topic) {
309301
runOnEventThread {
310302
topics.forEach(::subscribe)
311303
flushAllPending()
@@ -331,11 +323,7 @@ abstract class AbstractRouter(
331323

332324
override fun getPeerTopics(): CompletableFuture<Map<PeerId, Set<Topic>>> {
333325
return submitOnEventThread {
334-
val topicsByPeerId = hashMapOf<PeerId, Set<Topic>>()
335-
peerTopics.forEach { entry ->
336-
topicsByPeerId[entry.key.peerId] = HashSet(entry.value)
337-
}
338-
topicsByPeerId
326+
peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId }
339327
}
340328
}
341329

src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ class FloodRouter(executor: ScheduledExecutorService = Executors.newSingleThread
3737
}
3838

3939
private fun broadcast(msg: PubsubMessage, receivedFrom: PeerHandler?): CompletableFuture<Unit> {
40-
val sentFutures = getTopicsPeers(msg.topics)
40+
val peers = msg.topics
41+
.map { getTopicPeers(it) }
42+
.reduce { p1, p2 -> p1 + p2 }
43+
val sentFutures = peers
4144
.filter { it != receivedFrom }
4245
.map { submitPublishMessage(it, msg) }
4346
return anyComplete(sentFutures)

0 commit comments

Comments
 (0)