Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: data-race when sending a message from consensus/reactor to a peer via p2p.Channel #334

5 changes: 5 additions & 0 deletions internal/consensus/peer_state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consensus

import (
"errors"
"fmt"
"time"

Expand All @@ -16,6 +17,10 @@ import (
"github.com/tendermint/tendermint/types"
)

var (
errPeerClosed = errors.New("peer is closed")
)

// peerStateStats holds internal statistics for a peer.
type peerStateStats struct {
Votes int `json:"votes"`
Expand Down
31 changes: 16 additions & 15 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consensus

import (
"errors"
"fmt"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -81,6 +82,8 @@ var (
},
},
}

errReactorClosed = errors.New("reactor is closed")
)

const (
Expand Down Expand Up @@ -663,30 +666,28 @@ func (r *Reactor) sendCommit(ps *PeerState, commit *types.Commit) error {
func (r *Reactor) send(ps *PeerState, channel *p2p.Channel, msg proto.Message) error {
select {
case <-ps.closer.Done():
return fmt.Errorf("peer is closed")
return errPeerClosed
case <-r.closeCh:
return fmt.Errorf("reactor is closed")
case channel.Out <- p2p.Envelope{
To: ps.peerID,
Message: msg,
}:
return errReactorClosed
default:
return channel.Send(p2p.Envelope{
To: ps.peerID,
Message: msg,
})
}

return nil
}

// broadcast sends a broadcast message to all peers connected to the `channel`.
func (r *Reactor) broadcast(channel *p2p.Channel, msg proto.Message) error {
select {
case <-r.closeCh:
return fmt.Errorf("reactor is closed")
case channel.Out <- p2p.Envelope{
Broadcast: true,
Message: msg,
}:
return errReactorClosed
default:
return channel.Send(p2p.Envelope{
Broadcast: true,
Message: msg,
})
}

return nil
}

// logResult creates a log that depends on value of err
Expand Down
29 changes: 26 additions & 3 deletions internal/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (

const queueBufferDefault = 32

var (
// ErrPeerChannelClosed is for when the send/receive an envelope but a peer was already disconnected
ErrPeerChannelClosed = errors.New("a peer channel is closed")
)

// ChannelID is an arbitrary channel ID.
type ChannelID uint16

Expand Down Expand Up @@ -64,9 +69,9 @@ type Channel struct {
Out chan<- Envelope // outbound messages (reactors to peers)
Error chan<- PeerError // peer error reporting

mtx sync.RWMutex
messageType proto.Message // the channel's message type, used for unmarshaling
closeCh chan struct{}
closeOnce sync.Once
}

// NewChannel creates a new channel. It is primarily for internal and test
Expand All @@ -88,15 +93,33 @@ func NewChannel(
}
}

// Send sends an envelope to a peer through a channel
func (c *Channel) Send(e Envelope) error {
c.mtx.RLock()
defer c.mtx.RUnlock()
select {
case <-c.closeCh:
return ErrPeerChannelClosed
default:
c.Out <- e
}
return nil
}

// Close closes the channel. Future sends on Out and Error will panic. The In
// channel remains open to avoid having to synchronize Router senders, which
// should use Done() to detect channel closure instead.
func (c *Channel) Close() {
c.closeOnce.Do(func() {
c.mtx.Lock()
defer c.mtx.Unlock()
select {
case <-c.closeCh:
return
default:
close(c.closeCh)
close(c.Out)
close(c.Error)
})
}
}

// Done returns a channel that's closed when Channel.Close() is called.
Expand Down