Skip to content

Commit

Permalink
Merge branch 'master' into mx/fixPanicByConcurrentMapIterationAndWrit…
Browse files Browse the repository at this point in the history
…eInMemoryUsageAlarm
  • Loading branch information
mengxin9014 committed Nov 1, 2022
2 parents 6d10d95 + 37ed0ab commit b83be6c
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 176 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3429,8 +3429,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:5KLqhDGLc/mtemdS/odfOP717rn8ttsTj3jzZ8TZn9A=",
version = "v2.0.1-0.20221017092635-91be9c6ce6c0",
sum = "h1:s8eJEGI4p/fxFwMBkoJ+4FAEQNQhHR47TZmVW+EEtOE=",
version = "v2.0.1-0.20221026083454-6c9c7c7c5815",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
195 changes: 25 additions & 170 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (

var id uint64

type metaMaker = func(files ...*backuppb.DataFileInfo) *backuppb.Metadata

func wm(start, end, minBegin uint64) *backuppb.DataFileInfo {
i := wr(start, end, minBegin)
i.IsMeta = true
Expand Down Expand Up @@ -155,7 +157,7 @@ func (b *mockMetaBuilder) b(useV2 bool) (*storage.LocalStorage, string) {
return s, path
}

func TestReadMetaBetweenTS(t *testing.T) {
func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
log.SetLevel(zapcore.DebugLevel)
type Case struct {
items []*backuppb.Metadata
Expand Down Expand Up @@ -251,103 +253,12 @@ func TestReadMetaBetweenTS(t *testing.T) {
}
}

func TestReadMetaBetweenTSV2(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
type Case struct {
items []*backuppb.Metadata
startTS uint64
endTS uint64
expectedShiftTS uint64
expected []int
}

cases := []Case{
{
items: []*backuppb.Metadata{
m2(wr(4, 10, 3), wr(5, 13, 5)),
m2(dr(1, 3)),
m2(wr(10, 42, 9), dr(6, 9)),
},
startTS: 4,
endTS: 5,
expectedShiftTS: 3,
expected: []int{0, 1},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
},
startTS: 50,
endTS: 99,
expectedShiftTS: 1,
expected: []int{0},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
m2(wr(200, 300, 200), dr(200, 300)),
},
startTS: 150,
endTS: 199,
expectedShiftTS: 98,
expected: []int{1, 0},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5)),
m2(wr(101, 200, 101), dr(100, 200)),
m2(wr(200, 300, 200), dr(200, 300)),
},
startTS: 150,
endTS: 199,
expectedShiftTS: 101,
expected: []int{1},
},
}

run := func(t *testing.T, c Case) {
req := require.New(t)
ctx := context.Background()
loc, temp := (&mockMetaBuilder{
metas: c.items,
}).b(true)
defer func() {
t.Log("temp dir", temp)
if !t.Failed() {
os.RemoveAll(temp)
}
}()
init := LogFileManagerInit{
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
req.NoError(err)
metas, err := cli.readStreamMeta(ctx)
req.NoError(err)
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
expectedStoreIDs = append(expectedStoreIDs, c.items[meta].StoreId)
}
req.ElementsMatch(actualStoreIDs, expectedStoreIDs)
}

for i, c := range cases {
t.Run(fmt.Sprintf("case#%d", i), func(t *testing.T) {
run(t, c)
})
}
func TestReadMetaBetweenTS(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testReadMetaBetweenTSWithVersion(t, m) })
t.Run("MetaV2", func(t *testing.T) { testReadMetaBetweenTSWithVersion(t, m2) })
}

