diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 20b1f8dbaea4d..fe74fa8ca00f8 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "external", - srcs = ["byte_reader.go"], + srcs = [ + "byte_reader.go", + "file.go", + "sharedisk.go", + ], importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external", visibility = ["//visibility:public"], deps = [ @@ -16,11 +20,15 @@ go_library( go_test( name = "external_test", timeout = "short", - srcs = ["byte_reader_test.go"], + srcs = [ + "byte_reader_test.go", + "file_test.go", + ], embed = [":external"], flaky = True, - shard_count = 3, + shard_count = 4, deps = [ + "//br/pkg/storage", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/br/pkg/lightning/backend/external/file.go b/br/pkg/lightning/backend/external/file.go new file mode 100644 index 0000000000000..d7d6d8a529236 --- /dev/null +++ b/br/pkg/lightning/backend/external/file.go @@ -0,0 +1,186 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "context" + "encoding/binary" + "path/filepath" + "strconv" + "strings" + + "github.com/pingcap/tidb/br/pkg/storage" +) + +// KeyValueStore stores key-value pairs and maintains the range properties. +type KeyValueStore struct { + dataWriter storage.ExternalFileWriter + + rc *rangePropertiesCollector + ctx context.Context + writerID int + seq int + offset uint64 +} + +// NewKeyValueStore creates a new KeyValueStore. The data will be written to the +// given dataWriter and range properties will be maintained in the given +// rangePropertiesCollector. +func NewKeyValueStore( + ctx context.Context, + dataWriter storage.ExternalFileWriter, + rangePropertiesCollector *rangePropertiesCollector, + writerID int, + seq int, +) (*KeyValueStore, error) { + kvStore := &KeyValueStore{ + dataWriter: dataWriter, + ctx: ctx, + rc: rangePropertiesCollector, + writerID: writerID, + seq: seq, + } + return kvStore, nil +} + +// AddKeyValue saves a key-value pair to the KeyValueStore. If the accumulated +// size or key count exceeds the given distance, a new range property will be +// appended to the rangePropertiesCollector with current status. +func (s *KeyValueStore) AddKeyValue(key, value []byte) error { + kvLen := len(key) + len(value) + 16 + var b [8]byte + + // data layout: keyLen + key + valueLen + value + _, err := s.dataWriter.Write( + s.ctx, + binary.BigEndian.AppendUint64(b[:], uint64(len(key))), + ) + if err != nil { + return err + } + _, err = s.dataWriter.Write(s.ctx, key) + if err != nil { + return err + } + _, err = s.dataWriter.Write( + s.ctx, + binary.BigEndian.AppendUint64(b[:], uint64(len(value))), + ) + if err != nil { + return err + } + _, err = s.dataWriter.Write(s.ctx, value) + if err != nil { + return err + } + + if len(s.rc.currProp.key) == 0 { + s.rc.currProp.key = key + s.rc.currProp.writerID = s.writerID + s.rc.currProp.dataSeq = s.seq + } + + s.offset += uint64(kvLen) + s.rc.currProp.size += uint64(len(key) + len(value)) + s.rc.currProp.keys++ + + if s.rc.currProp.size >= s.rc.propSizeIdxDistance || + s.rc.currProp.keys >= s.rc.propKeysIdxDistance { + newProp := *s.rc.currProp + s.rc.props = append(s.rc.props, &newProp) + + s.rc.currProp.key = nil + s.rc.currProp.offset = s.offset + s.rc.currProp.keys = 0 + s.rc.currProp.size = 0 + } + + return nil +} + +var statSuffix = filepath.Join("_stat", "0") + +// GetAllFileNames returns a FilePathHandle that contains all data file paths +// and a slice of stat file paths. +func GetAllFileNames( + ctx context.Context, + store storage.ExternalStorage, + subDir string, +) (FilePathHandle, []string, error) { + var dataFilePaths FilePathHandle + var stats []string + + err := store.WalkDir(ctx, + &storage.WalkOption{SubDir: subDir}, + func(path string, size int64) error { + if strings.HasSuffix(path, statSuffix) { + stats = append(stats, path) + } else { + dir, file := filepath.Split(path) + writerID, err := strconv.Atoi(filepath.Base(dir)) + if err != nil { + return err + } + seq, err := strconv.Atoi(file) + if err != nil { + return err + } + dataFilePaths.set(writerID, seq, path) + } + return nil + }) + if err != nil { + return dataFilePaths, nil, err + } + return dataFilePaths, stats, nil +} + +// FilePathHandle handles data file paths under a prefix path. +type FilePathHandle struct { + paths [][]string +} + +func (p *FilePathHandle) set(writerID, seq int, path string) { + if writerID >= len(p.paths) { + p.paths = append(p.paths, make([][]string, writerID-len(p.paths)+1)...) + } + if seq >= len(p.paths[writerID]) { + p.paths[writerID] = append(p.paths[writerID], make([]string, seq-len(p.paths[writerID])+1)...) + } + p.paths[writerID][seq] = path +} + +// Get returns the path of the data file with the given writerID and seq. +func (p *FilePathHandle) Get(writerID, seq int) string { + return p.paths[writerID][seq] +} + +// ForEach applies the given function to each data file path. +func (p *FilePathHandle) ForEach(f func(writerID, seq int, path string)) { + for writerID, paths := range p.paths { + for seq, path := range paths { + f(writerID, seq, path) + } + } +} + +// FlatSlice returns a flat slice of all data file paths. +func (p *FilePathHandle) FlatSlice() []string { + var paths []string + p.ForEach(func(writerID, seq int, path string) { + paths = append(paths, path) + }) + return paths +} diff --git a/br/pkg/lightning/backend/external/file_test.go b/br/pkg/lightning/backend/external/file_test.go new file mode 100644 index 0000000000000..72d8f7e06c377 --- /dev/null +++ b/br/pkg/lightning/backend/external/file_test.go @@ -0,0 +1,113 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestAddKeyValueMaintainRangeProperty(t *testing.T) { + ctx := context.Background() + memStore := storage.NewMemStorage() + writer, err := memStore.Create(ctx, "/test", nil) + require.NoError(t, err) + rc := &rangePropertiesCollector{ + propSizeIdxDistance: 100, + propKeysIdxDistance: 2, + } + rc.reset() + initRC := *rc + kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1) + require.NoError(t, err) + + require.Equal(t, &initRC, rc) + encoded := rc.encode() + require.Len(t, encoded, 0) + + k1, v1 := []byte("key1"), []byte("value1") + err = kvStore.AddKeyValue(k1, v1) + require.NoError(t, err) + // when not accumulated enough data, no range property will be added. + require.Equal(t, &initRC, rc) + + // propKeysIdxDistance = 2, so after adding 2 keys, a new range property will be added. + k2, v2 := []byte("key2"), []byte("value2") + err = kvStore.AddKeyValue(k2, v2) + require.NoError(t, err) + require.Len(t, rc.props, 1) + expected := &rangeProperty{ + key: k1, + offset: 0, + writerID: 1, + dataSeq: 1, + size: uint64(len(k1) + len(v1) + len(k2) + len(v2)), + keys: 2, + } + require.Equal(t, expected, rc.props[0]) + encoded = rc.encode() + require.Greater(t, len(encoded), 0) + + // when not accumulated enough data, no range property will be added. + k3, v3 := []byte("key3"), []byte("value3") + err = kvStore.AddKeyValue(k3, v3) + require.NoError(t, err) + require.Len(t, rc.props, 1) + + err = writer.Close(ctx) + require.NoError(t, err) + + writer, err = memStore.Create(ctx, "/test2", nil) + require.NoError(t, err) + rc = &rangePropertiesCollector{ + propSizeIdxDistance: 1, + propKeysIdxDistance: 100, + } + rc.reset() + kvStore, err = NewKeyValueStore(ctx, writer, rc, 2, 2) + require.NoError(t, err) + err = kvStore.AddKeyValue(k1, v1) + require.NoError(t, err) + require.Len(t, rc.props, 1) + expected = &rangeProperty{ + key: k1, + offset: 0, + writerID: 2, + dataSeq: 2, + size: uint64(len(k1) + len(v1)), + keys: 1, + } + require.Equal(t, expected, rc.props[0]) + + err = kvStore.AddKeyValue(k2, v2) + require.NoError(t, err) + require.Len(t, rc.props, 2) + expected = &rangeProperty{ + key: k2, + offset: uint64(len(k1) + len(v1) + 16), + writerID: 2, + dataSeq: 2, + size: uint64(len(k2) + len(v2)), + keys: 1, + } + require.Equal(t, expected, rc.props[1]) + err = writer.Close(ctx) + require.NoError(t, err) +} + +// TODO(lance6716): add more tests when the usage of other functions are merged into master. diff --git a/br/pkg/lightning/backend/external/sharedisk.go b/br/pkg/lightning/backend/external/sharedisk.go new file mode 100644 index 0000000000000..55c5848109e62 --- /dev/null +++ b/br/pkg/lightning/backend/external/sharedisk.go @@ -0,0 +1,86 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package external + +import ( + "encoding/binary" +) + +// rangeProperty describes some statistic of a range of a file. +type rangeProperty struct { + key []byte // the first key in the range + offset uint64 // the end offset of the range + writerID int + dataSeq int + size uint64 // total KV size in the range, not considering the file format layout overhead + keys uint64 // total KV count in the range +} + +// rangePropertiesCollector collects range properties for each range. The zero +// value of rangePropertiesCollector is not ready to use, should call reset() +// first. +type rangePropertiesCollector struct { + props []*rangeProperty + currProp *rangeProperty + propSizeIdxDistance uint64 + propKeysIdxDistance uint64 +} + +func (rc *rangePropertiesCollector) reset() { + rc.props = rc.props[:0] + rc.currProp = &rangeProperty{} +} + +// keyLen + p.size + p.keys + p.offset + p.WriterID + p.DataSeq +const propertyLengthExceptKey = 4 + 8 + 8 + 8 + 4 + 4 + +// encode encodes rc.props to a byte slice. +func (rc *rangePropertiesCollector) encode() []byte { + b := make([]byte, 0, 1024) + idx := 0 + for _, p := range rc.props { + // Size. + b = append(b, 0, 0, 0, 0) + binary.BigEndian.PutUint32(b[idx:], uint32(propertyLengthExceptKey+len(p.key))) + idx += 4 + + b = append(b, 0, 0, 0, 0) + binary.BigEndian.PutUint32(b[idx:], uint32(len(p.key))) + idx += 4 + b = append(b, p.key...) + idx += len(p.key) + + b = append(b, 0, 0, 0, 0, 0, 0, 0, 0) + binary.BigEndian.PutUint64(b[idx:], p.size) + idx += 8 + + b = append(b, 0, 0, 0, 0, 0, 0, 0, 0) + binary.BigEndian.PutUint64(b[idx:], p.keys) + idx += 8 + + b = append(b, 0, 0, 0, 0, 0, 0, 0, 0) + binary.BigEndian.PutUint64(b[idx:], p.offset) + idx += 8 + + b = append(b, 0, 0, 0, 0) + binary.BigEndian.PutUint32(b[idx:], uint32(p.writerID)) + idx += 4 + + b = append(b, 0, 0, 0, 0) + binary.BigEndian.PutUint32(b[idx:], uint32(p.dataSeq)) + idx += 4 + } + return b +}