Skip to content

Commit

Permalink
fix(evidence): send evidence only once
Browse files Browse the repository at this point in the history
Right now, evidence is sent to all peers, which is not necessary
and can generate huge load during chain halt.

Reproduction scenario:

1. Stop majority of chain nodes (except one)
2. Remove WAL from stopped nodes
3. Break the code so that the chain will be halted
4. Start stopped nodes

As a result, these nodes will generate huge amount of evidence
messages, which will be sent to all peers repeatedly.
On big enough network, this will cause significant load.
  • Loading branch information
lklimek committed Sep 14, 2023
1 parent a94c22e commit 7f1046a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 67 deletions.
119 changes: 53 additions & 66 deletions internal/evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"fmt"
"runtime/debug"
"time"

sync "github.com/sasha-s/go-deadlock"

clist "github.com/dashpay/tenderdash/internal/libs/clist"
"github.com/dashpay/tenderdash/internal/p2p"
"github.com/dashpay/tenderdash/libs/log"
"github.com/dashpay/tenderdash/libs/service"
Expand All @@ -22,12 +20,6 @@ const (
EvidenceChannel = p2p.ChannelID(0x38)

maxMsgSize = 1048576 // 1MB TODO make it configurable

// broadcast all uncommitted evidence this often. This sets when the reactor
// goes back to the start of the list and begins sending the evidence again.
// Most evidence should be committed in the very next block that is why we wait
// just over the block production rate before sending evidence again.
broadcastEvidenceIntervalS = 10
)

// GetChannelDescriptor produces an instance of a descriptor for this
Expand All @@ -49,6 +41,8 @@ type Reactor struct {

evpool *Pool
chCreator p2p.ChannelCreator
evidenceCh p2p.Channel

peerEvents p2p.PeerEventSubscriber

mtx sync.Mutex
Expand Down Expand Up @@ -82,14 +76,14 @@ func NewReactor(
// envelopes on each. In addition, it also listens for peer updates and handles
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
ch, err := r.chCreator(ctx, GetChannelDescriptor())
func (r *Reactor) OnStart(ctx context.Context) (err error) {
r.evidenceCh, err = r.chCreator(ctx, GetChannelDescriptor())
if err != nil {
return err
}

go r.processEvidenceCh(ctx, ch)
go r.processPeerUpdates(ctx, r.peerEvents(ctx, "evidence"), ch)
go r.processEvidenceCh(ctx)
go r.processPeerUpdates(ctx, r.peerEvents(ctx, "evidence"))

return nil
}
Expand All @@ -111,23 +105,30 @@ func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envel
// Evidence is sent and received one by one
ev, err := types.EvidenceFromProto(msg)
if err != nil {
logger.Error("failed to convert evidence", "err", err)
logger.Error("failed to convert evidence", "error", err)
return err
}
if err := r.evpool.AddEvidence(ctx, ev); err != nil {
// If we're given invalid evidence by the peer, notify the router that
// we should remove this peer by returning an error.
if _, ok := err.(*types.ErrInvalidEvidence); ok {
return err

// If the evidence is already pending or committed, we don't need to
// broadcast it again.
if !r.evpool.isPending(ev) && !r.evpool.isCommitted(ev) {
if err := r.evpool.AddEvidence(ctx, ev); err != nil {
// If we're given invalid evidence by the peer, notify the router that
// we should remove this peer by returning an error.
if _, ok := err.(*types.ErrInvalidEvidence); ok {
return err
}
logger.Error("failed to add evidence", "error", err)
}

return r.broadcastEvidence(ctx, *msg, r.evidenceCh)
}
logger.Debug("evidence already pending", "evidence", ev)
return nil

default:
return fmt.Errorf("received unknown message: %T", msg)
}

return nil
}

// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
Expand Down Expand Up @@ -159,13 +160,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er

// processEvidenceCh implements a blocking event loop where we listen for p2p
// Envelope messages from the evidenceCh.
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) {
iter := evidenceCh.Receive(ctx)
func (r *Reactor) processEvidenceCh(ctx context.Context) {
iter := r.evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
if serr := evidenceCh.SendError(ctx, p2p.PeerError{
if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
Expand All @@ -186,7 +187,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel)
// connects/disconnects frequently from the broadcasting peer(s).
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)

r.mtx.Lock()
Expand All @@ -209,7 +210,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
if !ok {
pctx, pcancel := context.WithCancel(ctx)
r.peerRoutines[peerUpdate.NodeID] = pcancel
go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID, evidenceCh)
go r.syncEvidence(pctx, peerUpdate.NodeID)
}

case p2p.PeerStatusDown:
Expand All @@ -227,31 +228,23 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, evidenceCh)
r.processPeerUpdate(ctx, peerUpdate)
case <-ctx.Done():
return
}
}
}

