Skip to content

Commit

Permalink
Cherry-pick v20.03: Fix panic for sending on a closed channel (#5479) (
Browse files Browse the repository at this point in the history
…#5490)

(cherry picked from commit 1e2f0e7)
(cherry picked from commit f5f4257)
  • Loading branch information
parasssh authored May 20, 2020
1 parent 61bb530 commit 6bc33f1
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions worker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,9 @@ func (e *executor) shutdown() {
}
}

func (e *executor) getChannel(pred string) (ch chan *subMutation) {
e.RLock()
// getChannelUnderLock obtains the channel for the given pred. It must be called under e.Lock().
func (e *executor) getChannelUnderLock(pred string) (ch chan *subMutation) {
ch, ok := e.predChan[pred]
e.RUnlock()
if ok {
return ch
}

// Create a new channel for `pred`.
e.Lock()
defer e.Unlock()
ch, ok = e.predChan[pred]
if ok {
return ch
}
Expand All @@ -123,7 +114,17 @@ func (e *executor) addEdges(ctx context.Context, startTs uint64, edges []*pb.Dir
payload.edges = append(payload.edges, edge)
}

for attr, payload := range payloadMap {
e.getChannel(attr) <- payload
// Lock() in case the channel gets closed from underneath us.
e.Lock()
defer e.Unlock()
select {
case <-e.closer.HasBeenClosed():
return
default:
// Closer is not closed. And we have the Lock, so sending on channel should be safe.
for attr, payload := range payloadMap {
e.getChannelUnderLock(attr) <- payload
}
}

}

0 comments on commit 6bc33f1

Please sign in to comment.