Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FAB-17289] Fix gossip goroutine leak when reading msg #439

Merged
merged 1 commit into from
Dec 22, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
clientStream: cs,
serverStream: ss,
stopFlag: int32(0),
stopChan: make(chan struct{}, 1),
stopChan: make(chan struct{}),
recvBuffSize: config.RecvBuffSize,
}
return connection
Expand Down Expand Up @@ -257,7 +257,7 @@ func (conn *connection) close() {
return
}

conn.stopChan <- struct{}{}
close(conn.stopChan)

conn.drainOutputBuffer()
conn.Lock()
Expand Down Expand Up @@ -306,21 +306,18 @@ func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error),
func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.SignedGossipMessage, conn.recvBuffSize)
quit := make(chan struct{})
// Call stream.Recv() asynchronously in readFromStream(),
// and wait for either the Recv() call to end,
// or a signal to close the connection, which exits
// the method and makes the Recv() call to fail in the
// readFromStream() method
go conn.readFromStream(errChan, quit, msgChan)
go conn.readFromStream(errChan, msgChan)

go conn.writeToStream()

for !conn.toDie() {
select {
case stop := <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
conn.stopChan <- stop
case <-conn.stopChan:
return nil
case err := <-errChan:
return err
Expand All @@ -346,9 +343,8 @@ func (conn *connection) writeToStream() {
return
}
conn.metrics.SentMessages.Add(1)
case stop := <-conn.stopChan:
case <-conn.stopChan:
conn.logger.Debug("Closing writing to stream")
conn.stopChan <- stop
return
}
}
Expand All @@ -366,7 +362,7 @@ func (conn *connection) drainOutputBuffer() {
}
}

func (conn *connection) readFromStream(errChan chan error, quit chan struct{}, msgChan chan *proto.SignedGossipMessage) {
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage) {
for !conn.toDie() {
stream := conn.getStream()
if stream == nil {
Expand All @@ -392,7 +388,8 @@ func (conn *connection) readFromStream(errChan chan error, quit chan struct{}, m
}
select {
case msgChan <- msg:
case <-quit:
case <-conn.stopChan:
conn.logger.Debug("Closing reading from stream")
return
}
}
Expand Down