Skip to content

Commit

Permalink
Merge pull request #887 from lochjin/dev2.0
Browse files Browse the repository at this point in the history
fix:snap-sync stuck
  • Loading branch information
dindinw authored Jan 11, 2025
2 parents 7f27777 + 16ccd18 commit 231d403
Showing 1 changed file with 52 additions and 22 deletions.
74 changes: 52 additions & 22 deletions p2p/peers/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peers
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -20,6 +21,9 @@ const MaxMessageSize = 10 * 1024 * 1024
const MsgCodeSize = 8
const PacketSize = 8

// HandleTimeout is the maximum time for complete handler.
const HandleTimeout = 20 * time.Second

var ErrConnClosed = errors.New("ConnMsg: read or write on closed message")

type Msg struct {
Expand Down Expand Up @@ -106,14 +110,14 @@ func (p *ConnMsgRW) Send(msgcode uint64, data interface{}, respondID uint64) (in
select {
case p.w <- msg:
case <-p.closing:
return nil, nil
return nil, ErrConnClosed
}
if msg.Reply != nil {
select {
case ret := <-msg.Reply:
return ret, nil
case <-p.closing:
return nil, nil
return nil, ErrConnClosed
}
}
return nil, nil
Expand All @@ -134,7 +138,7 @@ func (p *ConnMsgRW) Run(pe *Peer) error {
return ErrConnClosed
}
var (
readErr = make(chan error, 1)
readErr = make(chan error)
)
p.wg.Add(1)
go p.readLoop(pe, readErr)
Expand Down Expand Up @@ -215,6 +219,7 @@ func (p *ConnMsgRW) Encoder() encoder.NetworkEncoding {
func (p *ConnMsgRW) readLoop(pe *Peer, errc chan<- error) {
defer p.wg.Done()
returnFun := func(err error) {

if err != nil {
select {
case <-p.closing:
Expand All @@ -223,11 +228,20 @@ func (p *ConnMsgRW) readLoop(pe *Peer, errc chan<- error) {
}
}
}
var msg *Msg
for {
msg, err := p.readMsg(pe)
if err != nil {
returnFun(err)
ctx, can := context.WithTimeout(context.Background(), HandleTimeout)
defer can()
ret := make(chan *Msg)
msg = nil
go p.readMsg(pe, ret)
select {
case <-p.closing:
return
case <-ctx.Done():
returnFun(fmt.Errorf("ConnMsgRW read message timeout:%s", pe.GetID()))
return
case msg = <-ret:
}
if msg == nil {
returnFun(fmt.Errorf("No read msg"))
Expand All @@ -250,7 +264,7 @@ func (p *ConnMsgRW) readLoop(pe *Peer, errc chan<- error) {
msgT := reflect.New(ty)
msgd := msgT.Interface()

err = p.en.DecodeWithMaxLength(bytes.NewReader(msg.Payload), msgd)
err := p.en.DecodeWithMaxLength(bytes.NewReader(msg.Payload), msgd)
//
value, ok := p.pending.Load(msg.ID)
if ok {
Expand All @@ -275,44 +289,60 @@ func (p *ConnMsgRW) readLoop(pe *Peer, errc chan<- error) {
}
}

func (p *ConnMsgRW) readMsg(pe *Peer) (*Msg, error) {
func (p *ConnMsgRW) readMsg(pe *Peer, ret chan *Msg) {
returnFun := func(msg *Msg) {
select {
case <-p.closing:
return
case ret <- msg:
}
}

if p.closed.Load() {
return nil, ErrConnClosed
log.Warn(ErrConnClosed.Error())
returnFun(nil)
return
}
dataHead := make([]byte, PacketSize)
size, err := p.rw.Read(dataHead)
if err == io.EOF {
log.Debug("Base Stream closed by peer", "peer", pe.IDWithAddress())
return nil, nil
returnFun(nil)
return
}
if err != nil {
log.Warn("Error reading from base stream", "peer", pe.IDWithAddress(), "error", err)
return nil, err
returnFun(nil)
return
}
if size != PacketSize {
err = fmt.Errorf("Error message head size")
log.Warn(err.Error(), "peer", pe.IDWithAddress())
return nil, err
log.Warn("Error message head size", "peer", pe.IDWithAddress())
returnFun(nil)
return
}
dataSize := binary.BigEndian.Uint64(dataHead)
log.Debug("Receive message head", "peer", pe.IDWithAddress(), "size", dataSize)
if dataSize > MaxMessageSize {
return nil, fmt.Errorf("Too large message size: %d > %d", dataSize, MaxMessageSize)
log.Warn("Too large message", "size", dataSize, "max", MaxMessageSize)
returnFun(nil)
return
}
msgData := make([]byte, dataSize)
size, err = io.ReadFull(p.rw, msgData)
if err == io.EOF {
log.Debug("Base Stream closed by peer", "peer", pe.IDWithAddress())
return nil, nil
returnFun(nil)
return
}
if err != nil {
log.Warn("Error reading from long stream", "peer", pe.IDWithAddress(), "error", err)
return nil, err
returnFun(nil)
return
}
if uint64(size) != dataSize {
err = fmt.Errorf("Receive error size message data")
log.Warn(err.Error(), "peer", pe.IDWithAddress())
return nil, err
log.Warn("Receive error size message data", "peer", pe.IDWithAddress())
returnFun(nil)
return
}
msgIDBs := msgData[:MsgCodeSize]
msgID := binary.BigEndian.Uint64(msgIDBs)
Expand All @@ -321,11 +351,11 @@ func (p *ConnMsgRW) readMsg(pe *Peer) (*Msg, error) {

log.Debug("Receive message", "id", msgID, "code", msgCode, "peer", pe.IDWithAddress(), "size", size)

return &Msg{
returnFun(&Msg{
ID: msgID,
Code: msgCode,
Payload: msgData[MsgCodeSize*2:],
}, nil
})
}

func NewConnRW(stream network.Stream, en encoder.NetworkEncoding) *ConnMsgRW {
Expand Down

0 comments on commit 231d403

Please sign in to comment.