func TestReadFromMetadata(t *testing.T) {
func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {
type Case struct {
items []*backuppb.Metadata
untilTS uint64
Expand Down Expand Up @@ -413,70 +324,9 @@ func TestReadFromMetadata(t *testing.T) {
}
}

func TestReadFromMetadataV2(t *testing.T) {
type Case struct {
items []*backuppb.Metadata
untilTS uint64
expected []int
}

cases := []Case{
{
items: []*backuppb.Metadata{
m2(wr(4, 10, 3), wr(5, 13, 5)),
m2(dr(1, 3)),
m2(wr(10, 42, 9), dr(6, 9)),
},
untilTS: 10,
expected: []int{0, 1, 2},
},
{
items: []*backuppb.Metadata{
m2(wr(1, 100, 1), wr(5, 13, 5), dr(1, 101)),
m2(wr(100, 200, 98), dr(100, 200)),
},
untilTS: 99,
expected: []int{0},
},
}

run := func(t *testing.T, c Case) {
req := require.New(t)
ctx := context.Background()
loc, temp := (&mockMetaBuilder{
metas: c.items,
}).b(true)
defer func() {
t.Log("temp dir", temp)
if !t.Failed() {
os.RemoveAll(temp)
}
}()

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.LoadUntil(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
for _, m := range meta.metadata {
metas = append(metas, m)
}
actualStoreIDs := make([]int64, 0, len(metas))
for _, meta := range metas {
actualStoreIDs = append(actualStoreIDs, meta.StoreId)
}
expectedStoreIDs := make([]int64, 0, len(c.expected))
for _, meta := range c.expected {
expectedStoreIDs = append(expectedStoreIDs, c.items[meta].StoreId)
}
req.ElementsMatch(actualStoreIDs, expectedStoreIDs)
}

for i, c := range cases {
t.Run(fmt.Sprintf("case#%d", i), func(t *testing.T) {
run(t, c)
})
}
func TestReadFromMetadata(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testReadFromMetadataWithVersion(t, m) })
t.Run("MetaV2", func(t *testing.T) { testReadFromMetadataWithVersion(t, m2) })
}

func dataFileInfoMatches(t *testing.T, listA []*backuppb.DataFileInfo, listB ...*backuppb.DataFileInfo) {
Expand Down Expand Up @@ -528,7 +378,7 @@ func formatL(l []*backuppb.DataFileInfo) string {
return "[" + strings.Join(r.Item, ", ") + "]"
}

func TestFileManager(t *testing.T) {
func testFileManagerWithMeta(t *testing.T, m metaMaker) {
type Case struct {
Metadata []*backuppb.Metadata
StartTS int
Expand All @@ -544,9 +394,9 @@ func TestFileManager(t *testing.T) {
cases := []Case{
{
Metadata: []*backuppb.Metadata{
m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 2,
RestoreTS: 60,
Expand All @@ -556,9 +406,9 @@ func TestFileManager(t *testing.T) {
},
{
Metadata: []*backuppb.Metadata{
m2(wm(4, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(4, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 5,
RestoreTS: 80,
Expand All @@ -570,9 +420,9 @@ func TestFileManager(t *testing.T) {
},
{
Metadata: []*backuppb.Metadata{
m2(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m2(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m2(dr(100, 101), wr(102, 104, 100)),
m(wm(5, 10, 1), dm(1, 8), dr(2, 6), wr(4, 5, 2)),
m(wr(50, 54, 42), dr(42, 50), wr(70, 78, 0), wm(80, 81, 0), wm(90, 92, 0)),
m(dr(100, 101), wr(102, 104, 100)),
},
StartTS: 6,
RestoreTS: 80,
Expand Down Expand Up @@ -629,3 +479,8 @@ func TestFileManager(t *testing.T) {
t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { run(t, c) })
}
}

func TestFileManger(t *testing.T) {
t.Run("MetaV1", func(t *testing.T) { testFileManagerWithMeta(t, m) })
t.Run("MetaV2", func(t *testing.T) { testFileManagerWithMeta(t, m2) })
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20221017092635-91be9c6ce6c0
github.com/tikv/client-go/v2 v2.0.1-0.20221026083454-6c9c7c7c5815
github.com/tikv/pd/client v0.0.0-20221010134149-d50e5fe43f14
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -910,8 +910,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20221017092635-91be9c6ce6c0 h1:5KLqhDGLc/mtemdS/odfOP717rn8ttsTj3jzZ8TZn9A=
github.com/tikv/client-go/v2 v2.0.1-0.20221017092635-91be9c6ce6c0/go.mod h1:9hmGJFrWdehClHg0lv2cYgzvCUEhwLZkH67/PHl75tg=
github.com/tikv/client-go/v2 v2.0.1-0.20221026083454-6c9c7c7c5815 h1:s8eJEGI4p/fxFwMBkoJ+4FAEQNQhHR47TZmVW+EEtOE=
github.com/tikv/client-go/v2 v2.0.1-0.20221026083454-6c9c7c7c5815/go.mod h1:9hmGJFrWdehClHg0lv2cYgzvCUEhwLZkH67/PHl75tg=
github.com/tikv/pd/client v0.0.0-20221010134149-d50e5fe43f14 h1:REQOR1XraH1fT9BCoNBPZs1CAe+w7VPLU+d+si7DLYo=
github.com/tikv/pd/client v0.0.0-20221010134149-d50e5fe43f14/go.mod h1:E/7+Fkqzwsrp4duzJ2gLPqFl6awU7QG+5yFRXaQwimM=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
6 changes: 6 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,10 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
if explain, ok := p.(*plannercore.Explain); ok && explain.Analyze && explain.TargetPlan != nil {
p = explain.TargetPlan
}
canExplainAnalyze := false
if _, ok := p.(plannercore.PhysicalPlan); ok {
canExplainAnalyze = true
}
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
Port: s.sessionVars.Port,
Expand All @@ -1581,6 +1585,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
StatsInfo: plannercore.GetStatsInfo,
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
CanExplainAnalyze: canExplainAnalyze,
}
oldPi := s.ShowProcess()
if p == nil {
Expand All @@ -1590,6 +1595,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
pi.Plan = oldPi.Plan
pi.PlanExplainRows = oldPi.PlanExplainRows
pi.RuntimeStatsColl = oldPi.RuntimeStatsColl
_, pi.CanExplainAnalyze = pi.Plan.(plannercore.PhysicalPlan)
}
}
// We set process info before building plan, so we extended execution time.
Expand Down
23 changes: 23 additions & 0 deletions tests/realtikvtest/txntest/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,26 @@ func TestDuplicateErrorMessage(t *testing.T) {
tk2.MustExec("insert into t4 values (1, 2)")
tk.MustContainErrMsg("update t3 set c = c + 1 where v = 1", "Duplicate entry '1' for key 't3.i1'")
}

func TestAssertionWhenPessimisticLockLost(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("set @@tidb_constraint_check_in_place_pessimistic=0")
tk1.MustExec("set @@tidb_txn_assertion_level=strict")
tk2.MustExec("set @@tidb_constraint_check_in_place_pessimistic=0")
tk2.MustExec("set @@tidb_txn_assertion_level=strict")
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("create table t (id int primary key, val text)")
tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t where id = 1 for update")
tk2.MustExec("begin pessimistic")
tk2.MustExec("insert into t values (1, 'b')")
tk2.MustExec("insert into t values (2, 'b')")
tk2.MustExec("commit")
tk1.MustExec("select * from t where id = 2 for update")
tk1.MustExec("insert into t values (1, 'a') on duplicate key update val = concat(val, 'a')")
err := tk1.ExecToErr("commit")
require.NotContains(t, err.Error(), "assertion")
}
2 changes: 1 addition & 1 deletion util/memoryusagealarm/memoryusagealarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager, recordDir stri
processInfo := sm.ShowProcessList()
pinfo := make([]*util.ProcessInfo, 0, len(processInfo))
for _, info := range processInfo {
if len(info.Info) != 0 {
if len(info.Info) != 0 && info.CanExplainAnalyze {
pinfo = append(pinfo, info)
}
}
Expand Down
1 change: 1 addition & 0 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type ProcessInfo struct {
Command byte
ExceedExpensiveTimeThresh bool
RedactSQL bool
CanExplainAnalyze bool
}

// ToRowForShow returns []interface{} for the row data of "SHOW [FULL] PROCESSLIST".
Expand Down

0 comments on commit b83be6c

Please sign in to comment.