Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Gossip peers <=> topics map #251

Merged
merged 2 commits into from
Aug 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/main/kotlin/io/libp2p/etc/types/MultiBiMap.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.libp2p.etc.types

fun <TValue1, TValue2> mutableMultiBiMap(): MutableMultiBiMap<TValue1, TValue2> = HashSetMultiBiMap()

/**
* Associates values of type [TFirst] to a set of values of type [TSecond]
* and back: associates values of type [TSecond] to a set of values of type [TFirst]
*/
interface MultiBiMap<TFirst, TSecond> {

fun getByFirst(first: TFirst): Set<TSecond>
fun getBySecond(second: TSecond): Set<TFirst>

fun valuesFirst(): Set<TFirst>
fun valuesSecond(): Set<TSecond>

fun asFirstToSecondMap(): Map<TFirst, Set<TSecond>> = valuesFirst().associateWith { getByFirst(it) }
fun asSecondToFirstMap(): Map<TSecond, Set<TFirst>> = valuesSecond().associateWith { getBySecond(it) }
}

interface MutableMultiBiMap<TFirst, TSecond> : MultiBiMap<TFirst, TSecond> {

fun add(first: TFirst, second: TSecond)

fun remove(first: TFirst, second: TSecond)
fun removeAllByFirst(first: TFirst)
fun removeAllBySecond(second: TSecond)
}

