diff --git a/src/main/kotlin/io/libp2p/etc/types/MultiBiMap.kt b/src/main/kotlin/io/libp2p/etc/types/MultiBiMap.kt new file mode 100644 index 000000000..d6ce27f19 --- /dev/null +++ b/src/main/kotlin/io/libp2p/etc/types/MultiBiMap.kt @@ -0,0 +1,90 @@ +package io.libp2p.etc.types + +fun mutableMultiBiMap(): MutableMultiBiMap = HashSetMultiBiMap() + +/** + * Associates values of type [TFirst] to a set of values of type [TSecond] + * and back: associates values of type [TSecond] to a set of values of type [TFirst] + */ +interface MultiBiMap { + + fun getByFirst(first: TFirst): Set + fun getBySecond(second: TSecond): Set + + fun valuesFirst(): Set + fun valuesSecond(): Set + + fun asFirstToSecondMap(): Map> = valuesFirst().associateWith { getByFirst(it) } + fun asSecondToFirstMap(): Map> = valuesSecond().associateWith { getBySecond(it) } +} + +interface MutableMultiBiMap : MultiBiMap { + + fun add(first: TFirst, second: TSecond) + + fun remove(first: TFirst, second: TSecond) + fun removeAllByFirst(first: TFirst) + fun removeAllBySecond(second: TSecond) +} + +internal class HashSetMultiBiMap : MutableMultiBiMap { + private val firstToSecondMap = mutableMapOf>() + private val secondToFirstMap = mutableMapOf>() + + override fun getByFirst(first: TFirst): Set = firstToSecondMap[first] ?: emptySet() + override fun getBySecond(second: TSecond): Set = secondToFirstMap[second] ?: emptySet() + override fun valuesFirst(): Set = firstToSecondMap.keys + override fun valuesSecond(): Set = secondToFirstMap.keys + override fun asFirstToSecondMap(): Map> = firstToSecondMap + override fun asSecondToFirstMap(): Map> = secondToFirstMap + + override fun add(first: TFirst, second: TSecond) { + firstToSecondMap.computeIfAbsent(first) { mutableSetOf() } += second + secondToFirstMap.computeIfAbsent(second) { mutableSetOf() } += first + } + + private fun removeFromFirstToSecondMap(first: TFirst, second: TSecond) { + firstToSecondMap.compute(first) { _, curSecondValues -> + if (curSecondValues != null) { + curSecondValues -= second + if (curSecondValues.isNotEmpty()) { + curSecondValues + } else { + null + } + } else { + null + } + } + } + + private fun removeFromSecondToFirstMap(first: TFirst, second: TSecond) { + secondToFirstMap.compute(second) { _, curFirstValues -> + if (curFirstValues != null) { + curFirstValues -= first + if (curFirstValues.isNotEmpty()) { + curFirstValues + } else { + null + } + } else { + null + } + } + } + + override fun remove(first: TFirst, second: TSecond) { + removeFromFirstToSecondMap(first, second) + removeFromSecondToFirstMap(first, second) + } + + override fun removeAllByFirst(first: TFirst) { + val removedSecondValues = firstToSecondMap.remove(first) ?: emptySet() + removedSecondValues.forEach { removeFromSecondToFirstMap(first, it) } + } + + override fun removeAllBySecond(second: TSecond) { + val removedFirstValues = secondToFirstMap.remove(second) ?: emptySet() + removedFirstValues.forEach { removeFromFirstToSecondMap(it, second) } + } +} diff --git a/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt b/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt index 91a8c1d23..9d6dfbb27 100644 --- a/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt @@ -4,12 +4,7 @@ import io.libp2p.core.BadPeerException import io.libp2p.core.PeerId import io.libp2p.core.Stream import io.libp2p.core.pubsub.ValidationResult -import io.libp2p.etc.types.MultiSet -import io.libp2p.etc.types.completedExceptionally -import io.libp2p.etc.types.copy -import io.libp2p.etc.types.forward -import io.libp2p.etc.types.thenApplyAll -import io.libp2p.etc.types.toWBytes +import io.libp2p.etc.types.* import io.libp2p.etc.util.P2PServiceSemiDuplex import io.libp2p.etc.util.netty.protobuf.LimitedProtobufVarint32FrameDecoder import io.netty.channel.ChannelHandler @@ -51,7 +46,7 @@ abstract class AbstractRouter( protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") } - protected open val peerTopics = MultiSet() + protected open val peersTopics = mutableMultiBiMap() protected open val subscribedTopics = linkedSetOf() protected open val pendingRpcParts = PendingRpcPartsMap { DefaultRpcPartsQueue() } protected open val pendingMessagePromises = MultiSet>() @@ -176,7 +171,8 @@ abstract class AbstractRouter( try { val subscriptions = msg.subscriptionsList.map { PubsubSubscription(it.topicid, it.subscribe) } - subscriptionFilter.filterIncomingSubscriptions(subscriptions, peerTopics[peer]) + subscriptionFilter + .filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer)) .forEach { handleMessageSubscriptions(peer, it) } } catch (e: Exception) { logger.debug("Subscription filter error, ignoring message from peer $peer", e) @@ -274,7 +270,7 @@ abstract class AbstractRouter( override fun onPeerDisconnected(peer: PeerHandler) { super.onPeerDisconnected(peer) - peerTopics.removeAll(peer) + peersTopics.removeAllByFirst(peer) } override fun onPeerWireException(peer: PeerHandler?, cause: Throwable) { @@ -293,19 +289,15 @@ abstract class AbstractRouter( private fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) { if (msg.subscribe) { - peerTopics[peer] += msg.topic + peersTopics.add(peer, msg.topic) } else { - peerTopics[peer] -= msg.topic + peersTopics.remove(peer, msg.topic) } } - protected fun getTopicsPeers(topics: Collection) = - activePeers.filter { topics.intersect(peerTopics[it]).isNotEmpty() } + protected fun getTopicPeers(topic: Topic) = peersTopics.getBySecond(topic) - protected fun getTopicPeers(topic: String) = - activePeers.filter { topic in peerTopics[it] } - - override fun subscribe(vararg topics: String) { + override fun subscribe(vararg topics: Topic) { runOnEventThread { topics.forEach(::subscribe) flushAllPending() @@ -331,11 +323,7 @@ abstract class AbstractRouter( override fun getPeerTopics(): CompletableFuture>> { return submitOnEventThread { - val topicsByPeerId = hashMapOf>() - peerTopics.forEach { entry -> - topicsByPeerId[entry.key.peerId] = HashSet(entry.value) - } - topicsByPeerId + peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId } } } diff --git a/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt b/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt index 7cc380b61..9bed00ddd 100644 --- a/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt +++ b/src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt @@ -37,7 +37,10 @@ class FloodRouter(executor: ScheduledExecutorService = Executors.newSingleThread } private fun broadcast(msg: PubsubMessage, receivedFrom: PeerHandler?): CompletableFuture { - val sentFutures = getTopicsPeers(msg.topics) + val peers = msg.topics + .map { getTopicPeers(it) } + .reduce { p1, p2 -> p1 + p2 } + val sentFutures = peers .filter { it != receivedFrom } .map { submitPublishMessage(it, msg) } return anyComplete(sentFutures) diff --git a/src/test/kotlin/io/libp2p/etc/types/MultiBiMapTest.kt b/src/test/kotlin/io/libp2p/etc/types/MultiBiMapTest.kt new file mode 100644 index 000000000..9bec1dfc2 --- /dev/null +++ b/src/test/kotlin/io/libp2p/etc/types/MultiBiMapTest.kt @@ -0,0 +1,190 @@ +package io.libp2p.etc.types + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow + +class MultiBiMapTest { + + val map = mutableMultiBiMap() + + @Test + fun `empty map test`() { + checkEmpty() + assertThat(map.getByFirst("any")).isEmpty() + assertThat(map.getBySecond(111)).isEmpty() + } + + private fun checkEmpty() { + assertThat(map.valuesFirst()).isEmpty() + assertThat(map.valuesSecond()).isEmpty() + assertThat(map.asFirstToSecondMap()).isEmpty() + assertThat(map.asSecondToFirstMap()).isEmpty() + } + + @Test + fun `add one test`() { + map.add("a", 1) + + assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a") + assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1) + assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(1))) + assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("a"))) + assertThat(map.getByFirst("b")).isEmpty() + assertThat(map.getByFirst("a")).isEqualTo(setOf(1)) + assertThat(map.getBySecond(2)).isEmpty() + assertThat(map.getBySecond(1)).isEqualTo(setOf("a")) + } + + @Test + fun `add-remove one test`() { + map.add("a", 1) + map.remove("a", 1) + + checkEmpty() + assertThat(map.getByFirst("a")).isEmpty() + assertThat(map.getBySecond(1)).isEmpty() + } + + @Test + fun `add-remove two test`() { + map.add("a", 1) + map.add("a", 2) + map.remove("a", 1) + map.remove("a", 2) + + checkEmpty() + assertThat(map.getByFirst("a")).isEmpty() + assertThat(map.getBySecond(1)).isEmpty() + assertThat(map.getBySecond(2)).isEmpty() + } + + @Test + fun `add two remove by first test`() { + map.add("a", 1) + map.add("a", 2) + map.removeAllByFirst("a") + + checkEmpty() + assertThat(map.getByFirst("a")).isEmpty() + assertThat(map.getBySecond(1)).isEmpty() + assertThat(map.getBySecond(2)).isEmpty() + } + + @Test + fun `add two remove by second test`() { + map.add("a", 1) + map.add("b", 1) + map.removeAllBySecond(1) + + checkEmpty() + assertThat(map.getByFirst("a")).isEmpty() + assertThat(map.getBySecond(1)).isEmpty() + assertThat(map.getBySecond(2)).isEmpty() + } + + @Test + fun `add two test`() { + map.add("a", 1) + map.add("a", 2) + + assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a") + assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1, 2) + assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(1, 2))) + assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("a"), 2 to setOf("a"))) + assertThat(map.getByFirst("b")).isEmpty() + assertThat(map.getByFirst("a")).isEqualTo(setOf(1, 2)) + assertThat(map.getBySecond(1)).isEqualTo(setOf("a")) + assertThat(map.getBySecond(2)).isEqualTo(setOf("a")) + assertThat(map.getBySecond(3)).isEmpty() + } + + @Test + fun `add two remove one test`() { + map.add("a", 1) + map.add("a", 2) + map.remove("a", 1) + + assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a") + assertThat(map.valuesSecond()).containsExactlyInAnyOrder(2) + assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(2))) + assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(2 to setOf("a"))) + assertThat(map.getByFirst("b")).isEmpty() + assertThat(map.getByFirst("a")).isEqualTo(setOf(2)) + assertThat(map.getBySecond(1)).isEmpty() + assertThat(map.getBySecond(2)).isEqualTo(setOf("a")) + } + + @Test + fun `remove missing values does nothing`() { + map.add("a", 1) + + assertDoesNotThrow { map.remove("a", 2) } + assertDoesNotThrow { map.remove("b", 1) } + assertDoesNotThrow { map.removeAllByFirst("b") } + assertDoesNotThrow { map.removeAllBySecond(2) } + + assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a") + assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1) + assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(1))) + assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("a"))) + assertThat(map.getByFirst("b")).isEmpty() + assertThat(map.getByFirst("a")).isEqualTo(setOf(1)) + assertThat(map.getBySecond(2)).isEmpty() + assertThat(map.getBySecond(1)).isEqualTo(setOf("a")) + } + + @Test + fun `add four remove one test`() { + map.add("a", 1) + map.add("a", 2) + map.add("b", 1) + map.add("b", 2) + map.remove("a", 1) + + assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a", "b") + assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1, 2) + assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(2), "b" to setOf(1, 2))) + assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("b"), 2 to setOf("a", "b"))) + assertThat(map.getByFirst("a")).isEqualTo(setOf(2)) + assertThat(map.getByFirst("b")).isEqualTo(setOf(1, 2)) + assertThat(map.getBySecond(1)).isEqualTo(setOf("b")) + assertThat(map.getBySecond(2)).isEqualTo(setOf("a", "b")) + } + + @Test + fun `add four remove all by first test`() { + map.add("a", 1) + map.add("a", 2) + map.add("b", 1) + map.add("b", 2) + map.removeAllByFirst("a") + + assertThat(map.valuesFirst()).containsExactlyInAnyOrder("b") + assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1, 2) + assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("b" to setOf(1, 2))) + assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("b"), 2 to setOf("b"))) + assertThat(map.getByFirst("a")).isEmpty() + assertThat(map.getByFirst("b")).isEqualTo(setOf(1, 2)) + assertThat(map.getBySecond(1)).isEqualTo(setOf("b")) + assertThat(map.getBySecond(2)).isEqualTo(setOf("b")) + } + + @Test + fun `add four remove all by second test`() { + map.add("a", 1) + map.add("a", 2) + map.add("b", 1) + map.add("b", 2) + map.removeAllBySecond(1) + + assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a", "b") + assertThat(map.valuesSecond()).containsExactlyInAnyOrder(2) + assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(2), "b" to setOf(2))) + assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(2 to setOf("a", "b"))) + assertThat(map.getByFirst("a")).isEqualTo(setOf(2)) + assertThat(map.getByFirst("b")).isEqualTo(setOf(2)) + assertThat(map.getBySecond(1)).isEmpty() + assertThat(map.getBySecond(2)).isEqualTo(setOf("a", "b")) + } +}