Skip to content

Commit

Permalink
lightning: add function about write-then-read to external engine (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Aug 18, 2023
1 parent 6174973 commit 6188778
Show file tree
Hide file tree
Showing 9 changed files with 490 additions and 91 deletions.
7 changes: 6 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ go_library(
srcs = [
"byte_reader.go",
"codec.go",
"engine.go",
"file.go",
"iter.go",
"kv_reader.go",
"stat_reader.go",
"util.go",
"writer.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
Expand All @@ -21,6 +23,7 @@ go_library(
"//br/pkg/membuf",
"//br/pkg/storage",
"//kv",
"//util/hack",
"//util/logutil",
"//util/mathutil",
"//util/size",
Expand All @@ -36,13 +39,15 @@ go_test(
srcs = [
"byte_reader_test.go",
"codec_test.go",
"engine_test.go",
"file_test.go",
"iter_test.go",
"util_test.go",
"writer_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 15,
shard_count = 19,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand Down
60 changes: 60 additions & 0 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"encoding/hex"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// Engine stored sorted key/value pairs in an external storage.
type Engine struct {
storage storage.ExternalStorage
dataFiles []string
statsFiles []string
}

func (e *Engine) createMergeIter(ctx context.Context, start kv.Key) (*MergeKVIter, error) {
logger := logutil.Logger(ctx)

var offsets []uint64
if len(e.statsFiles) == 0 {
offsets = make([]uint64, len(e.dataFiles))
logger.Info("no stats files",
zap.String("startKey", hex.EncodeToString(start)))
} else {
offs, err := seekPropsOffsets(ctx, start, e.statsFiles, e.storage)
if err != nil {
return nil, errors.Trace(err)
}
offsets = offs
logger.Info("seek props offsets",
zap.Uint64s("offsets", offsets),
zap.String("startKey", hex.EncodeToString(start)),
zap.Strings("dataFiles", prettyFileNames(e.dataFiles)),
zap.Strings("statsFiles", prettyFileNames(e.statsFiles)))
}
iter, err := NewMergeKVIter(ctx, e.dataFiles, offsets, e.storage, 64*1024)
if err != nil {
return nil, errors.Trace(err)
}
return iter, nil
}
109 changes: 109 additions & 0 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"bytes"
"context"
"slices"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func TestIter(t *testing.T) {
seed := time.Now().Unix()
rand.Seed(uint64(seed))
t.Logf("seed: %d", seed)

totalKV := 300
kvPairs := make([]common.KvPair, totalKV)
for i := range kvPairs {
keyBuf := make([]byte, rand.Intn(10)+1)
rand.Read(keyBuf)
// make sure the key is unique
kvPairs[i].Key = append(keyBuf, byte(i/255), byte(i%255))
valBuf := make([]byte, rand.Intn(10)+1)
rand.Read(valBuf)
kvPairs[i].Val = valBuf
}

sortedKVPairs := make([]common.KvPair, totalKV)
copy(sortedKVPairs, kvPairs)
slices.SortFunc(sortedKVPairs, func(i, j common.KvPair) int {
return bytes.Compare(i.Key, j.Key)
})

ctx := context.Background()
store := storage.NewMemStorage()

for i := 0; i < 3; i++ {
w := NewWriterBuilder().
SetMemorySizeLimit(uint64(rand.Intn(100)+1)).
SetPropSizeDistance(uint64(rand.Intn(50)+1)).
SetPropKeysDistance(uint64(rand.Intn(10)+1)).
Build(store, i, "/subtask")
kvStart := i * 100
kvEnd := (i + 1) * 100
err := w.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvPairs[kvStart:kvEnd]))
require.NoError(t, err)
_, err = w.Close(ctx)
require.NoError(t, err)
}

dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/subtask")
require.NoError(t, err)

engine := Engine{
storage: store,
dataFiles: dataFiles,
statsFiles: statFiles,
}
iter, err := engine.createMergeIter(ctx, sortedKVPairs[0].Key)
require.NoError(t, err)
got := make([]common.KvPair, 0, totalKV)
for iter.Next() {
got = append(got, common.KvPair{
Key: iter.Key(),
Val: iter.Value(),
})
}
require.NoError(t, iter.Error())
require.Equal(t, sortedKVPairs, got)

pickStartIdx := rand.Intn(len(sortedKVPairs))
startKey := sortedKVPairs[pickStartIdx].Key
iter, err = engine.createMergeIter(ctx, startKey)
require.NoError(t, err)
got = make([]common.KvPair, 0, totalKV)
for iter.Next() {
got = append(got, common.KvPair{
Key: iter.Key(),
Val: iter.Value(),
})
}
require.NoError(t, iter.Error())
// got keys must be ascending
for i := 1; i < len(got); i++ {
require.True(t, bytes.Compare(got[i-1].Key, got[i].Key) < 0)
}
// the first key must be less than or equal to startKey
require.True(t, bytes.Compare(got[0].Key, startKey) <= 0)
}
78 changes: 1 addition & 77 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package external
import (
"context"
"encoding/binary"
"path/filepath"
"strconv"
"strings"

"github.com/pingcap/tidb/br/pkg/storage"
)
Expand Down Expand Up @@ -117,77 +114,4 @@ func (s *KeyValueStore) Close() {
}
}

var statSuffix = filepath.Join("_stat", "0")

// GetAllFileNames returns a FilePathHandle that contains all data file paths
// and a slice of stat file paths.
func GetAllFileNames(
ctx context.Context,
store storage.ExternalStorage,
subDir string,
) (FilePathHandle, []string, error) {
var dataFilePaths FilePathHandle
var stats []string

err := store.WalkDir(ctx,
&storage.WalkOption{SubDir: subDir},
func(path string, size int64) error {
if strings.HasSuffix(path, statSuffix) {
stats = append(stats, path)
} else {
dir, file := filepath.Split(path)
writerID, err := strconv.Atoi(filepath.Base(dir))
if err != nil {
return err
}
seq, err := strconv.Atoi(file)
if err != nil {
return err
}
dataFilePaths.set(writerID, seq, path)
}
return nil
})
if err != nil {
return dataFilePaths, nil, err
}
return dataFilePaths, stats, nil
}

// FilePathHandle handles data file paths under a prefix path.
type FilePathHandle struct {
paths [][]string
}

func (p *FilePathHandle) set(writerID, seq int, path string) {
if writerID >= len(p.paths) {
p.paths = append(p.paths, make([][]string, writerID-len(p.paths)+1)...)
}
if seq >= len(p.paths[writerID]) {
p.paths[writerID] = append(p.paths[writerID], make([]string, seq-len(p.paths[writerID])+1)...)
}
p.paths[writerID][seq] = path
}

// Get returns the path of the data file with the given writerID and seq.
func (p *FilePathHandle) Get(writerID, seq int) string {
return p.paths[writerID][seq]
}

// ForEach applies the given function to each data file path.
func (p *FilePathHandle) ForEach(f func(writerID, seq int, path string)) {
for writerID, paths := range p.paths {
for seq, path := range paths {
f(writerID, seq, path)
}
}
}

// FlatSlice returns a flat slice of all data file paths.
func (p *FilePathHandle) FlatSlice() []string {
var paths []string
p.ForEach(func(writerID, seq int, path string) {
paths = append(paths, path)
})
return paths
}
const statSuffix = "_stat"
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/external/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ func (i *MergePropIter) prop() *rangeProperty {
return i.iter.curr
}

func (i *MergePropIter) readerIndex() int {
return i.iter.lastReaderIdx
}

// Close closes the iterator.
func (i *MergePropIter) Close() error {
return i.iter.close()
Expand Down
Loading

0 comments on commit 6188778

Please sign in to comment.