internal class HashSetMultiBiMap<TFirst, TSecond> : MutableMultiBiMap<TFirst, TSecond> {
private val firstToSecondMap = mutableMapOf<TFirst, MutableSet<TSecond>>()
private val secondToFirstMap = mutableMapOf<TSecond, MutableSet<TFirst>>()

override fun getByFirst(first: TFirst): Set<TSecond> = firstToSecondMap[first] ?: emptySet()
override fun getBySecond(second: TSecond): Set<TFirst> = secondToFirstMap[second] ?: emptySet()
override fun valuesFirst(): Set<TFirst> = firstToSecondMap.keys
override fun valuesSecond(): Set<TSecond> = secondToFirstMap.keys
override fun asFirstToSecondMap(): Map<TFirst, Set<TSecond>> = firstToSecondMap
override fun asSecondToFirstMap(): Map<TSecond, Set<TFirst>> = secondToFirstMap

override fun add(first: TFirst, second: TSecond) {
firstToSecondMap.computeIfAbsent(first) { mutableSetOf() } += second
secondToFirstMap.computeIfAbsent(second) { mutableSetOf() } += first
}

private fun removeFromFirstToSecondMap(first: TFirst, second: TSecond) {
firstToSecondMap.compute(first) { _, curSecondValues ->
if (curSecondValues != null) {
curSecondValues -= second
if (curSecondValues.isNotEmpty()) {
curSecondValues
} else {
null
}
} else {
null
}
}
}

private fun removeFromSecondToFirstMap(first: TFirst, second: TSecond) {
secondToFirstMap.compute(second) { _, curFirstValues ->
if (curFirstValues != null) {
curFirstValues -= first
if (curFirstValues.isNotEmpty()) {
curFirstValues
} else {
null
}
} else {
null
}
}
}

override fun remove(first: TFirst, second: TSecond) {
removeFromFirstToSecondMap(first, second)
removeFromSecondToFirstMap(first, second)
}

override fun removeAllByFirst(first: TFirst) {
val removedSecondValues = firstToSecondMap.remove(first) ?: emptySet()
removedSecondValues.forEach { removeFromSecondToFirstMap(first, it) }
}

override fun removeAllBySecond(second: TSecond) {
val removedFirstValues = secondToFirstMap.remove(second) ?: emptySet()
removedFirstValues.forEach { removeFromFirstToSecondMap(it, second) }
}
}
32 changes: 10 additions & 22 deletions src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@ import io.libp2p.core.BadPeerException
import io.libp2p.core.PeerId
import io.libp2p.core.Stream
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.MultiSet
import io.libp2p.etc.types.completedExceptionally
import io.libp2p.etc.types.copy
import io.libp2p.etc.types.forward
import io.libp2p.etc.types.thenApplyAll
import io.libp2p.etc.types.toWBytes
import io.libp2p.etc.types.*
import io.libp2p.etc.util.P2PServiceSemiDuplex
import io.libp2p.etc.util.netty.protobuf.LimitedProtobufVarint32FrameDecoder
import io.netty.channel.ChannelHandler
Expand Down Expand Up @@ -51,7 +46,7 @@ abstract class AbstractRouter(

protected var msgHandler: PubsubMessageHandler = { throw IllegalStateException("Message handler is not initialized for PubsubRouter") }

protected open val peerTopics = MultiSet<PeerHandler, Topic>()
protected open val peersTopics = mutableMultiBiMap<PeerHandler, Topic>()
protected open val subscribedTopics = linkedSetOf<Topic>()
protected open val pendingRpcParts = PendingRpcPartsMap<RpcPartsQueue> { DefaultRpcPartsQueue() }
protected open val pendingMessagePromises = MultiSet<PeerHandler, CompletableFuture<Unit>>()
Expand Down Expand Up @@ -176,7 +171,8 @@ abstract class AbstractRouter(

try {
val subscriptions = msg.subscriptionsList.map { PubsubSubscription(it.topicid, it.subscribe) }
subscriptionFilter.filterIncomingSubscriptions(subscriptions, peerTopics[peer])
subscriptionFilter
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
.forEach { handleMessageSubscriptions(peer, it) }
} catch (e: Exception) {
logger.debug("Subscription filter error, ignoring message from peer $peer", e)
Expand Down Expand Up @@ -274,7 +270,7 @@ abstract class AbstractRouter(

override fun onPeerDisconnected(peer: PeerHandler) {
super.onPeerDisconnected(peer)
peerTopics.removeAll(peer)
peersTopics.removeAllByFirst(peer)
}

override fun onPeerWireException(peer: PeerHandler?, cause: Throwable) {
Expand All @@ -293,19 +289,15 @@ abstract class AbstractRouter(

private fun handleMessageSubscriptions(peer: PeerHandler, msg: PubsubSubscription) {
if (msg.subscribe) {
peerTopics[peer] += msg.topic
peersTopics.add(peer, msg.topic)
} else {
peerTopics[peer] -= msg.topic
peersTopics.remove(peer, msg.topic)
}
}

protected fun getTopicsPeers(topics: Collection<String>) =
activePeers.filter { topics.intersect(peerTopics[it]).isNotEmpty() }
protected fun getTopicPeers(topic: Topic) = peersTopics.getBySecond(topic)

protected fun getTopicPeers(topic: String) =
activePeers.filter { topic in peerTopics[it] }

override fun subscribe(vararg topics: String) {
override fun subscribe(vararg topics: Topic) {
runOnEventThread {
topics.forEach(::subscribe)
flushAllPending()
Expand All @@ -331,11 +323,7 @@ abstract class AbstractRouter(

override fun getPeerTopics(): CompletableFuture<Map<PeerId, Set<Topic>>> {
return submitOnEventThread {
val topicsByPeerId = hashMapOf<PeerId, Set<Topic>>()
peerTopics.forEach { entry ->
topicsByPeerId[entry.key.peerId] = HashSet(entry.value)
}
topicsByPeerId
peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId }
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/main/kotlin/io/libp2p/pubsub/flood/FloodRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class FloodRouter(executor: ScheduledExecutorService = Executors.newSingleThread
}

private fun broadcast(msg: PubsubMessage, receivedFrom: PeerHandler?): CompletableFuture<Unit> {
val sentFutures = getTopicsPeers(msg.topics)
val peers = msg.topics
.map { getTopicPeers(it) }
.reduce { p1, p2 -> p1 + p2 }
val sentFutures = peers
.filter { it != receivedFrom }
.map { submitPublishMessage(it, msg) }
return anyComplete(sentFutures)
Expand Down
190 changes: 190 additions & 0 deletions src/test/kotlin/io/libp2p/etc/types/MultiBiMapTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package io.libp2p.etc.types

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow

class MultiBiMapTest {

val map = mutableMultiBiMap<String, Int>()

@Test
fun `empty map test`() {
checkEmpty()
assertThat(map.getByFirst("any")).isEmpty()
assertThat(map.getBySecond(111)).isEmpty()
}

private fun checkEmpty() {
assertThat(map.valuesFirst()).isEmpty()
assertThat(map.valuesSecond()).isEmpty()
assertThat(map.asFirstToSecondMap()).isEmpty()
assertThat(map.asSecondToFirstMap()).isEmpty()
}

@Test
fun `add one test`() {
map.add("a", 1)

assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a")
assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1)
assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(1)))
assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("a")))
assertThat(map.getByFirst("b")).isEmpty()
assertThat(map.getByFirst("a")).isEqualTo(setOf(1))
assertThat(map.getBySecond(2)).isEmpty()
assertThat(map.getBySecond(1)).isEqualTo(setOf("a"))
}

@Test
fun `add-remove one test`() {
map.add("a", 1)
map.remove("a", 1)

checkEmpty()
assertThat(map.getByFirst("a")).isEmpty()
assertThat(map.getBySecond(1)).isEmpty()
}

@Test
fun `add-remove two test`() {
map.add("a", 1)
map.add("a", 2)
map.remove("a", 1)
map.remove("a", 2)

checkEmpty()
assertThat(map.getByFirst("a")).isEmpty()
assertThat(map.getBySecond(1)).isEmpty()
assertThat(map.getBySecond(2)).isEmpty()
}

@Test
fun `add two remove by first test`() {
map.add("a", 1)
map.add("a", 2)
map.removeAllByFirst("a")

checkEmpty()
assertThat(map.getByFirst("a")).isEmpty()
assertThat(map.getBySecond(1)).isEmpty()
assertThat(map.getBySecond(2)).isEmpty()
}

@Test
fun `add two remove by second test`() {
map.add("a", 1)
map.add("b", 1)
map.removeAllBySecond(1)

checkEmpty()
assertThat(map.getByFirst("a")).isEmpty()
assertThat(map.getBySecond(1)).isEmpty()
assertThat(map.getBySecond(2)).isEmpty()
}

@Test
fun `add two test`() {
map.add("a", 1)
map.add("a", 2)

assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a")
assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1, 2)
assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(1, 2)))
assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("a"), 2 to setOf("a")))
assertThat(map.getByFirst("b")).isEmpty()
assertThat(map.getByFirst("a")).isEqualTo(setOf(1, 2))
assertThat(map.getBySecond(1)).isEqualTo(setOf("a"))
assertThat(map.getBySecond(2)).isEqualTo(setOf("a"))
assertThat(map.getBySecond(3)).isEmpty()
}

@Test
fun `add two remove one test`() {
map.add("a", 1)
map.add("a", 2)
map.remove("a", 1)

assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a")
assertThat(map.valuesSecond()).containsExactlyInAnyOrder(2)
assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(2)))
assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(2 to setOf("a")))
assertThat(map.getByFirst("b")).isEmpty()
assertThat(map.getByFirst("a")).isEqualTo(setOf(2))
assertThat(map.getBySecond(1)).isEmpty()
assertThat(map.getBySecond(2)).isEqualTo(setOf("a"))
}

