Skip to content

Commit

Permalink
feat(networking): integrate gossipsub scoring (#1769)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta authored Jun 6, 2023
1 parent 44bcf0f commit 34a9263
Show file tree
Hide file tree
Showing 17 changed files with 296 additions and 248 deletions.
11 changes: 3 additions & 8 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -564,16 +564,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =

# Subscribe to a topic, if relay is mounted
if conf.relay:
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
trace "Hit subscribe handler", topic

let decoded = WakuMessage.decode(data)

if decoded.isOk():
if decoded.get().contentTopic == chat.contentTopic:
chat.printReceivedMessage(decoded.get())
else:
trace "Invalid encoded WakuMessage", error = decoded.error
if msg.contentTopic == chat.contentTopic:
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
node.subscribe(topic, handler)
Expand Down
11 changes: 5 additions & 6 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,14 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =

# Always mount relay for bridge
# `triggerSelf` is false on a `bridge` to avoid duplicates
await cmb.nodev2.mountRelay(triggerSelf = false)
await cmb.nodev2.mountRelay()
cmb.nodev2.wakuRelay.triggerSelf = false

# Bridging
# Handle messages on Waku v2 and bridge to Matterbridge
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe, raises: [Defect].} =
let msg = WakuMessage.decode(data)
if msg.isOk():
trace "Bridging message from Chat2 to Matterbridge", msg=msg[]
cmb.toMatterbridge(msg[])
proc relayHandler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg)

cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)

Expand Down
15 changes: 5 additions & 10 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,8 @@ proc subscribeAndHandleMessages(node: WakuNode,
msgPerContentTopic: ContentTopicMessageTableRef) =

# handle function
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let messageRes = WakuMessage.decode(data)
if messageRes.isErr():
warn "could not decode message", data=data, pubsubTopic=pubsubTopic

let message = messageRes.get()
trace "rx message", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
trace "rx message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic

# If we reach a table limit size, remove c topics with the least messages.
let tableSize = 100
Expand All @@ -322,10 +317,10 @@ proc subscribeAndHandleMessages(node: WakuNode,

# TODO: Will overflow at some point
# +1 if content topic existed, init to 1 otherwise
if msgPerContentTopic.hasKey(message.contentTopic):
msgPerContentTopic[message.contentTopic] += 1
if msgPerContentTopic.hasKey(msg.contentTopic):
msgPerContentTopic[msg.contentTopic] += 1
else:
msgPerContentTopic[message.contentTopic] = 1
msgPerContentTopic[msg.contentTopic] = 1

node.subscribe(pubsubTopic, handler)

Expand Down
12 changes: 6 additions & 6 deletions apps/wakubridge/wakubridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ proc start*(bridge: WakuBridge) {.async.} =

# Always mount relay for bridge.
# `triggerSelf` is false on a `bridge` to avoid duplicates
await bridge.nodev2.mountRelay(triggerSelf = false)
await bridge.nodev2.mountRelay()
bridge.nodev2.wakuRelay.triggerSelf = false

# Bridging
# Handle messages on Waku v1 and bridge to Waku v2
Expand All @@ -275,12 +276,11 @@ proc start*(bridge: WakuBridge) {.async.} =
bridge.nodev1.registerEnvReceivedHandler(handleEnvReceived)

# Handle messages on Waku v2 and bridge to Waku v1
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk() and msg.get().isBridgeable():
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
if msg.isBridgeable():
try:
trace "Bridging message from V2 to V1", msg=msg.tryGet()
bridge.toWakuV1(msg.tryGet())
trace "Bridging message from V2 to V1", msg=msg
bridge.toWakuV1(msg)
except ValueError:
trace "Failed to convert message to Waku v1. Check content-topic format.", msg=msg
waku_bridge_dropped.inc(labelValues = ["value_error"])
Expand Down
11 changes: 5 additions & 6 deletions examples/v2/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
# any content topic can be chosen. make sure it matches the publisher
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")

proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.decode(data).value
let payloadStr = string.fromBytes(message.payload)
if message.contentTopic == contentTopic:
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let payloadStr = string.fromBytes(msg.payload)
if msg.contentTopic == contentTopic:
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp
contentTopic=msg.contentTopic,
timestamp=msg.timestamp
node.subscribe(pubSubTopic, handler)

when isMainModule:
Expand Down
13 changes: 5 additions & 8 deletions tests/v2/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,11 @@ suite "WakuNode":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node2.subscribe(pubSubTopic, relayHandler)
Expand Down
5 changes: 2 additions & 3 deletions tests/v2/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ suite "WakuNode - Lightpush":
let message = fakeWakuMessage()

var completionFutRelay = newFuture[bool]()
proc relayHandler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data).get()
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
pubsubTopic == DefaultPubsubTopic
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler)
Expand Down
12 changes: 6 additions & 6 deletions tests/v2/waku_relay/test_waku_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import
../testlib/wakucore


