Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat: coalesce and queue connection event handling #565

Merged
merged 3 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

bs.pqm.Startup()
network.SetDelegate(bs)
network.Start(bs)

// Start up bitswaps async worker routines
bs.startWorkers(ctx, px)
Expand All @@ -316,6 +316,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sm.Shutdown()
cancelFunc()
notif.Shutdown()
network.Stop()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first

Expand Down
183 changes: 145 additions & 38 deletions network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,96 +11,203 @@ type ConnectionListener interface {
PeerDisconnected(peer.ID)
}

type state byte

const (
stateDisconnected = iota
stateResponsive
stateUnresponsive
)

type connectEventManager struct {
connListener ConnectionListener
lk sync.RWMutex
conns map[peer.ID]*connState
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []peer.ID
stop bool
done chan struct{}
}

type connState struct {
refs int
responsive bool
type peerState struct {
newState, curState state
pending bool
}

func newConnectEventManager(connListener ConnectionListener) *connectEventManager {
return &connectEventManager{
evtManager := &connectEventManager{
connListener: connListener,
conns: make(map[peer.ID]*connState),
peers: make(map[peer.ID]*peerState),
done: make(chan struct{}),
}
evtManager.cond = sync.Cond{L: &evtManager.lk}
return evtManager
}

func (c *connectEventManager) Connected(p peer.ID) {
func (c *connectEventManager) Start() {
go c.worker()
}

func (c *connectEventManager) Stop() {
c.lk.Lock()
defer c.lk.Unlock()
c.stop = true
c.lk.Unlock()
c.cond.Broadcast()

state, ok := c.conns[p]
<-c.done
}

func (c *connectEventManager) getState(p peer.ID) state {
if state, ok := c.peers[p]; ok {
return state.newState
} else {
return stateDisconnected
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
state, ok := c.peers[p]
if !ok {
state = &connState{responsive: true}
c.conns[p] = state
state = new(peerState)
c.peers[p] = state
}
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
c.cond.Broadcast()
}
state.refs++
}

if state.refs == 1 && state.responsive {
c.connListener.PeerConnected(p)
// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
// connect event manager has been stopped.
func (c *connectEventManager) waitChange() bool {
for !c.stop && len(c.changeQueue) == 0 {
c.cond.Wait()
}
return !c.stop
}

func (c *connectEventManager) Disconnected(p peer.ID) {
func (c *connectEventManager) worker() {
c.lk.Lock()
defer c.lk.Unlock()
defer close(c.done)

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
continue
}

state, ok := c.conns[p]
if !ok {
// Should never happen
// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
c.connListener.PeerDisconnected(pid)
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
c.connListener.PeerConnected(pid)
c.lk.Lock()
}
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !responsive -> responsive

if c.getState(p) == stateResponsive {
return
}
state.refs--
c.setState(p, stateResponsive)
}

if state.refs == 0 {
if state.responsive {
c.connListener.PeerDisconnected(p)
}
delete(c.conns, p)
// Called when we drop the final connection to a peer.
func (c *connectEventManager) Disconnected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !disconnected -> disconnected

if c.getState(p) == stateDisconnected {
return
}

c.setState(p, stateDisconnected)
}

// Called whenever a peer is unresponsive.
func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

state, ok := c.conns[p]
if !ok || !state.responsive {
// responsive -> unresponsive

if c.getState(p) != stateResponsive {
return
}
state.responsive = false

c.connListener.PeerDisconnected(p)
c.setState(p, stateUnresponsive)
}

// Called whenever we receive a message from a peer.
//
// - When we're connected to the peer, this will mark the peer as responsive (from unresponsive).
// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process
// the "on message" event, so we can't treat this as evidence of a connection.
func (c *connectEventManager) OnMessage(p peer.ID) {
// This is a frequent operation so to avoid different message arrivals
// getting blocked by a write lock, first take a read lock to check if
// we need to modify state
c.lk.RLock()
state, ok := c.conns[p]
responsive := ok && state.responsive
unresponsive := c.getState(p) == stateUnresponsive
c.lk.RUnlock()

if !ok || responsive {
// Only continue if both connected, and unresponsive.
if !unresponsive {
return
}

// unresponsive -> responsive

// We need to make a modification so now take a write lock
c.lk.Lock()
defer c.lk.Unlock()

// Note: state may have changed in the time between when read lock
// was released and write lock taken, so check again
state, ok = c.conns[p]
if !ok || state.responsive {
if c.getState(p) != stateUnresponsive {
return
}

state.responsive = true
c.connListener.PeerConnected(p)
c.setState(p, stateResponsive)
}
Loading