Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix goroutine build up from connected notifications #430

Merged
merged 8 commits into from
Jul 13, 2021
33 changes: 19 additions & 14 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-msgio/protoio"

"github.com/gogo/protobuf/proto"
ms "github.com/multiformats/go-multistream"
)

// get the initial RPC containing all of our subscriptions to send to new peers
Expand Down Expand Up @@ -88,22 +87,29 @@ func (p *PubSub) handleNewStream(s network.Stream) {
}
}

func (p *PubSub) notifyPeerDead(pid peer.ID) {
p.peerDeadPrioLk.RLock()
p.peerDeadMx.Lock()
p.peerDeadPend[pid] = struct{}{}
p.peerDeadMx.Unlock()
p.peerDeadPrioLk.RUnlock()

select {
case p.peerDead <- struct{}{}:
default:
}
}

func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)

var ch chan peer.ID
if err == ms.ErrNotSupported {
ch = p.newPeerError
} else {
ch = p.peerDead
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}

select {
case ch <- pid:
case p.newPeerError <- pid:
case <-ctx.Done():
}

return
}

Expand All @@ -116,18 +122,17 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}

func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
pid := s.Conn().RemotePeer()
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
if err != nil {
select {
case p.peerDead <- s.Conn().RemotePeer():
case <-ctx.Done():
}
p.notifyPeerDead(pid)
return
}
log.Debugf("unexpected message from %s", s.Conn().RemotePeer())

log.Debugf("unexpected message from %s", pid)
}
}

Expand Down
26 changes: 20 additions & 6 deletions notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ func (p *PubSubNotif) Connected(n network.Network, c network.Conn) {
if c.Stat().Transient {
return
}

go func() {
p.newPeersPrioLk.RLock()
p.newPeersMx.Lock()
p.newPeersPend[c.RemotePeer()] = struct{}{}
p.newPeersMx.Unlock()
p.newPeersPrioLk.RUnlock()

select {
case p.newPeers <- c.RemotePeer():
case <-p.ctx.Done():
case p.newPeers <- struct{}{}:
default:
}
}()
}
Expand All @@ -49,13 +56,20 @@ func (p *PubSubNotif) Initialize() {
return true
}

p.newPeersPrioLk.RLock()
p.newPeersMx.Lock()
for _, pid := range p.host.Network().Peers() {
if isTransient(pid) {
continue
}
select {
case p.newPeers <- pid:
case <-p.ctx.Done():
}

p.newPeersPend[pid] = struct{}{}
}
p.newPeersMx.Unlock()
p.newPeersPrioLk.RUnlock()

select {
case p.newPeers <- struct{}{}:
default:
}
}
142 changes: 94 additions & 48 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ type PubSub struct {
// removeTopic is a topic cancellation channel
rmTopic chan *rmTopicReq

// a notification channel for new peer connections
newPeers chan peer.ID
// a notification channel for new peer connections accumulated
newPeers chan struct{}
newPeersPrioLk sync.RWMutex
newPeersMx sync.Mutex
newPeersPend map[peer.ID]struct{}

// a notification channel for new outoging peer streams
newPeerStream chan network.Stream
Expand All @@ -99,7 +102,10 @@ type PubSub struct {
newPeerError chan peer.ID

// a notification channel for when our peers die
peerDead chan peer.ID
peerDead chan struct{}
peerDeadPrioLk sync.RWMutex
peerDeadMx sync.Mutex
peerDeadPend map[peer.ID]struct{}

// The set of topics we are subscribed to
mySubs map[string]map[*Subscription]struct{}
Expand Down Expand Up @@ -231,10 +237,12 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
signKey: nil,
signPolicy: StrictSign,
incoming: make(chan *RPC, 32),
newPeers: make(chan peer.ID),
newPeers: make(chan struct{}, 1),
newPeersPend: make(map[peer.ID]struct{}),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
peerDead: make(chan struct{}, 1),
peerDeadPend: make(map[peer.ID]struct{}),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
Expand Down Expand Up @@ -480,21 +488,8 @@ func (p *PubSub) processLoop(ctx context.Context) {

for {
select {
case pid := <-p.newPeers:
if _, ok := p.peers[pid]; ok {
log.Debug("already have connection to peer: ", pid)
continue
}

if p.blacklist.Contains(pid) {
log.Warn("ignoring connection from blacklisted peer: ", pid)
continue
}

messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages
case <-p.newPeers:
p.handlePendingPeers()

case s := <-p.newPeerStream:
pid := s.Conn().RemotePeer()
Expand All @@ -518,34 +513,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
case pid := <-p.newPeerError:
delete(p.peers, pid)

case pid := <-p.peerDead:
ch, ok := p.peers[pid]
if !ok {
continue
}

close(ch)

if p.host.Network().Connectedness(pid) == network.Connected {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages
continue
}

delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}

p.rt.RemovePeer(pid)
case <-p.peerDead:
p.handleDeadPeers()

case treq := <-p.getTopics:
var out []string
Expand Down Expand Up @@ -621,6 +590,83 @@ func (p *PubSub) processLoop(ctx context.Context) {
}
}

func (p *PubSub) handlePendingPeers() {
p.newPeersPrioLk.Lock()

if len(p.newPeersPend) == 0 {
p.newPeersPrioLk.Unlock()
return
}

newPeers := p.newPeersPend
p.newPeersPend = make(map[peer.ID]struct{})
p.newPeersPrioLk.Unlock()

vyzo marked this conversation as resolved.
Show resolved Hide resolved
for pid := range newPeers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we concerned that if this map is large we could block the event loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, if it gets large enough to take a non trivial amount of time to process, we've got bigger problems! We are getting flooded with connections.

if p.host.Network().Connectedness(pid) != network.Connected {
continue
}

if _, ok := p.peers[pid]; ok {
log.Debug("already have connection to peer: ", pid)
continue
}

if p.blacklist.Contains(pid) {
log.Warn("ignoring connection from blacklisted peer: ", pid)
continue
}

messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(p.ctx, pid, messages)
p.peers[pid] = messages
}
}

func (p *PubSub) handleDeadPeers() {
p.peerDeadPrioLk.Lock()

if len(p.peerDeadPend) == 0 {
p.peerDeadPrioLk.Unlock()
return
}

deadPeers := p.peerDeadPend
p.peerDeadPend = make(map[peer.ID]struct{})
p.peerDeadPrioLk.Unlock()

for pid := range deadPeers {
ch, ok := p.peers[pid]
if !ok {
continue
}

close(ch)

if p.host.Network().Connectedness(pid) == network.Connected {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Debugf("peer declared dead but still connected; respawning writer: %s", pid)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(p.ctx, pid, messages)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
p.peers[pid] = messages
continue
}

delete(p.peers, pid)
for t, tmap := range p.topics {
if _, ok := tmap[pid]; ok {
delete(tmap, pid)
p.notifyLeave(t, pid)
}
}

p.rt.RemovePeer(pid)
}
}

// handleAddTopic adds a tracker for a particular topic.
// Only called from processLoop.
func (p *PubSub) handleAddTopic(req *addTopicReq) {
Expand Down