Skip to content

Commit

Permalink
WithWaitUntilQueued option for topic.Publish
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Nov 17, 2019
1 parent 534fe2f commit f8a2f96
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 36 deletions.
19 changes: 19 additions & 0 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
}

out := rpcWithMessages(msg)

// fetch channel we can write to after we attempt to add a message to on a peer's outbound queue
listener, hasListener := fs.p.msgQueuedEventListeners[msgID(msg)]

// start adding the message to the outbound queue for receiver peers
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
Expand All @@ -89,12 +94,26 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
continue
}

var success bool
select {
case mch <- out:
success = true
default:
success = false
log.Infof("dropping message to peer %s: queue full", pid)
// Drop it. The peer is too slow.
}

if hasListener {
select {
case <-listener.ctx.Done():
case listener.notifChan <- &msgQueuedNotification{pid, success}:
}
}
}

if hasListener {
close(listener.notifChan)
}
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/multiformats/go-multiaddr v0.1.1
github.com/multiformats/go-multistream v0.1.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
)

Expand Down
28 changes: 25 additions & 3 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,30 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
}

out := rpcWithMessages(msg)

// fetch channel we can write to after we attempt to add a message to on a peer's outbound queue
listener, hasListener := gs.p.msgQueuedEventListeners[msgID(msg)]

// add message to the outbound queues for receiver peers
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}

gs.sendRPC(pid, out)
success := gs.sendRPC(pid, out)

// send notification to the listener about the message queue attempt
if hasListener {
select {
case <-listener.ctx.Done():
case listener.notifChan <- &msgQueuedNotification{pid, success}:
}
}
}

// close the listener channel because we have no more notifications to send for this message
if hasListener {
close(listener.notifChan)
}
}

Expand Down Expand Up @@ -353,7 +371,8 @@ func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
gs.sendRPC(p, out)
}

func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
// returns true if the rpc was successfully added to the peer's outbound queue
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) bool {
// do we own the RPC?
own := false

Expand All @@ -379,11 +398,12 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {

mch, ok := gs.p.peers[p]
if !ok {
return
return false
}

select {
case mch <- out:
return true
default:
log.Infof("dropping message to peer %s: queue full", p)
// push control messages that need to be retried
Expand All @@ -392,6 +412,8 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
gs.pushControl(p, ctl)
}
}

return false
}

