Skip to content

Commit

Permalink
ttl: submit ttl scan task to the system table (#40422)
Browse files Browse the repository at this point in the history
close #40362, ref #40363
  • Loading branch information
YangKeao authored Jan 13, 2023
1 parent 17c0d54 commit b619324
Show file tree
Hide file tree
Showing 12 changed files with 428 additions and 21 deletions.
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 44
result := 45
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
2 changes: 2 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,6 +1576,8 @@ func Args2Expressions4Test(args ...interface{}) []Expression {
ft = types.NewFieldType(mysql.TypeVarString)
case types.KindMysqlTime:
ft = types.NewFieldType(mysql.TypeTimestamp)
case types.KindBytes:
ft = types.NewFieldType(mysql.TypeBlob)
default:
exprs[i] = nil
continue
Expand Down
34 changes: 32 additions & 2 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ const (
PRIMARY KEY (Host,User,Password_timestamp )
) COMMENT='Password history for user accounts' `

// CreateTTLTableStatus is a table about TTL task schedule
// CreateTTLTableStatus is a table about TTL job schedule
CreateTTLTableStatus = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_table_status (
table_id bigint(64) PRIMARY KEY,
parent_table_id bigint(64),
Expand All @@ -498,6 +498,24 @@ const (
current_job_state text DEFAULT NULL,
current_job_status varchar(64) DEFAULT NULL,
current_job_status_update_time timestamp NULL DEFAULT NULL);`

// CreateTTLTask is a table about parallel ttl tasks
CreateTTLTask = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_task (
job_id varchar(64) NOT NULL,
table_id bigint(64) NOT NULL,
scan_id int NOT NULL,
scan_range_start BLOB,
scan_range_end BLOB,
expire_time timestamp NOT NULL,
owner_id varchar(64) DEFAULT NULL,
owner_addr varchar(64) DEFAULT NULL,
owner_hb_time timestamp DEFAULT NULL,
status varchar(64) DEFAULT 'waiting',
status_update_time timestamp NULL DEFAULT NULL,
state text,
created_time timestamp NOT NULL,
primary key(job_id, scan_id),
key(created_time));`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -739,11 +757,13 @@ const (
version109 = 109
// version110 sets tidb_enable_gc_aware_memory_track to off when a cluster upgrades from some version lower than v6.5.0.
version110 = 110
// version111 adds the table tidb_ttl_task
version111 = 111
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version109
var currentBootstrapVersion int64 = version111

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -861,6 +881,7 @@ var (
upgradeToVer108,
upgradeToVer109,
upgradeToVer110,
upgradeToVer111,
}
)

Expand Down Expand Up @@ -2213,6 +2234,13 @@ func upgradeToVer110(s Session, ver int64) {
mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableGCAwareMemoryTrack, 0)
}

func upgradeToVer111(s Session, ver int64) {
if ver >= version111 {
return
}
doReentrantDDL(s, CreateTTLTask)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down Expand Up @@ -2319,6 +2347,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateStatsTableLocked)
// Create tidb_ttl_table_status table
mustExecute(s, CreateTTLTableStatus)
// Create tidb_ttl_task table
mustExecute(s, CreateTTLTask)
}

// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap.
Expand Down
8 changes: 4 additions & 4 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,16 +1018,16 @@ func (store *MVCCStore) buildPrewriteLock(reqCtx *requestCtx, m *kvrpcpb.Mutatio
lock.Op = uint8(kvrpcpb.Op_Put)
}
if rowcodec.IsRowKey(m.Key) && lock.Op == uint8(kvrpcpb.Op_Put) {
if rowcodec.IsNewFormat(m.Value) {
reqCtx.buf = m.Value
} else {
if !rowcodec.IsNewFormat(m.Value) {
reqCtx.buf, err = encodeFromOldRow(m.Value, reqCtx.buf)
if err != nil {
log.Error("encode data failed", zap.Binary("value", m.Value), zap.Binary("key", m.Key), zap.Stringer("op", m.Op), zap.Error(err))
return nil, err
}

lock.Value = make([]byte, len(reqCtx.buf))
copy(lock.Value, reqCtx.buf)
}
lock.Value = reqCtx.buf
}

lock.ForUpdateTS = req.ForUpdateTs
Expand Down
3 changes: 3 additions & 0 deletions ttl/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"base.go",
"infoschema.go",
"table.go",
"task.go",
"ttlstatus.go",
],
importpath = "github.com/pingcap/tidb/ttl/cache",
Expand Down Expand Up @@ -40,6 +41,7 @@ go_test(
"main_test.go",
"split_test.go",
"table_test.go",
"task_test.go",
"ttlstatus_test.go",
],
embed = [":cache"],
Expand All @@ -49,6 +51,7 @@ go_test(
"//kv",
"//parser/model",
"//server",
"//session",
"//store/helper",
"//tablecodec",
"//testkit",
Expand Down
183 changes: 183 additions & 0 deletions ttl/cache/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"encoding/json"
"time"

"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
)

const selectFromTTLTask = `SELECT LOW_PRIORITY
job_id,
table_id,
scan_id,
scan_range_start,
scan_range_end,
expire_time,
owner_id,
owner_addr,
owner_hb_time,
status,
status_update_time,
state,
created_time FROM mysql.tidb_ttl_task`
const insertIntoTTLTask = `INSERT LOW_PRIORITY INTO mysql.tidb_ttl_task SET
job_id = %?,
table_id = %?,
scan_id = %?,
scan_range_start = %?,
scan_range_end = %?,
expire_time = %?,
created_time = %?`

// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task
func SelectFromTTLTaskWithID(jobID string) (string, []interface{}) {
return selectFromTTLTask + " WHERE job_id = %?", []interface{}{jobID}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
func InsertIntoTTLTask(sctx sessionctx.Context, jobID string, tableID int64, scanID int, scanRangeStart []types.Datum, scanRangeEnd []types.Datum, expireTime time.Time, createdTime time.Time) (string, []interface{}, error) {
rangeStart, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeStart...)
if err != nil {
return "", nil, err
}
rangeEnd, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeEnd...)
if err != nil {
return "", nil, err
}
return insertIntoTTLTask, []interface{}{jobID, tableID, int64(scanID), rangeStart, rangeEnd, expireTime, createdTime}, nil
}

// TaskStatus represents the current status of a task
type TaskStatus string

const (
// TaskStatusWaiting means the task hasn't started
TaskStatusWaiting TaskStatus = "waiting"
// TaskStatusRunning means this task is running
TaskStatusRunning = "running"
// TaskStatusFinished means this task has finished
TaskStatusFinished = "finished"
)

// TTLTask is a row recorded in mysql.tidb_ttl_task
type TTLTask struct {
JobID string
TableID int64
ScanID int64
ScanRangeStart []types.Datum
ScanRangeEnd []types.Datum
ExpireTime time.Time
OwnerID string
OwnerAddr string
OwnerHBTime time.Time
Status TaskStatus
StatusUpdateTime time.Time
State *TTLTaskState
CreatedTime time.Time
}

// TTLTaskState records the internal states of the ttl task
type TTLTaskState struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`

ScanTaskErr string `json:"scan_task_err"`
}

// RowToTTLTask converts a row into TTL task
func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) {
var err error
timeZone := sctx.GetSessionVars().Location()

task := &TTLTask{
JobID: row.GetString(0),
TableID: row.GetInt64(1),
ScanID: row.GetInt64(2),
}
if !row.IsNull(3) {
scanRangeStartBuf := row.GetBytes(3)
// it's still posibble to be nil even this column is not NULL
if scanRangeStartBuf != nil {
task.ScanRangeStart, err = codec.Decode(scanRangeStartBuf, len(scanRangeStartBuf))
if err != nil {
return nil, err
}
}
}
if !row.IsNull(4) {
scanRangeEndBuf := row.GetBytes(4)
// it's still posibble to be nil even this column is not NULL
if scanRangeEndBuf != nil {
task.ScanRangeEnd, err = codec.Decode(scanRangeEndBuf, len(scanRangeEndBuf))
if err != nil {
return nil, err
}
}
}

task.ExpireTime, err = row.GetTime(5).GoTime(timeZone)
if err != nil {
return nil, err
}

if !row.IsNull(6) {
task.OwnerID = row.GetString(6)
}
if !row.IsNull(7) {
task.OwnerAddr = row.GetString(7)
}
if !row.IsNull(8) {
task.OwnerHBTime, err = row.GetTime(8).GoTime(timeZone)
if err != nil {
return nil, err
}
}
if !row.IsNull(9) {
status := row.GetString(9)
if len(status) == 0 {
status = "waiting"
}
task.Status = TaskStatus(status)
}
if !row.IsNull(10) {
task.StatusUpdateTime, err = row.GetTime(10).GoTime(timeZone)
if err != nil {
return nil, err
}
}
if !row.IsNull(11) {
stateStr := row.GetString(11)
state := &TTLTaskState{}
err = json.Unmarshal([]byte(stateStr), state)
if err != nil {
return nil, err
}
task.State = state
}

task.CreatedTime, err = row.GetTime(12).GoTime(timeZone)
if err != nil {
return nil, err
}

return task, nil
}
Loading

0 comments on commit b619324

Please sign in to comment.