From 6bc33f1d6784e31d66f67fbed4e725fa2fbb5290 Mon Sep 17 00:00:00 2001 From: parasssh Date: Wed, 20 May 2020 14:28:44 -0700 Subject: [PATCH] Cherry-pick v20.03: Fix panic for sending on a closed channel (#5479) (#5490) (cherry picked from commit 1e2f0e7) (cherry picked from commit f5f42579a28906bba8f434e01bbe93e2e6fc3674) --- worker/executor.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/worker/executor.go b/worker/executor.go index 6e0d27175c6..94ea0d42cca 100644 --- a/worker/executor.go +++ b/worker/executor.go @@ -86,18 +86,9 @@ func (e *executor) shutdown() { } } -func (e *executor) getChannel(pred string) (ch chan *subMutation) { - e.RLock() +// getChannelUnderLock obtains the channel for the given pred. It must be called under e.Lock(). +func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) { ch, ok := e.predChan[pred] - e.RUnlock() - if ok { - return ch - } - - // Create a new channel for `pred`. - e.Lock() - defer e.Unlock() - ch, ok = e.predChan[pred] if ok { return ch } @@ -123,7 +114,17 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir payload.edges = append(payload.edges, edge) } - for attr, payload := range payloadMap { - e.getChannel(attr) <- payload + // Lock() in case the channel gets closed from underneath us. + e.Lock() + defer e.Unlock() + select { + case <-e.closer.HasBeenClosed(): + return + default: + // Closer is not closed. And we have the Lock, so sending on channel should be safe. + for attr, payload := range payloadMap { + e.getChannelUnderLock(attr) <- payload + } } + }