Skip to content

Commit

Permalink
[FAB-17289] Fix gossip goroutine leak when reading msg (hyperledger#439)
Browse files Browse the repository at this point in the history
Signed-off-by: Shitaibin <hz_stb@163.com>
Change-Id: I52acd1631b996ac44cd76a2f6c1319ae3a18cfb3
  • Loading branch information
Shitaibin authored and yacovm committed Dec 24, 2019
1 parent 13554f6 commit 8a8264a
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
clientStream: cs,
serverStream: ss,
stopFlag: int32(0),
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
recvBuffSize: config.RecvBuffSize,
}
return connection
Expand Down Expand Up @@ -257,7 +257,7 @@ func (conn *connection) close() {
return
}

conn.stopChan <- struct{}{}
close(conn.stopChan)

conn.drainOutputBuffer()
conn.Lock()
Expand Down Expand Up @@ -306,21 +306,18 @@ func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error),
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.SignedGossipMessage, conn.recvBuffSize)
quit := make(chan struct{})
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
// or a signal to close the connection, which exits
// the method and makes the Recv() call to fail in the
// readFromStream() method
go conn.readFromStream(errChan, quit, msgChan)
go conn.readFromStream(errChan, msgChan)

go conn.writeToStream()

for !conn.toDie() {
select {
case stop := <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
conn.stopChan <- stop
case <-conn.stopChan:
return nil
case err := <-errChan:
return err
Expand All @@ -346,9 +343,8 @@ func (conn *connection) writeToStream() {
return
}
conn.metrics.SentMessages.Add(1)
case stop := <-conn.stopChan:
case <-conn.stopChan:
conn.logger.Debug("Closing writing to stream")
conn.stopChan <- stop
return
}
}
Expand All @@ -366,7 +362,7 @@ func (conn *connection) drainOutputBuffer() {
}
}

func (conn *connection) readFromStream(errChan chan error, quit chan struct{}, msgChan chan *proto.SignedGossipMessage) {
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage) {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
Expand All @@ -392,7 +388,8 @@ func (conn *connection) readFromStream(errChan chan error, quit chan struct{}, m
}
select {
case msgChan <- msg:
case <-quit:
case <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
return
}
}
Expand Down

0 comments on commit 8a8264a

Please sign in to comment.