Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(networking): disconnect due to colocation ip in conn handler #1821

Merged
merged 1 commit into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -743,29 +743,28 @@ procSuite "Peer Manager":

let pInfos = nodes.mapIt(it.switch.peerInfo.toRemotePeerInfo())

# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1

# 2 in connections
discard await nodes[1].peerManager.connectRelay(pInfos[0])
discard await nodes[2].peerManager.connectRelay(pInfos[0])

# but one is pruned
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# 2 out connections
discard await nodes[0].peerManager.connectRelay(pInfos[3])
discard await nodes[0].peerManager.connectRelay(pInfos[4])

# force max 1 conn per ip
nodes[0].peerManager.colocationLimit = 1
nodes[0].peerManager.updateIpTable()
# they are also prunned 
check nodes[0].peerManager.switch.connManager.getConnections().len == 1

# table is updated and we have 4 conns (2in 2out)
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 4
nodes[0].peerManager.switch.connManager.getConnections().len == 4
nodes[0].peerManager.peerStore.peers().len == 4

await nodes[0].peerManager.pruneConnsByIp()

# peers are pruned, max 1 conn per ip
nodes[0].peerManager.updateIpTable()
# we should have 4 peers (2in/2out) but due to collocation limit
# they are pruned to max 1
check:
nodes[0].peerManager.ipTable["127.0.0.1"].len == 1
nodes[0].peerManager.switch.connManager.getConnections().len == 1
nodes[0].peerManager.peerStore.peers().len == 1

await allFutures(nodes.mapIt(it.stop()))
83 changes: 34 additions & 49 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ const
# How often metrics and logs are shown/updated
LogAndMetricsInterval = chronos.seconds(60)

# Prune by ip interval
PruneByIpInterval = chronos.seconds(30)

# Max peers that we allow from the same IP
ColocationLimit = 5

Expand Down Expand Up @@ -285,6 +282,18 @@ proc canBeConnected*(pm: PeerManager,
# Initialisation #
##################

proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
if pm.switch.connManager.getConnections().hasKey(peerId):
let conns = pm.switch.connManager.getConnections().getOrDefault(peerId)
if conns.len != 0:
let observedAddr = conns[0].connection.observedAddr
let ip = observedAddr.get.getHostname()
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
return some(ip)
return none(string)

# called when a connection i) is created or ii) is closed
proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
case event.kind
Expand All @@ -302,36 +311,36 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected

let ip = pm.getPeerIp(peerId)
if ip.isSome:
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)

let peersBehindIp = pm.ipTable[ip.get]
if peersBehindIp.len > pm.colocationLimit:
# in theory this should always be one, but just in case
for peerId in peersBehindIp[0..<(peersBehindIp.len - pm.colocationLimit)]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)

elif event.kind == PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect

# note we cant access the peerId ip here as the connection was already closed
for ip, peerIds in pm.ipTable.pairs:
if peerIds.contains(peerId):
pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId)
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break

pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)

proc updateIpTable*(pm: PeerManager) =
# clean table
pm.ipTable = initTable[string, seq[PeerId]]()

# populate ip->peerIds from existing out/in connections
for peerId, conn in pm.switch.connManager.getConnections():
if conn.len == 0:
continue

# we may want to enable it only in inbound peers
#if conn[0].connection.transportDir != In:
# continue

# assumes just one physical connection per peer
let observedAddr = conn[0].connection.observedAddr
if observedAddr.isSome:
# TODO: think if circuit relay ips should be handled differently
let ip = observedAddr.get.getHostname()
pm.ipTable.mgetOrPut(ip, newSeq[PeerId]()).add(peerId)


proc new*(T: type PeerManager,
switch: Switch,
maxRelayPeers: int = 50,
Expand Down Expand Up @@ -556,24 +565,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =
let connsToPrune = min(amount, inRelayPeers.len)

for p in inRelayPeers[0..<connsToPrune]:
await pm.switch.disconnect(p)

proc pruneConnsByIp*(pm: PeerManager) {.async.} =
## prunes connections based on ip colocation, allowing no more
## than ColocationLimit inbound connections from same ip
##

# update the table tracking ip and the connected peers
pm.updateIpTable()

# trigger disconnections based on colocationLimit
for ip, peersInIp in pm.ipTable.pairs:
if peersInIp.len > pm.colocationLimit:
let connsToPrune = peersInIp.len - pm.colocationLimit
for peerId in peersInIp[0..<connsToPrune]:
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
await pm.switch.disconnect(peerId)
pm.peerStore.delete(peerId)
asyncSpawn(pm.switch.disconnect(p))

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
Expand Down Expand Up @@ -658,12 +650,6 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

proc pruneConnsByIpLoop(pm: PeerManager) {.async.} =
debug "Starting prune peer by ip loop"
while pm.started:
await pm.pruneConnsByIp()
await sleepAsync(PruneByIpInterval)

# Prunes peers from peerstore to remove old/stale ones
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
debug "Starting prune peerstore loop"
Expand Down Expand Up @@ -709,7 +695,6 @@ proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()
asyncSpawn pm.pruneConnsByIpLoop()
asyncSpawn pm.logAndMetrics()

proc stop*(pm: PeerManager) =
Expand Down