func (gs *GossipSubRouter) heartbeatTimer() {
Expand Down
78 changes: 46 additions & 32 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type PubSub struct {
// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}

// listeners for when we attempt to add a message to a peer's outbound queue
msgQueuedEventListeners map[string]*msgQueuedEventListener

// The set of topics we are interested in
myTopics map[string]*Topic

Expand Down Expand Up @@ -169,38 +172,39 @@ type Option func(*PubSub) error
// NewPubSub returns a new PubSub management object.
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
ps := &PubSub{
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
msgQueuedEventListeners: make(map[string]*msgQueuedEventListener),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
}

for _, opt := range opts {
Expand Down Expand Up @@ -741,6 +745,16 @@ func (p *PubSub) publishMessage(msg *Message) {
p.rt.Publish(msg.ReceivedFrom, msg.Message)
}

type msgQueuedNotification struct {
peer peer.ID
success bool
}

type msgQueuedEventListener struct {
ctx context.Context // context of the listener
notifChan chan *msgQueuedNotification // channel we will write the event notification to
}

type addTopicReq struct {
topic *Topic
resp chan *Topic
Expand Down
126 changes: 125 additions & 1 deletion topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@ import (
"fmt"
"sync"

logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
pkgerrors "github.com/pkg/errors"
)

// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

// ErrFailedToAddToPeerQueue is returned if we fail to achieve the desired target for WithWaitUntilQueued because of full outbound peer queues
var ErrFailedToAddToPeerQueue = errors.New("failed to achieve desired WithWaitUntilQueued target")

var topicHandleLog = logging.Logger("topicHandle")

// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
Expand Down Expand Up @@ -125,7 +132,8 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
type RouterReady func(rt PubSubRouter, topic string) (bool, error)

type PublishOptions struct {
ready RouterReady
ready RouterReady
nQueuedNotifs int
}

type PubOpt func(pub *PublishOptions) error
Expand Down Expand Up @@ -166,12 +174,111 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
}

var waitForMsgQueuedNotifications bool
var notifChan chan *msgQueuedNotification
var msgQueuedTargetAchieved chan error
// setup for receiving notifications when a message is added to a peer's outbound queue
if pub.nQueuedNotifs != 0 {
waitForMsgQueuedNotifications = true

// create & register the listener
listenerContext, cancel := context.WithCancel(ctx)
notifChan = make(chan *msgQueuedNotification)
listener := &msgQueuedEventListener{listenerContext, notifChan}

done := make(chan struct{}, 1)
select {
case t.p.eval <- func() {
t.p.msgQueuedEventListeners[msgID(m)] = listener
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
<-done

// remove the listener & cancel the listener context before we return
defer func() {
cancel()

done := make(chan struct{}, 1)
select {
case t.p.eval <- func() {
delete(t.p.msgQueuedEventListeners, msgID(m))
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return
}
<-done
}()

// start listening to notifications
msgQueuedTargetAchieved = make(chan error, 1)
go func() {
nSuccess := 0
var failedPeers []peer.ID

for {
select {
case <-listenerContext.Done():
return
case notif, ok := <-notifChan:
if !ok {
// notification channel is closed

if pub.nQueuedNotifs == -1 {
if len(failedPeers) == 0 {
msgQueuedTargetAchieved <- nil
} else {
topicHandleLog.Warningf("Publish: failed on the -1/ALL option: failed to add messageID %s to queues for peers %v",
msgID(m), failedPeers)

msgQueuedTargetAchieved <- ErrFailedToAddToPeerQueue
}
} else {
topicHandleLog.Warningf("Publish: did not achieve desired count: "+
"failed to add messageID %s to queues for peers %v, success count is %d", msgID(m), failedPeers, nSuccess)

msgQueuedTargetAchieved <- ErrFailedToAddToPeerQueue
}
return
} else {
if !notif.success {
failedPeers = append(failedPeers, notif.peer)
} else {
nSuccess++
if pub.nQueuedNotifs != -1 {
if nSuccess == pub.nQueuedNotifs {
msgQueuedTargetAchieved <- nil
return
}
}
}
}
}
}
}()
}

select {
case t.p.publish <- &Message{m, id}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}

// wait for msg queued notifications
if waitForMsgQueuedNotifications {
select {
case err := <-msgQueuedTargetAchieved:
return err
case <-ctx.Done():
return pkgerrors.Wrap(ctx.Err(), "context expired while waiting for msg queued notifs")
case <-t.p.ctx.Done():
return pkgerrors.Wrap(t.p.ctx.Err(), "pubsub context expired while waiting for msg queued notifs")
}
}

return nil
}

Expand All @@ -184,6 +291,23 @@ func WithReadiness(ready RouterReady) PubOpt {
}
}

// WithWaitUntilQueued blocks the publish until the message has been added to the outbound message queue for
// as many peers as the arg indicates
// A value of -1 means all peers in mesh/fanout for gossipsub & all subscribed peers in floodsub
// Please note that if nPeers is -1, the behavior is not fail fast
// If we fail to achieve the desired target because of full outbound peer queues, Publish will return ErrFailedToAddToPeerQueues
// However, the message could still have been added to the outbound queue for other peers
func WithWaitUntilQueued(nPeers int) PubOpt {
return func(pub *PublishOptions) error {
if nPeers < -1 {
return errors.New("nPeers should be greater than or equal to -1, please refer to the docs for WithWaitUntilQueued")
}

pub.nQueuedNotifs = nPeers
return nil
}
}

// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
Expand Down
Loading

0 comments on commit f8a2f96

Please sign in to comment.