@Test
fun `remove missing values does nothing`() {
map.add("a", 1)

assertDoesNotThrow { map.remove("a", 2) }
assertDoesNotThrow { map.remove("b", 1) }
assertDoesNotThrow { map.removeAllByFirst("b") }
assertDoesNotThrow { map.removeAllBySecond(2) }

assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a")
assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1)
assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(1)))
assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("a")))
assertThat(map.getByFirst("b")).isEmpty()
assertThat(map.getByFirst("a")).isEqualTo(setOf(1))
assertThat(map.getBySecond(2)).isEmpty()
assertThat(map.getBySecond(1)).isEqualTo(setOf("a"))
}

@Test
fun `add four remove one test`() {
map.add("a", 1)
map.add("a", 2)
map.add("b", 1)
map.add("b", 2)
map.remove("a", 1)

assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a", "b")
assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1, 2)
assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(2), "b" to setOf(1, 2)))
assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("b"), 2 to setOf("a", "b")))
assertThat(map.getByFirst("a")).isEqualTo(setOf(2))
assertThat(map.getByFirst("b")).isEqualTo(setOf(1, 2))
assertThat(map.getBySecond(1)).isEqualTo(setOf("b"))
assertThat(map.getBySecond(2)).isEqualTo(setOf("a", "b"))
}

@Test
fun `add four remove all by first test`() {
map.add("a", 1)
map.add("a", 2)
map.add("b", 1)
map.add("b", 2)
map.removeAllByFirst("a")

assertThat(map.valuesFirst()).containsExactlyInAnyOrder("b")
assertThat(map.valuesSecond()).containsExactlyInAnyOrder(1, 2)
assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("b" to setOf(1, 2)))
assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(1 to setOf("b"), 2 to setOf("b")))
assertThat(map.getByFirst("a")).isEmpty()
assertThat(map.getByFirst("b")).isEqualTo(setOf(1, 2))
assertThat(map.getBySecond(1)).isEqualTo(setOf("b"))
assertThat(map.getBySecond(2)).isEqualTo(setOf("b"))
}

@Test
fun `add four remove all by second test`() {
map.add("a", 1)
map.add("a", 2)
map.add("b", 1)
map.add("b", 2)
map.removeAllBySecond(1)

assertThat(map.valuesFirst()).containsExactlyInAnyOrder("a", "b")
assertThat(map.valuesSecond()).containsExactlyInAnyOrder(2)
assertThat(map.asFirstToSecondMap()).isEqualTo(mapOf("a" to setOf(2), "b" to setOf(2)))
assertThat(map.asSecondToFirstMap()).isEqualTo(mapOf(2 to setOf("a", "b")))
assertThat(map.getByFirst("a")).isEqualTo(setOf(2))
assertThat(map.getByFirst("b")).isEqualTo(setOf(2))
assertThat(map.getBySecond(1)).isEmpty()
assertThat(map.getBySecond(2)).isEqualTo(setOf("a", "b"))
}
}