Skip to content

Commit

Permalink
lightning/external: port file.go and necessary codes (#45795)
Browse files Browse the repository at this point in the history
ref #45719
  • Loading branch information
lance6716 authored Aug 7, 2023
1 parent 0aa67b2 commit de30804
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 3 deletions.
14 changes: 11 additions & 3 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "external",
srcs = ["byte_reader.go"],
srcs = [
"byte_reader.go",
"file.go",
"sharedisk.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/external",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -16,11 +20,15 @@ go_library(
go_test(
name = "external_test",
timeout = "short",
srcs = ["byte_reader_test.go"],
srcs = [
"byte_reader_test.go",
"file_test.go",
],
embed = [":external"],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//br/pkg/storage",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
186 changes: 186 additions & 0 deletions br/pkg/lightning/backend/external/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"encoding/binary"
"path/filepath"
"strconv"
"strings"

"github.com/pingcap/tidb/br/pkg/storage"
)

// KeyValueStore stores key-value pairs and maintains the range properties.
type KeyValueStore struct {
dataWriter storage.ExternalFileWriter

rc *rangePropertiesCollector
ctx context.Context
writerID int
seq int
offset uint64
}

// NewKeyValueStore creates a new KeyValueStore. The data will be written to the
// given dataWriter and range properties will be maintained in the given
// rangePropertiesCollector.
func NewKeyValueStore(
ctx context.Context,
dataWriter storage.ExternalFileWriter,
rangePropertiesCollector *rangePropertiesCollector,
writerID int,
seq int,
) (*KeyValueStore, error) {
kvStore := &KeyValueStore{
dataWriter: dataWriter,
ctx: ctx,
rc: rangePropertiesCollector,
writerID: writerID,
seq: seq,
}
return kvStore, nil
}

// AddKeyValue saves a key-value pair to the KeyValueStore. If the accumulated
// size or key count exceeds the given distance, a new range property will be
// appended to the rangePropertiesCollector with current status.
func (s *KeyValueStore) AddKeyValue(key, value []byte) error {
kvLen := len(key) + len(value) + 16
var b [8]byte

// data layout: keyLen + key + valueLen + value
_, err := s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:], uint64(len(key))),
)
if err != nil {
return err
}
_, err = s.dataWriter.Write(s.ctx, key)
if err != nil {
return err
}
_, err = s.dataWriter.Write(
s.ctx,
binary.BigEndian.AppendUint64(b[:], uint64(len(value))),
)
if err != nil {
return err
}
_, err = s.dataWriter.Write(s.ctx, value)
if err != nil {
return err
}

if len(s.rc.currProp.key) == 0 {
s.rc.currProp.key = key
s.rc.currProp.writerID = s.writerID
s.rc.currProp.dataSeq = s.seq
}

s.offset += uint64(kvLen)
s.rc.currProp.size += uint64(len(key) + len(value))
s.rc.currProp.keys++

if s.rc.currProp.size >= s.rc.propSizeIdxDistance ||
s.rc.currProp.keys >= s.rc.propKeysIdxDistance {
newProp := *s.rc.currProp
s.rc.props = append(s.rc.props, &newProp)

s.rc.currProp.key = nil
s.rc.currProp.offset = s.offset
s.rc.currProp.keys = 0
s.rc.currProp.size = 0
}

return nil
}

var statSuffix = filepath.Join("_stat", "0")

// GetAllFileNames returns a FilePathHandle that contains all data file paths
// and a slice of stat file paths.
func GetAllFileNames(
ctx context.Context,
store storage.ExternalStorage,
subDir string,
) (FilePathHandle, []string, error) {
var dataFilePaths FilePathHandle
var stats []string

err := store.WalkDir(ctx,
&storage.WalkOption{SubDir: subDir},
func(path string, size int64) error {
if strings.HasSuffix(path, statSuffix) {
stats = append(stats, path)
} else {
dir, file := filepath.Split(path)
writerID, err := strconv.Atoi(filepath.Base(dir))
if err != nil {
return err
}
seq, err := strconv.Atoi(file)
if err != nil {
return err
}
dataFilePaths.set(writerID, seq, path)
}
return nil
})
if err != nil {
return dataFilePaths, nil, err
}
return dataFilePaths, stats, nil
}

