Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gossipsub: unsubscribe fixes #569

Merged
merged 2 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 14 additions & 28 deletions libp2p/protocols/pubsub/floodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ proc addSeen*(f: FloodSub, msgId: MessageID): bool =
# Return true if the message has already been seen
f.seen.put(f.seenSalt & msgId)

method subscribeTopic*(f: FloodSub,
topic: string,
subscribe: bool,
peer: PubsubPeer) {.gcsafe.} =
proc handleSubscribe*(f: FloodSub,
peer: PubsubPeer,
topic: string,
subscribe: bool) =
logScope:
peer
topic
Expand All @@ -61,21 +61,16 @@ method subscribeTopic*(f: FloodSub,
return

if subscribe:
if topic notin f.floodsub:
f.floodsub[topic] = initHashSet[PubSubPeer]()

trace "adding subscription for topic", peer, topic

# subscribe the peer to the topic
f.floodsub[topic].incl(peer)
f.floodsub.mgetOrPut(topic, HashSet[PubSubPeer]()).incl(peer)
else:
if topic notin f.floodsub:
return

trace "removing subscription for topic", peer, topic
f.floodsub.withValue(topic, peers):
trace "removing subscription for topic", peer, topic

# unsubscribe the peer from the topic
f.floodsub[topic].excl(peer)
# unsubscribe the peer from the topic
peers[].excl(peer)

method unsubscribePeer*(f: FloodSub, peer: PeerID) =
## handle peer disconnects
Expand All @@ -93,7 +88,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerID) =
method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
await procCall PubSub(f).rpcHandler(peer, rpcMsg)
for i in 0..<min(f.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
f.handleSubscribe(peer, sub.topic, sub.subscribe)

for msg in rpcMsg.messages: # for every message
let msgId = f.msgIdProvider(msg)
Expand Down Expand Up @@ -139,6 +136,8 @@ method rpcHandler*(f: FloodSub,
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
trace "Forwared message to peers", peers = toSendPeers.len

f.updateMetrics(rpcMsg)

method init*(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
Expand Down Expand Up @@ -202,19 +201,6 @@ method publish*(f: FloodSub,

return peers.len

method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) =
procCall PubSub(f).unsubscribe(topics)

for p in f.peers.values:
f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false)

method unsubscribeAll*(f: FloodSub, topic: string) =
procCall PubSub(f).unsubscribeAll(topic)

for p in f.peers.values:
f.sendSubs(p, @[topic], false)

method initPubSub*(f: FloodSub) =
procCall PubSub(f).initPubSub()
f.seen = TimedCache[MessageID].init(2.minutes)
Expand Down
182 changes: 78 additions & 104 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -200,29 +200,29 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =

procCall FloodSub(g).unsubscribePeer(peer)

method subscribeTopic*(g: GossipSub,
topic: string,
subscribe: bool,
peer: PubSubPeer) {.gcsafe.} =
proc handleSubscribe*(g: GossipSub,
peer: PubSubPeer,
topic: string,
subscribe: bool) =
logScope:
peer
topic

# this is a workaround for a race condition
# that can happen if we disconnect the peer very early
# in the future we might use this as a test case
# and eventually remove this workaround
if subscribe and peer.peerId notin g.peers:
trace "ignoring unknown peer"
return

if subscribe and not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
trace "ignoring invalid topic subscription", topic, peer
libp2p_gossipsub_invalid_topic_subscription.inc()
return

if subscribe:
# this is a workaround for a race condition
# that can happen if we disconnect the peer very early
# in the future we might use this as a test case
# and eventually remove this workaround
if peer.peerId notin g.peers:
trace "ignoring unknown peer"
return

if not(isNil(g.subscriptionValidator)) and not(g.subscriptionValidator(topic)):
# this is a violation, so warn should be in order
trace "ignoring invalid topic subscription", topic, peer
libp2p_gossipsub_invalid_topic_subscription.inc()
return

trace "peer subscribed to topic"

# subscribe remote peer to the topic
Expand All @@ -241,50 +241,48 @@ method subscribeTopic*(g: GossipSub,

trace "gossip peers", peers = g.gossipsub.peers(topic), topic

proc handleControl(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) =
if rpcMsg.control.isSome:
let control = rpcMsg.control.get()
g.handlePrune(peer, control.prune)

var respControl: ControlMessage
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages

for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
g.handlePrune(peer, control.prune)

var respControl: ControlMessage
let iwant = g.handleIHave(peer, control.ihave)
if iwant.messageIDs.len > 0:
respControl.iwant.add(iwant)
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages

for smsg in messages:
for topic in smsg.topicIDs:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicID):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicID])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))

