diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index b86b45feed694..b9eb9bd2eb958 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -266,13 +266,15 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, startKey, endKey []byt sortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("sort") readStart := time.Now() + readDtStartKey := e.keyAdapter.Encode(nil, startKey, common.MinRowID) + readDtEndKey := e.keyAdapter.Encode(nil, endKey, common.MinRowID) err := readAllData( ctx, e.storage, e.dataFiles, e.statsFiles, - startKey, - endKey, + readDtStartKey, + readDtEndKey, e.smallBlockBufPool, e.largeBlockBufPool, &e.memKVsAndBuffers, @@ -402,7 +404,32 @@ func (e *Engine) ID() string { // GetKeyRange implements common.Engine. func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) { - return e.startKey, e.endKey, nil + if _, ok := e.keyAdapter.(common.NoopKeyAdapter); ok { + return e.startKey, e.endKey, nil + } + + // when duplicate detection feature is enabled, the end key comes from + // DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the + // error. + + start, err := e.keyAdapter.Decode(nil, e.startKey) + if err != nil { + return nil, nil, err + } + end, err := e.keyAdapter.Decode(nil, e.endKey) + if err == nil { + return start, end, nil + } + // handle the case that end key is from Key.Next() + if e.endKey[len(e.endKey)-1] != 0 { + return nil, nil, err + } + endEncoded := e.endKey[:len(e.endKey)-1] + end, err = e.keyAdapter.Decode(nil, endEncoded) + if err != nil { + return nil, nil, err + } + return start, kv.Key(end).Next(), nil } // SplitRanges split the ranges by split keys provided by external engine. @@ -412,6 +439,13 @@ func (e *Engine) SplitRanges( _ log.Logger, ) ([]common.Range, error) { splitKeys := e.splitKeys + for i, k := range e.splitKeys { + var err error + splitKeys[i], err = e.keyAdapter.Decode(nil, k) + if err != nil { + return nil, err + } + } ranges := make([]common.Range, 0, len(splitKeys)+1) ranges = append(ranges, common.Range{Start: startKey}) for i := 0; i < len(splitKeys); i++ { diff --git a/br/pkg/lightning/backend/external/util.go b/br/pkg/lightning/backend/external/util.go index 8433dc262aeae..2ee8d9942becb 100644 --- a/br/pkg/lightning/backend/external/util.go +++ b/br/pkg/lightning/backend/external/util.go @@ -17,7 +17,6 @@ package external import ( "bytes" "context" - "fmt" "io" "slices" "sort" @@ -41,11 +40,6 @@ import ( // file reader, read, parse and skip few smaller keys, and then locate the needed // data. // -// To avoid potential data loss, it also checks at least one statistic file has a -// key larger than or equal to the start key. If not, we are afraid that some -// paths are missing, and the data between [start key, min(first key of -// statistic files)) are lost. -// // Caller can specify multiple ascending keys and seekPropsOffsets will return // the offsets list per file for each key. func seekPropsOffsets( @@ -64,9 +58,7 @@ func seekPropsOffsets( for i := range offsetsPerFile { offsetsPerFile[i] = make([]uint64, len(starts)) } - // Record first key if it is smaller than first key of "starts" key argument for - // each file, and check all files afterward. - firstKeyTooSmallCheckers := make([]kv.Key, len(paths)) + eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx) for i := range paths { i := i @@ -80,7 +72,6 @@ func seekPropsOffsets( } defer r.Close() - moved := false keyIdx := 0 curKey := starts[keyIdx] @@ -100,11 +91,6 @@ func seekPropsOffsets( } propKey := kv.Key(p.firstKey) for propKey.Cmp(curKey) > 0 { - if !moved { - if firstKeyTooSmallCheckers[i] == nil { - firstKeyTooSmallCheckers[i] = propKey - } - } keyIdx++ if keyIdx >= len(starts) { return nil @@ -112,7 +98,6 @@ func seekPropsOffsets( offsetsPerFile[i][keyIdx] = offsetsPerFile[i][keyIdx-1] curKey = starts[keyIdx] } - moved = true offsetsPerFile[i][keyIdx] = p.offset p, err3 = r.nextProp() } @@ -123,27 +108,6 @@ func seekPropsOffsets( return nil, err } - hasNil := false - for _, k := range firstKeyTooSmallCheckers { - if k == nil { - hasNil = true - break - } - } - if !hasNil { - minKey := firstKeyTooSmallCheckers[0] - for _, k := range firstKeyTooSmallCheckers[1:] { - if k.Cmp(minKey) < 0 { - minKey = k - } - } - return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s", - starts[0].String(), - paths, - minKey.String(), - ) - } - // TODO(lance6716): change the caller so we don't need to transpose the result offsetsPerKey := make([][]uint64, len(starts)) for i := range starts { diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go index 1800a27d23c1a..163d908a2e5c3 100644 --- a/br/pkg/lightning/backend/external/util_test.go +++ b/br/pkg/lightning/backend/external/util_test.go @@ -87,15 +87,17 @@ func TestSeekPropsOffsets(t *testing.T) { require.NoError(t, err) require.Equal(t, [][]uint64{{10, 20}, {30, 20}}, got) - _, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store) - require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]") + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store) + require.NoError(t, err) + require.Equal(t, [][]uint64{{0, 0}}, got) got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store) require.NoError(t, err) require.Equal(t, [][]uint64{{10, 0}}, got) - _, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store) - require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]") + got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store) + require.NoError(t, err) + require.Equal(t, [][]uint64{{0, 0}, {10, 0}}, got) got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store) require.NoError(t, err) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 8f41ca6102244..27b422b0481eb 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1008,9 +1008,16 @@ func (local *Backend) generateJobForRange( return nil, err } if pairStart == nil { - log.FromContext(ctx).Info("There is no pairs in range", + logFn := log.FromContext(ctx).Info + if _, ok := data.(*external.MemoryIngestData); ok { + logFn = log.FromContext(ctx).Warn + } + logFn("There is no pairs in range", logutil.Key("start", start), logutil.Key("end", end)) + // trigger cleanup + data.IncRef() + data.DecRef() return nil, nil } diff --git a/br/pkg/lightning/common/engine.go b/br/pkg/lightning/common/engine.go index 559f0058e37ab..5920d76fa5d86 100644 --- a/br/pkg/lightning/common/engine.go +++ b/br/pkg/lightning/common/engine.go @@ -38,9 +38,13 @@ type Engine interface { KVStatistics() (totalKVSize int64, totalKVCount int64) // ImportedStatistics returns the imported kv size and imported kv count. ImportedStatistics() (importedKVSize int64, importedKVCount int64) - // GetKeyRange returns the key range [startKey, endKey) of the engine. + // GetKeyRange returns the key range [startKey, endKey) of the engine. If the + // duplicate detection is enabled, the keys in engine are encoded by duplicate + // detection but the returned keys should not be encoded. GetKeyRange() (startKey []byte, endKey []byte, err error) - // SplitRanges splits the range [startKey, endKey) into multiple ranges. + // SplitRanges splits the range [startKey, endKey) into multiple ranges. If the + // duplicate detection is enabled, the keys in engine are encoded by duplicate + // detection but the returned keys should not be encoded. SplitRanges(startKey, endKey []byte, sizeLimit, keysLimit int64, logger log.Logger) ([]Range, error) Close() error } diff --git a/br/pkg/storage/ks3.go b/br/pkg/storage/ks3.go index 1ecbbb73d4266..919da5e3aa760 100644 --- a/br/pkg/storage/ks3.go +++ b/br/pkg/storage/ks3.go @@ -530,6 +530,9 @@ type ks3ObjectReader struct { func (r *ks3ObjectReader) Read(p []byte) (n int, err error) { retryCnt := 0 maxCnt := r.rangeInfo.End + 1 - r.pos + if maxCnt == 0 { + return 0, io.EOF + } if maxCnt > int64(len(p)) { maxCnt = int64(len(p)) } diff --git a/br/pkg/storage/s3.go b/br/pkg/storage/s3.go index a1e9b8caed401..f5ffdbf463601 100644 --- a/br/pkg/storage/s3.go +++ b/br/pkg/storage/s3.go @@ -898,12 +898,12 @@ type s3ObjectReader struct { func (r *s3ObjectReader) Read(p []byte) (n int, err error) { retryCnt := 0 maxCnt := r.rangeInfo.End + 1 - r.pos - if maxCnt > int64(len(p)) { - maxCnt = int64(len(p)) - } if maxCnt == 0 { return 0, io.EOF } + if maxCnt > int64(len(p)) { + maxCnt = int64(len(p)) + } n, err = r.reader.Read(p[:maxCnt]) // TODO: maybe we should use !errors.Is(err, io.EOF) here to avoid error lint, but currently, pingcap/errors // doesn't implement this method yet. diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index a027dd12b20a1..4d9b58b530947 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -375,6 +375,7 @@ func generateGlobalSortIngestPlan( if err != nil { return nil, err } + iCnt := int64(len(instanceIDs)) metaArr := make([][]byte, 0, 16) for i, g := range kvMetaGroups { if g == nil { @@ -382,7 +383,7 @@ func generateGlobalSortIngestPlan( zap.Int64("taskID", task.ID)) return nil, errors.Errorf("subtask kv group %d is empty", i) } - newMeta, err := splitSubtaskMetaForOneKVMetaGroup(ctx, store, g, cloudStorageURI, int64(len(instanceIDs)), logger) + newMeta, err := splitSubtaskMetaForOneKVMetaGroup(ctx, store, g, cloudStorageURI, iCnt, logger) if err != nil { return nil, errors.Trace(err) } diff --git a/tests/realtikvtest/addindextest2/global_sort_test.go b/tests/realtikvtest/addindextest2/global_sort_test.go index f75c03dbee35c..39e324bb5562a 100644 --- a/tests/realtikvtest/addindextest2/global_sort_test.go +++ b/tests/realtikvtest/addindextest2/global_sort_test.go @@ -116,18 +116,23 @@ func TestGlobalSortBasic(t *testing.T) { dom.DDL().SetHook(hook) tk.MustExec("alter table t add index idx(a);") - dom.DDL().SetHook(origin) tk.MustExec("admin check table t;") <-scheduler.WaitCleanUpFinished checkFileCleaned(t, jobID, cloudStorageURI) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()")) tk.MustExec("alter table t add index idx1(a);") - dom.DDL().SetHook(origin) tk.MustExec("admin check table t;") <-scheduler.WaitCleanUpFinished + checkFileCleaned(t, jobID, cloudStorageURI) + tk.MustExec("alter table t add unique index idx2(a);") + tk.MustExec("admin check table t;") + <-scheduler.WaitCleanUpFinished checkFileCleaned(t, jobID, cloudStorageURI) + + dom.DDL().SetHook(origin) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort")) }