Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize GossipScore IP colocation calculation #248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.libp2p.pubsub.gossip

import io.libp2p.core.InternalErrorException
import io.libp2p.core.PeerId
import io.libp2p.core.multiformats.Multiaddr
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.*
import io.libp2p.etc.util.P2PService
Expand Down Expand Up @@ -57,6 +58,7 @@ const val MaxIWantRequestsEntries = 10 * 1024

typealias CurrentTimeSupplier = () -> Long

fun P2PService.PeerHandler.getRemoteAddress(): Multiaddr = streamHandler.stream.connection.remoteAddress()
fun P2PService.PeerHandler.isOutbound() = streamHandler.stream.connection.isInitiator

fun P2PService.PeerHandler.getPeerProtocol(): PubsubProtocol {
Expand Down Expand Up @@ -156,7 +158,7 @@ open class GossipRouter(

override fun onPeerActive(peer: PeerHandler) {
super.onPeerActive(peer)
eventBroadcaster.notifyConnected(peer.peerId, peer.getIP())
eventBroadcaster.notifyConnected(peer.peerId, peer.getRemoteAddress())
heartbeatTask.hashCode() // force lazy initialization
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.libp2p.pubsub.gossip

import io.libp2p.core.PeerId
import io.libp2p.core.multiformats.Multiaddr
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.Topic
Expand All @@ -11,7 +12,7 @@ interface GossipRouterEventListener {

fun notifyDisconnected(peerId: PeerId)

fun notifyConnected(peerId: PeerId, ipAddress: String?)
fun notifyConnected(peerId: PeerId, peerAddress: Multiaddr)

fun notifyUnseenMessage(peerId: PeerId, msg: PubsubMessage)

Expand All @@ -35,8 +36,8 @@ class GossipRouterEventBroadcaster : GossipRouterEventListener {
listeners.forEach { it.notifyDisconnected(peerId) }
}

override fun notifyConnected(peerId: PeerId, ipAddress: String?) {
listeners.forEach { it.notifyConnected(peerId, ipAddress) }
override fun notifyConnected(peerId: PeerId, peerAddress: Multiaddr) {
listeners.forEach { it.notifyConnected(peerId, peerAddress) }
}

override fun notifyUnseenMessage(peerId: PeerId, msg: PubsubMessage) {
Expand Down
63 changes: 47 additions & 16 deletions src/main/kotlin/io/libp2p/pubsub/gossip/GossipScore.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package io.libp2p.pubsub.gossip

import com.google.common.annotations.VisibleForTesting
import io.libp2p.core.PeerId
import io.libp2p.core.multiformats.Multiaddr
import io.libp2p.core.multiformats.Protocol
import io.libp2p.core.pubsub.ValidationResult
import io.libp2p.etc.types.cappedDouble
import io.libp2p.etc.types.createLRUMap
import io.libp2p.etc.types.millis
import io.libp2p.etc.util.P2PService
import io.libp2p.pubsub.PubsubMessage
import io.libp2p.pubsub.Topic
import java.util.Optional
Expand All @@ -19,9 +19,6 @@ import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow

fun P2PService.PeerHandler.getIP(): String? =
streamHandler.stream.connection.remoteAddress().getFirstComponent(Protocol.IP4)?.stringValue

interface GossipScore {

fun updateTopicParams(topicScoreParams: Map<String, GossipTopicScoreParams>)
Expand All @@ -37,6 +34,13 @@ class DefaultGossipScore(
val curTimeMillis: () -> Long
) : GossipScore, GossipRouterEventListener {

data class PeerIP(val ip4String: String) {
companion object {
fun fromMultiaddr(multiaddr: Multiaddr): PeerIP? =
multiaddr.getFirstComponent(Protocol.IP4)?.stringValue?.let { PeerIP(it) }
}
}

inner class TopicScores(val topic: Topic) {
private val params: GossipTopicScoreParams
get() = topicParams[topic]
Expand Down Expand Up @@ -129,7 +133,6 @@ class DefaultGossipScore(
@Volatile
var cachedScore: Double = 0.0

val ips = mutableSetOf<String>()
var connectedTimeMillis: Long = 0
var disconnectedTimeMillis: Long = 0

Expand All @@ -147,9 +150,11 @@ class DefaultGossipScore(
val topicParams = params.topicsScoreParams

private val validationTime: MutableMap<PubsubMessage, Long> = createLRUMap(1024)

@VisibleForTesting
val peerScores = ConcurrentHashMap<PeerId, PeerScores>()
private val activePeerIP = mutableMapOf<PeerId, String>()
private val peerIdToIP = mutableMapOf<PeerId, PeerIP>()
private val peerIPToId = PeerColocations()

private val refreshTask: ScheduledFuture<*>

Expand All @@ -161,7 +166,7 @@ class DefaultGossipScore(
private fun getPeerScores(peerId: PeerId) =
peerScores.computeIfAbsent(peerId) { PeerScores() }

private fun getPeerIp(peerId: PeerId): String? = activePeerIP[peerId]
private fun getPeerIp(peerId: PeerId): PeerIP? = peerIdToIP[peerId]

private fun getTopicScores(peerId: PeerId, topic: Topic) =
getPeerScores(peerId).topicScores.computeIfAbsent(topic) { TopicScores(it) }
Expand All @@ -184,10 +189,7 @@ class DefaultGossipScore(
)
val appScore = peerParams.appSpecificScore(peerId) * peerParams.appSpecificWeight

val peersInIp: Int = getPeerIp(peerId)?.let { thisIp ->
if (peerParams.ipWhitelisted(thisIp)) 0 else
peerScores.values.count { thisIp in it.ips }
} ?: 0
val peersInIp: Int = getPeerIp(peerId)?.let { peerIPToId.getPeerCountForIp(it) } ?: 0
val ipColocationPenalty = max(
0,
(peersInIp - peerParams.ipColocationFactorThreshold)
Expand All @@ -205,7 +207,18 @@ class DefaultGossipScore(

@VisibleForTesting
fun refreshScores() {
peerScores.values.removeIf { it.isDisconnected() && it.getDisconnectDuration() > peerParams.retainScore }
val peersToBury = peerScores
.filterValues {
it.isDisconnected() && it.getDisconnectDuration() > peerParams.retainScore
}
.keys
peersToBury.forEach { peerId ->
peerIdToIP.remove(peerId)?.also { peerIp ->
peerIPToId.remove(peerId, peerIp)
}
}
peerScores -= peersToBury

peerScores.values.forEach {
it.topicScores.values.forEach { it.decayScores() }
it.behaviorPenalty *= peerParams.behaviourPenaltyDecay
Expand All @@ -222,16 +235,20 @@ class DefaultGossipScore(
}

getPeerScores(peerId).disconnectedTimeMillis = curTimeMillis()
activePeerIP -= peerId
}

override fun notifyConnected(peerId: PeerId, ipAddress: String?) {
override fun notifyConnected(peerId: PeerId, peerAddress: Multiaddr) {
val ipAddress = PeerIP.fromMultiaddr(peerAddress)
ipAddress?.also { peerIp ->
activePeerIP[peerId] = peerIp
val maybePeerIP = peerIdToIP[peerId]
maybePeerIP?.also {
peerIPToId.remove(peerId, peerIp)
}
peerIdToIP[peerId] = peerIp
}
getPeerScores(peerId).apply {
connectedTimeMillis = curTimeMillis()
getPeerIp(peerId)?.also { ips += it }
getPeerIp(peerId)?.also { peerIPToId.add(peerId, it) }
}
}

Expand Down Expand Up @@ -286,4 +303,18 @@ class DefaultGossipScore(
fun stop() {
refreshTask.cancel(false)
}

internal class PeerColocations {
private val colocatedPeers = mutableMapOf<PeerIP, MutableSet<PeerId>>()

fun add(peerId: PeerId, peerIp: PeerIP) {
colocatedPeers.computeIfAbsent(peerIp) { mutableSetOf() } += peerId
}

fun remove(peerId: PeerId, peerIp: PeerIP) {
colocatedPeers[peerIp]?.also { it -= peerId }
}

fun getPeerCountForIp(ip: PeerIP) = colocatedPeers[ip]?.size ?: 0
}
}
17 changes: 10 additions & 7 deletions src/test/kotlin/io/libp2p/pubsub/gossip/DefaultGossipScoreTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.libp2p.pubsub.gossip

import com.google.protobuf.ByteString
import io.libp2p.core.PeerId
import io.libp2p.core.multiformats.Multiaddr
import io.libp2p.etc.types.hours
import io.libp2p.etc.types.millis
import io.libp2p.etc.types.minutes
Expand Down Expand Up @@ -732,6 +733,8 @@ class DefaultGossipScoreTest {
@Test
fun `test IP colocation penalty`() {

val addr1 = Multiaddr.fromString("/ip4/0.0.0.1")
val addr2 = Multiaddr.fromString("/ip4/0.0.0.2")
val peer1 = PeerId.random()
val peer2 = PeerId.random()
val peer3 = PeerId.random()
Expand All @@ -756,24 +759,24 @@ class DefaultGossipScoreTest {

// Check initial value
val score = DefaultGossipScore(scoreParams, executor, { timeController.time })
score.notifyConnected(peer1, "0.0.0.1")
score.notifyConnected(peer2, "0.0.0.1")
score.notifyConnected(peer1, addr1)
score.notifyConnected(peer2, addr1)
assertEquals(0.0, score.score(peer1))
assertEquals(0.0, score.score(peer2))

score.notifyConnected(peer3, "0.0.0.2")
score.notifyConnected(peer3, addr2)
assertEquals(0.0, score.score(peer1))
assertEquals(0.0, score.score(peer2))
assertEquals(0.0, score.score(peer3))

score.notifyConnected(peer4, "0.0.0.1")
score.notifyConnected(peer4, addr1)

assertEquals(-1.0, score.score(peer1))
assertEquals(-1.0, score.score(peer2))
assertEquals(0.0, score.score(peer3))
assertEquals(-1.0, score.score(peer4))

score.notifyConnected(peer5, "0.0.0.1")
score.notifyConnected(peer5, addr1)

assertEquals(-4.0, score.score(peer1))
assertEquals(-4.0, score.score(peer2))
Expand Down Expand Up @@ -809,7 +812,7 @@ class DefaultGossipScoreTest {
assertEquals(0.0, score.score(peer4))
assertEquals(0.0, score.score(peer5))

score.notifyConnected(peer1, "0.0.0.1")
score.notifyConnected(peer1, addr1)

assertEquals(-1.0, score.score(peer1))
assertEquals(0.0, score.score(peer3))
Expand All @@ -829,7 +832,7 @@ class DefaultGossipScoreTest {
val peerId = PeerId.random()
val peer = mockk<P2PService.PeerHandler>()
every { peer.peerId } returns peerId
every { peer.getIP() } returns "127.0.0.1"
every { peer.getRemoteAddress() } returns Multiaddr.fromString("/ip4/127.0.0.1")
return peer
}
}