From bff7dac52bb27ee4d54052a5c29ce076d13e106a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 4 Jun 2019 13:10:00 +0800 Subject: [PATCH] store/tikv: fix goroutine leak in gcworker (#10622) (#10683) --- store/tikv/range_task.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/store/tikv/range_task.go b/store/tikv/range_task.go index 1fac00a3a2588..dcf3d72069219 100644 --- a/store/tikv/range_task.go +++ b/store/tikv/range_task.go @@ -129,8 +129,6 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe key := startKey for { select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) case <-statLogTicker.C: logutil.Logger(ctx).Info("range task in progress", zap.String("name", s.name), @@ -168,7 +166,12 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe } pushTaskStartTime := time.Now() - taskCh <- task + + select { + case taskCh <- task: + case <-ctx.Done(): + break + } metrics.TiKVRangeTaskPushDuration.WithLabelValues(s.name).Observe(time.Since(pushTaskStartTime).Seconds()) if isLast { @@ -247,8 +250,6 @@ func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) { } completedRegions, err := w.handler(ctx, *r) - atomic.AddInt32(w.completedRegions, int32(completedRegions)) - if err != nil { logutil.Logger(ctx).Info("canceling range task because of error", zap.String("name", w.name), @@ -259,5 +260,6 @@ func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) { cancel() break } + atomic.AddInt32(w.completedRegions, int32(completedRegions)) } }