Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: add mutex for Handle.globalMap and Handle.feedback #30550

Merged
merged 3 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,25 @@ func (m *QueryFeedbackMap) append(k QueryFeedbackKey, qs []*QueryFeedback) bool
return true
}

// SiftFeedbacks eliminates feedbacks which are overlapped with others. It is a tradeoff between
// feedback accuracy and its overhead.
func (m *QueryFeedbackMap) SiftFeedbacks() {
sc := &stmtctx.StatementContext{TimeZone: time.UTC}
for k, qs := range m.Feedbacks {
fbs := make([]Feedback, 0, len(qs)*2)
for _, q := range qs {
fbs = append(fbs, q.Feedback...)
}
if len(fbs) == 0 {
delete(m.Feedbacks, k)
continue
}
m.Feedbacks[k] = m.Feedbacks[k][:1]
m.Feedbacks[k][0].Feedback, _ = NonOverlappedFeedbacks(sc, fbs)
}
m.Size = len(m.Feedbacks)
}

// Merge combines 2 collections of feedbacks.
func (m *QueryFeedbackMap) Merge(r *QueryFeedbackMap) {
for k, qs := range r.Feedbacks {
Expand Down
29 changes: 21 additions & 8 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,15 @@ type Handle struct {
// listHead contains all the stats collector required by session.
listHead *SessionStatsCollector
// globalMap contains all the delta map from collectors when we dump them to KV.
globalMap tableDeltaMap
globalMap struct {
sync.Mutex
data tableDeltaMap
}
// feedback is used to store query feedback info.
feedback *statistics.QueryFeedbackMap
feedback struct {
sync.Mutex
data *statistics.QueryFeedbackMap
}

lease atomic2.Duration

Expand Down Expand Up @@ -159,6 +165,7 @@ func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string,

// Clear the statsCache, only for test.
func (h *Handle) Clear() {
// TODO: Here h.mu seems to protect all the fields of Handle. Is is reasonable?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this pr, the h.mu doesn't protect all the fields of the Handle, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, h.mu never protects all the fields of the Handle. It is only responsible for the fields in its own struct(ctx, rateMap, pid2tid and schemaVersion). h.statsCache has its own mutex and is not protected by h.mu.

h.mu.Lock()
h.statsCache.Lock()
h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
Expand All @@ -167,13 +174,17 @@ func (h *Handle) Clear() {
for len(h.ddlEventCh) > 0 {
<-h.ddlEventCh
}
h.feedback = statistics.NewQueryFeedbackMap()
h.feedback.Lock()
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
h.mu.ctx.GetSessionVars().InitChunkSize = 1
h.mu.ctx.GetSessionVars().MaxChunkSize = 1
h.mu.ctx.GetSessionVars().EnableChunkRPC = false
h.mu.ctx.GetSessionVars().SetProjectionConcurrency(0)
h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}
h.globalMap = make(tableDeltaMap)
h.globalMap.Lock()
h.globalMap.data = make(tableDeltaMap)
h.globalMap.Unlock()
h.mu.rateMap = make(errorRateDeltaMap)
h.mu.Unlock()
}
Expand All @@ -188,8 +199,6 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*
handle := &Handle{
ddlEventCh: make(chan *util.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
globalMap: make(tableDeltaMap),
feedback: statistics.NewQueryFeedbackMap(),
idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)},
pool: pool,
}
Expand All @@ -199,6 +208,8 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*
handle.mu.ctx = ctx
handle.mu.rateMap = make(errorRateDeltaMap)
handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
handle.globalMap.data = make(tableDeltaMap)
handle.feedback.data = statistics.NewQueryFeedbackMap()
err := handle.RefreshVars()
if err != nil {
return nil, err
Expand All @@ -218,10 +229,12 @@ func (h *Handle) SetLease(lease time.Duration) {

// GetQueryFeedback gets the query feedback. It is only used in test.
func (h *Handle) GetQueryFeedback() *statistics.QueryFeedbackMap {
h.feedback.Lock()
defer func() {
h.feedback = statistics.NewQueryFeedbackMap()
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
}()
return h.feedback
return h.feedback.data
}

// DurationToTS converts duration to timestamp.
Expand Down
82 changes: 46 additions & 36 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[i
m[id] = item
}

func (m tableDeltaMap) merge(deltaMap tableDeltaMap) {
for id, item := range deltaMap {
m.update(id, item.Delta, item.Count, &item.ColSize)
}
}

type errorRateDelta struct {
PkID int64
PkErrorRate *statistics.ErrorRate
Expand Down Expand Up @@ -125,14 +131,12 @@ func (m errorRateDeltaMap) clear(tableID int64, histID int64, isIndex bool) {
m[tableID] = item
}

func (h *Handle) merge(s *SessionStatsCollector, rateMap errorRateDeltaMap) {
for id, item := range s.mapper {
h.globalMap.update(id, item.Delta, item.Count, &item.ColSize)
}
func merge(s *SessionStatsCollector, deltaMap tableDeltaMap, rateMap errorRateDeltaMap, feedback *statistics.QueryFeedbackMap) {
deltaMap.merge(s.mapper)
s.mapper = make(tableDeltaMap)
rateMap.merge(s.rateMap)
s.rateMap = make(errorRateDeltaMap)
h.feedback.Merge(s.feedback)
feedback.Merge(s.feedback)
s.feedback = statistics.NewQueryFeedbackMap()
}

Expand Down Expand Up @@ -375,13 +379,15 @@ const (
// sweepList will loop over the list, merge each session's local stats into handle
// and remove closed session's collector.
func (h *Handle) sweepList() {
deltaMap := make(tableDeltaMap)
errorRateMap := make(errorRateDeltaMap)
feedback := statistics.NewQueryFeedbackMap()
prev := h.listHead
prev.Lock()
errorRateMap := make(errorRateDeltaMap)
for curr := prev.next; curr != nil; curr = curr.next {
curr.Lock()
// Merge the session stats into handle and error rate map.
h.merge(curr, errorRateMap)
// Merge the session stats into deltaMap, errorRateMap and feedback respectively.
merge(curr, deltaMap, errorRateMap, feedback)
if curr.deleted {
prev.next = curr.next
// Since the session is already closed, we can safely unlock it here.
Expand All @@ -393,37 +399,34 @@ func (h *Handle) sweepList() {
}
}
prev.Unlock()
h.globalMap.Lock()
h.globalMap.data.merge(deltaMap)
h.globalMap.Unlock()
h.mu.Lock()
h.mu.rateMap.merge(errorRateMap)
h.mu.Unlock()
h.siftFeedbacks()
}

// siftFeedbacks eliminates feedbacks which are overlapped with others. It is a tradeoff between
// feedback accuracy and its overhead.
func (h *Handle) siftFeedbacks() {
sc := &stmtctx.StatementContext{TimeZone: time.UTC}
for k, qs := range h.feedback.Feedbacks {
fbs := make([]statistics.Feedback, 0, len(qs)*2)
for _, q := range qs {
fbs = append(fbs, q.Feedback...)
}
if len(fbs) == 0 {
delete(h.feedback.Feedbacks, k)
continue
}
h.feedback.Feedbacks[k] = h.feedback.Feedbacks[k][:1]
h.feedback.Feedbacks[k][0].Feedback, _ = statistics.NonOverlappedFeedbacks(sc, fbs)
}
h.feedback.Size = len(h.feedback.Feedbacks)
h.feedback.Lock()
h.feedback.data.Merge(feedback)
h.feedback.data.SiftFeedbacks()
h.feedback.Unlock()
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
h.sweepList()
h.globalMap.Lock()
deltaMap := h.globalMap.data
h.globalMap.data = make(tableDeltaMap)
h.globalMap.Unlock()
defer func() {
h.globalMap.Lock()
deltaMap.merge(h.globalMap.data)
h.globalMap.data = deltaMap
h.globalMap.Unlock()
}()
currentTime := time.Now()
for id, item := range h.globalMap {
for id, item := range deltaMap {
if mode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) {
continue
}
Expand All @@ -432,17 +435,17 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
return errors.Trace(err)
}
if updated {
h.globalMap.update(id, -item.Delta, -item.Count, nil)
deltaMap.update(id, -item.Delta, -item.Count, nil)
}
if err = h.dumpTableStatColSizeToKV(id, item); err != nil {
return errors.Trace(err)
}
if updated {
delete(h.globalMap, id)
delete(deltaMap, id)
} else {
m := h.globalMap[id]
m := deltaMap[id]
m.ColSize = nil
h.globalMap[id] = m
deltaMap[id] = m
}
}
return nil
Expand Down Expand Up @@ -522,8 +525,12 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e

// DumpStatsFeedbackToKV dumps the stats feedback to KV.
func (h *Handle) DumpStatsFeedbackToKV() error {
h.feedback.Lock()
feedback := h.feedback.data
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
var err error
for _, fbs := range h.feedback.Feedbacks {
for _, fbs := range feedback.Feedbacks {
for _, fb := range fbs {
if fb.Tp == statistics.PkType {
err = h.DumpFeedbackToKV(fb)
Expand All @@ -548,7 +555,6 @@ func (h *Handle) DumpStatsFeedbackToKV() error {
}
}
}
h.feedback = statistics.NewQueryFeedbackMap()
return errors.Trace(err)
}

Expand Down Expand Up @@ -581,8 +587,12 @@ func (h *Handle) DumpFeedbackToKV(fb *statistics.QueryFeedback) error {
// feedback locally on this tidb-server, so it could be used more timely.
func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) {
h.sweepList()
h.feedback.Lock()
feedback := h.feedback.data
h.feedback.data = statistics.NewQueryFeedbackMap()
h.feedback.Unlock()
OUTER:
for _, fbs := range h.feedback.Feedbacks {
for _, fbs := range feedback.Feedbacks {
for _, fb := range fbs {
h.mu.Lock()
table, ok := h.getTableByPhysicalID(is, fb.PhysicalID)
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func TestInsertAndDelete(t *testing.T) {
t.Parallel()
h := Handle{
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)},
feedback: statistics.NewQueryFeedbackMap(),
}
h.feedback.data = statistics.NewQueryFeedbackMap()
var items []*SessionStatsCollector
for i := 0; i < 5; i++ {
items = append(items, h.NewSessionStatsCollector())
Expand Down