Skip to content

Commit

Permalink
Merge branch 'master' into keep_order_hint
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Dec 19, 2022
2 parents e4d8302 + 0c18082 commit 319b6dd
Show file tree
Hide file tree
Showing 263 changed files with 17,970 additions and 11,804 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
startup --host_jvm_args=-Xmx5g
startup --host_jvm_args=-Xmx8g
startup --unlimit_coredumps

run:ci --color=yes
Expand Down
14 changes: 14 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
day: "friday"
time: "18:00"
timezone: "Asia/Shanghai"
allow:
- dependency-name: "golang.org/*"
- dependency-name: "github.com/golangci/golangci-lint"
open-pull-requests-limit: 2

12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4425,8 +4425,8 @@ def go_deps():
name = "org_golang_x_net",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/net",
sum = "h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=",
version = "v0.2.0",
sum = "h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=",
version = "v0.4.0",
)
go_repository(
name = "org_golang_x_oauth2",
Expand All @@ -4453,15 +4453,15 @@ def go_deps():
name = "org_golang_x_term",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/term",
sum = "h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM=",
version = "v0.2.0",
sum = "h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_text",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/text",
sum = "h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=",
version = "v0.4.0",
sum = "h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=",
version = "v0.5.0",
)
go_repository(
name = "org_golang_x_time",
Expand Down
105 changes: 0 additions & 105 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,108 +1388,3 @@ func TestDropBindBySQLDigest(t *testing.T) {
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for '1'")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")
}

func TestCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(id int primary key, a int, b int, key(a))")
tk.MustExec("create table t2(id int primary key, a int, b int, key(a))")

var testCases = []struct {
sqls []string
hint string
}{
{
sqls: []string{
"select %s * from t1, t2 where t1.id = t2.id",
"select %s * from test.t1, t2 where t1.id = t2.id",
"select %s * from test.t1, test.t2 where t1.id = t2.id",
"select %s * from t1, test.t2 where t1.id = t2.id",
},
hint: "/*+ merge_join(t1, t2) */",
},
{
sqls: []string{
"select %s * from t1 where a = 1",
"select %s * from test.t1 where a = 1",
},
hint: "/*+ ignore_index(t, a) */",
},
}

for _, testCase := range testCases {
for _, bind := range testCase.sqls {
stmtsummary.StmtSummaryByDigestMap.Clear()
bindSQL := fmt.Sprintf(bind, testCase.hint)
tk.MustExec(bindSQL)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", bindSQL)).Rows()
tk.MustExec(fmt.Sprintf("create session binding from history using plan digest '%s'", planDigest[0][0]))
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
for _, sql := range testCase.sqls {
tk.MustExec(fmt.Sprintf(sql, ""))
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}
}
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
tk.MustExec(fmt.Sprintf("drop binding for sql digest '%s'", showRes[0][9]))
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", "1"), "can't find any plans for '1'")
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", ""), "plan digest is empty")
tk.MustExec("create binding for select * from t1, t2 where t1.id = t2.id using select /*+ merge_join(t1, t2) */ * from t1, t2 where t1.id = t2.id")
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, showRes[0][10], "") // plan digest should be nil by create for
}

func TestCreateBindingForPrepareFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, a int, key(a))")

tk.MustExec("prepare stmt from 'select /*+ ignore_index(t,a) */ * from t where a = ?'")
tk.MustExec("set @a = 1")
tk.MustExec("execute stmt using @a")
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", "select /*+ ignore_index(t,a) */ * from t where a = ? [arguments: 1]")).Rows()
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 0)
tk.MustExec(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]))
showRes = tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
tk.MustExec("execute stmt using @a")
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}

func TestErrorCasesCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
tk.MustExec("create table t3(id int)")

sql := "select * from t1 where t1.id in (select id from t2)"
tk.MustExec(sql)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with sub query")

sql = "select * from t1, t2, t3 where t1.id = t2.id and t2.id = t3.id"
tk.MustExec(sql)
planDigest = tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with more than two table join")
}
2 changes: 1 addition & 1 deletion br/COMPATIBILITY_TEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Background

We had some incompatibility issues in the past, which made BR cannot restore backed up data in some situations.
So we need a test workflow to check the compatiblity.
So we need a test workflow to check the compatibility.

## Goal

