Skip to content

Commit

Permalink
stmtsummary: fix issue of concurrent map read and write (#35367) (#35381
Browse files Browse the repository at this point in the history
)

close #35340
  • Loading branch information
ti-srebot authored Sep 19, 2022
1 parent 12f54b9 commit 3aa7f7e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
30 changes: 30 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"net/http/httptest"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -1496,3 +1498,31 @@ func (s *testTableSuite) TestInfoschemaClientErrors(c *C) {
err = tk.ExecToErr("FLUSH CLIENT_ERRORS_SUMMARY")
c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the RELOAD privilege(s) for this operation")
}

func (s *testTableSuite) TestStmtSummaryIssue35340(c *C) {
tk := s.newTestKitWithRoot(c)
tk.MustExec("set global tidb_stmt_summary_refresh_interval=1800")
tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 3000")
for i := 0; i < 100; i++ {
user := "user" + strconv.Itoa(i)
tk.MustExec(fmt.Sprintf("create user '%v'@'localhost'", user))
}
tk.MustExec("flush privileges")
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
tk := s.newTestKitWithRoot(c)
for j := 0; j < 100; j++ {
user := "user" + strconv.Itoa(j)
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: user,
Hostname: "localhost",
}, nil, nil), IsTrue)
tk.MustQuery("select count(*) from information_schema.statements_summary;")
}
}()
}
wg.Wait()
}
27 changes: 14 additions & 13 deletions util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,10 @@ func (ssbd *stmtSummaryByDigest) toCurrentDatum(beginTimeForCurInterval int64, u

// `ssElement` is lazy expired, so expired elements could also be read.
// `beginTime` won't change since `ssElement` is created, so locking is not needed here.
isAuthed := true
if user != nil && !isSuper {
_, isAuthed = ssElement.authUsers[user.Username]
}
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval || !isAuthed {
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval {
return nil
}
return ssElement.toDatum(ssbd)
return ssElement.toDatum(ssbd, user, isSuper)
}

func (ssbd *stmtSummaryByDigest) toHistoryDatum(historySize int, user *auth.UserIdentity, isSuper bool) [][]types.Datum {
Expand All @@ -588,12 +584,9 @@ func (ssbd *stmtSummaryByDigest) toHistoryDatum(historySize int, user *auth.User

rows := make([][]types.Datum, 0, len(ssElements))
for _, ssElement := range ssElements {
isAuthed := true
if user != nil && !isSuper {
_, isAuthed = ssElement.authUsers[user.Username]
}
if isAuthed {
rows = append(rows, ssElement.toDatum(ssbd))
row := ssElement.toDatum(ssbd, user, isSuper)
if len(row) > 0 {
rows = append(rows, row)
}
}
return rows
Expand Down Expand Up @@ -854,10 +847,18 @@ func (ssElement *stmtSummaryByDigestElement) add(sei *StmtExecInfo, intervalSeco
ssElement.sumWriteSQLRespTotal += sei.StmtExecDetails.WriteSQLRespDuration
}

func (ssElement *stmtSummaryByDigestElement) toDatum(ssbd *stmtSummaryByDigest) []types.Datum {
func (ssElement *stmtSummaryByDigestElement) toDatum(ssbd *stmtSummaryByDigest, user *auth.UserIdentity, isSuper bool) []types.Datum {
ssElement.Lock()
defer ssElement.Unlock()

isAuthed := true
if user != nil && !isSuper {
_, isAuthed = ssElement.authUsers[user.Username]
}
if !isAuthed {
return nil
}

plan, err := plancodec.DecodePlan(ssElement.samplePlan)
if err != nil {
logutil.BgLogger().Error("decode plan in statement summary failed", zap.String("plan", ssElement.samplePlan), zap.String("query", ssElement.sampleSQL), zap.Error(err))
Expand Down

0 comments on commit 3aa7f7e

Please sign in to comment.