Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: cleanup sorted files, writer memory quota, and test #47092

Merged
merged 16 commits into from
Sep 20, 2023
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 34,
shard_count = 36,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"//kv",
Expand Down
74 changes: 74 additions & 0 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,77 @@ func TestGetMaxOverlapping(t *testing.T) {
}
require.EqualValues(t, 3, GetMaxOverlapping(points))
}

func TestSortedKVMeta(t *testing.T) {
summary := []*WriterSummary{
{
Min: []byte("a"),
Max: []byte("b"),
TotalSize: 123,
MultipleFilesStats: []MultipleFilesStat{
{
Filenames: [][2]string{
{"f1", "stat1"},
{"f2", "stat2"},
},
},
},
},
{
Min: []byte("x"),
Max: []byte("y"),
TotalSize: 177,
MultipleFilesStats: []MultipleFilesStat{
{
Filenames: [][2]string{
{"f3", "stat3"},
{"f4", "stat4"},
},
},
},
},
}
meta0 := NewSortedKVMeta(summary[0])
require.Equal(t, []byte("a"), meta0.MinKey)
require.Equal(t, []byte("b"), meta0.MaxKey)
require.Equal(t, uint64(123), meta0.TotalKVSize)
require.Equal(t, []string{"f1", "f2"}, meta0.DataFiles)
require.Equal(t, []string{"stat1", "stat2"}, meta0.StatFiles)
meta1 := NewSortedKVMeta(summary[1])
require.Equal(t, []byte("x"), meta1.MinKey)
require.Equal(t, []byte("y"), meta1.MaxKey)
require.Equal(t, uint64(177), meta1.TotalKVSize)
require.Equal(t, []string{"f3", "f4"}, meta1.DataFiles)
require.Equal(t, []string{"stat3", "stat4"}, meta1.StatFiles)

meta0.MergeSummary(summary[1])
require.Equal(t, []byte("a"), meta0.MinKey)
require.Equal(t, []byte("y"), meta0.MaxKey)
require.Equal(t, uint64(300), meta0.TotalKVSize)
require.Equal(t, []string{"f1", "f2", "f3", "f4"}, meta0.DataFiles)
require.Equal(t, []string{"stat1", "stat2", "stat3", "stat4"}, meta0.StatFiles)

meta00 := NewSortedKVMeta(summary[0])
meta00.Merge(meta1)
require.Equal(t, meta0, meta00)
}

