From 8a8264ab4f5429495850732ff43333adb12a8c74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=BD=AC?= Date: Mon, 23 Dec 2019 00:29:31 +0800 Subject: [PATCH] [FAB-17289] Fix gossip goroutine leak when reading msg (#439) Signed-off-by: Shitaibin Change-Id: I52acd1631b996ac44cd76a2f6c1319ae3a18cfb3 --- gossip/comm/conn.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/gossip/comm/conn.go b/gossip/comm/conn.go index 84edf318123..5555ac1b32a 100644 --- a/gossip/comm/conn.go +++ b/gossip/comm/conn.go @@ -217,7 +217,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go clientStream: cs, serverStream: ss, stopFlag: int32(0), - stopChan: make(chan struct{}, 1), + stopChan: make(chan struct{}), recvBuffSize: config.RecvBuffSize, } return connection @@ -257,7 +257,7 @@ func (conn *connection) close() { return } - conn.stopChan <- struct{}{} + close(conn.stopChan) conn.drainOutputBuffer() conn.Lock() @@ -306,21 +306,18 @@ func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error), func (conn *connection) serviceConnection() error { errChan := make(chan error, 1) msgChan := make(chan *proto.SignedGossipMessage, conn.recvBuffSize) - quit := make(chan struct{}) // Call stream.Recv() asynchronously in readFromStream(), // and wait for either the Recv() call to end, // or a signal to close the connection, which exits // the method and makes the Recv() call to fail in the // readFromStream() method - go conn.readFromStream(errChan, quit, msgChan) + go conn.readFromStream(errChan, msgChan) go conn.writeToStream() for !conn.toDie() { select { - case stop := <-conn.stopChan: - conn.logger.Debug("Closing reading from stream") - conn.stopChan <- stop + case <-conn.stopChan: return nil case err := <-errChan: return err @@ -346,9 +343,8 @@ func (conn *connection) writeToStream() { return } conn.metrics.SentMessages.Add(1) - case stop := <-conn.stopChan: + case <-conn.stopChan: conn.logger.Debug("Closing writing to stream") - conn.stopChan <- stop return } } @@ -366,7 +362,7 @@ func (conn *connection) drainOutputBuffer() { } } -func (conn *connection) readFromStream(errChan chan error, quit chan struct{}, msgChan chan *proto.SignedGossipMessage) { +func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage) { for !conn.toDie() { stream := conn.getStream() if stream == nil { @@ -392,7 +388,8 @@ func (conn *connection) readFromStream(errChan chan error, quit chan struct{}, m } select { case msgChan <- msg: - case <-quit: + case <-conn.stopChan: + conn.logger.Debug("Closing reading from stream") return } }