Skip to content

Commit

Permalink
gossipsub: unsubscribe fixes (#569)
Browse files Browse the repository at this point in the history
* gossipsub: unsubscribe fixes

* fix KeyError when updating metric of unsubscribed topic
* fix unsubscribe message not being sent to all peers causing them to
keep thinking we're still subscribed
* release memory earlier in a few places

* floodsub fix
  • Loading branch information
arnetheduck authored May 6, 2021
1 parent 9f30196 commit 83a20a9
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 259 deletions.
42 changes: 14 additions & 28 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ proc addSeen*(f: FloodSub, msgId: MessageID): bool =
# Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId)

method subscribeTopic*(f: FloodSub,
topic: string,
subscribe: bool,
peer: PubsubPeer) {.gcsafe.} =
proc handleSubscribe*(f: FloodSub,
peer: PubsubPeer,
topic: string,
subscribe: bool) =
logScope:
peer
topic
Expand All @@ -61,21 +61,16 @@ method subscribeTopic*(f: FloodSub,
return

if subscribe:
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()

trace "adding subscription for topic", peer, topic

# subscribe the peer to the topic
f.floodsub[topic].incl(peer)
f.floodsub.mgetOrPut(topic, HashSet[PubSubPeer]()).incl(peer)
else:
if topic notin f.floodsub:
return

trace "removing subscription for topic", peer, topic
f.floodsub.withValue(topic, peers):
trace "removing subscription for topic", peer, topic

# unsubscribe the peer from the topic
f.floodsub[topic].excl(peer)
# unsubscribe the peer from the topic
peers[].excl(peer)

method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects
Expand All @@ -93,7 +88,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerID) =
method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsg)
for i in 0..<min(f.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
f.handleSubscribe(peer, sub.topic, sub.subscribe)

for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg)
Expand Down Expand Up @@ -139,6 +136,8 @@ method rpcHandler*(f: FloodSub,
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "Forwared message to peers", peers = toSendPeers.len

f.updateMetrics(rpcMsg)

method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
Expand Down Expand Up @@ -202,19 +201,6 @@ method publish*(f: FloodSub,

return peers.len

method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) =
procCall PubSub(f).unsubscribe(topics)

for p in f.peers.values:
f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)

method unsubscribeAll*(f: FloodSub, topic: string) =
procCall PubSub(f).unsubscribeAll(topic)

for p in f.peers.values:
f.sendSubs(p, @[topic], false)

method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes)
Expand Down
182 changes: 78 additions & 104 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -200,29 +200,29 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =

procCall FloodSub(g).unsubscribePeer(peer)

method subscribeTopic*(g: GossipSub,
topic: string,
subscribe: bool,
peer: PubSubPeer) {.gcsafe.} =
proc handleSubscribe*(g: GossipSub,
peer: PubSubPeer,
topic: string,
subscribe: bool) =
logScope:
peer
topic

# this is a workaround for a race condition
# that can happen if we disconnect the peer very early
# in the future we might use this as a test case
# and eventually remove this workaround
if subscribe and peer.peerId notin g.peers:
trace "ignoring unknown peer"
return

if subscribe and not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
trace "ignoring invalid topic subscription", topic, peer
libp2p_gossipsub_invalid_topic_subscription.inc()
return

if subscribe:
# this is a workaround for a race condition
# that can happen if we disconnect the peer very early
# in the future we might use this as a test case
# and eventually remove this workaround
if peer.peerId notin g.peers:
trace "ignoring unknown peer"
return

if not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
trace "ignoring invalid topic subscription", topic, peer
libp2p_gossipsub_invalid_topic_subscription.inc()
return

trace "peer subscribed to topic"

# subscribe remote peer to the topic
Expand All @@ -241,50 +241,48 @@ method subscribeTopic*(g: GossipSub,

trace "gossip peers", peers = g.gossipsub.peers(topic), topic

proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) =
if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)

var respControl: ControlMessage
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages

for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
g.handlePrune(peer, control.prune)

var respControl: ControlMessage
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages

for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))

method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
# base will check the amount of subscriptions and process subscriptions
# also will update some metrics
await procCall PubSub(g).rpcHandler(peer, rpcMsg)
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
g.handleSubscribe(peer, sub.topic, sub.subscribe)

# the above call applied limtis to subs number
# in gossipsub we want to apply scoring as well
Expand All @@ -294,7 +292,8 @@ method rpcHandler*(g: GossipSub,
limit = g.topicsHigh
peer.behaviourPenalty += 0.1

for msg in rpcMsg.messages: # for every message
for i in 0..<rpcMsg.messages.len(): # for every message
template msg: untyped = rpcMsg.messages[i]
let msgId = g.msgIdProvider(msg)

# avoid the remote peer from controlling the seen table hashing
Expand Down Expand Up @@ -371,66 +370,41 @@ method rpcHandler*(g: GossipSub,
else:
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])

g.handleControl(peer, rpcMsg)

method subscribe*(g: GossipSub,
topic: string,
handler: TopicHandler) =
procCall PubSub(g).subscribe(topic, handler)
if rpcMsg.control.isSome():
g.handleControl(peer, rpcMsg.control.unsafeGet())

# if we have a fanout on this topic break it
if topic in g.fanout:
g.fanout.del(topic)
g.updateMetrics(rpcMsg)

# rebalance but don't update metrics here, we do that only in the heartbeat
g.rebalanceMesh(topic, metrics = nil)
method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
if subscribed:
procCall PubSub(g).onTopicSubscription(topic, subscribed)

proc unsubscribe*(g: GossipSub, topic: string) =
var
msg = RPCMsg.withSubs(@[topic], subscribe = false)
gpeers = g.gossipsub.getOrDefault(topic)
# if we have a fanout on this topic break it
if topic in g.fanout:
g.fanout.del(topic)

if topic in g.mesh:
# rebalance but don't update metrics here, we do that only in the heartbeat
g.rebalanceMesh(topic, metrics = nil)
else:
let mpeers = g.mesh.getOrDefault(topic)

# remove mesh peers from gpeers, we send 2 different messages
gpeers = gpeers - mpeers
# send to peers NOT in mesh first
g.broadcast(gpeers, msg)
# Remove peers from the mesh since we're no longer both interested
# in the topic
let msg = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(mpeers, msg)

for peer in mpeers:
trace "pruning unsubscribeAll call peer", peer, score = peer.score
g.pruned(peer, topic)

g.mesh.del(topic)

msg.control =
some(ControlMessage(prune:
@[ControlPrune(topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)]))

# send to peers IN mesh now
g.broadcast(mpeers, msg)
else:
g.broadcast(gpeers, msg)

g.topicParams.del(topic)

method unsubscribeAll*(g: GossipSub, topic: string) =
g.unsubscribe(topic)
# finally let's remove from g.topics, do that by calling PubSub
procCall PubSub(g).unsubscribeAll(topic)

method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) =
procCall PubSub(g).unsubscribe(topics)

for (topic, handler) in topics:
# delete from mesh only if no handlers are left
# (handlers are removed in pubsub unsubscribe above)
if topic notin g.topics:
g.unsubscribe(topic)
# Send unsubscribe (in reverse order to sub/graft)
procCall PubSub(g).onTopicSubscription(topic, subscribed)

method publish*(g: GossipSub,
topic: string,
Expand Down
Loading

0 comments on commit 83a20a9

Please sign in to comment.