Skip to content

Commit

Permalink
peer: Implement stall detection.
Browse files Browse the repository at this point in the history
This commit implements stall detection logic at the peer level to detect
and disconnect peers that are either not following the protocol in
regards to expected response messages or have otherwise stalled.  This
is accomplished by setting deadlines for each message type which expects
a response and periodically checking them while properly taking into
account processing time.

There are an increasing number of nodes on the network which claim to be
full nodes, but don't actually properly implement the entire p2p
protocol even though they implement it enough to cause properly
implemented nodes to make data requests to which they never respond.

Since btcd currently only syncs new blocks via single sync peer and,
prior to this commit only had very basic stall detection, this could
lead to a situation where the block download became stalled indefinitely
due to one of these misbehaving peers.  This commit fixes that issue
since the stalled peer will now be detected and disconnected which leads
to a new sync peer being selected.

This logic will also fit nicely with the future multi-peer sync model
which is on the roadmap and for which infrastructure work is underway.

Closes btcsuite#486.
  • Loading branch information
davecgh committed Oct 20, 2015
1 parent 520949b commit b2d0972
Showing 1 changed file with 244 additions and 13 deletions.
257 changes: 244 additions & 13 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ const (
// MaxProtocolVersion is the max protocol version the peer supports.
MaxProtocolVersion = 70011

// BlockStallTimeout is the number of seconds we will wait for a
// "block" response after we send out a "getdata" for an announced
// block before we deem the peer inactive, and disconnect it.
BlockStallTimeout = 5 * time.Second

// outputBufferSize is the number of elements the output channels use.
outputBufferSize = 50

Expand All @@ -54,6 +49,16 @@ const (
// idleTimeout is the duration of inactivity before we time out a peer.
idleTimeout = 5 * time.Minute

// stallTickInterval is the interval of time between each check for
// stalled peers.
stallTickInterval = 15 * time.Second

// stallResponseTimeout is the base maximum amount of time messages that
// expect a response will wait before disconnecting the peer for
// stalling. The deadlines are adjusted for callback running times and
// only checked on each stall tick interval.
stallResponseTimeout = 30 * time.Second

// trickleTimeout is the duration of the ticker which trickles down the
// inventory to a peer.
trickleTimeout = 10 * time.Second
Expand Down Expand Up @@ -84,8 +89,8 @@ var (
//
// NOTE: Unless otherwise documented, these listeners must NOT directly call any
// blocking calls (such as WaitForShutdown) on the peer instance since the
// inHandler goroutine blocks until the callback has completed. Doing so will
// result in a deadlock situation.
// input handler goroutine blocks until the callback has completed. Doing so
// will result in a deadlock.
type MessageListeners struct {
// OnGetAddr is invoked when a peer receives a getaddr bitcoin message.
OnGetAddr func(p *Peer, msg *wire.MsgGetAddr)
Expand Down Expand Up @@ -288,6 +293,32 @@ type outMsg struct {
doneChan chan struct{}
}

// stallControlCmd represents the command of a stall control message.
type stallControlCmd uint8

// Constants for the command of a stall control message.
const (
// sccSendMessage indicates a message is being sent to the remote peer.
sccSendMessage stallControlCmd = iota

// sccReceiveMessage indicates a message has been received from the
// remote peer.
sccReceiveMessage

// sccHandlerStart indicates a callback handler is about to be invoked.
sccHandlerStart

// sccHandlerStart indicates a callback handler has completed.
sccHandlerDone
)

// stallControlMsg is used to signal the stall handler about specific events
// so it can properly detect and handle stalled remote peers.
type stallControlMsg struct {
command stallControlCmd
message wire.Message
}

// stats is the collection of stats related to a peer.
type stats struct {
statsMtx sync.RWMutex // protects all statistics below here.
Expand Down Expand Up @@ -393,12 +424,16 @@ type Peer struct {
prevGetHdrsMtx sync.Mutex
prevGetHdrsBegin *wire.ShaHash
prevGetHdrsStop *wire.ShaHash
outputQueue chan outMsg
sendQueue chan outMsg
sendDoneQueue chan struct{}
outputInvChan chan *wire.InvVect
queueQuit chan struct{}
quit chan struct{}

stallControl chan stallControlMsg
outputQueue chan outMsg
sendQueue chan outMsg
sendDoneQueue chan struct{}
outputInvChan chan *wire.InvVect
inQuit chan struct{}
queueQuit chan struct{}
outQuit chan struct{}
quit chan struct{}

stats
}
Expand Down Expand Up @@ -1221,6 +1256,191 @@ func (p *Peer) shouldHandleReadError(err error) bool {
return true
}

// maybeAddDeadline potentially adds a deadline for the appropriate expected
// response for the passed wire protocol command to the pending responses map.
func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) {

This comment has been minimized.

Copy link
@Roasbeef

Roasbeef Oct 21, 2015

This function is missing a case for wire.CmdGetBlocks.

Correct me if I'm wrong, but in the most common case, each outgoing getblocks message should receive an inv in response. However, the exists an edge case when the best height of a new peer is identical to our best height. Meaning, they're caught up to the latest block on the main-chain as far as we know. In this scenario the remote peer detects that the first block hash within the block locator (the latest block hash) is part of the main-chain. It will then attempt to send invs for the "rest" of the main-chain, but, this block hash is of the latest known block. Therefore an inv message will not be sent because both peers are at the "edge" of the block horizon. If unaccounted for, this edge case will result in the unnecessary booting of new connections between fully synced nodes.

If accounted for, I believe this PR should also fix btcsuite#229.

This comment has been minimized.

Copy link
@davecgh

davecgh Oct 21, 2015

Author Owner

I need to confirm, but I think there is also no guarantee that a given getblocks message will result in an inv either (say it's on a side chain the remote doesn't know about). That was my reasoning for not adding a case for it. Let me take a look at 229 to refresh my memory.

// Setup a deadline for each message being sent that expects a response.
deadline := time.Now().Add(stallResponseTimeout)
switch msgCmd {
case wire.CmdVersion:
// Expects a verack message.
pendingResponses[wire.CmdVerAck] = deadline

case wire.CmdGetAddr:
// Expects an addr message.
pendingResponses[wire.CmdAddr] = deadline

case wire.CmdPing:
// Expects a pong message in later protocol versions.
if p.ProtocolVersion() > wire.BIP0031Version {
pendingResponses[wire.CmdPong] = deadline
}

case wire.CmdMemPool:
// Expects an inv message.
pendingResponses[wire.CmdInv] = deadline

case wire.CmdGetData:
// Expects a block, tx, or notfound message.
pendingResponses[wire.CmdBlock] = deadline
pendingResponses[wire.CmdTx] = deadline
pendingResponses[wire.CmdNotFound] = deadline

case wire.CmdGetHeaders:
// Expects a headers message. Use a longer deadline since it
// can take a while for the remote peer to load all of the
// headers.
deadline = time.Now().Add(stallResponseTimeout * 3)
pendingResponses[wire.CmdHeaders] = deadline
}
}

// stallHandler handles stall detection for the peer. This entails keeping
// track of expected responses and assigning them deadlines while accounting for
// the time spent in callbacks. It must be run as a goroutine.
func (p *Peer) stallHandler() {
// These variables are used to adjust the deadline times forward by the
// time it takes callbacks to execute. This is done because new
// messages aren't read until the previous one is finished processing
// (which includes callbacks), so the deadline for receiving a response
// for a given message must account for the processing time as well.
var handlerActive bool
var handlersStartTime time.Time
var deadlineOffset time.Duration

// pendingResponses tracks the expected response deadline times.
pendingResponses := make(map[string]time.Time)

// stallTicker is used to periodically check pending responses that have
// exceeded the expected deadline and disconncet the peer due to
// stalling.
stallTicker := time.NewTicker(stallTickInterval)
defer stallTicker.Stop()

// ioStopped is used to detect when both the input and output handler
// goroutines are done.
var ioStopped bool
out:
for {
select {
case msg := <-p.stallControl:
switch msg.command {
case sccSendMessage:
// Add a deadline for the expected response
// message if needed.
p.maybeAddDeadline(pendingResponses,
msg.message.Command())

case sccReceiveMessage:
// Move received messages from the expected
// reponse map. Since certain commands expect
// one of a group of responses, remove everyting
// in the expected group accordingly.
switch msgCmd := msg.message.Command(); msgCmd {
case wire.CmdBlock:
fallthrough
case wire.CmdTx:
fallthrough
case wire.CmdNotFound:
delete(pendingResponses, wire.CmdBlock)
delete(pendingResponses, wire.CmdTx)
delete(pendingResponses, wire.CmdNotFound)

default:
delete(pendingResponses, msgCmd)
}

case sccHandlerStart:
// Warn on unbalanced callback signalling.
if handlerActive {
log.Warn("Received handler start " +
"control command while a " +
"handler is already active")
continue
}

handlerActive = true
handlersStartTime = time.Now()

case sccHandlerDone:
// Warn on unbalanced callback signalling.
if !handlerActive {
log.Warn("Received handler done " +
"control command when a " +
"handler is not already active")
continue
}

// Extend active deadlines by the time it took
// to execute the callback.
duration := time.Now().Sub(handlersStartTime)
deadlineOffset += duration
handlerActive = false

default:
log.Warnf("Unsupported message command %v",
msg.command)
}

case <-stallTicker.C:
// Calculate the offset to apply to the deadline based
// on how long the handlers have taken to execute since
// the last tick.
now := time.Now()
offset := deadlineOffset
if handlerActive {
offset += now.Sub(handlersStartTime)
}

// Disconnect the peer if any of the pending responses
// don't arrive by their adjusted deadline.
for command, deadline := range pendingResponses {
if now.Before(deadline.Add(offset)) {
continue
}

log.Infof("Peer %s appears to be stalled or "+
"misbehaving since it did not send "+
"expected reply %s by deadline -- "+
"disconnecting", p, command)
p.Disconnect()
break
}

// Reset the deadline offset for the next tick.
deadlineOffset = 0

case <-p.inQuit:
// The stall handler can exit one both the input and

This comment has been minimized.

Copy link
@Roasbeef

Roasbeef Oct 21, 2015

one -> once

The same applies below.

This comment has been minimized.

Copy link
@davecgh

davecgh Oct 21, 2015

Author Owner

Fixed.

// output handler goroutines are done.
if ioStopped {
break out
}
ioStopped = true

case <-p.outQuit:
// The stall handler can exit one both the input and
// output handler goroutines are done.
if ioStopped {
break out
}
ioStopped = true
}
}

// Drain any wait channels before going away so there is nothing left
// waiting on this goroutine.
cleanup:
for {
select {
case <-p.stallControl:
default:
break cleanup
}
}
log.Tracef("Peer stall handler done for %s", p)
}

// inHandler handles all incoming messages for the peer. It must be run as a
// goroutine.
func (p *Peer) inHandler() {
Expand All @@ -1237,6 +1457,7 @@ func (p *Peer) inHandler() {
}
p.Disconnect()
})

out:
for atomic.LoadInt32(&p.disconnect) == 0 {
// Read a message and stop the idle timer as soon as the read
Expand Down Expand Up @@ -1281,6 +1502,7 @@ out:
p.statsMtx.Lock()
p.lastRecv = time.Now()
p.statsMtx.Unlock()
p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

// Ensure version message comes first.
if vmsg, ok := rmsg.(*wire.MsgVersion); !ok && !p.VersionKnown() {
Expand All @@ -1295,6 +1517,7 @@ out:
}

// Handle each supported message type.
p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
switch msg := rmsg.(type) {
case *wire.MsgVersion:
p.handleVersionMsg(msg)
Expand Down Expand Up @@ -1432,6 +1655,7 @@ out:
log.Debugf("Received unhandled message of type %v:",
rmsg.Command())
}
p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}

// A message was received so reset the idle timer.
idleTimer.Reset(idleTimeout)
Expand All @@ -1443,6 +1667,7 @@ out:
// Ensure connection is closed.
p.Disconnect()

close(p.inQuit)
log.Tracef("Peer input handler done for %s", p)
}

Expand Down Expand Up @@ -1604,6 +1829,7 @@ out:
}
}

p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
p.writeMessage(msg.msg)
p.statsMtx.Lock()
p.lastSend = time.Now()
Expand Down Expand Up @@ -1644,6 +1870,7 @@ cleanup:
break cleanup
}
}
close(p.outQuit)
log.Tracef("Peer output handler done for %s", p)
}

Expand Down Expand Up @@ -1746,6 +1973,7 @@ func (p *Peer) Start() error {
}

// Start processing input and output.
go p.stallHandler()
go p.inHandler()
go p.queueHandler()
go p.outHandler()
Expand Down Expand Up @@ -1785,11 +2013,14 @@ func newPeerBase(cfg *Config, inbound bool) *Peer {
p := Peer{
inbound: inbound,
knownInventory: NewMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
stats: stats{},
cfg: *cfg, // Copy so caller can't mutate.
Expand Down

0 comments on commit b2d0972

Please sign in to comment.