Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stmtsummary: fix issue of concurrent map read and write (#35367) #35381

Merged
merged 4 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"net/http/httptest"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -1496,3 +1498,31 @@ func (s *testTableSuite) TestInfoschemaClientErrors(c *C) {
err = tk.ExecToErr("FLUSH CLIENT_ERRORS_SUMMARY")
c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the RELOAD privilege(s) for this operation")
}

func (s *testTableSuite) TestStmtSummaryIssue35340(c *C) {
tk := s.newTestKitWithRoot(c)
tk.MustExec("set global tidb_stmt_summary_refresh_interval=1800")
tk.MustExec("set global tidb_stmt_summary_max_stmt_count = 3000")
for i := 0; i < 100; i++ {
user := "user" + strconv.Itoa(i)
tk.MustExec(fmt.Sprintf("create user '%v'@'localhost'", user))
}
tk.MustExec("flush privileges")
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
tk := s.newTestKitWithRoot(c)
for j := 0; j < 100; j++ {
user := "user" + strconv.Itoa(j)
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: user,
Hostname: "localhost",
}, nil, nil), IsTrue)
tk.MustQuery("select count(*) from information_schema.statements_summary;")
}
}()
}
wg.Wait()
}
27 changes: 14 additions & 13 deletions util/stmtsummary/statement_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,10 @@ func (ssbd *stmtSummaryByDigest) toCurrentDatum(beginTimeForCurInterval int64, u

// `ssElement` is lazy expired, so expired elements could also be read.
// `beginTime` won't change since `ssElement` is created, so locking is not needed here.
isAuthed := true
if user != nil && !isSuper {
_, isAuthed = ssElement.authUsers[user.Username]
}
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval || !isAuthed {
if ssElement == nil || ssElement.beginTime < beginTimeForCurInterval {
return nil
}
return ssElement.toDatum(ssbd)
return ssElement.toDatum(ssbd, user, isSuper)
}

func (ssbd *stmtSummaryByDigest) toHistoryDatum(historySize int, user *auth.UserIdentity, isSuper bool) [][]types.Datum {
Expand All @@ -588,12 +584,9 @@ func (ssbd *stmtSummaryByDigest) toHistoryDatum(historySize int, user *auth.User

rows := make([][]types.Datum, 0, len(ssElements))
for _, ssElement := range ssElements {
isAuthed := true
if user != nil && !isSuper {
_, isAuthed = ssElement.authUsers[user.Username]
}
if isAuthed {
rows = append(rows, ssElement.toDatum(ssbd))
row := ssElement.toDatum(ssbd, user, isSuper)
if len(row) > 0 {
rows = append(rows, row)
}
}
return rows
Expand Down Expand Up @@ -854,10 +847,18 @@ func (ssElement *stmtSummaryByDigestElement) add(sei *StmtExecInfo, intervalSeco
ssElement.sumWriteSQLRespTotal += sei.StmtExecDetails.WriteSQLRespDuration
}

func (ssElement *stmtSummaryByDigestElement) toDatum(ssbd *stmtSummaryByDigest) []types.Datum {
func (ssElement *stmtSummaryByDigestElement) toDatum(ssbd *stmtSummaryByDigest, user *auth.UserIdentity, isSuper bool) []types.Datum {
ssElement.Lock()
defer ssElement.Unlock()

isAuthed := true
if user != nil && !isSuper {
_, isAuthed = ssElement.authUsers[user.Username]
}
if !isAuthed {
return nil
}

plan, err := plancodec.DecodePlan(ssElement.samplePlan)
if err != nil {
logutil.BgLogger().Error("decode plan in statement summary failed", zap.String("plan", ssElement.samplePlan), zap.String("query", ssElement.sampleSQL), zap.Error(err))
Expand Down