Skip to content

Commit

Permalink
external: make global sort common.Range unencoded (pingcap#51900)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Mar 20, 2024
1 parent f676a1a commit 735adb0
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 53 deletions.
40 changes: 37 additions & 3 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, startKey, endKey []byt
sortDurHist := metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("sort")

readStart := time.Now()
readDtStartKey := e.keyAdapter.Encode(nil, startKey, common.MinRowID)
readDtEndKey := e.keyAdapter.Encode(nil, endKey, common.MinRowID)
err := readAllData(
ctx,
e.storage,
e.dataFiles,
e.statsFiles,
startKey,
endKey,
readDtStartKey,
readDtEndKey,
e.smallBlockBufPool,
e.largeBlockBufPool,
&e.memKVsAndBuffers,
Expand Down Expand Up @@ -402,7 +404,32 @@ func (e *Engine) ID() string {

// GetKeyRange implements common.Engine.
func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
return e.startKey, e.endKey, nil
if _, ok := e.keyAdapter.(common.NoopKeyAdapter); ok {
return e.startKey, e.endKey, nil
}

// when duplicate detection feature is enabled, the end key comes from
// DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the
// error.

start, err := e.keyAdapter.Decode(nil, e.startKey)
if err != nil {
return nil, nil, err
}
end, err := e.keyAdapter.Decode(nil, e.endKey)
if err == nil {
return start, end, nil
}
// handle the case that end key is from Key.Next()
if e.endKey[len(e.endKey)-1] != 0 {
return nil, nil, err
}
endEncoded := e.endKey[:len(e.endKey)-1]
end, err = e.keyAdapter.Decode(nil, endEncoded)
if err != nil {
return nil, nil, err
}
return start, kv.Key(end).Next(), nil
}

// SplitRanges split the ranges by split keys provided by external engine.
Expand All @@ -412,6 +439,13 @@ func (e *Engine) SplitRanges(
_ log.Logger,
) ([]common.Range, error) {
splitKeys := e.splitKeys
for i, k := range e.splitKeys {
var err error
splitKeys[i], err = e.keyAdapter.Decode(nil, k)
if err != nil {
return nil, err
}
}
ranges := make([]common.Range, 0, len(splitKeys)+1)
ranges = append(ranges, common.Range{Start: startKey})
for i := 0; i < len(splitKeys); i++ {
Expand Down
38 changes: 1 addition & 37 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package external
import (
"bytes"
"context"
"fmt"
"io"
"slices"
"sort"
Expand All @@ -41,11 +40,6 @@ import (
// file reader, read, parse and skip few smaller keys, and then locate the needed
// data.
//
// To avoid potential data loss, it also checks at least one statistic file has a
// key larger than or equal to the start key. If not, we are afraid that some
// paths are missing, and the data between [start key, min(first key of
// statistic files)) are lost.
//
// Caller can specify multiple ascending keys and seekPropsOffsets will return
// the offsets list per file for each key.
func seekPropsOffsets(
Expand All @@ -64,9 +58,7 @@ func seekPropsOffsets(
for i := range offsetsPerFile {
offsetsPerFile[i] = make([]uint64, len(starts))
}
// Record first key if it is smaller than first key of "starts" key argument for
// each file, and check all files afterward.
firstKeyTooSmallCheckers := make([]kv.Key, len(paths))

eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
for i := range paths {
i := i
Expand All @@ -80,7 +72,6 @@ func seekPropsOffsets(
}
defer r.Close()

moved := false
keyIdx := 0
curKey := starts[keyIdx]

Expand All @@ -100,19 +91,13 @@ func seekPropsOffsets(
}
propKey := kv.Key(p.firstKey)
for propKey.Cmp(curKey) > 0 {
if !moved {
if firstKeyTooSmallCheckers[i] == nil {
firstKeyTooSmallCheckers[i] = propKey
}
}
keyIdx++
if keyIdx >= len(starts) {
return nil
}
offsetsPerFile[i][keyIdx] = offsetsPerFile[i][keyIdx-1]
curKey = starts[keyIdx]
}
moved = true
offsetsPerFile[i][keyIdx] = p.offset
p, err3 = r.nextProp()
}
Expand All @@ -123,27 +108,6 @@ func seekPropsOffsets(
return nil, err
}

hasNil := false
for _, k := range firstKeyTooSmallCheckers {
if k == nil {
hasNil = true
break
}
}
if !hasNil {
minKey := firstKeyTooSmallCheckers[0]
for _, k := range firstKeyTooSmallCheckers[1:] {
if k.Cmp(minKey) < 0 {
minKey = k
}
}
return nil, fmt.Errorf("start key %s is too small for stat files %v, propKey %s",
starts[0].String(),
paths,
minKey.String(),
)
}

// TODO(lance6716): change the caller so we don't need to transpose the result
offsetsPerKey := make([][]uint64, len(starts))
for i := range starts {
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,17 @@ func TestSeekPropsOffsets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 20}, {30, 20}}, got)

_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store)
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{0, 0}}, got)

got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key1")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{10, 0}}, got)

_, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store)
require.ErrorContains(t, err, "start key 6b657930 is too small for stat files [/test1 /test2]")
got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key0"), []byte("key1")}, []string{file1, file2}, store)
require.NoError(t, err)
require.Equal(t, [][]uint64{{0, 0}, {10, 0}}, got)

got, err = seekPropsOffsets(ctx, []kv.Key{[]byte("key999")}, []string{file1, file2}, store)
require.NoError(t, err)
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,9 +1013,16 @@ func (local *Backend) generateJobForRange(
return nil, err
}
if pairStart == nil {
log.FromContext(ctx).Info("There is no pairs in range",
logFn := log.FromContext(ctx).Info
if _, ok := data.(*external.MemoryIngestData); ok {
logFn = log.FromContext(ctx).Warn
}
logFn("There is no pairs in range",
logutil.Key("start", start),
logutil.Key("end", end))
// trigger cleanup
data.IncRef()
data.DecRef()
return nil, nil
}

Expand Down
8 changes: 6 additions & 2 deletions br/pkg/lightning/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ type Engine interface {
KVStatistics() (totalKVSize int64, totalKVCount int64)
// ImportedStatistics returns the imported kv size and imported kv count.
ImportedStatistics() (importedKVSize int64, importedKVCount int64)
// GetKeyRange returns the key range [startKey, endKey) of the engine.
// GetKeyRange returns the key range [startKey, endKey) of the engine. If the
// duplicate detection is enabled, the keys in engine are encoded by duplicate
// detection but the returned keys should not be encoded.
GetKeyRange() (startKey []byte, endKey []byte, err error)
// SplitRanges splits the range [startKey, endKey) into multiple ranges.
// SplitRanges splits the range [startKey, endKey) into multiple ranges. If the
// duplicate detection is enabled, the keys in engine are encoded by duplicate
// detection but the returned keys should not be encoded.
SplitRanges(startKey, endKey []byte, sizeLimit, keysLimit int64, logger log.Logger) ([]Range, error)
Close() error
}
3 changes: 3 additions & 0 deletions br/pkg/storage/ks3.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ type ks3ObjectReader struct {
func (r *ks3ObjectReader) Read(p []byte) (n int, err error) {
retryCnt := 0
maxCnt := r.rangeInfo.End + 1 - r.pos
if maxCnt == 0 {
return 0, io.EOF
}
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,12 +898,12 @@ type s3ObjectReader struct {
func (r *s3ObjectReader) Read(p []byte) (n int, err error) {
retryCnt := 0
maxCnt := r.rangeInfo.End + 1 - r.pos
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
if maxCnt == 0 {
return 0, io.EOF
}
if maxCnt > int64(len(p)) {
maxCnt = int64(len(p))
}
n, err = r.reader.Read(p[:maxCnt])
// TODO: maybe we should use !errors.Is(err, io.EOF) here to avoid error lint, but currently, pingcap/errors
// doesn't implement this method yet.
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,15 @@ func generateGlobalSortIngestPlan(
if err != nil {
return nil, err
}
iCnt := int64(len(instanceIDs))
metaArr := make([][]byte, 0, 16)
for i, g := range kvMetaGroups {
if g == nil {
logger.Error("meet empty kv group when getting subtask summary",
zap.Int64("taskID", task.ID))
return nil, errors.Errorf("subtask kv group %d is empty", i)
}
newMeta, err := splitSubtaskMetaForOneKVMetaGroup(ctx, store, g, cloudStorageURI, int64(len(instanceIDs)), logger)
newMeta, err := splitSubtaskMetaForOneKVMetaGroup(ctx, store, g, cloudStorageURI, iCnt, logger)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
9 changes: 7 additions & 2 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,23 @@ func TestGlobalSortBasic(t *testing.T) {
dom.DDL().SetHook(hook)

tk.MustExec("alter table t add index idx(a);")
dom.DDL().SetHook(origin)
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
checkFileCleaned(t, jobID, cloudStorageURI)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()"))
tk.MustExec("alter table t add index idx1(a);")
dom.DDL().SetHook(origin)
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
checkFileCleaned(t, jobID, cloudStorageURI)

tk.MustExec("alter table t add unique index idx2(a);")
tk.MustExec("admin check table t;")
<-scheduler.WaitCleanUpFinished
checkFileCleaned(t, jobID, cloudStorageURI)

dom.DDL().SetHook(origin)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/scheduler/WaitCleanUpFinished"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/forceMergeSort"))
}
Expand Down

0 comments on commit 735adb0

Please sign in to comment.