Skip to content

Commit

Permalink
config: support temporary storage usage limitation (pingcap#15700)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored and sre-bot committed Apr 8, 2020
1 parent 251a3cc commit 76a93cb
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 15 deletions.
49 changes: 34 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sys/storage"
tracing "github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -67,21 +68,24 @@ var (

// Config contains configuration options.
type Config struct {
Host string `toml:"host" json:"host"`
AdvertiseAddress string `toml:"advertise-address" json:"advertise-address"`
Port uint `toml:"port" json:"port"`
Cors string `toml:"cors" json:"cors"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"`
TempStoragePath string `toml:"tmp-storage-path" json:"tmp-storage-path"`
OOMAction string `toml:"oom-action" json:"oom-action"`
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
Host string `toml:"host" json:"host"`
AdvertiseAddress string `toml:"advertise-address" json:"advertise-address"`
Port uint `toml:"port" json:"port"`
Cors string `toml:"cors" json:"cors"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"`
TempStoragePath string `toml:"tmp-storage-path" json:"tmp-storage-path"`
OOMAction string `toml:"oom-action" json:"oom-action"`
MemQuotaQuery int64 `toml:"mem-quota-query" json:"mem-quota-query"`
// TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled
// If the quota exceed the capacity of the TempStoragePath, the tidb-server would exit with fatal error
TempStorageQuota int64 `toml:"temp-storage-quota" json:"temp-storage-quota"` // Bytes
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
EnableBatchDML bool `toml:"enable-batch-dml" json:"enable-batch-dml"`
TxnLocalLatches TxnLocalLatches `toml:"-" json:"-"`
Expand Down Expand Up @@ -532,6 +536,7 @@ var defaultConf = Config{
Lease: "45s",
TokenLimit: 1000,
OOMUseTmpStorage: true,
TempStorageQuota: -1,
TempStoragePath: tempStorageDirName,
OOMAction: OOMActionCancel,
MemQuotaQuery: 1 << 30,
Expand Down Expand Up @@ -863,6 +868,20 @@ func (c *Config) Valid() error {
}
}

// check capacity and the quota when OOMUseTmpStorage is enabled
if c.OOMUseTmpStorage {
if c.TempStorageQuota < 0 {
// means unlimited, do nothing
} else {
capacityByte, err := storage.GetTargetDirectoryCapacity(c.TempStoragePath)
if err != nil {
return err
} else if capacityByte > uint64(c.TempStorageQuota) {
return fmt.Errorf("value of [temp-storage-quota](%d byte) exceeds the capacity(%d byte) of the [%s] directory", c.TempStorageQuota, capacityByte, c.TempStoragePath)
}
}
}

// test log level
l := zap.NewAtomicLevel()
return l.UnmarshalText([]byte(c.Log.Level))
Expand Down
5 changes: 5 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ oom-use-tmp-storage = true
# It only takes effect when `oom-use-tmp-storage` is `true`.
# tmp-storage-path = "/tmp/tidb/NDAwMC8xMDA4MA==/tmp-storage"

# Specifies the maximum use of temporary storage (bytes) for all active queries when `oom-use-tmp-storage` is enabled.
# If the `temp-storage-quota` exceeds the capacity of the temporary storage directory, tidb-server would return an error and exit.
# The default value of temp-storage-quota is under 0 which means tidb-server wouldn't check the capacity.
temp-storage-quota = -1

# Specifies what operation TiDB performs when a single SQL statement exceeds the memory quota specified by mem-quota-query and cannot be spilled over to disk.
# Valid options: ["log", "cancel"]
oom-action = "cancel"
Expand Down
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (a *recordSet) Close() error {
pps := types.CloneRow(sessVars.PreparedParams)
sessVars.PrevStmt = FormatSQL(a.stmt.OriginText(), pps)
a.stmt.logAudit()
// Detach the disk tracker from GlobalDiskUsageTracker after every execution
if stmtCtx := a.stmt.Ctx.GetSessionVars().StmtCtx; stmtCtx != nil && stmtCtx.DiskTracker != nil {
stmtCtx.DiskTracker.Detach()
}
return err
}

Expand Down
39 changes: 39 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ var (
_ Executor = &TableScanExec{}
_ Executor = &TopNExec{}
_ Executor = &UnionExec{}

// GlobalDiskUsageTracker is the ancestor of all the Executors' disk tracker
GlobalDiskUsageTracker *disk.Tracker
)

type baseExecutor struct {
Expand All @@ -98,6 +101,35 @@ type baseExecutor struct {
runtimeStats *execdetails.RuntimeStats
}

// globalPanicOnExceed panics when GlobalDisTracker storage usage exceeds storage quota.
type globalPanicOnExceed struct {
mutex sync.Mutex // For synchronization.
}

// SetLogHook sets a hook for PanicOnExceed.
func (a *globalPanicOnExceed) SetLogHook(hook func(uint64)) {}

// Action panics when storage usage exceeds storage quota.
func (a *globalPanicOnExceed) Action(t *memory.Tracker) {
a.mutex.Lock()
defer a.mutex.Unlock()
panic(globalPanicStorageExceed)
}

// SetFallback sets a fallback action.
func (a *globalPanicOnExceed) SetFallback(memory.ActionOnExceed) {}

const (
// globalPanicStorageExceed represents the panic message when out of storage quota.
globalPanicStorageExceed string = "Out Of Global Storage Quota!"
)

func init() {
GlobalDiskUsageTracker = disk.NewTracker(stringutil.StringerStr("GlobalStorageLabel"), -1)
action := &globalPanicOnExceed{}
GlobalDiskUsageTracker.SetActionOnExceed(action)
}

// base returns the baseExecutor of an executor, don't override this method!
func (e *baseExecutor) base() *baseExecutor {
return e
Expand Down Expand Up @@ -1522,11 +1554,18 @@ func (e *UnionExec) Close() error {
// Before every execution, we must clear statement context.
func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars := ctx.GetSessionVars()
// Detach the disk tracker for the previous stmtctx from GlobalDiskUsageTracker
if vars.StmtCtx != nil && vars.StmtCtx.DiskTracker != nil {
vars.StmtCtx.DiskTracker.Detach()
}
sc := &stmtctx.StatementContext{
TimeZone: vars.Location(),
MemTracker: memory.NewTracker(stringutil.MemoizeStr(s.Text), vars.MemQuotaQuery),
DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1),
}
if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
sc.DiskTracker.AttachTo(GlobalDiskUsageTracker)
}
switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
Expand Down
2 changes: 2 additions & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -571,6 +572,7 @@ func setGlobalVars() {
tikv.RegionCacheTTLSec = int64(cfg.TiKVClient.RegionCacheTTL)
domainutil.RepairInfo.SetRepairMode(cfg.RepairMode)
domainutil.RepairInfo.SetRepairTableList(cfg.RepairTableList)
executor.GlobalDiskUsageTracker.SetBytesLimit(config.GetGlobalConfig().TempStorageQuota)
}

func setupLog() {
Expand Down
11 changes: 11 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ func (t *Tracker) AttachTo(parent *Tracker) {
t.parent.Consume(t.BytesConsumed())
}

// Detach de-attach the tracker child from its parent, then set its parent property as nil
func (t *Tracker) Detach() {
if t.parent == nil {
return
}
t.parent.remove(t)
t.mu.Lock()
defer t.mu.Unlock()
t.parent = nil
}

func (t *Tracker) remove(oldChild *Tracker) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
17 changes: 17 additions & 0 deletions util/memory/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,23 @@ func (s *testSuite) TestAttachTo(c *C) {
c.Assert(len(oldParent.mu.children), Equals, 0)
}

func (s *testSuite) TestDetach(c *C) {
parent := NewTracker(stringutil.StringerStr("parent"), -1)
child := NewTracker(stringutil.StringerStr("child"), -1)
child.Consume(100)
child.AttachTo(parent)
c.Assert(child.BytesConsumed(), Equals, int64(100))
c.Assert(parent.BytesConsumed(), Equals, int64(100))
c.Assert(len(parent.mu.children), Equals, 1)
c.Assert(parent.mu.children[0], DeepEquals, child)

child.Detach()
c.Assert(child.BytesConsumed(), Equals, int64(100))
c.Assert(parent.BytesConsumed(), Equals, int64(0))
c.Assert(len(parent.mu.children), Equals, 0)
c.Assert(child.parent, IsNil)
}

func (s *testSuite) TestReplaceChild(c *C) {
oldChild := NewTracker(stringutil.StringerStr("old child"), -1)
oldChild.Consume(100)
Expand Down
23 changes: 23 additions & 0 deletions util/sys/storage/sys_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !linux,!windows,!darwin

package storage

import "math"

// GetTargetDirectoryCapacity get the capacity (bytes) of directory
func GetTargetDirectoryCapacity(path string) (uint64, error) {
return math.MaxInt64, nil
}
29 changes: 29 additions & 0 deletions util/sys/storage/sys_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// +build linux darwin

package storage

import "syscall"

// GetTargetDirectoryCapacity get the capacity (bytes) of directory
func GetTargetDirectoryCapacity(path string) (uint64, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(path, &stat)
if err != nil {
return 0, err
}
c := stat.Bavail * uint64(stat.Bsize)
return c, nil
}
36 changes: 36 additions & 0 deletions util/sys/storage/sys_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package storage_test

import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/util/sys/storage"
)

func TestT(t *testing.T) {
TestingT(t)
}

func TestGetTargetDirectoryCapacity(t *testing.T) {
r, err := storage.GetTargetDirectoryCapacity(".")
if err != nil {
t.Fatal(t)
}
if r < 1 {
t.Fatalf("couldn't get capacity")
}
//TODO: check the value of r with `df` in linux
}
34 changes: 34 additions & 0 deletions util/sys/storage/sys_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// +build windows

package storage

import (
"syscall"
"unsafe"
)

// GetTargetDirectoryCapacity get the capacity (bytes) of directory
func GetTargetDirectoryCapacity(path string) (uint64, error) {
h := syscall.MustLoadDLL("kernel32.dll")
c := h.MustFindProc("GetDiskFreeSpaceExW")
var freeBytes int64
_, _, err := c.Call(uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(path))),
uintptr(unsafe.Pointer(&freeBytes)))
if err != nil {
return 0, err
}
return uint64(freeBytes), nil
}

0 comments on commit 76a93cb

Please sign in to comment.