method rpcHandler*(g: GossipSub,
peer: PubSubPeer,
rpcMsg: RPCMsg) {.async.} =
# base will check the amount of subscriptions and process subscriptions
# also will update some metrics
await procCall PubSub(g).rpcHandler(peer, rpcMsg)
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
g.handleSubscribe(peer, sub.topic, sub.subscribe)

# the above call applied limtis to subs number
# in gossipsub we want to apply scoring as well
Expand All @@ -294,7 +292,8 @@ method rpcHandler*(g: GossipSub,
limit = g.topicsHigh
peer.behaviourPenalty += 0.1

for msg in rpcMsg.messages: # for every message
for i in 0..<rpcMsg.messages.len(): # for every message
template msg: untyped = rpcMsg.messages[i]
let msgId = g.msgIdProvider(msg)

# avoid the remote peer from controlling the seen table hashing
Expand Down Expand Up @@ -371,66 +370,41 @@ method rpcHandler*(g: GossipSub,
else:
libp2p_pubsub_messages_rebroadcasted.inc(toSendPeers.len.int64, labelValues = ["generic"])

g.handleControl(peer, rpcMsg)

method subscribe*(g: GossipSub,
topic: string,
handler: TopicHandler) =
procCall PubSub(g).subscribe(topic, handler)
if rpcMsg.control.isSome():
g.handleControl(peer, rpcMsg.control.unsafeGet())

# if we have a fanout on this topic break it
if topic in g.fanout:
g.fanout.del(topic)
g.updateMetrics(rpcMsg)

# rebalance but don't update metrics here, we do that only in the heartbeat
g.rebalanceMesh(topic, metrics = nil)
method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
if subscribed:
procCall PubSub(g).onTopicSubscription(topic, subscribed)

proc unsubscribe*(g: GossipSub, topic: string) =
var
msg = RPCMsg.withSubs(@[topic], subscribe = false)
gpeers = g.gossipsub.getOrDefault(topic)
# if we have a fanout on this topic break it
if topic in g.fanout:
g.fanout.del(topic)

if topic in g.mesh:
# rebalance but don't update metrics here, we do that only in the heartbeat
g.rebalanceMesh(topic, metrics = nil)
else:
let mpeers = g.mesh.getOrDefault(topic)

# remove mesh peers from gpeers, we send 2 different messages
gpeers = gpeers - mpeers
# send to peers NOT in mesh first
g.broadcast(gpeers, msg)
# Remove peers from the mesh since we're no longer both interested
# in the topic
let msg = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
g.broadcast(mpeers, msg)

for peer in mpeers:
trace "pruning unsubscribeAll call peer", peer, score = peer.score
g.pruned(peer, topic)

g.mesh.del(topic)

msg.control =
some(ControlMessage(prune:
@[ControlPrune(topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)]))

# send to peers IN mesh now
g.broadcast(mpeers, msg)
else:
g.broadcast(gpeers, msg)

g.topicParams.del(topic)

method unsubscribeAll*(g: GossipSub, topic: string) =
g.unsubscribe(topic)
# finally let's remove from g.topics, do that by calling PubSub
procCall PubSub(g).unsubscribeAll(topic)

method unsubscribe*(g: GossipSub,
topics: seq[TopicPair]) =
procCall PubSub(g).unsubscribe(topics)

for (topic, handler) in topics:
# delete from mesh only if no handlers are left
# (handlers are removed in pubsub unsubscribe above)
if topic notin g.topics:
g.unsubscribe(topic)
# Send unsubscribe (in reverse order to sub/graft)
procCall PubSub(g).onTopicSubscription(topic, subscribed)

method publish*(g: GossipSub,
topic: string,
Expand Down
Loading