-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix goroutine build up from connected notifications #430
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple of comments/questions, generally I get the changes and trust your instincts here.
newPeers := p.newPeersPend | ||
p.newPeersPend = make(map[peer.ID]struct{}) | ||
|
||
for pid := range newPeers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we concerned that if this map is large we could block the event loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, if it gets large enough to take a non trivial amount of time to process, we've got bigger problems! We are getting flooded with connections.
pubsub.go
Outdated
// contention, return and wait for the next notification without blocking the event loop | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it might be ok, although is it a problem if there's contention for a long time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think so, at worst some peers wont be processed for a while; we can live with that.
pubsub.go
Outdated
@@ -260,6 +264,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option | |||
counter: uint64(time.Now().UnixNano()), | |||
} | |||
|
|||
ps.newPeersSema <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I generally prefer the pattern where writing takes the lock, and reading unlocks. Just means you can avoid this initialization part.
pubsub.go
Outdated
@@ -231,7 +233,9 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option | |||
signKey: nil, | |||
signPolicy: StrictSign, | |||
incoming: make(chan *RPC, 32), | |||
newPeers: make(chan peer.ID), | |||
newPeers: make(chan struct{}, 1), | |||
newPeersSema: make(chan struct{}, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd really just use a lock here. We're just protecting reads/writes to a single map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really -- we can block the event loop indefinitely if there is a lot of contention in the lock; better to use a semaphore that allows us to try lock.
don't block the swarm while waiting for the semaphore.
so that there is no case of infinite accumulation of pending peers in the queue. also adds a connectedness check before adding the peer.
rebased on master and also:
|
As observed in the gateway.
Aggregates notifications and batch processes, while being careful not to block the event loop.