func TestKeyMinMax(t *testing.T) {
require.Equal(t, []byte(nil), NotNilMin(nil, nil))
require.Equal(t, []byte{}, NotNilMin(nil, []byte{}))
require.Equal(t, []byte(nil), NotNilMin([]byte{}, nil))
require.Equal(t, []byte("a"), NotNilMin([]byte("a"), nil))
require.Equal(t, []byte("a"), NotNilMin([]byte("a"), []byte{}))
require.Equal(t, []byte("a"), NotNilMin(nil, []byte("a")))
require.Equal(t, []byte("a"), NotNilMin([]byte("a"), []byte("b")))
require.Equal(t, []byte("a"), NotNilMin([]byte("b"), []byte("a")))

require.Equal(t, []byte(nil), NotNilMax(nil, nil))
require.Equal(t, []byte{}, NotNilMax(nil, []byte{}))
require.Equal(t, []byte(nil), NotNilMax([]byte{}, nil))
require.Equal(t, []byte("a"), NotNilMax([]byte("a"), nil))
require.Equal(t, []byte("a"), NotNilMax([]byte("a"), []byte{}))
require.Equal(t, []byte("a"), NotNilMax(nil, []byte("a")))
require.Equal(t, []byte("b"), NotNilMax([]byte("a"), []byte("b")))
require.Equal(t, []byte("b"), NotNilMax([]byte("b"), []byte("a")))
}
14 changes: 11 additions & 3 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import (

var multiFileStatNum = 500

const (
// DefaultMemSizeLimit is the default memory size limit for writer.
DefaultMemSizeLimit = 256 * size.MB
)

// rangePropertiesCollector collects range properties for each range. The zero
// value of rangePropertiesCollector is not ready to use, should call reset()
// first.
Expand All @@ -61,8 +66,11 @@ func (rc *rangePropertiesCollector) encode() []byte {

// WriterSummary is the summary of a writer.
type WriterSummary struct {
WriterID string
Seq int
WriterID string
Seq int
// Min and Max are the min and max key written by this writer, both are
// inclusive, i.e. [Min, Max].
// will be empty if no key is written.
Min tidbkv.Key
Max tidbkv.Key
TotalSize uint64
Expand Down Expand Up @@ -90,7 +98,7 @@ type WriterBuilder struct {
// NewWriterBuilder creates a WriterBuilder.
func NewWriterBuilder() *WriterBuilder {
return &WriterBuilder{
memSizeLimit: 256 * size.MB,
memSizeLimit: DefaultMemSizeLimit,
writeBatchCount: 8 * 1024,
propSizeDist: 1 * size.MB,
propKeysDist: 8 * 1024,
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
dbkv "github.com/pingcap/tidb/kv"
Expand All @@ -41,11 +42,13 @@ func TestWriter(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()

writer := NewWriterBuilder().
w := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
Build(memStore, "/test", "0")

writer := NewEngineWriter(w)

kvCnt := rand.Intn(10) + 10
kvs := make([]common.KvPair, kvCnt)
for i := 0; i < kvCnt; i++ {
Expand All @@ -58,12 +61,9 @@ func TestWriter(t *testing.T) {
_, err = rand.Read(kvs[i].Val)
require.NoError(t, err)
}
for _, pair := range kvs {
err := writer.WriteRow(ctx, pair.Key, pair.Val, nil)
require.NoError(t, err)
}

err := writer.Close(ctx)
require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs)))
_, err := writer.Close(ctx)
require.NoError(t, err)

slices.SortFunc(kvs, func(i, j common.KvPair) int {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
"//util/codec",
"//util/engine",
"//util/hack",
"//util/intest",
"//util/mathutil",
"//util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -939,7 +940,11 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
if err != nil {
return err
}
store, err := storage.New(ctx, storeBackend, nil)
opt := &storage.ExternalStorageOptions{}
if intest.InTest {
opt.NoCredentials = true
}
store, err := storage.New(ctx, storeBackend, opt)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,3 +2255,13 @@ func TestExternalEngine(t *testing.T) {
}
require.Equal(t, 100, kvIdx)
}

func TestGetExternalEngineKVStatistics(t *testing.T) {
b := Backend{
externalEngine: map[uuid.UUID]common.Engine{},
}
// non existent uuid
size, count := b.GetExternalEngineKVStatistics(uuid.New())
require.Zero(t, size)
require.Zero(t, count)
}
11 changes: 10 additions & 1 deletion disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//meta/autoid",
"//metrics",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//resourcemanager/pool/workerpool",
"//resourcemanager/util",
Expand All @@ -59,7 +60,9 @@ go_library(
"//util/logutil",
"//util/mathutil",
"//util/promutil",
"//util/size",
"//util/sqlexec",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
Expand All @@ -78,6 +81,7 @@ go_test(
"dispatcher_test.go",
"dispatcher_testkit_test.go",
"encode_and_sort_operator_test.go",
"job_testkit_test.go",
"metrics_test.go",
"planner_test.go",
"subtask_executor_test.go",
Expand All @@ -86,12 +90,14 @@ go_test(
embed = [":importinto"],
flaky = True,
race = "on",
shard_count = 8,
shard_count = 11,
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/external",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//ddl",
"//disttask/framework/dispatcher",
"//disttask/framework/planner",
"//disttask/framework/proto",
Expand All @@ -101,9 +107,12 @@ go_test(
"//domain/infosync",
"//executor/importer",
"//meta/autoid",
"//parser",
"//parser/ast",
"//parser/model",
"//testkit",
"//util/logutil",
"//util/mock",
"//util/sqlexec",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
Loading
Loading