Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external: make global sort common.Range unencoded (#51900) #51969

Merged
40 changes: 37 additions & 3 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, startKey, endKey []byt
sortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("sort")

readStart := time.Now()
readDtStartKey := e.keyAdapter.Encode(nil, startKey, common.MinRowID)
readDtEndKey := e.keyAdapter.Encode(nil, endKey, common.MinRowID)
err := readAllData(
ctx,
e.storage,
e.dataFiles,
e.statsFiles,
startKey,
endKey,
readDtStartKey,
readDtEndKey,
e.smallBlockBufPool,
e.largeBlockBufPool,
&e.memKVsAndBuffers,
Expand Down Expand Up @@ -402,7 +404,32 @@ func (e *Engine) ID() string {

// GetKeyRange implements common.Engine.
func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
return e.startKey, e.endKey, nil
if _, ok := e.keyAdapter.(common.NoopKeyAdapter); ok {
return e.startKey, e.endKey, nil
}

// when duplicate detection feature is enabled, the end key comes from
// DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the
// error.

start, err := e.keyAdapter.Decode(nil, e.startKey)
if err != nil {
return nil, nil, err
}
end, err := e.keyAdapter.Decode(nil, e.endKey)
if err == nil {
return start, end, nil
}
// handle the case that end key is from Key.Next()
if e.endKey[len(e.endKey)-1] != 0 {
return nil, nil, err
}
endEncoded := e.endKey[:len(e.endKey)-1]
end, err = e.keyAdapter.Decode(nil, endEncoded)
if err != nil {
return nil, nil, err
}
return start, kv.Key(end).Next(), nil
}

// SplitRanges split the ranges by split keys provided by external engine.
Expand All @@ -412,6 +439,13 @@ func (e *Engine) SplitRanges(
_ log.Logger,
) ([]common.Range, error) {
splitKeys := e.splitKeys
for i, k := range e.splitKeys {
var err error
splitKeys[i], err = e.keyAdapter.Decode(nil, k)
if err != nil {
return nil, err
}
}
ranges := make([]common.Range, 0, len(splitKeys)+1)
ranges = append(ranges, common.Range{Start: startKey})
for i := 0; i < len(splitKeys); i++ {
Expand Down
38 changes: 1 addition & 37 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package external
import (
"bytes"
"context"
"fmt"
"io"
"slices"
"sort"
Expand All @@ -41,11 +40,6 @@ import (
// file reader, read, parse and skip few smaller keys, and then locate the needed
// data.
//
// To avoid potential data loss, it also checks at least one statistic file has a
// key larger than or equal to the start key. If not, we are afraid that some
// paths are missing, and the data between [start key, min(first key of
// statistic files)) are lost.
//
// Caller can specify multiple ascending keys and seekPropsOffsets will return
// the offsets list per file for each key.
func seekPropsOffsets(
Expand All @@ -64,9 +58,7 @@ func seekPropsOffsets(
for i := range offsetsPerFile {
offsetsPerFile[i] = make([]uint64, len(starts))
}
// Record first key if it is smaller than first key of "starts" key argument for
// each file, and check all files afterward.
firstKeyTooSmallCheckers := make([]kv.Key, len(paths))

eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
for i := range paths {
i := i
Expand All @@ -80,7 +72,6 @@ func seekPropsOffsets(
}
defer r.Close()

moved := false
keyIdx := 0
curKey := starts[keyIdx]

Expand All @@ -100,19 +91,13 @@ func seekPropsOffsets(
}
propKey := kv.Key(p.firstKey)
for propKey.Cmp(curKey) > 0 {
if !moved {
if firstKeyTooSmallCheckers[i] == nil {
firstKeyTooSmallCheckers[i] = propKey
}
}
keyIdx++
if keyIdx >= len(starts) {
return nil
}
offsetsPerFile[i][keyIdx] = offsetsPerFile[i][keyIdx-1]
curKey = starts[keyIdx]
}
moved = true
offsetsPerFile[i][keyIdx] = p.offset
p, err3 = r.nextProp()
}
Expand All @@ -123,27 +108,6 @@ func seekPropsOffsets(
return nil, err
}

hasNil := false
for _, k := range firstKeyTooSmallCheckers {
if k == nil {
hasNil = true
break
}
}
if !hasNil {
minKey := firstKeyTooSmallCheckers[0]
for _, k := range firstKeyTooSmallCheckers[1:] {
if k.Cmp(minKey) < 0 {
minKey = k
}
}
return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s",
starts[0].String(),
paths,
minKey.String(),
)
}

// TODO(lance6716): change the caller so we don't need to transpose the result
offsetsPerKey := make([][]uint64, len(starts))
for i := range starts {
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ func TestSeekPropsOffsets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 20}, {30, 20}}, got)

_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store)
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{0, 0}}, got)

got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 0}}, got)