proc noopRawHandler(): PubsubRawHandler =
var handler: PubsubRawHandler
handler = proc(pubsubTopic: PubsubTopic, data: seq[byte]): Future[void] {.gcsafe, noSideEffect.} = discard
proc noopRawHandler(): WakuRelayHandler =
var handler: WakuRelayHandler
handler = proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = discard
handler


proc newTestWakuRelay(switch = newTestSwitch(), self = true): Future[WakuRelay] {.async.} =
let proto = WakuRelay.new(switch, triggerSelf = self).tryGet()
proc newTestWakuRelay(switch = newTestSwitch()): Future[WakuRelay] {.async.} =
let proto = WakuRelay.new(switch).tryGet()
await proto.start()

let protocolMatcher = proc(proto: string): bool {.gcsafe.} =
Expand Down Expand Up @@ -85,7 +85,7 @@ suite "Waku Relay":
topics.contains(networkC)

## When
nodeA.unsubscribeAll(networkA)
nodeA.unsubscribe(networkA)

## Then
check:
Expand Down
144 changes: 86 additions & 58 deletions tests/v2/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ suite "WakuNode - Relay":
)

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node3.subscribe(pubSubTopic, relayHandler)
Expand Down Expand Up @@ -182,19 +179,14 @@ suite "WakuNode - Relay":
node2.wakuRelay.addValidator(pubSubTopic, validator)

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "relayed pubsub topic:", topic
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
val.contentTopic == contentTopic1
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
msg.contentTopic == contentTopic1
# relay handler is called
completionFut.complete(true)


node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(500.millis)

Expand Down Expand Up @@ -269,14 +261,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
Expand Down Expand Up @@ -313,14 +302,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
Expand Down Expand Up @@ -361,14 +347,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
Expand Down Expand Up @@ -404,14 +387,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
Expand Down Expand Up @@ -447,14 +427,11 @@ suite "WakuNode - Relay":
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.decode(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)

node1.subscribe(pubSubTopic, relayHandler)
Expand All @@ -468,3 +445,54 @@ suite "WakuNode - Relay":
(await completionFut.withTimeout(5.seconds)) == true
await node1.stop()
await node2.stop()

asyncTest "Bad peers with low reputation are disconnected":
# Create 5 nodes
let nodes = toSeq(0..<5).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))

# subscribe all nodes to a topic
let topic = "topic"
for node in nodes: node.wakuRelay.subscribe(topic, nil)
await sleepAsync(500.millis)

# connect nodes in full mesh
for i in 0..<5:
for j in 0..<5:
if i == j:
continue
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
require connOk

# connection triggers different actions, wait for them
await sleepAsync(1.seconds)

# all peers are connected in a mesh, 4 conns each
for i in 0..<5:
check:
nodes[i].peerManager.switch.connManager.getConnections().len == 4

# node[0] publishes wrong messages (random bytes not decoding into WakuMessage)
for j in 0..<50:
discard await nodes[0].wakuRelay.publish(topic, urandom(1*(10^3)))

# long wait, must be higher than the configured decayInterval (how often score is updated)
await sleepAsync(20.seconds)

# all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes)
for i in 1..<5:
check:
nodes[i].wakuRelay.peerStats[nodes[0].switch.peerInfo.peerId].score == -249999.9

# nodes[0] was blacklisted from all other peers, no connections
check:
nodes[0].peerManager.switch.connManager.getConnections().len == 0

# the rest of the nodes now have 1 conn less (kicked nodes[0] out)
for i in 1..<5:
check:
nodes[i].peerManager.switch.connManager.getConnections().len == 3

# Stop all nodes
await allFutures(nodes.mapIt(it.stop()))
Loading

0 comments on commit 34a9263

Please sign in to comment.