From 9fd77410345f72bb1c327ef8d09a9f3ed41d15be Mon Sep 17 00:00:00 2001 From: Anton Nashatyrev Date: Mon, 6 May 2024 14:55:06 +0400 Subject: [PATCH] Fix returning internal mutable set to avoid ConcurrentModificationException (#362) --- .../kotlin/io/libp2p/pubsub/AbstractRouter.kt | 6 +- .../io/libp2p/pubsub/PubsubRouterTest.kt | 65 ++++++++++++++++++- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt index eb8033ce..4dd76409 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt @@ -323,7 +323,11 @@ abstract class AbstractRouter( override fun getPeerTopics(): CompletableFuture>> { return submitOnEventThread { - peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId } + peersTopics.asFirstToSecondMap() + .map { (key, value) -> + key.peerId to value.toSet() + } + .toMap() } } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt index b06f713b..cc45118e 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt @@ -1,7 +1,13 @@ package io.libp2p.pubsub -import io.libp2p.core.pubsub.* +import io.libp2p.core.pubsub.MessageApi +import io.libp2p.core.pubsub.RESULT_INVALID +import io.libp2p.core.pubsub.RESULT_VALID +import io.libp2p.core.pubsub.Subscriber import io.libp2p.core.pubsub.Topic +import io.libp2p.core.pubsub.ValidationResult +import io.libp2p.core.pubsub.Validator +import io.libp2p.core.pubsub.createPubsubApi import io.libp2p.etc.types.seconds import io.libp2p.etc.types.toByteBuf import io.libp2p.etc.types.toBytesBigEndian @@ -10,6 +16,7 @@ import io.libp2p.pubsub.gossip.GossipRouter import io.libp2p.tools.TestChannel.TestConnection import io.netty.handler.logging.LogLevel import io.netty.util.ResourceLeakDetector +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import pubsub.pb.Rpc @@ -279,7 +286,10 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor doTenNeighborsTopology() } - fun doTenNeighborsTopology(randomSeed: Int = 0, routerFactory: DeterministicFuzzRouterFactory = this.routerFactory) { + fun doTenNeighborsTopology( + randomSeed: Int = 0, + routerFactory: DeterministicFuzzRouterFactory = this.routerFactory + ) { val fuzz = DeterministicFuzz().also { it.randomSeed = randomSeed.toLong() } @@ -398,6 +408,7 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor routers[1].connectSemiDuplex(routers[2], pubsubLogs = LogLevel.ERROR) val apis = routers.map { createPubsubApi(it.router) } + class RecordingSubscriber : Subscriber { var count = 0 override fun accept(t: MessageApi) { @@ -460,4 +471,54 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor Assertions.assertEquals(2, subs2[2].count) Assertions.assertEquals(0, subs2[3].count) } + + @Test + fun `getPeerTopics() should return immutable snapshot`() { + val fuzz = DeterministicFuzz() + + fun executeAsyncNow(asyncTask: () -> CompletableFuture): T { + val future = asyncTask() + fuzz.timeController.addTime(Duration.ofMillis(1)) + if (!future.isDone) throw AssertionError("Async task was not complete within virtual 1ms") + return future.join() + } + + val router1 = fuzz.createTestRouter(routerFactory) + val router2 = fuzz.createTestRouter(routerFactory) + router2.router.subscribe("topic1") + + router1.connectSemiDuplex(router2, LogLevel.DEBUG, LogLevel.DEBUG) + + val peerTopics1 = executeAsyncNow { router1.router.getPeerTopics() } + val peerTopics1MapIt = peerTopics1.entries.iterator() + val peerTopics1SetIt = peerTopics1.entries.first().value.iterator() + + router2.router.subscribe("topic2") + + val router3 = fuzz.createTestRouter(routerFactory) + router3.router.subscribe("topic3") + router1.connectSemiDuplex(router3, LogLevel.DEBUG, LogLevel.DEBUG) + + val peerTopics2 = executeAsyncNow { router1.router.getPeerTopics() } + + assertThat(peerTopics2) + .containsExactlyInAnyOrderEntriesOf( + mapOf( + router2.peerId to setOf("topic1", "topic2"), + router3.peerId to setOf("topic3") + ) + ) + + assertThat(peerTopics1) + .containsExactlyInAnyOrderEntriesOf( + mapOf( + router2.peerId to setOf("topic1") + ) + ) + + assertThat(peerTopics1MapIt.next().key).isEqualTo(router2.peerId) + assertThat(peerTopics1MapIt.hasNext()).isFalse() + assertThat(peerTopics1SetIt.next()).isEqualTo("topic1") + assertThat(peerTopics1SetIt.hasNext()).isFalse() + } }