_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store)
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{0, 0}, {10, 0}}, got)

got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store)
require.NoError(t, err)
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,9 +1008,16 @@ func (local *Backend) generateJobForRange(
return nil, err
}
if pairStart == nil {
log.FromContext(ctx).Info("There is no pairs in range",
logFn := log.FromContext(ctx).Info
if _, ok := data.(*external.MemoryIngestData); ok {
logFn = log.FromContext(ctx).Warn
}
logFn("There is no pairs in range",
logutil.Key("start", start),
logutil.Key("end", end))
// trigger cleanup
data.IncRef()
data.DecRef()
return nil, nil
}

Expand Down
8 changes: 6 additions & 2 deletions br/pkg/lightning/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ type Engine interface {
KVStatistics() (totalKVSize int64, totalKVCount int64)
// ImportedStatistics returns the imported kv size and imported kv count.
ImportedStatistics() (importedKVSize int64, importedKVCount int64)
// GetKeyRange returns the key range [startKey, endKey) of the engine.
// GetKeyRange returns the key range [startKey, endKey) of the engine. If the
// duplicate detection is enabled, the keys in engine are encoded by duplicate
// detection but the returned keys should not be encoded.
GetKeyRange() (startKey []byte, endKey []byte, err error)
// SplitRanges splits the range [startKey, endKey) into multiple ranges.
// SplitRanges splits the range [startKey, endKey) into multiple ranges. If the
// duplicate detection is enabled, the keys in engine are encoded by duplicate
// detection but the returned keys should not be encoded.
SplitRanges(startKey, endKey []byte, sizeLimit, keysLimit int64, logger log.Logger) ([]Range, error)
Close() error
}
3 changes: 3 additions & 0 deletions br/pkg/storage/ks3.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ type ks3ObjectReader struct {
func (r *ks3ObjectReader) Read(p []byte) (n int, err error) {
retryCnt := 0
maxCnt := r.rangeInfo.End + 1 - r.pos
if maxCnt == 0 {
return 0, io.EOF
}
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,12 +898,12 @@ type s3ObjectReader struct {
func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
retryCnt := 0
maxCnt := r.rangeInfo.End + 1 - r.pos
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
if maxCnt == 0 {
return 0, io.EOF
}
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
n, err = r.reader.Read(p[:maxCnt])
// TODO: maybe we should use !errors.Is(err, io.EOF) here to avoid error lint, but currently, pingcap/errors
// doesn't implement this method yet.
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,15 @@ func generateGlobalSortIngestPlan(
if err != nil {
return nil, err
}
iCnt := int64(len(instanceIDs))
metaArr := make([][]byte, 0, 16)
for i, g := range kvMetaGroups {
if g == nil {
logger.Error("meet empty kv group when getting subtask summary",
zap.Int64("taskID", task.ID))
return nil, errors.Errorf("subtask kv group %d is empty", i)
}
newMeta, err := splitSubtaskMetaForOneKVMetaGroup(ctx, store, g, cloudStorageURI, int64(len(instanceIDs)), logger)
newMeta, err := splitSubtaskMetaForOneKVMetaGroup(ctx, store, g, cloudStorageURI, iCnt, logger)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
9 changes: 7 additions & 2 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,23 @@ func TestGlobalSortBasic(t *testing.T) {
dom.DDL().SetHook(hook)

tk.MustExec("alter table t add index idx(a);")
dom.DDL().SetHook(origin)
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
checkFileCleaned(t, jobID, cloudStorageURI)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()"))
tk.MustExec("alter table t add index idx1(a);")
dom.DDL().SetHook(origin)
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
checkFileCleaned(t, jobID, cloudStorageURI)

tk.MustExec("alter table t add unique index idx2(a);")
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
checkFileCleaned(t, jobID, cloudStorageURI)

dom.DDL().SetHook(origin)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort"))
}
Expand Down
Loading