From e1db9a9c37acc031aa0962bdd1af542f1b8eefeb Mon Sep 17 00:00:00 2001 From: shotonoff Date: Fri, 8 Apr 2022 10:18:09 +0200 Subject: [PATCH 1/5] fix: add an additional select block to check were a chennel / peer or reactor closed --- internal/consensus/peer_state.go | 5 +++++ internal/consensus/reactor.go | 31 ++++++++++++++++++++++++++----- internal/p2p/router.go | 5 +++++ 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/internal/consensus/peer_state.go b/internal/consensus/peer_state.go index 3965ca1c28..625053006c 100644 --- a/internal/consensus/peer_state.go +++ b/internal/consensus/peer_state.go @@ -1,6 +1,7 @@ package consensus import ( + "errors" "fmt" "time" @@ -16,6 +17,10 @@ import ( "github.com/tendermint/tendermint/types" ) +var ( + errPeerClosed = errors.New("peer is closed") +) + // peerStateStats holds internal statistics for a peer. type peerStateStats struct { Votes int `json:"votes"` diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index a329bc8abf..fbe461e319 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -1,6 +1,7 @@ package consensus import ( + "errors" "fmt" "runtime/debug" "sync" @@ -81,6 +82,8 @@ var ( }, }, } + + errReactorClosed = errors.New("reactor is closed") ) const ( @@ -662,30 +665,48 @@ func (r *Reactor) sendCommit(ps *PeerState, commit *types.Commit) error { // If to is nil, message will be broadcasted. func (r *Reactor) send(ps *PeerState, channel *p2p.Channel, msg proto.Message) error { select { + case <-channel.Done(): + return p2p.ErrPeerChannelClosed + case <-ps.closer.Done(): + return errPeerClosed + case <-r.closeCh: + return errReactorClosed + default: + } + select { + case <-channel.Done(): + return p2p.ErrPeerChannelClosed case <-ps.closer.Done(): - return fmt.Errorf("peer is closed") + return errPeerClosed case <-r.closeCh: - return fmt.Errorf("reactor is closed") + return errReactorClosed case channel.Out <- p2p.Envelope{ To: ps.peerID, Message: msg, }: } - return nil } // broadcast sends a broadcast message to all peers connected to the `channel`. func (r *Reactor) broadcast(channel *p2p.Channel, msg proto.Message) error { select { + case <-channel.Done(): + return p2p.ErrPeerChannelClosed + case <-r.closeCh: + return errReactorClosed + default: + } + select { + case <-channel.Done(): + return p2p.ErrPeerChannelClosed case <-r.closeCh: - return fmt.Errorf("reactor is closed") + return errReactorClosed case channel.Out <- p2p.Envelope{ Broadcast: true, Message: msg, }: } - return nil } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 80b9b48a66..277958dcce 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -21,6 +21,11 @@ import ( const queueBufferDefault = 32 +var ( + // ErrPeerChannelClosed is for when the send/receive an envelope but a peer was already disconnected + ErrPeerChannelClosed = errors.New("a peer channel is closed") +) + // ChannelID is an arbitrary channel ID. type ChannelID uint16 From ad1a4e25aeb4a8ad36c0a7cec0040e187654c030 Mon Sep 17 00:00:00 2001 From: shotonoff Date: Fri, 8 Apr 2022 12:16:06 +0200 Subject: [PATCH 2/5] fix: implement Channel.Send method to safely send messages --- internal/consensus/reactor.go | 16 ++++------------ internal/p2p/router.go | 24 +++++++++++++++++++++--- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index fbe461e319..4e2d610171 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -672,18 +672,10 @@ func (r *Reactor) send(ps *PeerState, channel *p2p.Channel, msg proto.Message) e case <-r.closeCh: return errReactorClosed default: - } - select { - case <-channel.Done(): - return p2p.ErrPeerChannelClosed - case <-ps.closer.Done(): - return errPeerClosed - case <-r.closeCh: - return errReactorClosed - case channel.Out <- p2p.Envelope{ - To: ps.peerID, - Message: msg, - }: + channel.Send(p2p.Envelope{ + To: ps.peerID, + Message: msg, + }) } return nil } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 277958dcce..24ea27f34a 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -69,9 +69,9 @@ type Channel struct { Out chan<- Envelope // outbound messages (reactors to peers) Error chan<- PeerError // peer error reporting + mtx sync.RWMutex messageType proto.Message // the channel's message type, used for unmarshaling closeCh chan struct{} - closeOnce sync.Once } // NewChannel creates a new channel. It is primarily for internal and test @@ -93,15 +93,33 @@ func NewChannel( } } +// Send sends an envelope to a peer through a channel +func (c *Channel) Send(e Envelope) error { + c.mtx.RLock() + defer c.mtx.RUnlock() + select { + case <-c.closeCh: + return ErrPeerChannelClosed + default: + c.Out <- e + } + return nil +} + // Close closes the channel. Future sends on Out and Error will panic. The In // channel remains open to avoid having to synchronize Router senders, which // should use Done() to detect channel closure instead. func (c *Channel) Close() { - c.closeOnce.Do(func() { + c.mtx.Lock() + defer c.mtx.Unlock() + select { + case <-c.closeCh: + return + default: close(c.closeCh) close(c.Out) close(c.Error) - }) + } } // Done returns a channel that's closed when Channel.Close() is called. From bf2f2f5c61281c2eab7112f36d07f9e856b79458 Mon Sep 17 00:00:00 2001 From: shotonoff Date: Fri, 8 Apr 2022 15:09:58 +0200 Subject: [PATCH 3/5] fix: add Channel.Send call for broadcasting --- internal/consensus/reactor.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 4e2d610171..f8def525d6 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -672,12 +672,11 @@ func (r *Reactor) send(ps *PeerState, channel *p2p.Channel, msg proto.Message) e case <-r.closeCh: return errReactorClosed default: - channel.Send(p2p.Envelope{ + return channel.Send(p2p.Envelope{ To: ps.peerID, Message: msg, }) } - return nil } // broadcast sends a broadcast message to all peers connected to the `channel`. @@ -694,12 +693,12 @@ func (r *Reactor) broadcast(channel *p2p.Channel, msg proto.Message) error { return p2p.ErrPeerChannelClosed case <-r.closeCh: return errReactorClosed - case channel.Out <- p2p.Envelope{ - Broadcast: true, - Message: msg, - }: + default: + return channel.Send(p2p.Envelope{ + Broadcast: true, + Message: msg, + }) } - return nil } // logResult creates a log that depends on value of err From a07c9a6b0978ca08e8af1c63dd7182f2718d05ca Mon Sep 17 00:00:00 2001 From: shotonoff Date: Fri, 8 Apr 2022 16:22:10 +0200 Subject: [PATCH 4/5] fix: remove duplication checking of closed channel --- internal/consensus/reactor.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index f8def525d6..fa40564395 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -665,8 +665,6 @@ func (r *Reactor) sendCommit(ps *PeerState, commit *types.Commit) error { // If to is nil, message will be broadcasted. func (r *Reactor) send(ps *PeerState, channel *p2p.Channel, msg proto.Message) error { select { - case <-channel.Done(): - return p2p.ErrPeerChannelClosed case <-ps.closer.Done(): return errPeerClosed case <-r.closeCh: @@ -682,15 +680,11 @@ func (r *Reactor) send(ps *PeerState, channel *p2p.Channel, msg proto.Message) e // broadcast sends a broadcast message to all peers connected to the `channel`. func (r *Reactor) broadcast(channel *p2p.Channel, msg proto.Message) error { select { - case <-channel.Done(): - return p2p.ErrPeerChannelClosed case <-r.closeCh: return errReactorClosed default: } select { - case <-channel.Done(): - return p2p.ErrPeerChannelClosed case <-r.closeCh: return errReactorClosed default: From 0914d77dced1d552ed6db1886de8b5dc2008d34b Mon Sep 17 00:00:00 2001 From: shotonoff Date: Fri, 8 Apr 2022 16:45:03 +0200 Subject: [PATCH 5/5] fix: remove first select statement from broadcast method --- internal/consensus/reactor.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index fa40564395..e0a264f08f 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -679,11 +679,6 @@ func (r *Reactor) send(ps *PeerState, channel *p2p.Channel, msg proto.Message) e // broadcast sends a broadcast message to all peers connected to the `channel`. func (r *Reactor) broadcast(channel *p2p.Channel, msg proto.Message) error { - select { - case <-r.closeCh: - return errReactorClosed - default: - } select { case <-r.closeCh: return errReactorClosed