Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WithWaitUntilQueued option for topic.Publish #232

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
}

out := rpcWithMessages(msg)

// fetch listener to send notification to after we attempt to add a message to a peer's outbound queue
listener, hasListener := fs.p.msgQueuedEventListeners[msgID(msg)]

// start adding the message to the outbound queue for receiver peers
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
Expand All @@ -89,12 +94,26 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
continue
}

var success bool
select {
case mch <- out:
success = true
default:
success = false
log.Infof("dropping message to peer %s: queue full", pid)
// Drop it. The peer is too slow.
}

if hasListener {
select {
case <-listener.ctx.Done():
case listener.notifChan <- &msgQueuedNotification{pid, success}:
Copy link
Collaborator

@vyzo vyzo Nov 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way too heavy, it is going to spam with events for every message sent.
Also note that the channel could block the event loop, which is a no,no.

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Nov 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vyzo

I see what you mean.

We could use a buffered channel here & drop the event if the buffer is full.
But, that could lead to lost events & Publish would end up erroneously reporting that it wasn't able to fulfil the target.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So yeah, this seems quite impossible to implement correctly.
We can't block the event loop and we don't want to erroneously report non-delivery (or not report at all).

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Nov 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulk @aschmahmann Wdyt ? Can we use a buffered channel here & do the WaitUntilQueued on a "best effort" basis ?

Or, is there a smarter way to do this ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could launch a goroutine for every channel put, which would side-step the problem. But I worry about potentially unbounded number of goroutines running if the receiver is stalling.

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Nov 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vyzo @aschmahmann Aha, here is one thing that can work:

For WaitUntilNQueued:

  • Buffer size = N & the event loop ONLY writes success cases to the channel
  • Event loop drops messages if the buffer is full

For WaitiUntillAllQueued:
This is slightly tricky as I've explained here. But, we can do it like so:

  • Buffer size = len(pubsub.topics[topic]) (number of peers interested in the topic) when Publish is called
  • Event loop writes both success & failure cases to the channel
  • Event loop spins up a new go-routine if the buffer is full. Since we have estimated the buffer size based on the number of peers we know are interested in the topic when Publish is called, it effectively prevents spinning up too many go-routines.

Let me know what you think :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @aschmahmann @vyzo

Please take a look at this when you can.

}
}
}

if hasListener {
close(listener.notifChan)
}
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ require (
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/multiformats/go-multiaddr v0.1.1
github.com/multiformats/go-multistream v0.1.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
)

Expand Down
28 changes: 25 additions & 3 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,30 @@ func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) {
}

out := rpcWithMessages(msg)

// fetch listener to send notification to after we attempt to add a message to a peer's outbound queue
listener, hasListener := gs.p.msgQueuedEventListeners[msgID(msg)]

// add message to the outbound queues for receiver peers
for pid := range tosend {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}

gs.sendRPC(pid, out)
success := gs.sendRPC(pid, out)

// send notification to the listener about the message queue attempt
if hasListener {
select {
case <-listener.ctx.Done():
case listener.notifChan <- &msgQueuedNotification{pid, success}:
}
}
}

// close the listener channel because we have no more notifications to send for this message
if hasListener {
close(listener.notifChan)
}
}

Expand Down Expand Up @@ -353,7 +371,8 @@ func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
gs.sendRPC(p, out)
}

func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
// returns true if the rpc was successfully added to the peer's outbound queue
func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) bool {
// do we own the RPC?
own := false

Expand All @@ -379,11 +398,12 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {

mch, ok := gs.p.peers[p]
if !ok {
return
return false
}

select {
case mch <- out:
return true
default:
log.Infof("dropping message to peer %s: queue full", p)
// push control messages that need to be retried
Expand All @@ -392,6 +412,8 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
gs.pushControl(p, ctl)
}
}

return false
}