// FilePathHandle handles data file paths under a prefix path.
type FilePathHandle struct {
paths [][]string
}

func (p *FilePathHandle) set(writerID, seq int, path string) {
if writerID >= len(p.paths) {
p.paths = append(p.paths, make([][]string, writerID-len(p.paths)+1)...)
}
if seq >= len(p.paths[writerID]) {
p.paths[writerID] = append(p.paths[writerID], make([]string, seq-len(p.paths[writerID])+1)...)
}
p.paths[writerID][seq] = path
}

// Get returns the path of the data file with the given writerID and seq.
func (p *FilePathHandle) Get(writerID, seq int) string {
return p.paths[writerID][seq]
}

// ForEach applies the given function to each data file path.
func (p *FilePathHandle) ForEach(f func(writerID, seq int, path string)) {
for writerID, paths := range p.paths {
for seq, path := range paths {
f(writerID, seq, path)
}
}
}

// FlatSlice returns a flat slice of all data file paths.
func (p *FilePathHandle) FlatSlice() []string {
var paths []string
p.ForEach(func(writerID, seq int, path string) {
paths = append(paths, path)
})
return paths
}
113 changes: 113 additions & 0 deletions br/pkg/lightning/backend/external/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package external

import (
"context"
"testing"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/stretchr/testify/require"
)

func TestAddKeyValueMaintainRangeProperty(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()
writer, err := memStore.Create(ctx, "/test", nil)
require.NoError(t, err)
rc := &rangePropertiesCollector{
propSizeIdxDistance: 100,
propKeysIdxDistance: 2,
}
rc.reset()
initRC := *rc
kvStore, err := NewKeyValueStore(ctx, writer, rc, 1, 1)
require.NoError(t, err)

require.Equal(t, &initRC, rc)
encoded := rc.encode()
require.Len(t, encoded, 0)

k1, v1 := []byte("key1"), []byte("value1")
err = kvStore.AddKeyValue(k1, v1)
require.NoError(t, err)
// when not accumulated enough data, no range property will be added.
require.Equal(t, &initRC, rc)

// propKeysIdxDistance = 2, so after adding 2 keys, a new range property will be added.
k2, v2 := []byte("key2"), []byte("value2")
err = kvStore.AddKeyValue(k2, v2)
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected := &rangeProperty{
key: k1,
offset: 0,
writerID: 1,
dataSeq: 1,
size: uint64(len(k1) + len(v1) + len(k2) + len(v2)),
keys: 2,
}
require.Equal(t, expected, rc.props[0])
encoded = rc.encode()
require.Greater(t, len(encoded), 0)

// when not accumulated enough data, no range property will be added.
k3, v3 := []byte("key3"), []byte("value3")
err = kvStore.AddKeyValue(k3, v3)
require.NoError(t, err)
require.Len(t, rc.props, 1)

err = writer.Close(ctx)
require.NoError(t, err)

writer, err = memStore.Create(ctx, "/test2", nil)
require.NoError(t, err)
rc = &rangePropertiesCollector{
propSizeIdxDistance: 1,
propKeysIdxDistance: 100,
}
rc.reset()
kvStore, err = NewKeyValueStore(ctx, writer, rc, 2, 2)
require.NoError(t, err)
err = kvStore.AddKeyValue(k1, v1)
require.NoError(t, err)
require.Len(t, rc.props, 1)
expected = &rangeProperty{
key: k1,
offset: 0,
writerID: 2,
dataSeq: 2,
size: uint64(len(k1) + len(v1)),
keys: 1,
}
require.Equal(t, expected, rc.props[0])

err = kvStore.AddKeyValue(k2, v2)
require.NoError(t, err)
require.Len(t, rc.props, 2)
expected = &rangeProperty{
key: k2,
offset: uint64(len(k1) + len(v1) + 16),
writerID: 2,
dataSeq: 2,
size: uint64(len(k2) + len(v2)),
keys: 1,
}
require.Equal(t, expected, rc.props[1])
err = writer.Close(ctx)
require.NoError(t, err)
}

// TODO(lance6716): add more tests when the usage of other functions are merged into master.
Loading

0 comments on commit de30804

Please sign in to comment.