Skip to content

Commit

Permalink
Fix returning internal mutable set to avoid ConcurrentModificationExc…
Browse files Browse the repository at this point in the history
…eption (#362)
  • Loading branch information
Nashatyrev authored May 6, 2024
1 parent 9895401 commit 9fd7741
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
6 changes: 5 additions & 1 deletion libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,11 @@ abstract class AbstractRouter(

override fun getPeerTopics(): CompletableFuture<Map<PeerId, Set<Topic>>> {
return submitOnEventThread {
peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId }
peersTopics.asFirstToSecondMap()
.map { (key, value) ->
key.peerId to value.toSet()
}
.toMap()
}
}

Expand Down
65 changes: 63 additions & 2 deletions libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package io.libp2p.pubsub

import io.libp2p.core.pubsub.*
import io.libp2p.core.pubsub.MessageApi
import io.libp2p.core.pubsub.RESULT_INVALID
import io.libp2p.core.pubsub.RESULT_VALID
import io.libp2p.core.pubsub.Subscriber
import io.libp2p.core.pubsub.Topic
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.core.pubsub.Validator
import io.libp2p.core.pubsub.createPubsubApi
import io.libp2p.etc.types.seconds
import io.libp2p.etc.types.toByteBuf
import io.libp2p.etc.types.toBytesBigEndian
Expand All @@ -10,6 +16,7 @@ import io.libp2p.pubsub.gossip.GossipRouter
import io.libp2p.tools.TestChannel.TestConnection
import io.netty.handler.logging.LogLevel
import io.netty.util.ResourceLeakDetector
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import pubsub.pb.Rpc
Expand Down Expand Up @@ -279,7 +286,10 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor
doTenNeighborsTopology()
}

fun doTenNeighborsTopology(randomSeed: Int = 0, routerFactory: DeterministicFuzzRouterFactory = this.routerFactory) {
fun doTenNeighborsTopology(
randomSeed: Int = 0,
routerFactory: DeterministicFuzzRouterFactory = this.routerFactory
) {
val fuzz = DeterministicFuzz().also {
it.randomSeed = randomSeed.toLong()
}
Expand Down Expand Up @@ -398,6 +408,7 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor
routers[1].connectSemiDuplex(routers[2], pubsubLogs = LogLevel.ERROR)

val apis = routers.map { createPubsubApi(it.router) }

class RecordingSubscriber : Subscriber {
var count = 0
override fun accept(t: MessageApi) {
Expand Down Expand Up @@ -460,4 +471,54 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor
Assertions.assertEquals(2, subs2[2].count)
Assertions.assertEquals(0, subs2[3].count)
}

@Test
fun `getPeerTopics() should return immutable snapshot`() {
val fuzz = DeterministicFuzz()

fun <T> executeAsyncNow(asyncTask: () -> CompletableFuture<T>): T {
val future = asyncTask()
fuzz.timeController.addTime(Duration.ofMillis(1))
if (!future.isDone) throw AssertionError("Async task was not complete within virtual 1ms")
return future.join()
}

val router1 = fuzz.createTestRouter(routerFactory)
val router2 = fuzz.createTestRouter(routerFactory)
router2.router.subscribe("topic1")

router1.connectSemiDuplex(router2, LogLevel.DEBUG, LogLevel.DEBUG)

val peerTopics1 = executeAsyncNow { router1.router.getPeerTopics() }
val peerTopics1MapIt = peerTopics1.entries.iterator()
val peerTopics1SetIt = peerTopics1.entries.first().value.iterator()

router2.router.subscribe("topic2")

val router3 = fuzz.createTestRouter(routerFactory)
router3.router.subscribe("topic3")
router1.connectSemiDuplex(router3, LogLevel.DEBUG, LogLevel.DEBUG)

val peerTopics2 = executeAsyncNow { router1.router.getPeerTopics() }

assertThat(peerTopics2)
.containsExactlyInAnyOrderEntriesOf(
mapOf(
router2.peerId to setOf("topic1", "topic2"),
router3.peerId to setOf("topic3")
)
)

assertThat(peerTopics1)
.containsExactlyInAnyOrderEntriesOf(
mapOf(
router2.peerId to setOf("topic1")
)
)

assertThat(peerTopics1MapIt.next().key).isEqualTo(router2.peerId)
assertThat(peerTopics1MapIt.hasNext()).isFalse()
assertThat(peerTopics1SetIt.next()).isEqualTo("topic1")
assertThat(peerTopics1SetIt.hasNext()).isFalse()
}
}

0 comments on commit 9fd7741

Please sign in to comment.