Skip to content

Commit

Permalink
stmtsummary: fix issue of concurrent map read and write (#35367) (#35384
Browse files Browse the repository at this point in the history
)

close #35340
  • Loading branch information
ti-srebot authored Jun 17, 2022
1 parent eab2b9b commit 60be3f4
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
32 changes: 32 additions & 0 deletions infoschema/cluster_tables_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -82,6 +83,7 @@ func TestClusterTables(t *testing.T) {
t.Run("StmtSummaryHistoryTable", SubTestStmtSummaryHistoryTable(s))
t.Run("Issue26379", SubTestIssue26379(s))
t.Run("SubTestStmtSummaryResultRows", SubTestStmtSummaryResultRows(s))
t.Run("SubTestStmtSummaryIssue35340", SubTestStmtSummaryIssue35340(s))
t.Run("SubTestSlowQueryOOM", SubTestSlowQueryOOM(s))
}

Expand Down Expand Up @@ -493,6 +495,36 @@ func SubTestStmtSummaryResultRows(s *clusterTablesSuite) func(t *testing.T) {
}
}

func SubTestStmtSummaryIssue35340(s *clusterTablesSuite) func(t *testing.T) {
return func(t *testing.T) {
tk := s.newTestKitWithRoot(t)
tk.MustExec("set global tidb_stmt_summary_refresh_interval=1800")
tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 3000")
for i := 0; i < 100; i++ {
user := "user" + strconv.Itoa(i)
tk.MustExec(fmt.Sprintf("create user '%v'@'localhost'", user))
}
tk.MustExec("flush privileges")
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
tk := s.newTestKitWithRoot(t)
for j := 0; j < 100; j++ {
user := "user" + strconv.Itoa(j)
require.True(t, tk.Session().Auth(&auth.UserIdentity{
Username: user,
Hostname: "localhost",
}, nil, nil))
tk.MustQuery("select count(*) from information_schema.statements_summary;")
}
}()
}
wg.Wait()
}
}

func SubTestSlowQueryOOM(s *clusterTablesSuite) func(t *testing.T) {
return func(t *testing.T) {
var clean func()
Expand Down
28 changes: 16 additions & 12 deletions util/stmtsummary/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,7 @@ func (ssr *stmtSummaryReader) getStmtByDigestRow(ssbd *stmtSummaryByDigest, begi

// `ssElement` is lazy expired, so expired elements could also be read.
// `beginTime` won't change since `ssElement` is created, so locking is not needed here.
isAuthed := true
if ssr.user != nil && !ssr.hasProcessPriv && ssElement != nil {
_, isAuthed = ssElement.authUsers[ssr.user.Username]
}
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval || !isAuthed {
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval {
return nil
}
return ssr.getStmtByDigestElementRow(ssElement, ssbd)
Expand All @@ -151,6 +147,14 @@ func (ssr *stmtSummaryReader) getStmtByDigestRow(ssbd *stmtSummaryByDigest, begi
func (ssr *stmtSummaryReader) getStmtByDigestElementRow(ssElement *stmtSummaryByDigestElement, ssbd *stmtSummaryByDigest) []types.Datum {
ssElement.Lock()
defer ssElement.Unlock()
isAuthed := true
if ssr.user != nil && !ssr.hasProcessPriv {
_, isAuthed = ssElement.authUsers[ssr.user.Username]
}
if !isAuthed {
return nil
}

datums := make([]types.Datum, len(ssr.columnValueFactories))
for i, factory := range ssr.columnValueFactories {
datums[i] = types.NewDatum(factory(ssElement, ssbd))
Expand All @@ -164,12 +168,9 @@ func (ssr *stmtSummaryReader) getStmtByDigestHistoryRow(ssbd *stmtSummaryByDiges

rows := make([][]types.Datum, 0, len(ssElements))
for _, ssElement := range ssElements {
isAuthed := true
if ssr.user != nil && !ssr.hasProcessPriv {
_, isAuthed = ssElement.authUsers[ssr.user.Username]
}
if isAuthed {
rows = append(rows, ssr.getStmtByDigestElementRow(ssElement, ssbd))
record := ssr.getStmtByDigestElementRow(ssElement, ssbd)
if record != nil {
rows = append(rows, record)
}
}
return rows
Expand Down Expand Up @@ -200,7 +201,10 @@ func (ssr *stmtSummaryReader) getStmtEvictedOtherHistoryRow(ssbde *stmtSummaryBy

ssbd := new(stmtSummaryByDigest)
for _, seElement := range seElements {
rows = append(rows, ssr.getStmtByDigestElementRow(seElement.otherSummary, ssbd))
record := ssr.getStmtByDigestElementRow(seElement.otherSummary, ssbd)
if record != nil {
rows = append(rows, record)
}
}
return rows
}
Expand Down

0 comments on commit 60be3f4

Please sign in to comment.