Skip to content

Commit

Permalink
Merge branch 'master' into parse_time_remove_exp_tz
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Nov 15, 2023
2 parents bfc4792 + 1c8d383 commit 79212ca
Show file tree
Hide file tree
Showing 114 changed files with 3,724 additions and 3,475 deletions.
25 changes: 14 additions & 11 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ component_management:
statuses:
- type: project # in this case every component that doens't have a status defined will have a project type one
target: auto
informational: # resulting status will pass no matter what the coverage is or what other settings are specified.
individual_components:
- component_id: component_dumpling # this is an identifier that should not be changed
name: dumpling # this is a display name, and can be changed freely
Expand All @@ -64,8 +65,10 @@ flag_management:
statuses:
- type: project
target: auto
informational: # resulting status will pass no matter what the coverage is or what other settings are specified.
- type: patch
target: 85%
target: auto
informational: # resulting status will pass no matter what the coverage is or what other settings are specified.

ignore:
- "LICENSES"
Expand All @@ -76,15 +79,15 @@ ignore:
- "cmd/.*"
- "docs/.*"
- "vendor/.*"
- "ddl/failtest/.*"
- "ddl/testutil/.*"
- "executor/seqtest/.*"
- "metrics/.*"
- "expression/generator/.*"
- "pkg/ddl/failtest/.*"
- "pkg/ddl/testutil/.*"
- "pkg/executor/seqtest/.*"
- "pkg/metrics/.*"
- "pkg/expression/generator/.*"
- "br/pkg/mock/.*"
- "testkit/.*"
- "server/internal/testutil/.*"
- "statistics/handle/cache/internal/testutil/.*"
- "session/testutil.go"
- "store/mockstore/unistore/testutil.go"
- "pkg/testkit/.*"
- "pkg/server/internal/testutil/.*"
- "pkg/statistics/handle/cache/internal/testutil/.*"
- "pkg/session/testutil.go"
- "pkg/store/mockstore/unistore/testutil.go"

24 changes: 12 additions & 12 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7119,26 +7119,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "b689432454a504f8ba1ad166ebf901584155edc64eed4119a30c07ab52e3af8f",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231030120815-1362f1e87566",
sha256 = "285edca3320cc8847aceffb5d5471fe7483c49f66795622f71ed819c72635d00",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231114060955-8fc8a528217e",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "79a51a978de81c8a893e2b33c279ac84e3f95ccdf70cb7f62fef9a76472cd92b",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20230912103610-2f57a9f050eb",
sha256 = "cb510944ce56555f005fff2d891af3fefa667f37955779b89c35fd40f51deace",
strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231114041114-86831ce71865",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20230912103610-2f57a9f050eb.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20230912103610-2f57a9f050eb.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20230912103610-2f57a9f050eb.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20230912103610-2f57a9f050eb.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip",
],
)
go_repository(
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 45,
shard_count = 46,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand All @@ -89,6 +89,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
"@org_golang_x_exp//rand",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
],
)
230 changes: 223 additions & 7 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"context"
"flag"
"fmt"
"io"
"os"
"runtime/pprof"
"sync"
"testing"
"time"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"golang.org/x/sync/errgroup"
)

var testingStorageURI = flag.String("testing-storage-uri", "", "the URI of the storage used for testing")
Expand Down Expand Up @@ -93,8 +96,7 @@ func (s *ascendingKeySource) outputSize() int {
return s.totalSize
}