// broadcastEvidenceLoop starts a blocking process that continuously reads pieces
// of evidence off of a linked-list and sends the evidence in a p2p Envelope to
// the given peer by ID. This should be invoked in a goroutine per unique peer
// syncEvidence starts a blocking process that sends all evidence to a newly
// connected peer. This should be invoked in a goroutine per unique peer
// ID via an appropriate PeerUpdate. The goroutine can be signaled to gracefully
// exit by either explicitly closing the provided doneCh or by the reactor
// signaling to stop.
//
// TODO: This should be refactored so that we do not blindly gossip evidence
// that the peer has already received or may not be ready for.
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) {
var next *clist.CElement

func (r *Reactor) syncEvidence(ctx context.Context, peerID types.NodeID) {
defer func() {
r.mtx.Lock()
delete(r.peerRoutines, peerID)
Expand All @@ -266,25 +259,9 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
}
}()

timer := time.NewTimer(0)
defer timer.Stop()

for {
// This happens because the CElement we were looking at got garbage
// collected (removed). That is, .NextWaitChan() returned nil. So we can go
// ahead and start from the beginning.
if next == nil {
select {
case <-r.evpool.EvidenceWaitChan(): // wait until next evidence is available
if next = r.evpool.EvidenceFront(); next == nil {
continue
}

case <-ctx.Done():
return
}
}
next := r.evpool.EvidenceFront()

for next != nil {
ev := next.Value.(types.Evidence)
evProto, err := types.EvidenceToProto(ev)
if err != nil {
Expand All @@ -296,25 +273,35 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
// peer may receive this piece of evidence multiple times if it added and
// removed frequently from the broadcasting peer.

if err := evidenceCh.Send(ctx, p2p.Envelope{
if err := r.evidenceCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: evProto,
}); err != nil {
return
}
r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID)
r.logger.Debug("evidence sync: sent evidence to peer", "evidence", ev, "peer", peerID)

select {
case <-timer.C:
// start from the beginning after broadcastEvidenceIntervalS seconds
timer.Reset(time.Second * broadcastEvidenceIntervalS)
next = nil

case <-next.NextWaitChan():
next = next.Next()

case <-ctx.Done():
return
default:
}

next = next.Next()
}
r.logger.Debug("evidence sync finished", "peer", peerID)
}

// broadcastEvidence sends new evidence to all connected peers.
func (r *Reactor) broadcastEvidence(ctx context.Context, evidence tmproto.Evidence, evidenceCh p2p.Channel) error {

if err := evidenceCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &evidence,
}); err != nil {
return fmt.Errorf("failed to broadcast evidence: %w", err)
}
r.logger.Debug("evidence broadcasted", "evidence", evidence)

return nil
}
2 changes: 1 addition & 1 deletion internal/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (r *Router) routeChannel(
r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds())

case <-q.closed():
r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
r.logger.Debug("dropping message on closed channel", "peer", envelope.To, "channel", chID)

case <-ctx.Done():
return
Expand Down

0 comments on commit 7f1046a

Please sign in to comment.