Expand Down
6 changes: 5 additions & 1 deletion br/pkg/aws/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ go_test(
name = "aws_test",
srcs = ["ebs_test.go"],
embed = [":aws"],
deps = ["@com_github_stretchr_testify//require"],
deps = [
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//service/ec2",
"@com_github_stretchr_testify//require",
],
)
31 changes: 20 additions & 11 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,9 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
return 0, errors.Trace(err)
}

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
log.Info("volume is available", zap.String("id", *volume.SnapshotId))
totalVolumeSize += *volume.Size
progress.Inc()
} else {
log.Debug("volume creating...", zap.Stringer("volume", volume))
unfinishedVolumes = append(unfinishedVolumes, volume.SnapshotId)
}
}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp)
progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes)))
totalVolumeSize += createdVolumeSize
pendingVolumes = unfinishedVolumes
}
log.Info("all pending volume are created.")
Expand Down Expand Up @@ -357,3 +349,20 @@ func (e *EC2Session) DeleteVolumes(volumeIDMap map[string]string) {
func ec2Tag(key, val string) *ec2.Tag {
return &ec2.Tag{Key: &key, Value: &val}
}

func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) {
totalVolumeSize := int64(0)

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
log.Info("volume is available", zap.String("id", *volume.VolumeId))
totalVolumeSize += *volume.Size
} else {
log.Debug("volume creating...", zap.Stringer("volume", volume))
unfinishedVolumes = append(unfinishedVolumes, volume.VolumeId)
}
}

return totalVolumeSize, unfinishedVolumes
}
53 changes: 45 additions & 8 deletions br/pkg/aws/ebs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,63 @@ package aws
import (
"testing"

awsapi "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/stretchr/testify/require"
)

func TestEC2SessionExtractSnapProgress(t *testing.T) {
strPtr := func(s string) *string {
return &s
}
tests := []struct {
str *string
want int64
}{
{nil, 0},
{strPtr("12.12%"), 12},
{strPtr("44.99%"), 44},
{strPtr(" 89.89% "), 89},
{strPtr("100%"), 100},
{strPtr("111111%"), 100},
{awsapi.String("12.12%"), 12},
{awsapi.String("44.99%"), 44},
{awsapi.String(" 89.89% "), 89},
{awsapi.String("100%"), 100},
{awsapi.String("111111%"), 100},
}
e := &EC2Session{}
for _, tt := range tests {
require.Equal(t, tt.want, e.extractSnapProgress(tt.str))
}
}

func createVolume(snapshotId string, volumeId string, state string) *ec2.Volume {
return &ec2.Volume{
Attachments: nil,
AvailabilityZone: awsapi.String("us-west-2"),
CreateTime: nil,
Encrypted: awsapi.Bool(true),
FastRestored: awsapi.Bool(true),
Iops: awsapi.Int64(3000),
KmsKeyId: nil,
MultiAttachEnabled: awsapi.Bool(true),
OutpostArn: awsapi.String("arn:12342"),
Size: awsapi.Int64(1),
SnapshotId: awsapi.String(snapshotId),
State: awsapi.String(state),
Tags: nil,
Throughput: nil,
VolumeId: awsapi.String(volumeId),
VolumeType: awsapi.String("gp3"),
}
}
func TestHandleDescribeVolumesResponse(t *testing.T) {
curentVolumesStates := &ec2.DescribeVolumesOutput{
NextToken: awsapi.String("fake token"),
Volumes: []*ec2.Volume{
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "creating"),
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "available"),
},
}

e := &EC2Session{}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates)
require.Equal(t, int64(4), createdVolumeSize)
require.Equal(t, 1, len(unfinishedVolumes))
}
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ func openDuplicateDB(storeDir string) (*pebble.DB, error) {
return pebble.Open(dbPath, opts)
}

var (
// RunInTest indicates whether the current process is running in test.
RunInTest bool
// LastAlloc is the last ID allocator.
LastAlloc manual.Allocator
)

// NewLocalBackend creates new connections to tikv.
func NewLocalBackend(
ctx context.Context,
Expand Down Expand Up @@ -461,6 +468,11 @@ func NewLocalBackend(
} else {
writeLimiter = noopStoreWriteLimiter{}
}
alloc := manual.Allocator{}
if RunInTest {
alloc.RefCnt = new(atomic.Int64)
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -486,7 +498,7 @@ func NewLocalBackend(
keyAdapter: keyAdapter,
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/manual/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ go_library(
cgo = True,
importpath = "github.com/pingcap/tidb/br/pkg/lightning/manual",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_atomic//:atomic"],
)
31 changes: 28 additions & 3 deletions br/pkg/lightning/manual/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,33 @@

package manual

type Allocator struct{}
import (
"fmt"

func (Allocator) Alloc(n int) []byte { return New(n) }
"go.uber.org/atomic"
)

func (Allocator) Free(b []byte) { Free(b) }
type Allocator struct {
RefCnt *atomic.Int64
}

func (a Allocator) Alloc(n int) []byte {
if a.RefCnt != nil {
a.RefCnt.Add(1)
}
return New(n)
}

func (a Allocator) Free(b []byte) {
if a.RefCnt != nil {
a.RefCnt.Add(-1)
}
Free(b)
}

func (a Allocator) CheckRefCnt() error {
if a.RefCnt != nil && a.RefCnt.Load() != 0 {
return fmt.Errorf("memory leak detected, refCnt: %d", a.RefCnt.Load())
}
return nil
}
Binary file not shown.
Loading

0 comments on commit 319b6dd

Please sign in to comment.