From 1b71c7bfe30c33e1e22828d6237809eaf6724025 Mon Sep 17 00:00:00 2001 From: Jin Date: Sun, 12 Jan 2025 18:58:00 +0800 Subject: [PATCH 1/2] fix:handshake stuck by long connection --- p2p/synch/longconn.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/p2p/synch/longconn.go b/p2p/synch/longconn.go index b5695b86..23a53c02 100644 --- a/p2p/synch/longconn.go +++ b/p2p/synch/longconn.go @@ -50,16 +50,19 @@ func (s *Sync) establishLongConnection(pe *peers.Peer) error { if err != nil { return common.NewErrorStr(common.ErrStreamBase, fmt.Sprintf("open stream on topic %v failed", topic)).ToError() } - defer func() { - err := stream.Close() + common.EgressConnectMeter.Mark(1) + + go func() { + err = pe.Run(stream, s.p2p.Encoding()) + if err != nil { + log.Warn(err.Error()) + } + err = stream.Close() if err != nil { log.Warn(err.Error()) } }() - - common.EgressConnectMeter.Mark(1) - - return pe.Run(stream, s.p2p.Encoding()) + return nil } func (s *Sync) registerLongConnection() { From ae539f185135623fbf08944e61e32711dbf0fe6c Mon Sep 17 00:00:00 2001 From: Jin Date: Sun, 12 Jan 2025 19:43:35 +0800 Subject: [PATCH 2/2] feat:optimize send message --- p2p/peers/message.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/p2p/peers/message.go b/p2p/peers/message.go index eb64b505..ca050ea6 100644 --- a/p2p/peers/message.go +++ b/p2p/peers/message.go @@ -107,8 +107,14 @@ func (p *ConnMsgRW) Send(msgcode uint64, data interface{}, respondID uint64) (in msg.Reply = make(chan interface{}) } log.Debug("Send message", "msg", msg.String()) + ctx, can := context.WithTimeout(context.Background(), HandleTimeout) + defer can() + select { case p.w <- msg: + case <-ctx.Done(): + go p.Close() + return nil, fmt.Errorf("ConnMsgRW send message timeout:%s", msg.String()) case <-p.closing: return nil, ErrConnClosed } @@ -116,6 +122,9 @@ func (p *ConnMsgRW) Send(msgcode uint64, data interface{}, respondID uint64) (in select { case ret := <-msg.Reply: return ret, nil + case <-ctx.Done(): + go p.Close() + return nil, fmt.Errorf("ConnMsgRW wait respond message timeout:%s", msg.String()) case <-p.closing: return nil, ErrConnClosed }