Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat: coalesce and queue connection event handling #565

Merged
merged 3 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

bs.pqm.Startup()
network.SetDelegate(bs)
network.Start(bs)

// Start up bitswaps async worker routines
bs.startWorkers(ctx, px)
Expand All @@ -316,6 +316,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sm.Shutdown()
cancelFunc()
notif.Shutdown()
network.Stop()
}()
procctx.CloseAfterContext(px, ctx) // parent cancelled first

Expand Down
178 changes: 140 additions & 38 deletions network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,96 +11,198 @@ type ConnectionListener interface {
PeerDisconnected(peer.ID)
}

type state byte

const (
stateDisconnected = iota
stateResponsive
stateUnresponsive
)

type connectEventManager struct {
connListener ConnectionListener
lk sync.RWMutex
conns map[peer.ID]*connState
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []peer.ID
stop bool
done chan struct{}
}

type connState struct {
refs int
responsive bool
type peerState struct {
newState, curState state
pending bool
}

func newConnectEventManager(connListener ConnectionListener) *connectEventManager {
return &connectEventManager{
evtManager := &connectEventManager{
connListener: connListener,
conns: make(map[peer.ID]*connState),
peers: make(map[peer.ID]*peerState),
done: make(chan struct{}),
}
evtManager.cond = sync.Cond{L: &evtManager.lk}
return evtManager
}

func (c *connectEventManager) Connected(p peer.ID) {
func (c *connectEventManager) Start() {
go c.worker()
}

func (c *connectEventManager) Stop() {
c.lk.Lock()
defer c.lk.Unlock()
c.stop = true
c.lk.Unlock()
c.cond.Broadcast()

<-c.done
}

func (c *connectEventManager) getState(p peer.ID) state {
if state, ok := c.peers[p]; ok {
return state.newState
} else {
return stateDisconnected
}
}

state, ok := c.conns[p]
func (c *connectEventManager) setState(p peer.ID, newState state) {
state, ok := c.peers[p]
if !ok {
state = &connState{responsive: true}
c.conns[p] = state
state = new(peerState)
c.peers[p] = state
}
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
c.cond.Broadcast()
}
}

func (c *connectEventManager) worker() {
c.lk.Lock()
defer c.lk.Unlock()
defer close(c.done)

for {
for !c.stop && len(c.changeQueue) == 0 {
c.cond.Wait()
}

if c.stop {
return
}
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("")
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
// If we've disconnected and forgotten, continue. We shouldn't reach this?
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
continue
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
c.connListener.PeerDisconnected(pid)
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
c.connListener.PeerConnected(pid)
c.lk.Lock()
}
}
state.refs++
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

if state.refs == 1 && state.responsive {
c.connListener.PeerConnected(p)
// disconnected -> responsive
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

if c.getState(p) != stateDisconnected {
return
}
c.setState(p, stateResponsive)
}

// Called when we drop the final connection to a peer.
func (c *connectEventManager) Disconnected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

state, ok := c.conns[p]
if !ok {
// Should never happen
// !disconnected -> disconnected

if c.getState(p) == stateDisconnected {
return
}
state.refs--

if state.refs == 0 {
if state.responsive {
c.connListener.PeerDisconnected(p)
}
delete(c.conns, p)
}
c.setState(p, stateDisconnected)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}

// Called whenever a peer is unresponsive.
func (c *connectEventManager) MarkUnresponsive(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

state, ok := c.conns[p]
if !ok || !state.responsive {
// responsive -> unresponsive

if c.getState(p) != stateResponsive {
return
}
state.responsive = false

c.connListener.PeerDisconnected(p)
c.setState(p, stateUnresponsive)
}

// Called whenever we receive a message from a peer.
//
// - When we're connected to the peer, this will mark the peer as responsive (from unresponsive).
// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process
// the "on message" event, so we can't treat this as evidence of a connection.
func (c *connectEventManager) OnMessage(p peer.ID) {
// This is a frequent operation so to avoid different message arrivals
// getting blocked by a write lock, first take a read lock to check if
// we need to modify state
c.lk.RLock()
state, ok := c.conns[p]
responsive := ok && state.responsive
unresponsive := c.getState(p) == stateUnresponsive
c.lk.RUnlock()

if !ok || responsive {
// Only continue if both connected, and unresponsive.
if !unresponsive {
return
}

// unresponsive -> responsive

// We need to make a modification so now take a write lock
c.lk.Lock()
defer c.lk.Unlock()

// Note: state may have changed in the time between when read lock
// was released and write lock taken, so check again
state, ok = c.conns[p]
if !ok || state.responsive {
if c.getState(p) != stateUnresponsive {
return
}

state.responsive = true
c.connListener.PeerConnected(p)
c.setState(p, stateResponsive)
}
Loading