type testSuite struct {
t *testing.T
type writeTestSuite struct {
store storage.ExternalStorage
source kvSource
memoryLimit int
Expand All @@ -103,7 +105,7 @@ type testSuite struct {
afterWriterClose func()
}

func writePlainFile(s *testSuite) {
func writePlainFile(s *writeTestSuite) {
ctx := context.Background()
buf := make([]byte, s.memoryLimit)
offset := 0
Expand Down Expand Up @@ -139,7 +141,7 @@ func writePlainFile(s *testSuite) {
}
}

func writeExternalFile(s *testSuite) {
func writeExternalFile(s *writeTestSuite) {
ctx := context.Background()
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(s.memoryLimit))
Expand All @@ -164,7 +166,7 @@ func writeExternalFile(s *testSuite) {
}
}

func TestCompare(t *testing.T) {
func TestCompareWriter(t *testing.T) {
store := openTestingStorage(t)
source := newAscendingKeySource(20, 100, 10000000)
memoryLimit := 64 * 1024 * 1024
Expand Down Expand Up @@ -195,8 +197,7 @@ func TestCompare(t *testing.T) {
pprof.StopCPUProfile()
}

suite := &testSuite{
t: t,
suite := &writeTestSuite{
store: store,
source: source,
memoryLimit: memoryLimit,
Expand All @@ -214,3 +215,218 @@ func TestCompare(t *testing.T) {
writerSpeed := float64(source.outputSize()) / elapsed.Seconds() / 1024 / 1024
t.Logf("writer speed for %d bytes: %.2f MB/s", source.outputSize(), writerSpeed)
}

type readTestSuite struct {
store storage.ExternalStorage
totalKVCnt int
concurrency int
memoryLimit int
beforeCreateReader func()
beforeReaderClose func()
afterReaderClose func()
}

func readFileSequential(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

buf := make([]byte, s.memoryLimit)
if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for i, file := range files {
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
}
intest.Assert(err == io.EOF)
if i == len(files)-1 {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
}
err = reader.Close()
intest.Assert(err == nil)
}
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func readFileConcurrently(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

conc := min(s.concurrency, len(files))
var eg errgroup.Group
eg.SetLimit(conc)
var once sync.Once

if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for _, file := range files {
eg.Go(func() error {
buf := make([]byte, s.memoryLimit/conc)
reader, err := s.store.Open(ctx, file, nil)
intest.Assert(err == nil)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
}
intest.Assert(err == io.EOF)
once.Do(func() {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
})
err = reader.Close()
intest.Assert(err == nil)
return nil
})
}
err = eg.Wait()
intest.Assert(err == nil)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func createEvenlyDistributedFiles(
t *testing.T,
fileSize, fileCount int,
) (storage.ExternalStorage, int) {
store := openTestingStorage(t)
ctx := context.Background()

files, statFiles, err := GetAllFileNames(ctx, store, "evenly_distributed")
intest.Assert(err == nil)
err = store.DeleteFiles(ctx, files)
intest.Assert(err == nil)
err = store.DeleteFiles(ctx, statFiles)
intest.Assert(err == nil)

value := make([]byte, 100)
kvCnt := 0
for i := 0; i < fileCount; i++ {
builder := NewWriterBuilder().
SetMemorySizeLimit(uint64(float64(fileSize) * 1.1))
writer := builder.Build(
store,
"evenly_distributed",
fmt.Sprintf("%d", i),
)

keyIdx := i
totalSize := 0
for totalSize < fileSize {
key := fmt.Sprintf("key_%09d", keyIdx)
err := writer.WriteRow(ctx, []byte(key), value, nil)
intest.Assert(err == nil)
keyIdx += fileCount
totalSize += len(key) + len(value)
kvCnt++
}
err := writer.Close(ctx)
intest.Assert(err == nil)
}
return store, kvCnt
}

func readMergeIter(s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "evenly_distributed")
intest.Assert(err == nil)

if s.beforeCreateReader != nil {
s.beforeCreateReader()
}

readBufSize := s.memoryLimit / len(files)
zeroOffsets := make([]uint64, len(files))
iter, err := NewMergeKVIter(ctx, files, zeroOffsets, s.store, readBufSize, false)
intest.Assert(err == nil)

kvCnt := 0
for iter.Next() {
kvCnt++
if kvCnt == s.totalKVCnt/2 {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
}
}
}
intest.Assert(kvCnt == s.totalKVCnt)
err = iter.Close()
intest.Assert(err == nil)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
}

func TestCompareReader(t *testing.T) {
fileSize := 50 * 1024 * 1024
fileCnt := 24
store, kvCnt := createEvenlyDistributedFiles(t, fileSize, fileCnt)
memoryLimit := 64 * 1024 * 1024
fileIdx := 0
var (
now time.Time
elapsed time.Duration
file *os.File
err error
)
beforeTest := func() {
fileIdx++
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
err = pprof.StartCPUProfile(file)
intest.Assert(err == nil)
now = time.Now()
}
beforeClose := func() {
file, err = os.Create(fmt.Sprintf("heap-profile-%d.prof", fileIdx))
intest.Assert(err == nil)
// check heap profile to see the memory usage is expected
err = pprof.WriteHeapProfile(file)
intest.Assert(err == nil)
}
afterClose := func() {
elapsed = time.Since(now)
pprof.StopCPUProfile()
}

suite := &readTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: 100,
memoryLimit: memoryLimit,
beforeCreateReader: beforeTest,
beforeReaderClose: beforeClose,
afterReaderClose: afterClose,
}
readFileSequential(suite)
t.Logf(
"sequential read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readFileConcurrently(suite)
t.Logf(
"concurrent read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readMergeIter(suite)
t.Logf(
"merge iter read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)
}
Loading

0 comments on commit 79212ca

Please sign in to comment.