Skip to content

Commit

Permalink
Merge master to unstable (#570)
Browse files Browse the repository at this point in the history
* Revisit Floodsub (#543)

Fixes #525

add coverage to unsubscribeAll and testing

* add mounted protos to identify message (#546)

* add stable/unstable auto bumps

* fix auto-bump CI

* merge nbc auto bump with CI in order to bump only on CI success

* put conditional locks on nbc bump (#549)

* Fix minor exception issues (#550)

Makes code compatible with
status-im/nim-chronos#166 without requiring it.

* fix nimbus ref for auto-bump stable's PR

* use a builder pattern to build the switch (#551)

* use a builder pattern to build the switch

* with with

* more refs

* builders (#559)

* More builders (#560)

* address some issues pointed out in review

* re-add to prevent breaking other projects

* mem usage cleanups for pubsub (#564)

In `async` functions, a closure environment is created for variables
that cross an await boundary - this closure environment is kept in
memory for the lifetime of the associated future - this means that
although _some_ variables are no longer used, they still take up memory
for a long time.

In Nimbus, message validation is processed in batches meaning the future
of an incoming gossip message stays around for quite a while - this
leads to memory consumption peaks of 100-200 mb when there are many
attestations in the pipeline.

To avoid excessive memory usage, it's generally better to move non-async
code into proc's such that the variables therein can be released earlier
- this includes the many hidden variables introduced by macro and
template expansion (ie chronicles that does expensive exception
handling)

* move seen table salt to floodsub, use there as well
* shorten seen table salt to size of hash
* avoid unnecessary memory allocations and copies in a few places
* factor out message scoring
* avoid reencoding outgoing message for every peer
* keep checking validators until reject (in case there's both reject and
ignore)
* `readOnce` avoids `readExactly` overhead for single-byte read
* genericAssign -> assign2

* More gossip coverage (#553)

* add floodPublish test

* test delivery via control Iwant/have mechanics

* fix issues in control, and add testing

* fix possible backoff issue with pruned routine overriding it

* fix control messages (#566)

* remove unused control graft check in handleControl

* avoid sending empty Iwant messages

* Split dialer (#542)

* extracting dialing logic to dialer

* exposing upgrade methods on transport

* cleanup

* fixing tests to use new interfaces

* add comments

* add base exception class and fix hierarchy

* fix imports

* Merge master (#555)

* Revisit Floodsub (#543)

Fixes #525

add coverage to unsubscribeAll and testing

* add mounted protos to identify message (#546)

* add stable/unstable auto bumps

* fix auto-bump CI

* merge nbc auto bump with CI in order to bump only on CI success

* put conditional locks on nbc bump (#549)

* Fix minor exception issues (#550)

Makes code compatible with
status-im/nim-chronos#166 without requiring it.

* fix nimbus ref for auto-bump stable's PR

* Split dialer (#542)

* extracting dialing logic to dialer

* exposing upgrade methods on transport

* cleanup

* fixing tests to use new interfaces

* add comments

* add base exception class and fix hierarchy

* fix imports

* `doAssert` is `ValueError` not `AssertionError`?

* revert back to `AssertionError`

Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com>
Co-authored-by: Jacek Sieka <jacek@status.im>

* Builders (#558)

* use a builder pattern to build the switch (#551)

* use a builder pattern to build the switch

* with with

* more refs

* Merge master (#555)

* Revisit Floodsub (#543)

Fixes #525

add coverage to unsubscribeAll and testing

* add mounted protos to identify message (#546)

* add stable/unstable auto bumps

* fix auto-bump CI

* merge nbc auto bump with CI in order to bump only on CI success

* put conditional locks on nbc bump (#549)

* Fix minor exception issues (#550)

Makes code compatible with
status-im/nim-chronos#166 without requiring it.

* fix nimbus ref for auto-bump stable's PR

* Split dialer (#542)

* extracting dialing logic to dialer

* exposing upgrade methods on transport

* cleanup

* fixing tests to use new interfaces

* add comments

* add base exception class and fix hierarchy

* fix imports

* `doAssert` is `ValueError` not `AssertionError`?

* revert back to `AssertionError`

Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com>
Co-authored-by: Jacek Sieka <jacek@status.im>

* `doAssert` is `ValueError` not `AssertionError`?

* revert back to `AssertionError`

* fix builders

* more builder stuff

* more builders

Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com>
Co-authored-by: Jacek Sieka <jacek@status.im>

Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com>
Co-authored-by: Jacek Sieka <jacek@status.im>
  • Loading branch information
3 people authored May 6, 2021
1 parent bc72594 commit 0e9f72c
Show file tree
Hide file tree
Showing 14 changed files with 641 additions and 368 deletions.
30 changes: 23 additions & 7 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.

import std/[sequtils, sets, tables]
import chronos, chronicles, metrics
import std/[sequtils, sets, hashes, tables]
import chronos, chronicles, metrics, bearssl
import ./pubsub,
./pubsubpeer,
./timedcache,
Expand All @@ -27,7 +27,17 @@ const FloodSubCodec* = "/floodsub/1.0.0"
type
FloodSub* = ref object of PubSub
floodsub*: PeerTable # topic to remote peer map
seen*: TimedCache[MessageID] # list of messages forwarded to peers
seen*: TimedCache[MessageID] # message id:s already seen on the network
seenSalt*: seq[byte]

proc hasSeen*(f: FloodSub, msgId: MessageID): bool =
f.seenSalt & msgId in f.seen

proc addSeen*(f: FloodSub, msgId: MessageID): bool =
# Salting the seen hash helps avoid attacks against the hash function used
# in the nim hash table
# Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId)

method subscribeTopic*(f: FloodSub,
topic: string,
Expand Down Expand Up @@ -88,7 +98,7 @@ method rpcHandler*(f: FloodSub,
for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg)

if f.seen.put(msgId):
if f.addSeen(msgId):
trace "Dropping already-seen message", msgId, peer
continue

Expand Down Expand Up @@ -118,13 +128,15 @@ method rpcHandler*(f: FloodSub,

var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t notin f.topics:
continue
f.floodsub.withValue(t, peers): toSendPeers.incl(peers[])

await handleData(f, t, msg.data)

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg]))
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "Forwared message to peers", peers = toSendPeers.len

method init*(f: FloodSub) =
Expand Down Expand Up @@ -157,7 +169,7 @@ method publish*(f: FloodSub,
debug "Empty topic, skipping publish", topic
return 0

let peers = toSeq(f.floodsub.getOrDefault(topic))
let peers = f.floodsub.getOrDefault(topic)

if peers.len == 0:
debug "No peers for topic, skipping publish", topic
Expand All @@ -175,7 +187,7 @@ method publish*(f: FloodSub,
trace "Created new message",
msg = shortLog(msg), peers = peers.len, topic, msgId

if f.seen.put(msgId):
if f.addSeen(msgId):
# custom msgid providers might cause this
trace "Dropping already-seen message", msgId, topic
return 0
Expand Down Expand Up @@ -206,4 +218,8 @@ method unsubscribeAll*(f: FloodSub, topic: string) =
method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes)
var rng = newRng()
f.seenSalt = newSeqUninitialized[byte](sizeof(Hash))
brHmacDrbgGenerate(rng[], f.seenSalt)

f.init()
162 changes: 59 additions & 103 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
## those terms.

import std/[tables, sets, options, sequtils, random]
import chronos, chronicles, metrics, bearssl
import chronos, chronicles, metrics
import ./pubsub,
./floodsub,
./pubsubpeer,
Expand Down Expand Up @@ -98,27 +98,6 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
else:
ok()

proc init*(_: type[TopicParams]): TopicParams =
TopicParams(
topicWeight: 0.0, # disabled by default
timeInMeshWeight: 0.01,
timeInMeshQuantum: 1.seconds,
timeInMeshCap: 10.0,
firstMessageDeliveriesWeight: 1.0,
firstMessageDeliveriesDecay: 0.5,
firstMessageDeliveriesCap: 10.0,
meshMessageDeliveriesWeight: -1.0,
meshMessageDeliveriesDecay: 0.5,
meshMessageDeliveriesCap: 10,
meshMessageDeliveriesThreshold: 1,
meshMessageDeliveriesWindow: 5.milliseconds,
meshMessageDeliveriesActivation: 10.seconds,
meshFailurePenaltyWeight: -1.0,
meshFailurePenaltyDecay: 0.5,
invalidMessageDeliveriesWeight: -1.0,
invalidMessageDeliveriesDecay: 0.5
)

proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0:
err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value")
Expand Down Expand Up @@ -262,6 +241,44 @@ method subscribeTopic*(g: GossipSub,

trace "gossip peers", peers = g.gossipsub.peers(topic), topic

proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) =
if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)

var respControl: ControlMessage
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages

for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))

method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
Expand All @@ -283,26 +300,12 @@ method rpcHandler*(g: GossipSub,
# avoid the remote peer from controlling the seen table hashing
# by adding random bytes to the ID we ensure we randomize the IDs
# we do only for seen as this is the great filter from the external world
if g.seen.put(msgId & g.randomBytes):
if g.addSeen(msgId):
trace "Dropping already-seen message", msgId = shortLog(msgId), peer

# make sure to update score tho before continuing
for t in msg.topicIDs:
if t notin g.topics:
continue
# for every topic in the message
let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())
# if in mesh add more delivery score
g.withPeerStats(peer.peerId) do (stats: var PeerStats):
stats.topicInfos.withValue(t, tstats):
if tstats[].inMesh:
# TODO: take into account meshMessageDeliveriesWindow
# score only if messages are not too old.
tstats[].meshMessageDeliveries += 1
if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information
stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1)
# TODO: take into account meshMessageDeliveriesWindow
# score only if messages are not too old.
g.rewardDelivered(peer, msg.topicIDs, false)

# onto the next message
continue
Expand Down Expand Up @@ -346,73 +349,29 @@ method rpcHandler*(g: GossipSub,
# store in cache only after validation
g.mcache.put(msgId, msg)

g.rewardDelivered(peer, msg.topicIDs, true)

var toSendPeers = initHashSet[PubSubPeer]()
for t in msg.topicIDs: # for every topic in the message
if t notin g.topics:
continue

let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init())

g.withPeerStats(peer.peerId) do(stats: var PeerStats):
stats.topicInfos.withValue(t, tstats):
# contribute to peer score first delivery
tstats[].firstMessageDeliveries += 1
if tstats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap:
tstats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap

# if in mesh add more delivery score
if tstats[].inMesh:
tstats[].meshMessageDeliveries += 1
if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap:
tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap
do: # make sure we don't loose this information
stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1)

g.floodsub.withValue(t, peers): toSendPeers.incl(peers[])
g.mesh.withValue(t, peers): toSendPeers.incl(peers[])

await handleData(g, t, msg.data)

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
let sendingTo = toSeq(toSendPeers)
g.broadcast(sendingTo, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = sendingTo.len, msgId, peer
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "forwared message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = [topic])
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic])
else:
libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = ["generic"])

if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])

var respControl: ControlMessage
respControl.iwant.add(g.handleIHave(peer, control.ihave))
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if respControl.graft.len > 0 or respControl.prune.len > 0 or
respControl.ihave.len > 0 or messages.len > 0:
# iwant and prunes from here, also messages

for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
g.handleControl(peer, rpcMsg)

method subscribe*(g: GossipSub,
topic: string,
Expand All @@ -437,7 +396,7 @@ proc unsubscribe*(g: GossipSub, topic: string) =
# remove mesh peers from gpeers, we send 2 different messages
gpeers = gpeers - mpeers
# send to peers NOT in mesh first
g.broadcast(toSeq(gpeers), msg)
g.broadcast(gpeers, msg)

for peer in mpeers:
trace "pruning unsubscribeAll call peer", peer, score = peer.score
Expand All @@ -452,9 +411,9 @@ proc unsubscribe*(g: GossipSub, topic: string) =
backoff: g.parameters.pruneBackoff.seconds.uint64)]))

# send to peers IN mesh now
g.broadcast(toSeq(mpeers), msg)
g.broadcast(mpeers, msg)
else:
g.broadcast(toSeq(gpeers), msg)
g.broadcast(gpeers, msg)

g.topicParams.del(topic)

Expand Down Expand Up @@ -540,21 +499,21 @@ method publish*(g: GossipSub,

trace "Created new message", msg = shortLog(msg), peers = peers.len

if g.seen.put(msgId & g.randomBytes):
if g.addSeen(msgId):
# custom msgid providers might cause this
trace "Dropping already-seen message"
return 0

g.mcache.put(msgId, msg)

let peerSeq = toSeq(peers)
g.broadcast(peerSeq, RPCMsg(messages: @[msg]))
g.broadcast(peers, RPCMsg(messages: @[msg]))

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = [topic])
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
else:
libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = ["generic"])
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"])

trace "Published message to peers"
trace "Published message to peers", peers=peers.len

return peers.len

Expand Down Expand Up @@ -618,6 +577,3 @@ method initPubSub*(g: GossipSub) =

# init gossip stuff
g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength)
var rng = newRng()
g.randomBytes = newSeqUninitialized[byte](32)
brHmacDrbgGenerate(rng[], g.randomBytes)
Loading

0 comments on commit 0e9f72c

Please sign in to comment.