func (gs *GossipSubRouter) heartbeatTimer() {
Expand Down
80 changes: 47 additions & 33 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type PubSub struct {
// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}

// listeners for when we attempt to add a message to a peer's outbound queue
msgQueuedEventListeners map[string]*msgQueuedEventListener

// The set of topics we are interested in
myTopics map[string]*Topic

Expand Down Expand Up @@ -174,39 +177,40 @@ type Option func(*PubSub) error
// NewPubSub returns a new PubSub management object.
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
ps := &PubSub{
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
msgQueuedEventListeners: make(map[string]*msgQueuedEventListener),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
}

for _, opt := range opts {
Expand Down Expand Up @@ -759,6 +763,16 @@ func (p *PubSub) publishMessage(msg *Message) {
p.rt.Publish(msg.ReceivedFrom, msg.Message)
}

type msgQueuedNotification struct {
peer peer.ID
success bool
}

type msgQueuedEventListener struct {
ctx context.Context // context of the listener
notifChan chan *msgQueuedNotification // channel we will write the event notification to
}

type addTopicReq struct {
topic *Topic
resp chan *Topic
Expand Down
125 changes: 124 additions & 1 deletion topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@ import (
"fmt"
"sync"

logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
pkgerrors "github.com/pkg/errors"
)

// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

// ErrFailedToAddToPeerQueue is returned if we fail to achieve the desired target for WithWaitUntilQueued because of full outbound peer queues
var ErrFailedToAddToPeerQueue = errors.New("failed to achieve desired WithWaitUntilQueued target")

var topicHandleLog = logging.Logger("topicHandle")

// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
Expand Down Expand Up @@ -125,7 +132,8 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
type RouterReady func(rt PubSubRouter, topic string) (bool, error)

type PublishOptions struct {
ready RouterReady
ready RouterReady
nQueuedNotifs int
}

type PubOpt func(pub *PublishOptions) error
Expand Down Expand Up @@ -166,12 +174,110 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
}

var waitForMsgQueuedNotifications bool
var msgQueuedTargetAchieved chan error
// setup for receiving notifications when a message is added to a peer's outbound queue
if pub.nQueuedNotifs != 0 {
waitForMsgQueuedNotifications = true

// create & register the listener
listenerContext, cancel := context.WithCancel(ctx)
notifChan := make(chan *msgQueuedNotification)
listener := &msgQueuedEventListener{listenerContext, notifChan}

done := make(chan struct{}, 1)
select {
case t.p.eval <- func() {
t.p.msgQueuedEventListeners[msgID(m)] = listener
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
<-done

// remove the listener & cancel the listener context before we return
defer func() {
cancel()

done := make(chan struct{}, 1)
select {
case t.p.eval <- func() {
delete(t.p.msgQueuedEventListeners, msgID(m))
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return
}
<-done
}()

// start listening to notifications
msgQueuedTargetAchieved = make(chan error, 1)
go func() {
nSuccess := 0
var failedPeers []peer.ID

for {
select {
case <-listenerContext.Done():
return
case notif, ok := <-notifChan:
if !ok {
// notification channel is closed

if pub.nQueuedNotifs == -1 {
if len(failedPeers) == 0 {
msgQueuedTargetAchieved <- nil
} else {
topicHandleLog.Warningf("Publish: failed on the -1/ALL option: failed to add messageID %s to queues for peers %v",
msgID(m), failedPeers)

msgQueuedTargetAchieved <- ErrFailedToAddToPeerQueue
}
} else {
topicHandleLog.Warningf("Publish: did not achieve desired count: "+
"failed to add messageID %s to queues for peers %v, success count is %d", msgID(m), failedPeers, nSuccess)

msgQueuedTargetAchieved <- ErrFailedToAddToPeerQueue
}
return
} else {
if !notif.success {
failedPeers = append(failedPeers, notif.peer)
} else {
nSuccess++
if pub.nQueuedNotifs != -1 {
if nSuccess == pub.nQueuedNotifs {
msgQueuedTargetAchieved <- nil
return
}
}
}
}
}
}
}()
}

select {
case t.p.publish <- &Message{m, id, nil}:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}

// wait for msg queued notifications
if waitForMsgQueuedNotifications {
select {
case err := <-msgQueuedTargetAchieved:
return err
case <-ctx.Done():
return pkgerrors.Wrap(ctx.Err(), "context expired while waiting for msg queued notifs")
case <-t.p.ctx.Done():
return pkgerrors.Wrap(t.p.ctx.Err(), "pubsub context expired while waiting for msg queued notifs")
}
}

return nil
}

Expand All @@ -184,6 +290,23 @@ func WithReadiness(ready RouterReady) PubOpt {
}
}

// WithWaitUntilQueued blocks the publish until the message has been added to the outbound message queue for
// as many peers as the arg indicates
// A value of -1 means all peers in mesh/fanout for gossipsub & all subscribed peers in floodsub
// Please note that if nPeers is -1, the behavior is not fail fast
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a "non fail fast" behaviour make sense here ? If the caller is willing to wait for the message to be added to the queues for all peers, I figured we might as well wait till the end so we can log all failures for better debugging later.

// If we fail to achieve the desired target because of full outbound peer queues, Publish will return ErrFailedToAddToPeerQueues
// However, the message could still have been added to the outbound queue for other peers
func WithWaitUntilQueued(nPeers int) PubOpt {
return func(pub *PublishOptions) error {
if nPeers < -1 {
return errors.New("nPeers should be greater than or equal to -1, please refer to the docs for WithWaitUntilQueued")
}

pub.nQueuedNotifs = nPeers
return nil
}
}

// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
Expand Down
Loading