Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Util: Fix global tracker mutex contention (#16298) #17234

Merged
merged 3 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
a.logAudit()
// Detach the disk tracker from GlobalDiskUsageTracker after every execution
if stmtCtx := a.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil && stmtCtx.DiskTracker != nil {
stmtCtx.DiskTracker.Detach()
stmtCtx.DiskTracker.DetachFromGlobalTracker()
}
}

Expand Down
6 changes: 3 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ const (
)

func init() {
GlobalDiskUsageTracker = disk.NewTracker(stringutil.StringerStr("GlobalStorageLabel"), -1)
GlobalDiskUsageTracker = disk.NewGlobalTrcaker(stringutil.StringerStr("GlobalStorageLabel"), -1)
action := &globalPanicOnExceed{}
GlobalDiskUsageTracker.SetActionOnExceed(action)
}
Expand Down Expand Up @@ -1556,7 +1556,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars := ctx.GetSessionVars()
// Detach the disk tracker for the previous stmtctx from GlobalDiskUsageTracker
if vars.StmtCtx != nil && vars.StmtCtx.DiskTracker != nil {
vars.StmtCtx.DiskTracker.Detach()
vars.StmtCtx.DiskTracker.DetachFromGlobalTracker()
}
sc := &stmtctx.StatementContext{
TimeZone: vars.Location(),
Expand All @@ -1565,7 +1565,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
TaskID: stmtctx.AllocateTaskID(),
}
if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
sc.DiskTracker.AttachTo(GlobalDiskUsageTracker)
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
Expand Down
3 changes: 3 additions & 0 deletions util/disk/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ type Tracker = memory.Tracker
// 1. "label" is the label used in the usage string.
// 2. "bytesLimit <= 0" means no limit.
var NewTracker = memory.NewTracker

// NewGlobalTrcaker creates a global disk tracker.
var NewGlobalTrcaker = memory.NewGlobalTracker
53 changes: 52 additions & 1 deletion util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ import (
type Tracker struct {
mu struct {
sync.Mutex
children []*Tracker // The children memory trackers
// The children memory trackers. If the Tracker is the Global Tracker, like executor.GlobalDiskUsageTracker,
// we wouldn't maintain its children in order to avoiding mutex contention.
children []*Tracker
}
actionMu struct {
sync.Mutex
Expand All @@ -52,17 +54,31 @@ type Tracker struct {
bytesLimit int64 // bytesLimit <= 0 means no limit.
maxConsumed int64 // max number of bytes consumed during execution.
parent *Tracker // The parent memory tracker.
isGlobal bool // isGlobal indicates whether this tracker is global tracker
}

// NewTracker creates a memory tracker.
// 1. "label" is the label used in the usage string.
// 2. "bytesLimit <= 0" means no limit.
// For the common tracker, isGlobal is default as false
func NewTracker(label fmt.Stringer, bytesLimit int64) *Tracker {
t := &Tracker{
label: label,
bytesLimit: bytesLimit,
}
t.actionMu.actionOnExceed = &LogOnExceed{}
t.isGlobal = false
return t
}

// NewGlobalTracker creates a global tracker, its isGlobal is default as true
func NewGlobalTracker(label fmt.Stringer, bytesLimit int64) *Tracker {
t := &Tracker{
label: label,
bytesLimit: bytesLimit,
}
t.actionMu.actionOnExceed = &LogOnExceed{}
t.isGlobal = true
return t
}

Expand Down Expand Up @@ -275,3 +291,38 @@ func (t *Tracker) BytesToString(numBytes int64) string {

return fmt.Sprintf("%v Bytes", numBytes)
}

// AttachToGlobalTracker attach the tracker to the global tracker
// AttachToGlobalTracker should be called at the initialization for the session executor's tracker
func (t *Tracker) AttachToGlobalTracker(globalTracker *Tracker) {
if globalTracker == nil {
return
}
if !globalTracker.isGlobal {
panic("Attach to a non-GlobalTracker")
}
if t.parent != nil {
if t.parent.isGlobal {
t.parent.Consume(-t.BytesConsumed())
} else {
t.parent.remove(t)
}
}
t.parent = globalTracker
t.parent.Consume(t.BytesConsumed())
}

// DetachFromGlobalTracker detach itself from its parent
// Note that only the parent of this tracker is Global Tracker could call this function
// Otherwise it should use Detach
func (t *Tracker) DetachFromGlobalTracker() {
if t.parent == nil {
return
}
if !t.parent.isGlobal {
panic("Detach from a non-GlobalTracker")
}
parent := t.parent
parent.Consume(-t.BytesConsumed())
t.parent = nil
}
49 changes: 49 additions & 0 deletions util/memory/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,55 @@ func (s *testSuite) TestMaxConsumed(c *C) {
}
}

func (s *testSuite) TestGlobalTracker(c *C) {
r := NewGlobalTracker(stringutil.StringerStr("root"), -1)
c1 := NewTracker(stringutil.StringerStr("child 1"), -1)
c2 := NewTracker(stringutil.StringerStr("child 2"), -1)
c1.Consume(100)
c2.Consume(200)

c1.AttachToGlobalTracker(r)
c2.AttachToGlobalTracker(r)
c.Assert(r.BytesConsumed(), Equals, int64(300))
c.Assert(c1.parent, DeepEquals, r)
c.Assert(c2.parent, DeepEquals, r)
c.Assert(len(r.mu.children), Equals, 0)

c1.DetachFromGlobalTracker()
c2.DetachFromGlobalTracker()
c.Assert(r.BytesConsumed(), Equals, int64(0))
c.Assert(c1.parent, IsNil)
c.Assert(c2.parent, IsNil)
c.Assert(len(r.mu.children), Equals, 0)

defer func() {
v := recover()
c.Assert(v, Equals, "Attach to a non-GlobalTracker")
}()
commonTracker := NewTracker(stringutil.StringerStr("common"), -1)
c1.AttachToGlobalTracker(commonTracker)

c1.AttachTo(commonTracker)
c.Assert(commonTracker.BytesConsumed(), Equals, int64(100))
c.Assert(len(commonTracker.mu.children), Equals, 1)
c.Assert(c1.parent, DeepEquals, commonTracker)

c1.AttachToGlobalTracker(r)
c.Assert(commonTracker.BytesConsumed(), Equals, int64(0))
c.Assert(len(commonTracker.mu.children), Equals, 0)
c.Assert(r.BytesConsumed(), Equals, int64(100))
c.Assert(c1.parent, DeepEquals, r)
c.Assert(len(r.mu.children), Equals, 0)

defer func() {
v := recover()
c.Assert(v, Equals, "Detach from a non-GlobalTracker")
}()
c2.AttachTo(commonTracker)
c2.DetachFromGlobalTracker()

}

func BenchmarkConsume(b *testing.B) {
tracker := NewTracker(stringutil.StringerStr("root"), -1)
b.RunParallel(func(pb *testing.PB) {
Expand Down