Skip to content

Commit

Permalink
Merge branch 'master' into fix-prealloc
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored Dec 26, 2022
2 parents 969829d + 2f6401a commit 28fe235
Show file tree
Hide file tree
Showing 359 changed files with 23,607 additions and 13,537 deletions.
2 changes: 1 addition & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
startup --host_jvm_args=-Xmx5g
startup --host_jvm_args=-Xmx8g
startup --unlimit_coredumps

run:ci --color=yes
Expand Down
14 changes: 14 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version: 2
updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "weekly"
day: "friday"
time: "18:00"
timezone: "Asia/Shanghai"
allow:
- dependency-name: "golang.org/*"
- dependency-name: "github.com/golangci/golangci-lint"
open-pull-requests-limit: 2

20 changes: 10 additions & 10 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4425,15 +4425,15 @@ def go_deps():
name = "org_golang_x_net",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/net",
sum = "h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU=",
version = "v0.2.0",
sum = "h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=",
version = "v0.4.0",
)
go_repository(
name = "org_golang_x_oauth2",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/oauth2",
sum = "h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=",
version = "v0.2.0",
sum = "h1:6l90koy8/LaBLmLu8jpHeHexzMwEita0zFfYlggy2F8=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_sync",
Expand All @@ -4453,22 +4453,22 @@ def go_deps():
name = "org_golang_x_term",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/term",
sum = "h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM=",
version = "v0.2.0",
sum = "h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_text",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/text",
sum = "h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=",
version = "v0.4.0",
sum = "h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM=",
version = "v0.5.0",
)
go_repository(
name = "org_golang_x_time",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/time",
sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=",
version = "v0.2.0",
sum = "h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_tools",
Expand Down
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "ae013bf35bd23234d1dea46b079f1e05ba74ac0321423830119d3e787ec73483",
sha256 = "56d8c5a5c91e1af73eca71a6fab2ced959b67c86d12ba37feedb0a2dfea441a6",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
],
)

Expand Down
105 changes: 0 additions & 105 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,108 +1388,3 @@ func TestDropBindBySQLDigest(t *testing.T) {
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for '1'")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")
}

func TestCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(id int primary key, a int, b int, key(a))")
tk.MustExec("create table t2(id int primary key, a int, b int, key(a))")

var testCases = []struct {
sqls []string
hint string
}{
{
sqls: []string{
"select %s * from t1, t2 where t1.id = t2.id",
"select %s * from test.t1, t2 where t1.id = t2.id",
"select %s * from test.t1, test.t2 where t1.id = t2.id",
"select %s * from t1, test.t2 where t1.id = t2.id",
},
hint: "/*+ merge_join(t1, t2) */",
},
{
sqls: []string{
"select %s * from t1 where a = 1",
"select %s * from test.t1 where a = 1",
},
hint: "/*+ ignore_index(t, a) */",
},
}

for _, testCase := range testCases {
for _, bind := range testCase.sqls {
stmtsummary.StmtSummaryByDigestMap.Clear()
bindSQL := fmt.Sprintf(bind, testCase.hint)
tk.MustExec(bindSQL)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", bindSQL)).Rows()
tk.MustExec(fmt.Sprintf("create session binding from history using plan digest '%s'", planDigest[0][0]))
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
for _, sql := range testCase.sqls {
tk.MustExec(fmt.Sprintf(sql, ""))
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}
}
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
tk.MustExec(fmt.Sprintf("drop binding for sql digest '%s'", showRes[0][9]))
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", "1"), "can't find any plans for '1'")
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", ""), "plan digest is empty")
tk.MustExec("create binding for select * from t1, t2 where t1.id = t2.id using select /*+ merge_join(t1, t2) */ * from t1, t2 where t1.id = t2.id")
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, showRes[0][10], "") // plan digest should be nil by create for
}

func TestCreateBindingForPrepareFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, a int, key(a))")

tk.MustExec("prepare stmt from 'select /*+ ignore_index(t,a) */ * from t where a = ?'")
tk.MustExec("set @a = 1")
tk.MustExec("execute stmt using @a")
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", "select /*+ ignore_index(t,a) */ * from t where a = ? [arguments: 1]")).Rows()
showRes := tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 0)
tk.MustExec(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]))
showRes = tk.MustQuery("show bindings").Rows()
require.Equal(t, len(showRes), 1)
require.Equal(t, planDigest[0][0], showRes[0][10])
tk.MustExec("execute stmt using @a")
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
}

func TestErrorCasesCreateBindingFromHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil))

tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, t3")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")
tk.MustExec("create table t3(id int)")

sql := "select * from t1 where t1.id in (select id from t2)"
tk.MustExec(sql)
planDigest := tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with sub query")

sql = "select * from t1, t2, t3 where t1.id = t2.id and t2.id = t3.id"
tk.MustExec(sql)
planDigest = tk.MustQuery(fmt.Sprintf("select plan_digest from information_schema.statements_summary where query_sample_text = '%s'", sql)).Rows()
tk.MustGetErrMsg(fmt.Sprintf("create binding from history using plan digest '%s'", planDigest[0][0]), "can't create binding for query with more than two table join")
}
9 changes: 9 additions & 0 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,15 @@ func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, n
return
}

// SetBindRecordStatusByDigest set a BindRecord's status to the storage and bind cache.
func (h *BindHandle) SetBindRecordStatusByDigest(newStatus, sqlDigest string) (ok bool, err error) {
oldRecord, err := h.GetBindRecordBySQLDigest(sqlDigest)
if err != nil {
return false, err
}
return h.SetBindRecordStatus(oldRecord.OriginalSQL, nil, newStatus)
}

