From 467e04a2436286e16bf4a235a1ef76a849162d64 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 4 Aug 2021 17:11:35 +0800 Subject: [PATCH 1/5] fix the bug that calculate unfinished ranges may miss some range --- pkg/lightning/backend/local/local.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 36bd08c7f..92c8c8ef8 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -1862,13 +1862,20 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File var allErrLock sync.Mutex var allErr error var wg sync.WaitGroup + metErr := atomic.NewBool(false) wg.Add(len(ranges)) - for _, r := range ranges { + for i, r := range ranges { startKey := r.start endKey := r.end w := local.rangeConcurrency.Apply() + // if meet error here, skip try more here to allow fail fast. + if metErr.Load() { + wg.Add(i - len(ranges)) + local.rangeConcurrency.Recycle(w) + break + } go func(w *worker.Worker) { defer func() { local.rangeConcurrency.Recycle(w) @@ -1895,6 +1902,9 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File allErrLock.Lock() allErr = multierr.Append(allErr, err) allErrLock.Unlock() + if err != nil { + metErr.Store(true) + } }(w) } @@ -2020,11 +2030,15 @@ func sortAndMergeRanges(ranges []Range) []Range { return ranges[:i+1] } -func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range { +func filterOverlapRange(rawRanges []Range, finishedRanges []Range) []Range { if len(finishedRanges) == 0 { - return ranges + return rawRanges } + // copy the ranges because we may change its values + ranges := make([]Range, 0, len(rawRanges)) + ranges = append(ranges, rawRanges...) + result := make([]Range, 0, len(ranges)) rIdx := 0 fIdx := 0 From 303a0132c243b318c46a5b3eb8ba656b5f39522e Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 5 Aug 2021 10:24:14 +0800 Subject: [PATCH 2/5] fix comment --- pkg/lightning/backend/local/local.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 92c8c8ef8..4fcfedb03 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -1864,18 +1864,16 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engineFile *File var wg sync.WaitGroup metErr := atomic.NewBool(false) - wg.Add(len(ranges)) - - for i, r := range ranges { + for _, r := range ranges { startKey := r.start endKey := r.end w := local.rangeConcurrency.Apply() // if meet error here, skip try more here to allow fail fast. if metErr.Load() { - wg.Add(i - len(ranges)) local.rangeConcurrency.Recycle(w) break } + wg.Add(1) go func(w *worker.Worker) { defer func() { local.rangeConcurrency.Recycle(w) From b52c1f9841bdc0f1cbadfbb2fe7277c9dd91d017 Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 5 Aug 2021 10:45:32 +0800 Subject: [PATCH 3/5] avoid copy full slice --- pkg/lightning/backend/local/local.go | 43 ++++++++++++++++------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 4fcfedb03..3c60bdba7 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -2028,46 +2028,53 @@ func sortAndMergeRanges(ranges []Range) []Range { return ranges[:i+1] } -func filterOverlapRange(rawRanges []Range, finishedRanges []Range) []Range { - if len(finishedRanges) == 0 { - return rawRanges +func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range { + if len(ranges) == 0 || len(finishedRanges) == 0 { + return ranges } - // copy the ranges because we may change its values - ranges := make([]Range, 0, len(rawRanges)) - ranges = append(ranges, rawRanges...) - result := make([]Range, 0, len(ranges)) rIdx := 0 fIdx := 0 + rStart := ranges[rIdx].start + incRIdx := func() { + rIdx++ + if rIdx < len(ranges) { + rStart = ranges[rIdx].start + } + } for rIdx < len(ranges) && fIdx < len(finishedRanges) { if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].start) <= 0 { - result = append(result, ranges[rIdx]) - rIdx++ - } else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].end) >= 0 { + result = append(result, Range{start: rStart, end: ranges[rIdx].end}) + incRIdx() + } else if bytes.Compare(rStart, finishedRanges[fIdx].end) >= 0 { fIdx++ - } else if bytes.Compare(ranges[rIdx].start, finishedRanges[fIdx].start) < 0 { - result = append(result, Range{start: ranges[rIdx].start, end: finishedRanges[fIdx].start}) + } else if bytes.Compare(rStart, finishedRanges[fIdx].start) < 0 { + result = append(result, Range{start: rStart, end: finishedRanges[fIdx].start}) switch bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) { case -1: - rIdx++ + incRIdx() case 0: - rIdx++ + incRIdx() fIdx++ case 1: - ranges[rIdx].start = finishedRanges[fIdx].end + rStart = finishedRanges[fIdx].end fIdx++ } } else if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) > 0 { - ranges[rIdx].start = finishedRanges[fIdx].end + rStart = finishedRanges[fIdx].end fIdx++ } else { - rIdx++ + incRIdx() } } if rIdx < len(ranges) { - result = append(result, ranges[rIdx:]...) + result = append(result, Range{start: rStart, end: ranges[rIdx].end}) + rIdx++ + if rIdx < len(ranges) { + result = append(result, ranges[rIdx:]...) + } } return result From c401b68cd1cc1b120b502721059bd053f77e7e39 Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 6 Aug 2021 12:07:19 +0800 Subject: [PATCH 4/5] simplify filterOverlapRange --- pkg/lightning/backend/local/local.go | 57 +++++++++------------------- 1 file changed, 17 insertions(+), 40 deletions(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 3c60bdba7..4c2a128fe 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -2033,50 +2033,27 @@ func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range { return ranges } - result := make([]Range, 0, len(ranges)) - rIdx := 0 - fIdx := 0 - rStart := ranges[rIdx].start - incRIdx := func() { - rIdx++ - if rIdx < len(ranges) { - rStart = ranges[rIdx].start - } - } - for rIdx < len(ranges) && fIdx < len(finishedRanges) { - if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].start) <= 0 { - result = append(result, Range{start: rStart, end: ranges[rIdx].end}) - incRIdx() - } else if bytes.Compare(rStart, finishedRanges[fIdx].end) >= 0 { - fIdx++ - } else if bytes.Compare(rStart, finishedRanges[fIdx].start) < 0 { - result = append(result, Range{start: rStart, end: finishedRanges[fIdx].start}) - switch bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) { - case -1: - incRIdx() - case 0: - incRIdx() - fIdx++ - case 1: - rStart = finishedRanges[fIdx].end - fIdx++ + result := make([]Range, 0) + for _, r := range ranges { + start := r.start + end := r.end + for len(finishedRanges) > 0 && bytes.Compare(finishedRanges[0].start, end) < 0 { + fr := finishedRanges[0] + if bytes.Compare(fr.start, start) > 0 { + result = append(result, Range{start: start, end: fr.start}) } - } else if bytes.Compare(ranges[rIdx].end, finishedRanges[fIdx].end) > 0 { - rStart = finishedRanges[fIdx].end - fIdx++ - } else { - incRIdx() + if bytes.Compare(fr.end, start) > 0 { + start = fr.end + } + if bytes.Compare(fr.end, end) > 0 { + break + } + finishedRanges = finishedRanges[1:] } - } - - if rIdx < len(ranges) { - result = append(result, Range{start: rStart, end: ranges[rIdx].end}) - rIdx++ - if rIdx < len(ranges) { - result = append(result, ranges[rIdx:]...) + if bytes.Compare(start, end) < 0 { + result = append(result, Range{start: start, end: r.end}) } } - return result } From e165999edc142ec87d07dc0f7f4bddfce165a5fa Mon Sep 17 00:00:00 2001 From: glorv Date: Fri, 6 Aug 2021 13:38:07 +0800 Subject: [PATCH 5/5] fix --- pkg/lightning/backend/local/local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index 4c2a128fe..201ab6ec0 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -2051,7 +2051,7 @@ func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range { finishedRanges = finishedRanges[1:] } if bytes.Compare(start, end) < 0 { - result = append(result, Range{start: start, end: r.end}) + result = append(result, Range{start: start, end: end}) } } return result