From 0e9f72c6b9aa62e89e9965be1a6f31ac61217afb Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 6 May 2021 14:41:49 -0600 Subject: [PATCH] Merge master to unstable (#570) * Revisit Floodsub (#543) Fixes #525 add coverage to unsubscribeAll and testing * add mounted protos to identify message (#546) * add stable/unstable auto bumps * fix auto-bump CI * merge nbc auto bump with CI in order to bump only on CI success * put conditional locks on nbc bump (#549) * Fix minor exception issues (#550) Makes code compatible with https://github.com/status-im/nim-chronos/pull/166 without requiring it. * fix nimbus ref for auto-bump stable's PR * use a builder pattern to build the switch (#551) * use a builder pattern to build the switch * with with * more refs * builders (#559) * More builders (#560) * address some issues pointed out in review * re-add to prevent breaking other projects * mem usage cleanups for pubsub (#564) In `async` functions, a closure environment is created for variables that cross an await boundary - this closure environment is kept in memory for the lifetime of the associated future - this means that although _some_ variables are no longer used, they still take up memory for a long time. In Nimbus, message validation is processed in batches meaning the future of an incoming gossip message stays around for quite a while - this leads to memory consumption peaks of 100-200 mb when there are many attestations in the pipeline. To avoid excessive memory usage, it's generally better to move non-async code into proc's such that the variables therein can be released earlier - this includes the many hidden variables introduced by macro and template expansion (ie chronicles that does expensive exception handling) * move seen table salt to floodsub, use there as well * shorten seen table salt to size of hash * avoid unnecessary memory allocations and copies in a few places * factor out message scoring * avoid reencoding outgoing message for every peer * keep checking validators until reject (in case there's both reject and ignore) * `readOnce` avoids `readExactly` overhead for single-byte read * genericAssign -> assign2 * More gossip coverage (#553) * add floodPublish test * test delivery via control Iwant/have mechanics * fix issues in control, and add testing * fix possible backoff issue with pruned routine overriding it * fix control messages (#566) * remove unused control graft check in handleControl * avoid sending empty Iwant messages * Split dialer (#542) * extracting dialing logic to dialer * exposing upgrade methods on transport * cleanup * fixing tests to use new interfaces * add comments * add base exception class and fix hierarchy * fix imports * Merge master (#555) * Revisit Floodsub (#543) Fixes #525 add coverage to unsubscribeAll and testing * add mounted protos to identify message (#546) * add stable/unstable auto bumps * fix auto-bump CI * merge nbc auto bump with CI in order to bump only on CI success * put conditional locks on nbc bump (#549) * Fix minor exception issues (#550) Makes code compatible with https://github.com/status-im/nim-chronos/pull/166 without requiring it. * fix nimbus ref for auto-bump stable's PR * Split dialer (#542) * extracting dialing logic to dialer * exposing upgrade methods on transport * cleanup * fixing tests to use new interfaces * add comments * add base exception class and fix hierarchy * fix imports * `doAssert` is `ValueError` not `AssertionError`? * revert back to `AssertionError` Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Co-authored-by: Jacek Sieka * Builders (#558) * use a builder pattern to build the switch (#551) * use a builder pattern to build the switch * with with * more refs * Merge master (#555) * Revisit Floodsub (#543) Fixes #525 add coverage to unsubscribeAll and testing * add mounted protos to identify message (#546) * add stable/unstable auto bumps * fix auto-bump CI * merge nbc auto bump with CI in order to bump only on CI success * put conditional locks on nbc bump (#549) * Fix minor exception issues (#550) Makes code compatible with https://github.com/status-im/nim-chronos/pull/166 without requiring it. * fix nimbus ref for auto-bump stable's PR * Split dialer (#542) * extracting dialing logic to dialer * exposing upgrade methods on transport * cleanup * fixing tests to use new interfaces * add comments * add base exception class and fix hierarchy * fix imports * `doAssert` is `ValueError` not `AssertionError`? * revert back to `AssertionError` Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Co-authored-by: Jacek Sieka * `doAssert` is `ValueError` not `AssertionError`? * revert back to `AssertionError` * fix builders * more builder stuff * more builders Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Co-authored-by: Jacek Sieka Co-authored-by: Giovanni Petrantoni <7008900+sinkingsugar@users.noreply.github.com> Co-authored-by: Jacek Sieka --- libp2p/protocols/pubsub/floodsub.nim | 30 +- libp2p/protocols/pubsub/gossipsub.nim | 162 +++------ .../protocols/pubsub/gossipsub/behavior.nim | 55 +-- libp2p/protocols/pubsub/gossipsub/scoring.nim | 44 +++ libp2p/protocols/pubsub/gossipsub/types.nim | 2 - libp2p/protocols/pubsub/pubsub.nim | 33 +- libp2p/protocols/pubsub/pubsubpeer.nim | 29 +- libp2p/protocols/pubsub/rpc/protobuf.nim | 13 +- libp2p/stream/lpstream.nim | 12 +- tests/pubsub/testgossipinternal.nim | 37 ++ tests/pubsub/testgossipsub.nim | 254 +++---------- tests/pubsub/testgossipsub2.nim | 335 ++++++++++++++++++ tests/pubsub/testpubsub.nim | 1 + tests/pubsub/utils.nim | 2 +- 14 files changed, 641 insertions(+), 368 deletions(-) create mode 100644 tests/pubsub/testgossipsub2.nim diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 48bc711c57..004b384132 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -7,8 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[sequtils, sets, tables] -import chronos, chronicles, metrics +import std/[sequtils, sets, hashes, tables] +import chronos, chronicles, metrics, bearssl import ./pubsub, ./pubsubpeer, ./timedcache, @@ -27,7 +27,17 @@ const FloodSubCodec* = "/floodsub/1.0.0" type FloodSub* = ref object of PubSub floodsub*: PeerTable # topic to remote peer map - seen*: TimedCache[MessageID] # list of messages forwarded to peers + seen*: TimedCache[MessageID] # message id:s already seen on the network + seenSalt*: seq[byte] + +proc hasSeen*(f: FloodSub, msgId: MessageID): bool = + f.seenSalt & msgId in f.seen + +proc addSeen*(f: FloodSub, msgId: MessageID): bool = + # Salting the seen hash helps avoid attacks against the hash function used + # in the nim hash table + # Return true if the message has already been seen + f.seen.put(f.seenSalt & msgId) method subscribeTopic*(f: FloodSub, topic: string, @@ -88,7 +98,7 @@ method rpcHandler*(f: FloodSub, for msg in rpcMsg.messages: # for every message let msgId = f.msgIdProvider(msg) - if f.seen.put(msgId): + if f.addSeen(msgId): trace "Dropping already-seen message", msgId, peer continue @@ -118,13 +128,15 @@ method rpcHandler*(f: FloodSub, var toSendPeers = initHashSet[PubSubPeer]() for t in msg.topicIDs: # for every topic in the message + if t notin f.topics: + continue f.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) await handleData(f, t, msg.data) # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - f.broadcast(toSeq(toSendPeers), RPCMsg(messages: @[msg])) + f.broadcast(toSendPeers, RPCMsg(messages: @[msg])) trace "Forwared message to peers", peers = toSendPeers.len method init*(f: FloodSub) = @@ -157,7 +169,7 @@ method publish*(f: FloodSub, debug "Empty topic, skipping publish", topic return 0 - let peers = toSeq(f.floodsub.getOrDefault(topic)) + let peers = f.floodsub.getOrDefault(topic) if peers.len == 0: debug "No peers for topic, skipping publish", topic @@ -175,7 +187,7 @@ method publish*(f: FloodSub, trace "Created new message", msg = shortLog(msg), peers = peers.len, topic, msgId - if f.seen.put(msgId): + if f.addSeen(msgId): # custom msgid providers might cause this trace "Dropping already-seen message", msgId, topic return 0 @@ -206,4 +218,8 @@ method unsubscribeAll*(f: FloodSub, topic: string) = method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() f.seen = TimedCache[MessageID].init(2.minutes) + var rng = newRng() + f.seenSalt = newSeqUninitialized[byte](sizeof(Hash)) + brHmacDrbgGenerate(rng[], f.seenSalt) + f.init() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2cf50a262a..b44c1842da 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -8,7 +8,7 @@ ## those terms. import std/[tables, sets, options, sequtils, random] -import chronos, chronicles, metrics, bearssl +import chronos, chronicles, metrics import ./pubsub, ./floodsub, ./pubsubpeer, @@ -98,27 +98,6 @@ proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] = else: ok() -proc init*(_: type[TopicParams]): TopicParams = - TopicParams( - topicWeight: 0.0, # disabled by default - timeInMeshWeight: 0.01, - timeInMeshQuantum: 1.seconds, - timeInMeshCap: 10.0, - firstMessageDeliveriesWeight: 1.0, - firstMessageDeliveriesDecay: 0.5, - firstMessageDeliveriesCap: 10.0, - meshMessageDeliveriesWeight: -1.0, - meshMessageDeliveriesDecay: 0.5, - meshMessageDeliveriesCap: 10, - meshMessageDeliveriesThreshold: 1, - meshMessageDeliveriesWindow: 5.milliseconds, - meshMessageDeliveriesActivation: 10.seconds, - meshFailurePenaltyWeight: -1.0, - meshFailurePenaltyDecay: 0.5, - invalidMessageDeliveriesWeight: -1.0, - invalidMessageDeliveriesDecay: 0.5 - ) - proc validateParameters*(parameters: TopicParams): Result[void, cstring] = if parameters.timeInMeshWeight <= 0.0 or parameters.timeInMeshWeight > 1.0: err("gossipsub: timeInMeshWeight parameter error, Must be a small positive value") @@ -262,6 +241,44 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic +proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) = + if rpcMsg.control.isSome: + let control = rpcMsg.control.get() + g.handlePrune(peer, control.prune) + + var respControl: ControlMessage + let iwant = g.handleIHave(peer, control.ihave) + if iwant.messageIDs.len > 0: + respControl.iwant.add(iwant) + respControl.prune.add(g.handleGraft(peer, control.graft)) + let messages = g.handleIWant(peer, control.iwant) + + if + respControl.prune.len > 0 or + respControl.iwant.len > 0 or + messages.len > 0: + # iwant and prunes from here, also messages + + for smsg in messages: + for topic in smsg.topicIDs: + if g.knownTopics.contains(topic): + libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) + else: + libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) + + libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) + + for prune in respControl.prune: + if g.knownTopics.contains(prune.topicID): + libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) + else: + libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) + + trace "sending control message", msg = shortLog(respControl), peer + g.send( + peer, + RPCMsg(control: some(respControl), messages: messages)) + method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = @@ -283,26 +300,12 @@ method rpcHandler*(g: GossipSub, # avoid the remote peer from controlling the seen table hashing # by adding random bytes to the ID we ensure we randomize the IDs # we do only for seen as this is the great filter from the external world - if g.seen.put(msgId & g.randomBytes): + if g.addSeen(msgId): trace "Dropping already-seen message", msgId = shortLog(msgId), peer - # make sure to update score tho before continuing - for t in msg.topicIDs: - if t notin g.topics: - continue - # for every topic in the message - let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - # if in mesh add more delivery score - g.withPeerStats(peer.peerId) do (stats: var PeerStats): - stats.topicInfos.withValue(t, tstats): - if tstats[].inMesh: - # TODO: take into account meshMessageDeliveriesWindow - # score only if messages are not too old. - tstats[].meshMessageDeliveries += 1 - if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - do: # make sure we don't loose this information - stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) + # TODO: take into account meshMessageDeliveriesWindow + # score only if messages are not too old. + g.rewardDelivered(peer, msg.topicIDs, false) # onto the next message continue @@ -346,28 +349,13 @@ method rpcHandler*(g: GossipSub, # store in cache only after validation g.mcache.put(msgId, msg) + g.rewardDelivered(peer, msg.topicIDs, true) + var toSendPeers = initHashSet[PubSubPeer]() for t in msg.topicIDs: # for every topic in the message if t notin g.topics: continue - let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) - - g.withPeerStats(peer.peerId) do(stats: var PeerStats): - stats.topicInfos.withValue(t, tstats): - # contribute to peer score first delivery - tstats[].firstMessageDeliveries += 1 - if tstats[].firstMessageDeliveries > topicParams.firstMessageDeliveriesCap: - tstats[].firstMessageDeliveries = topicParams.firstMessageDeliveriesCap - - # if in mesh add more delivery score - if tstats[].inMesh: - tstats[].meshMessageDeliveries += 1 - if tstats[].meshMessageDeliveries > topicParams.meshMessageDeliveriesCap: - tstats[].meshMessageDeliveries = topicParams.meshMessageDeliveriesCap - do: # make sure we don't loose this information - stats.topicInfos[t] = TopicInfo(firstMessageDeliveries: 1, meshMessageDeliveries: 1) - g.floodsub.withValue(t, peers): toSendPeers.incl(peers[]) g.mesh.withValue(t, peers): toSendPeers.incl(peers[]) @@ -375,44 +363,15 @@ method rpcHandler*(g: GossipSub, # In theory, if topics are the same in all messages, we could batch - we'd # also have to be careful to only include validated messages - let sendingTo = toSeq(toSendPeers) - g.broadcast(sendingTo, RPCMsg(messages: @[msg])) - trace "forwared message to peers", peers = sendingTo.len, msgId, peer + g.broadcast(toSendPeers, RPCMsg(messages: @[msg])) + trace "forwared message to peers", peers = toSendPeers.len, msgId, peer for topic in msg.topicIDs: if g.knownTopics.contains(topic): - libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = [topic]) + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = [topic]) else: - libp2p_pubsub_messages_rebroadcasted.inc(sendingTo.len.int64, labelValues = ["generic"]) - - if rpcMsg.control.isSome: - let control = rpcMsg.control.get() - g.handlePrune(peer, control.prune) + libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"]) - var respControl: ControlMessage - respControl.iwant.add(g.handleIHave(peer, control.ihave)) - respControl.prune.add(g.handleGraft(peer, control.graft)) - let messages = g.handleIWant(peer, control.iwant) - - if respControl.graft.len > 0 or respControl.prune.len > 0 or - respControl.ihave.len > 0 or messages.len > 0: - # iwant and prunes from here, also messages - - for smsg in messages: - for topic in smsg.topicIDs: - if g.knownTopics.contains(topic): - libp2p_pubsub_broadcast_messages.inc(labelValues = [topic]) - else: - libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"]) - libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64) - for prune in respControl.prune: - if g.knownTopics.contains(prune.topicID): - libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID]) - else: - libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"]) - trace "sending control message", msg = shortLog(respControl), peer - g.send( - peer, - RPCMsg(control: some(respControl), messages: messages)) + g.handleControl(peer, rpcMsg) method subscribe*(g: GossipSub, topic: string, @@ -437,7 +396,7 @@ proc unsubscribe*(g: GossipSub, topic: string) = # remove mesh peers from gpeers, we send 2 different messages gpeers = gpeers - mpeers # send to peers NOT in mesh first - g.broadcast(toSeq(gpeers), msg) + g.broadcast(gpeers, msg) for peer in mpeers: trace "pruning unsubscribeAll call peer", peer, score = peer.score @@ -452,9 +411,9 @@ proc unsubscribe*(g: GossipSub, topic: string) = backoff: g.parameters.pruneBackoff.seconds.uint64)])) # send to peers IN mesh now - g.broadcast(toSeq(mpeers), msg) + g.broadcast(mpeers, msg) else: - g.broadcast(toSeq(gpeers), msg) + g.broadcast(gpeers, msg) g.topicParams.del(topic) @@ -540,21 +499,21 @@ method publish*(g: GossipSub, trace "Created new message", msg = shortLog(msg), peers = peers.len - if g.seen.put(msgId & g.randomBytes): + if g.addSeen(msgId): # custom msgid providers might cause this trace "Dropping already-seen message" return 0 g.mcache.put(msgId, msg) - let peerSeq = toSeq(peers) - g.broadcast(peerSeq, RPCMsg(messages: @[msg])) + g.broadcast(peers, RPCMsg(messages: @[msg])) + if g.knownTopics.contains(topic): - libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = [topic]) + libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic]) else: - libp2p_pubsub_messages_published.inc(peerSeq.len.int64, labelValues = ["generic"]) + libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = ["generic"]) - trace "Published message to peers" + trace "Published message to peers", peers=peers.len return peers.len @@ -618,6 +577,3 @@ method initPubSub*(g: GossipSub) = # init gossip stuff g.mcache = MCache.init(g.parameters.historyGossip, g.parameters.historyLength) - var rng = newRng() - g.randomBytes = newSeqUninitialized[byte](32) - brHmacDrbgGenerate(rng[], g.randomBytes) diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index d734762bcc..f3a4e0a518 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -13,7 +13,7 @@ import std/[tables, sequtils, sets, algorithm] import random # for shuffle import chronos, chronicles, metrics import "."/[types, scoring] -import ".."/[pubsubpeer, peertable, timedcache, mcache, pubsub] +import ".."/[pubsubpeer, peertable, timedcache, mcache, floodsub, pubsub] import "../rpc"/[messages] import "../../.."/[peerid, multiaddress, utility, switch] @@ -39,10 +39,11 @@ proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = trace "grafted", peer=p, topic -proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) {.raises: [Defect].} = - let backoff = Moment.fromNow(g.parameters.pruneBackoff) - g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff +proc pruned*(g: GossipSub, p: PubSubPeer, topic: string, setBackoff: bool = true) {.raises: [Defect].} = + if setBackoff: + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]())[p.peerId] = backoff g.peerStats.withValue(p.peerId, stats): stats.topicInfos.withValue(topic, info): @@ -80,6 +81,7 @@ proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] {.raises: proc handleGraft*(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = # {.raises: [Defect].} TODO chronicles exception on windows + var prunes: seq[ControlPrune] for graft in grafts: let topic = graft.topicID trace "peer grafted topic", peer, topic @@ -90,7 +92,7 @@ proc handleGraft*(g: GossipSub, warn "attempt to graft an explicit peer, peering agreements should be reciprocal", peer, topic # and such an attempt should be logged and rejected with a PRUNE - result.add(ControlPrune( + prunes.add(ControlPrune( topicID: topic, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) @@ -108,7 +110,7 @@ proc handleGraft*(g: GossipSub, .getOrDefault(peer.peerId) > Moment.now(): debug "attempt to graft a backingOff peer", peer, topic # and such an attempt should be logged and rejected with a PRUNE - result.add(ControlPrune( + prunes.add(ControlPrune( topicID: topic, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) @@ -141,7 +143,7 @@ proc handleGraft*(g: GossipSub, else: trace "pruning grafting peer, mesh full", peer, topic, score = peer.score, mesh = g.mesh.peers(topic) - result.add(ControlPrune( + prunes.add(ControlPrune( topicID: topic, peers: g.peerExchangeList(topic), backoff: g.parameters.pruneBackoff.seconds.uint64)) @@ -149,6 +151,8 @@ proc handleGraft*(g: GossipSub, trace "peer grafting topic we're not interested in", peer, topic # gossip 1.1, we do not send a control message prune anymore + return prunes + proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.raises: [Defect].} = for prune in prunes: let topic = prune.topicID @@ -173,7 +177,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r .mgetOrPut(topic, initTable[PeerID, Moment]())[peer.peerId] = backoff trace "pruning rpc received peer", peer, score = peer.score - g.pruned(peer, topic) + g.pruned(peer, topic, setBackoff = false) g.mesh.removePeer(topic, peer) # TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that @@ -183,6 +187,7 @@ proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) {.r proc handleIHave*(g: GossipSub, peer: PubSubPeer, ihaves: seq[ControlIHave]): ControlIWant {.raises: [Defect].} = + var res: ControlIWant if peer.score < g.parameters.gossipThreshold: trace "ihave: ignoring low score peer", peer, score = peer.score elif peer.iHaveBudget <= 0: @@ -198,22 +203,23 @@ proc handleIHave*(g: GossipSub, if ihave.topicID in g.mesh: # also avoid duplicates here! let deIhavesMsgs = ihave.messageIDs.deduplicate() - for m in deIhavesMsgs: - let msgId = m & g.randomBytes - if msgId notin g.seen: + for msgId in deIhavesMsgs: + if not g.hasSeen(msgId): if peer.iHaveBudget > 0: - result.messageIDs.add(m) + res.messageIDs.add(msgId) dec peer.iHaveBudget + trace "requested message via ihave", messageID=msgId else: - return - - # shuffling result.messageIDs before sending it out to increase the likelihood + break + # shuffling res.messageIDs before sending it out to increase the likelihood # of getting an answer if the peer truncates the list due to internal size restrictions. - shuffle(result.messageIDs) + shuffle(res.messageIDs) + return res proc handleIWant*(g: GossipSub, peer: PubSubPeer, iwants: seq[ControlIWant]): seq[Message] {.raises: [Defect].} = + var messages: seq[Message] if peer.score < g.parameters.gossipThreshold: trace "iwant: ignoring low score peer", peer, score = peer.score elif peer.iWantBudget <= 0: @@ -228,10 +234,11 @@ proc handleIWant*(g: GossipSub, if msg.isSome: # avoid spam if peer.iWantBudget > 0: - result.add(msg.get()) + messages.add(msg.get()) dec peer.iWantBudget else: - return + break + return messages proc commitMetrics(metrics: var MeshMetrics) {.raises: [Defect].} = libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics) @@ -486,9 +493,10 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: ## var cacheWindowSize = 0 + var control: Table[PubSubPeer, ControlMessage] - trace "getting gossip peers (iHave)" let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + trace "getting gossip peers (iHave)", ntopics=topics.len for topic in topics: if topic notin g.gossipsub: trace "topic not in gossip array, skipping", topicID = topic @@ -496,12 +504,15 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: let mids = g.mcache.window(topic) if not(mids.len > 0): + trace "no messages to emit" continue var midsSeq = toSeq(mids) cacheWindowSize += midsSeq.len + trace "got messages to emit", size=midsSeq.len + # not in spec # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 # and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582 @@ -531,10 +542,12 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.raises: allPeers.setLen(target) for peer in allPeers: - result.mGetOrPut(peer, ControlMessage()).ihave.add(ihave) + control.mGetOrPut(peer, ControlMessage()).ihave.add(ihave) libp2p_gossipsub_cache_window_size.set(cacheWindowSize.int64) + return control + proc onHeartbeat(g: GossipSub) {.raises: [Defect].} = # reset IWANT budget # reset IHAVE cap diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index 3c82d5671b..79ff50a394 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -25,6 +25,27 @@ declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) +proc init*(_: type[TopicParams]): TopicParams = + TopicParams( + topicWeight: 0.0, # disabled by default + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1.seconds, + timeInMeshCap: 10.0, + firstMessageDeliveriesWeight: 1.0, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10.0, + meshMessageDeliveriesWeight: -1.0, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 1, + meshMessageDeliveriesWindow: 5.milliseconds, + meshMessageDeliveriesActivation: 10.seconds, + meshFailurePenaltyWeight: -1.0, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1.0, + invalidMessageDeliveriesDecay: 0.5 + ) + proc withPeerStats*( g: GossipSub, peerId: PeerId, action: proc (stats: var PeerStats) {.gcsafe, raises: [Defect].}) = @@ -274,3 +295,26 @@ proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) # update stats g.withPeerStats(peer.peerId) do (stats: var PeerStats): stats.topicInfos.mgetOrPut(t, TopicInfo()).invalidMessageDeliveries += 1 + +proc addCapped*[T](stat: var T, diff, cap: T) = + stat += min(diff, cap - stat) + +proc rewardDelivered*( + g: GossipSub, peer: PubSubPeer, topics: openArray[string], first: bool) = + for t in topics: + if t notin g.topics: + continue + let topicParams = g.topicParams.mgetOrPut(t, TopicParams.init()) + # if in mesh add more delivery score + + g.withPeerStats(peer.peerId) do (stats: var PeerStats): + stats.topicInfos.withValue(t, tstats): + if tstats[].inMesh: + if first: + tstats[].firstMessageDeliveries.addCapped( + 1, topicParams.firstMessageDeliveriesCap) + + tstats[].meshMessageDeliveries.addCapped( + 1, topicParams.meshMessageDeliveriesCap) + do: # make sure we don't loose this information + stats.topicInfos[t] = TopicInfo(meshMessageDeliveries: 1) diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index bd3a67b9d1..05b67769bf 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -150,8 +150,6 @@ type heartbeatEvents*: seq[AsyncEvent] - randomBytes*: seq[byte] - MeshMetrics* = object # scratch buffers for metrics otherPeersPerTopicMesh*: int64 diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index a608bfc4e2..41ccdc87d8 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -9,8 +9,8 @@ import std/[tables, sequtils, sets, strutils] import chronos, chronicles, metrics -import pubsubpeer, - rpc/[message, messages], +import ./pubsubpeer, + ./rpc/[message, messages, protobuf], ../../switch, ../protocol, ../../stream/connection, @@ -128,7 +128,7 @@ proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [Defect].} = proc broadcast*( p: PubSub, - sendPeers: openArray[PubSubPeer], + sendPeers: auto, # Iteratble[PubSubPeer] msg: RPCMsg) {.raises: [Defect].} = ## Attempt to send `msg` to the given peers @@ -174,8 +174,15 @@ proc broadcast*( trace "broadcasting messages to peers", peers = sendPeers.len, msg = shortLog(msg) - for peer in sendPeers: - p.send(peer, msg) + + if anyIt(sendPeers, it.hasObservers): + for peer in sendPeers: + p.send(peer, msg) + else: + # Fast path that only encodes message once + let encoded = encodeRpcMsg(msg, p.anonymize) + for peer in sendPeers: + peer.sendEncoded(encoded) proc sendSubs*(p: PubSub, peer: PubSubPeer, @@ -205,7 +212,7 @@ method subscribeTopic*(p: PubSub, method rpcHandler*(p: PubSub, peer: PubSubPeer, - rpcMsg: RPCMsg) {.async, base.} = + rpcMsg: RPCMsg): Future[void] {.base.} = ## handle rpc messages trace "processing RPC message", msg = rpcMsg.shortLog, peer for i in 0.. B": + var passed: Future[bool] = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + passed.complete(true) let - nodes = generateNodes(runs, gossip = true, triggerSelf = true) - nodesFut = nodes.mapIt(it.switch.start()) + nodes = generateNodes( + 2, + gossip = true) - await allFuturesThrowing(nodes.mapIt(it.start())) - await subscribeNodes(nodes) + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) - var seen: Table[string, int] - var seenFut = newFuture[void]() - for i in 0..= runs: - seenFut.complete() + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) - dialer.subscribe("foobar", handler) - await waitSub(nodes[0], dialer, "foobar") + var gossip1: GossipSub = GossipSub(nodes[0]) + gossip1.parameters.floodPublish = true + var gossip2: GossipSub = GossipSub(nodes[1]) + gossip2.parameters.floodPublish = true - tryPublish await wait(nodes[0].publish("foobar", - toBytes("from node " & - $nodes[0].peerInfo.peerId)), - 1.minutes), 1, 5.seconds + await subscribeNodes(nodes) - await wait(seenFut, 2.minutes) - check: seen.len >= runs - for k, v in seen.pairs: - check: v >= 1 + # nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") - for node in nodes: - var gossip = GossipSub(node) + tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 - check: - "foobar" in gossip.gossipsub + check await passed + + check: + "foobar" in gossip1.gossipsub + "foobar" notin gossip2.gossipsub + not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.peerId) + not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.peerId) await allFuturesThrowing( - nodes.mapIt( - allFutures( - it.stop(), - it.switch.stop()))) + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) - await allFuturesThrowing(nodesFut) + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) - asyncTest "e2e - GossipSub with multiple peers (sparse)": + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "e2e - GossipSub with multiple peers": var runs = 10 let @@ -614,7 +621,7 @@ suite "GossipSub": nodesFut = nodes.mapIt(it.switch.start()) await allFuturesThrowing(nodes.mapIt(it.start())) - await subscribeSparseNodes(nodes) + await subscribeNodes(nodes) var seen: Table[string, int] var seenFut = newFuture[void]() @@ -639,17 +646,16 @@ suite "GossipSub": $nodes[0].peerInfo.peerId)), 1.minutes), 1, 5.seconds - await wait(seenFut, 5.minutes) + await wait(seenFut, 2.minutes) check: seen.len >= runs for k, v in seen.pairs: check: v >= 1 for node in nodes: var gossip = GossipSub(node) + check: "foobar" in gossip.gossipsub - gossip.fanout.len == 0 - gossip.mesh["foobar"].len > 0 await allFuturesThrowing( nodes.mapIt( @@ -659,118 +665,7 @@ suite "GossipSub": await allFuturesThrowing(nodesFut) - asyncTest "GossipSub invalid topic subscription": - var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - handlerFut.complete(true) - - let - nodes = generateNodes(2, gossip = true) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - # start pubsub - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - var gossip = GossipSub(nodes[0]) - let invalidDetected = newFuture[void]() - gossip.subscriptionValidator = - proc(topic: string): bool = - if topic == "foobar": - try: - invalidDetected.complete() - except: - raise newException(Defect, "Exception during subscriptionValidator") - false - else: - true - - await subscribeNodes(nodes) - - nodes[0].subscribe("foobar", handler) - nodes[1].subscribe("foobar", handler) - - await invalidDetected.wait(10.seconds) - - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() - ) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() - ) - - await allFuturesThrowing(nodesFut.concat()) - - asyncTest "GossipSub test directPeers": - var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - check topic == "foobar" - handlerFut.complete(true) - - let - nodes = generateNodes(2, gossip = true) - - # start switches - nodesFut = await allFinished( - nodes[0].switch.start(), - nodes[1].switch.start(), - ) - - var gossip = GossipSub(nodes[0]) - gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs - - # start pubsub - await allFuturesThrowing( - allFinished( - nodes[0].start(), - nodes[1].start(), - )) - - let invalidDetected = newFuture[void]() - gossip.subscriptionValidator = - proc(topic: string): bool = - if topic == "foobar": - try: - invalidDetected.complete() - except: - raise newException(Defect, "Exception during subscriptionValidator") - false - else: - true - - # DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN - ### await subscribeNodes(nodes) - - nodes[0].subscribe("foobar", handler) - nodes[1].subscribe("foobar", handler) - - await invalidDetected.wait(10.seconds) - - await allFuturesThrowing( - nodes[0].switch.stop(), - nodes[1].switch.stop() - ) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() - ) - - await allFuturesThrowing(nodesFut.concat()) - - asyncTest "GossipsSub peers disconnections mechanics": + asyncTest "e2e - GossipSub with multiple peers (sparse)": var runs = 10 let @@ -778,7 +673,7 @@ suite "GossipSub": nodesFut = nodes.mapIt(it.switch.start()) await allFuturesThrowing(nodes.mapIt(it.start())) - await subscribeNodes(nodes) + await subscribeSparseNodes(nodes) var seen: Table[string, int] var seenFut = newFuture[void]() @@ -798,10 +693,6 @@ suite "GossipSub": dialer.subscribe("foobar", handler) await waitSub(nodes[0], dialer, "foobar") - # ensure peer stats are stored properly and kept properly - check: - GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self - tryPublish await wait(nodes[0].publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId)), @@ -819,47 +710,6 @@ suite "GossipSub": gossip.fanout.len == 0 gossip.mesh["foobar"].len > 0 - # Removing some subscriptions - - for i in 0.. 0, "waitSub timeout!") + +template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped = + var + limit = times + pubs = 0 + while pubs < require and limit > 0: + pubs = pubs + call + await sleepAsync(wait) + limit.dec() + if limit == 0: + doAssert(false, "Failed to publish!") + +suite "GossipSub": + teardown: + checkTrackers() + + asyncTest "e2e - GossipSub with multiple peers - control deliver (sparse)": + var runs = 10 + + let + nodes = generateNodes(runs, gossip = true, triggerSelf = true) + nodesFut = nodes.mapIt(it.switch.start()) + + await allFuturesThrowing(nodes.mapIt(it.start())) + await subscribeSparseNodes(nodes) + + var seen: Table[string, int] + var seenFut = newFuture[void]() + for i in 0..= runs: + seenFut.complete() + + dialer.subscribe("foobar", handler) + await waitSub(nodes[0], dialer, "foobar") + + # we want to test ping pong deliveries via control Iwant/Ihave, so we publish just in a tap + let publishedTo = nodes[0] + .publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId)) + .await + check: + publishedTo != 0 + publishedTo != runs + + await wait(seenFut, 5.minutes) + check: seen.len >= runs + for k, v in seen.pairs: + check: v >= 1 + + await allFuturesThrowing( + nodes.mapIt( + allFutures( + it.stop(), + it.switch.stop()))) + + await allFuturesThrowing(nodesFut) + + asyncTest "GossipSub invalid topic subscription": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + var gossip = GossipSub(nodes[0]) + let invalidDetected = newFuture[void]() + gossip.subscriptionValidator = + proc(topic: string): bool = + if topic == "foobar": + try: + invalidDetected.complete() + except: + raise newException(Defect, "Exception during subscriptionValidator") + false + else: + true + + await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + await invalidDetected.wait(10.seconds) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "GossipSub test directPeers": + var handlerFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + handlerFut.complete(true) + + let + nodes = generateNodes(2, gossip = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), + nodes[1].switch.start(), + ) + + var gossip = GossipSub(nodes[0]) + gossip.parameters.directPeers[nodes[1].switch.peerInfo.peerId] = nodes[1].switch.peerInfo.addrs + + # start pubsub + await allFuturesThrowing( + allFinished( + nodes[0].start(), + nodes[1].start(), + )) + + let invalidDetected = newFuture[void]() + gossip.subscriptionValidator = + proc(topic: string): bool = + if topic == "foobar": + try: + invalidDetected.complete() + except: + raise newException(Defect, "Exception during subscriptionValidator") + false + else: + true + + # DO NOT SUBSCRIBE, CONNECTION SHOULD HAPPEN + ### await subscribeNodes(nodes) + + nodes[0].subscribe("foobar", handler) + nodes[1].subscribe("foobar", handler) + + await invalidDetected.wait(10.seconds) + + await allFuturesThrowing( + nodes[0].switch.stop(), + nodes[1].switch.stop() + ) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + await allFuturesThrowing(nodesFut.concat()) + + asyncTest "GossipsSub peers disconnections mechanics": + var runs = 10 + + let + nodes = generateNodes(runs, gossip = true, triggerSelf = true) + nodesFut = nodes.mapIt(it.switch.start()) + + await allFuturesThrowing(nodes.mapIt(it.start())) + await subscribeNodes(nodes) + + var seen: Table[string, int] + var seenFut = newFuture[void]() + for i in 0..= runs: + seenFut.complete() + + dialer.subscribe("foobar", handler) + await waitSub(nodes[0], dialer, "foobar") + + # ensure peer stats are stored properly and kept properly + check: + GossipSub(nodes[0]).peerStats.len == runs - 1 # minus self + + tryPublish await wait(nodes[0].publish("foobar", + toBytes("from node " & + $nodes[0].peerInfo.peerId)), + 1.minutes), 1, 5.seconds + + await wait(seenFut, 5.minutes) + check: seen.len >= runs + for k, v in seen.pairs: + check: v >= 1 + + for node in nodes: + var gossip = GossipSub(node) + check: + "foobar" in gossip.gossipsub + gossip.fanout.len == 0 + gossip.mesh["foobar"].len > 0 + + # Removing some subscriptions + + for i in 0..