// GCBindRecord physically removes the deleted bind records in mysql.bind_info.
func (h *BindHandle) GCBindRecord() (err error) {
h.bindInfo.Lock()
Expand Down
2 changes: 1 addition & 1 deletion br/COMPATIBILITY_TEST.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Background

We had some incompatibility issues in the past, which made BR cannot restore backed up data in some situations.
So we need a test workflow to check the compatiblity.
So we need a test workflow to check the compatibility.

## Goal

Expand Down
6 changes: 5 additions & 1 deletion br/pkg/aws/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ go_test(
name = "aws_test",
srcs = ["ebs_test.go"],
embed = [":aws"],
deps = ["@com_github_stretchr_testify//require"],
deps = [
"@com_github_aws_aws_sdk_go//aws",
"@com_github_aws_aws_sdk_go//service/ec2",
"@com_github_stretchr_testify//require",
],
)
31 changes: 20 additions & 11 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,9 @@ func (e *EC2Session) WaitVolumesCreated(volumeIDMap map[string]string, progress
return 0, errors.Trace(err)
}

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
log.Info("volume is available", zap.String("id", *volume.SnapshotId))
totalVolumeSize += *volume.Size
progress.Inc()
} else {
log.Debug("volume creating...", zap.Stringer("volume", volume))
unfinishedVolumes = append(unfinishedVolumes, volume.SnapshotId)
}
}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(resp)
progress.IncBy(int64(len(pendingVolumes) - len(unfinishedVolumes)))
totalVolumeSize += createdVolumeSize
pendingVolumes = unfinishedVolumes
}
log.Info("all pending volume are created.")
Expand Down Expand Up @@ -357,3 +349,20 @@ func (e *EC2Session) DeleteVolumes(volumeIDMap map[string]string) {
func ec2Tag(key, val string) *ec2.Tag {
return &ec2.Tag{Key: &key, Value: &val}
}

func (e *EC2Session) HandleDescribeVolumesResponse(resp *ec2.DescribeVolumesOutput) (int64, []*string) {
totalVolumeSize := int64(0)

var unfinishedVolumes []*string
for _, volume := range resp.Volumes {
if *volume.State == ec2.VolumeStateAvailable {
log.Info("volume is available", zap.String("id", *volume.VolumeId))
totalVolumeSize += *volume.Size
} else {
log.Debug("volume creating...", zap.Stringer("volume", volume))
unfinishedVolumes = append(unfinishedVolumes, volume.VolumeId)
}
}

return totalVolumeSize, unfinishedVolumes
}
53 changes: 45 additions & 8 deletions br/pkg/aws/ebs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,63 @@ package aws
import (
"testing"

awsapi "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/stretchr/testify/require"
)

func TestEC2SessionExtractSnapProgress(t *testing.T) {
strPtr := func(s string) *string {
return &s
}
tests := []struct {
str *string
want int64
}{
{nil, 0},
{strPtr("12.12%"), 12},
{strPtr("44.99%"), 44},
{strPtr(" 89.89% "), 89},
{strPtr("100%"), 100},
{strPtr("111111%"), 100},
{awsapi.String("12.12%"), 12},
{awsapi.String("44.99%"), 44},
{awsapi.String(" 89.89% "), 89},
{awsapi.String("100%"), 100},
{awsapi.String("111111%"), 100},
}
e := &EC2Session{}
for _, tt := range tests {
require.Equal(t, tt.want, e.extractSnapProgress(tt.str))
}
}

func createVolume(snapshotId string, volumeId string, state string) *ec2.Volume {
return &ec2.Volume{
Attachments: nil,
AvailabilityZone: awsapi.String("us-west-2"),
CreateTime: nil,
Encrypted: awsapi.Bool(true),
FastRestored: awsapi.Bool(true),
Iops: awsapi.Int64(3000),
KmsKeyId: nil,
MultiAttachEnabled: awsapi.Bool(true),
OutpostArn: awsapi.String("arn:12342"),
Size: awsapi.Int64(1),
SnapshotId: awsapi.String(snapshotId),
State: awsapi.String(state),
Tags: nil,
Throughput: nil,
VolumeId: awsapi.String(volumeId),
VolumeType: awsapi.String("gp3"),
}
}
func TestHandleDescribeVolumesResponse(t *testing.T) {
curentVolumesStates := &ec2.DescribeVolumesOutput{
NextToken: awsapi.String("fake token"),
Volumes: []*ec2.Volume{
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "creating"),
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "available"),
createVolume("snap-0873674883", "vol-98768979", "available"),
},
}

e := &EC2Session{}
createdVolumeSize, unfinishedVolumes := e.HandleDescribeVolumesResponse(curentVolumesStates)
require.Equal(t, int64(4), createdVolumeSize)
require.Equal(t, 1, len(unfinishedVolumes))
}
28 changes: 25 additions & 3 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,32 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()
if err = d.BatchCreateTableWithInfo(gs.se, schema, info, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
if len(info) == 1 {
return err
}
mid := len(info) / 2
err = gs.SplitBatchCreateTable(schema, info[:mid])
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, info[mid:])
if err != nil {
return err
}
return nil
}
return err
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

// Disable foreign key check when batch create tables.
Expand Down Expand Up @@ -233,8 +256,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...)
if err != nil {
if err := gs.SplitBatchCreateTable(dbName, cloneTables); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C
var genCols []genCol
for i, col := range cols {
if col.GeneratedExpr != nil {
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names)
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names, false)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 28fe235

Please sign in to comment.