Skip to content

Commit

Permalink
ttl: Implement scan and delete task for TTL (#39481) (#39615)
Browse files Browse the repository at this point in the history
close #39480, close #39554
  • Loading branch information
ti-chi-bot authored Dec 19, 2022
1 parent 61ee84e commit d211731
Show file tree
Hide file tree
Showing 16 changed files with 2,265 additions and 4 deletions.
39 changes: 39 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,45 @@ var defaultSysVars = []*SysVar{
s.EnableReuseCheck = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLJobEnable, Value: BoolToOnOff(DefTiDBTTLJobEnable), Type: TypeBool, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
EnableTTLJob.Store(TiDBOptOn(s))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(EnableTTLJob.Load()), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLScanBatchSize, Value: strconv.Itoa(DefTiDBTTLScanBatchSize), Type: TypeInt, MinValue: DefTiDBTTLScanBatchMinSize, MaxValue: DefTiDBTTLScanBatchMaxSize, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLScanBatchSize.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLScanBatchSize.Load()
return strconv.FormatInt(val, 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLDeleteBatchSize, Value: strconv.Itoa(DefTiDBTTLDeleteBatchSize), Type: TypeInt, MinValue: DefTiDBTTLDeleteBatchMinSize, MaxValue: DefTiDBTTLDeleteBatchMaxSize, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLDeleteBatchSize.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLDeleteBatchSize.Load()
return strconv.FormatInt(val, 10), nil
}},
{Scope: ScopeGlobal, Name: TiDBTTLDeleteRateLimit, Value: strconv.Itoa(DefTiDBTTLDeleteRateLimit), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
val, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return err
}
TTLDeleteRateLimit.Store(val)
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
val := TTLDeleteRateLimit.Load()
return strconv.FormatInt(val, 10), nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBStoreBatchSize, Value: strconv.FormatInt(DefTiDBStoreBatchSize, 10),
Type: TypeInt, MinValue: 0, MaxValue: 25000, SetSession: func(s *SessionVars, val string) error {
Expand Down
20 changes: 20 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,14 @@ const (
TiDBGOGCTunerThreshold = "tidb_gogc_tuner_threshold"
// TiDBExternalTS is the ts to read through when the `TiDBEnableExternalTsRead` is on
TiDBExternalTS = "tidb_external_ts"
// TiDBTTLJobEnable is used to enable/disable scheduling ttl job
TiDBTTLJobEnable = "tidb_ttl_job_enable"
// TiDBTTLScanBatchSize is used to control the batch size in the SELECT statement for TTL jobs
TiDBTTLScanBatchSize = "tidb_ttl_scan_batch_size"
// TiDBTTLDeleteBatchSize is used to control the batch size in the DELETE statement for TTL jobs
TiDBTTLDeleteBatchSize = "tidb_ttl_delete_batch_size"
// TiDBTTLDeleteRateLimit is used to control the delete rate limit for TTL jobs in each node
TiDBTTLDeleteRateLimit = "tidb_ttl_delete_rate_limit"
// PasswordReuseHistory limit a few passwords to reuse.
PasswordReuseHistory = "password_history"
// PasswordReuseTime limit how long passwords can be reused.
Expand Down Expand Up @@ -1115,6 +1123,14 @@ const (
DefTiDBUseAlloc = false
DefTiDBEnablePlanReplayerCapture = false
DefTiDBIndexMergeIntersectionConcurrency = ConcurrencyUnset
DefTiDBTTLJobEnable = true
DefTiDBTTLScanBatchSize = 500
DefTiDBTTLScanBatchMaxSize = 10240
DefTiDBTTLScanBatchMinSize = 1
DefTiDBTTLDeleteBatchSize = 500
DefTiDBTTLDeleteBatchMaxSize = 10240
DefTiDBTTLDeleteBatchMinSize = 1
DefTiDBTTLDeleteRateLimit = 0
DefPasswordReuseHistory = 0
DefPasswordReuseTime = 0
DefTiDBStoreBatchSize = 0
Expand Down Expand Up @@ -1180,6 +1196,10 @@ var (
PasswordValidationMixedCaseCount = atomic.NewInt32(1)
PasswordValidtaionNumberCount = atomic.NewInt32(1)
PasswordValidationSpecialCharCount = atomic.NewInt32(1)
EnableTTLJob = atomic.NewBool(DefTiDBTTLJobEnable)
TTLScanBatchSize = atomic.NewInt64(DefTiDBTTLScanBatchSize)
TTLDeleteBatchSize = atomic.NewInt64(DefTiDBTTLDeleteBatchSize)
TTLDeleteRateLimit = atomic.NewInt64(DefTiDBTTLDeleteRateLimit)
PasswordHistory = atomic.NewInt64(DefPasswordReuseHistory)
PasswordReuseInterval = atomic.NewInt64(DefPasswordReuseTime)
IsSandBoxModeEnabled = atomic.NewBool(false)
Expand Down
7 changes: 7 additions & 0 deletions ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func getTableKeyColumns(tbl *model.TableInfo) ([]*model.ColumnInfo, []*types.Fie

// PhysicalTable is used to provide some information for a physical table in TTL job
type PhysicalTable struct {
// ID is the physical ID of the table
ID int64
// Schema is the database name of the table
Schema model.CIStr
*model.TableInfo
Expand Down Expand Up @@ -92,11 +94,13 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
return nil, err
}

var physicalID int64
var partitionDef *model.PartitionDefinition
if tbl.Partition == nil {
if partition.L != "" {
return nil, errors.Errorf("table '%s.%s' is not a partitioned table", schema, tbl.Name)
}
physicalID = tbl.ID
} else {
if partition.L == "" {
return nil, errors.Errorf("partition name is required, table '%s.%s' is a partitioned table", schema, tbl.Name)
Expand All @@ -112,9 +116,12 @@ func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, partition model.
if partitionDef == nil {
return nil, errors.Errorf("partition '%s' is not found in ttl table '%s.%s'", partition.O, schema, tbl.Name)
}

physicalID = partitionDef.ID
}

return &PhysicalTable{
ID: physicalID,
Schema: schema,
TableInfo: tbl,
Partition: partition,
Expand Down
4 changes: 3 additions & 1 deletion ttl/cache/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestNewTTLTable(t *testing.T) {
physicalTbls = append(physicalTbls, ttlTbl)
} else {
for _, partition := range tblInfo.Partition.Definitions {
ttlTbl, err := cache.NewPhysicalTable(model.NewCIStr(c.db), tblInfo, model.NewCIStr(partition.Name.O))
ttlTbl, err := cache.NewPhysicalTable(model.NewCIStr(c.db), tblInfo, partition.Name)
if c.timeCol == "" {
require.Error(t, err)
continue
Expand All @@ -131,10 +131,12 @@ func TestNewTTLTable(t *testing.T) {
require.Same(t, timeColumn, ttlTbl.TimeColumn)

if tblInfo.Partition == nil {
require.Equal(t, ttlTbl.TableInfo.ID, ttlTbl.ID)
require.Equal(t, "", ttlTbl.Partition.L)
require.Nil(t, ttlTbl.PartitionDef)
} else {
def := tblInfo.Partition.Definitions[i]
require.Equal(t, def.ID, ttlTbl.ID)
require.Equal(t, def.Name.L, ttlTbl.Partition.L)
require.Equal(t, def, *(ttlTbl.PartitionDef))
}
Expand Down
5 changes: 4 additions & 1 deletion ttl/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//kv",
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
"//sessiontxn",
"//util/chunk",
"//util/sqlexec",
Expand All @@ -22,10 +23,12 @@ go_test(
srcs = [
"main_test.go",
"session_test.go",
"sysvar_test.go",
],
embed = [":session"],
flaky = True,
deps = [
":session",
"//sessionctx/variable",
"//testkit",
"//testkit/testsetup",
"@com_github_pingcap_errors//:errors",
Expand Down
24 changes: 24 additions & 0 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
Expand All @@ -36,6 +37,8 @@ type Session interface {
ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error)
// RunInTxn executes the specified function in a txn
RunInTxn(ctx context.Context, fn func() error) (err error)
// ResetWithGlobalTimeZone resets the session time zone to global time zone
ResetWithGlobalTimeZone(ctx context.Context) error
// Close closes the session
Close()
}
Expand Down Expand Up @@ -112,6 +115,27 @@ func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
return err
}

// ResetWithGlobalTimeZone resets the session time zone to global time zone
func (s *session) ResetWithGlobalTimeZone(ctx context.Context) error {
sessVar := s.GetSessionVars()
globalTZ, err := sessVar.GetGlobalSystemVar(ctx, variable.TimeZone)
if err != nil {
return err
}

tz, err := sessVar.GetSessionOrGlobalSystemVar(ctx, variable.TimeZone)
if err != nil {
return err
}

if globalTZ == tz {
return nil
}

_, err = s.ExecuteSQL(ctx, "SET @@time_zone=@@global.time_zone")
return err
}

// Close closes the session
func (s *session) Close() {
if s.closeFn != nil {
Expand Down
17 changes: 15 additions & 2 deletions ttl/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package session
package session_test

import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/session"
"github.com/stretchr/testify/require"
)

Expand All @@ -28,7 +29,7 @@ func TestSessionRunInTxn(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int primary key, v int)")
se := NewSession(tk.Session(), tk.Session(), nil)
se := session.NewSession(tk.Session(), tk.Session(), nil)
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")

Expand All @@ -50,3 +51,15 @@ func TestSessionRunInTxn(t *testing.T) {
}))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10", "3 30"))
}

func TestSessionResetTimeZone(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.time_zone='UTC'")
tk.MustExec("set @@time_zone='Asia/Shanghai'")

se := session.NewSession(tk.Session(), tk.Session(), nil)
tk.MustQuery("select @@time_zone").Check(testkit.Rows("Asia/Shanghai"))
require.NoError(t, se.ResetWithGlobalTimeZone(context.TODO()))
tk.MustQuery("select @@time_zone").Check(testkit.Rows("UTC"))
}
125 changes: 125 additions & 0 deletions ttl/session/sysvar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package session_test

import (
"fmt"
"strconv"
"testing"

"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestSysVarTTLJobEnable(t *testing.T) {
origEnableDDL := variable.EnableTTLJob.Load()
defer func() {
variable.EnableTTLJob.Store(origEnableDDL)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_job_enable=0")
require.False(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_job_enable=1")
require.True(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("1"))

tk.MustExec("set @@global.tidb_ttl_job_enable=0")
require.False(t, variable.EnableTTLJob.Load())
tk.MustQuery("select @@global.tidb_ttl_job_enable").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_job_enable").Check(testkit.Rows("0"))
}

func TestSysVarTTLScanBatchSize(t *testing.T) {
origScanBatchSize := variable.TTLScanBatchSize.Load()
defer func() {
variable.TTLScanBatchSize.Store(origScanBatchSize)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_scan_batch_size=789")
require.Equal(t, int64(789), variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows("789"))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows("789"))

tk.MustExec("set @@global.tidb_ttl_scan_batch_size=0")
require.Equal(t, int64(1), variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows("1"))

maxVal := int64(variable.DefTiDBTTLScanBatchMaxSize)
tk.MustExec(fmt.Sprintf("set @@global.tidb_ttl_scan_batch_size=%d", maxVal+1))
require.Equal(t, maxVal, variable.TTLScanBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_scan_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
tk.MustQuery("select @@tidb_ttl_scan_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
}

func TestSysVarTTLScanDeleteBatchSize(t *testing.T) {
origScanBatchSize := variable.TTLScanBatchSize.Load()
defer func() {
variable.TTLScanBatchSize.Store(origScanBatchSize)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_ttl_delete_batch_size=789")
require.Equal(t, int64(789), variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows("789"))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows("789"))

tk.MustExec("set @@global.tidb_ttl_delete_batch_size=0")
require.Equal(t, int64(1), variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows("1"))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows("1"))

maxVal := int64(variable.DefTiDBTTLDeleteBatchMaxSize)
tk.MustExec(fmt.Sprintf("set @@global.tidb_ttl_delete_batch_size=%d", maxVal+1))
require.Equal(t, maxVal, variable.TTLDeleteBatchSize.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
tk.MustQuery("select @@tidb_ttl_delete_batch_size").Check(testkit.Rows(strconv.FormatInt(maxVal, 10)))
}

func TestSysVarTTLScanDeleteLimit(t *testing.T) {
origDeleteLimit := variable.TTLDeleteRateLimit.Load()
defer func() {
variable.TTLDeleteRateLimit.Store(origDeleteLimit)
}()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=100000")
require.Equal(t, int64(100000), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("100000"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("100000"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=0")
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))

tk.MustExec("set @@global.tidb_ttl_delete_rate_limit=-1")
require.Equal(t, int64(0), variable.TTLDeleteRateLimit.Load())
tk.MustQuery("select @@global.tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_ttl_delete_rate_limit").Check(testkit.Rows("0"))
}
Loading

0 comments on commit d211731

Please sign in to comment.