Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
61413: closedts/sidetransport: fix goroutine leak r=andreimatei a=andreimatei

Fix a goroutine leak caused by gRPC streams. It turns out that
receiving an error from an RPC's stream is not enough to cleanup the
stream, you need to do something else as described here
https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream

This patch starts canceling the stream's context whenever the stream is
discarded.

Release note: None
Release justification: Bug fix for new functionality.

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Mar 3, 2021
2 parents 320dab6 + d9b4a23 commit c140198
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,10 @@ type rpcConn struct {
producer *Sender
nodeID roachpb.NodeID
stream ctpb.SideTransport_PushUpdatesClient
closed int32 // atomic
// cancelStreamCtx cleans up the resources (goroutine) associated with stream.
// It needs to be called whenever stream is discarded.
cancelStreamCtx context.CancelFunc
closed int32 // atomic
}

func newRPCConn(dialer *nodedialer.Dialer, producer *Sender, nodeID roachpb.NodeID) conn {
Expand All @@ -607,6 +610,18 @@ func newRPCConn(dialer *nodedialer.Dialer, producer *Sender, nodeID roachpb.Node
return r
}

// cleanupStream releases the resources associated with r.stream and marks the conn
// as needing a new stream.
func (r *rpcConn) cleanupStream() {
if r.stream == nil {
return
}
_ /* err */ = r.stream.CloseSend()
r.stream = nil
r.cancelStreamCtx()
r.cancelStreamCtx = nil
}

// close makes the connection stop sending messages. The run() goroutine will
// exit asynchronously. The parent Sender is expected to remove this connection
// from its list.
Expand All @@ -620,10 +635,15 @@ func (r *rpcConn) sendMsg(ctx context.Context, msg *ctpb.Update) error {
if err != nil {
return err
}
r.stream, err = ctpb.NewSideTransportClient(conn).PushUpdates(ctx)
streamCtx, cancel := context.WithCancel(ctx)
stream, err := ctpb.NewSideTransportClient(conn).PushUpdates(streamCtx)
if err != nil {
cancel()
return err
}
r.stream = stream
// This will need to be called when we're done with the stream.
r.cancelStreamCtx = cancel
}
return r.stream.Send(msg)
}
Expand All @@ -634,13 +654,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
func(ctx context.Context) {
ctx = r.AnnotateCtx(ctx)

defer func() {
if r.stream != nil {
_ /* err */ = r.stream.CloseSend()
r.stream = nil
}
}()

defer r.cleanupStream()
everyN := log.Every(10 * time.Second)

var lastSent ctpb.SeqNum
Expand Down Expand Up @@ -690,7 +704,7 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
// the circuit breaker if the remote node is still unreachable, we
// should have a blocking version of Dial() that we just leave hanging
// and get a notification when it succeeds.
r.stream = nil
r.cleanupStream()
}
}
})
Expand Down

0 comments on commit c140198

Please sign in to comment.