diff --git a/domain/domain.go b/domain/domain.go index 66fcf3ca0e3b3..221de43defa09 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -59,6 +59,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" @@ -1057,6 +1058,7 @@ func (do *Domain) Init( return err } + ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool).Start() return nil } diff --git a/kv/option.go b/kv/option.go index 888a1e24f0fa0..94cd679d4ede2 100644 --- a/kv/option.go +++ b/kv/option.go @@ -165,4 +165,6 @@ const ( InternalTxnBR = InternalTxnTools // InternalTxnTrace handles the trace statement. InternalTxnTrace = "Trace" + // InternalTxnTTL handles the TTL jobs + InternalTxnTTL = "TTL" ) diff --git a/ttl/BUILD.bazel b/ttl/BUILD.bazel new file mode 100644 index 0000000000000..8e82a7053df67 --- /dev/null +++ b/ttl/BUILD.bazel @@ -0,0 +1,41 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "ttl", + srcs = [ + "sql.go", + "table.go", + ], + importpath = "github.com/pingcap/tidb/ttl", + visibility = ["//visibility:public"], + deps = [ + "//parser/ast", + "//parser/format", + "//parser/model", + "//parser/mysql", + "//types", + "//util/sqlexec", + "@com_github_pingcap_errors//:errors", + "@com_github_pkg_errors//:errors", + ], +) + +go_test( + name = "ttl_test", + srcs = [ + "main_test.go", + "sql_test.go", + ], + deps = [ + ":ttl", + "//kv", + "//parser/model", + "//parser/mysql", + "//testkit", + "//testkit/testsetup", + "//types", + "//util/sqlexec", + "@com_github_stretchr_testify//require", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/ttl/error.go b/ttl/error.go new file mode 100644 index 0000000000000..5eb720ddccb4c --- /dev/null +++ b/ttl/error.go @@ -0,0 +1,15 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl diff --git a/ttl/main_test.go b/ttl/main_test.go new file mode 100644 index 0000000000000..8bda0eb98eeef --- /dev/null +++ b/ttl/main_test.go @@ -0,0 +1,33 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl_test + +import ( + "testing" + + "github.com/pingcap/tidb/testkit/testsetup" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testsetup.SetupForCommonTest() + opts := []goleak.Option{ + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + goleak.VerifyTestMain(m, opts...) +} diff --git a/ttl/session.go b/ttl/session.go new file mode 100644 index 0000000000000..1bf80d6a00c4f --- /dev/null +++ b/ttl/session.go @@ -0,0 +1,115 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl + +import ( + "context" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" +) + +type Session struct { + Sctx sessionctx.Context + Executor sqlexec.SQLExecutor + CloseFn func() +} + +func (s *Session) GetDomainInfoSchema() infoschema.InfoSchema { + is, ok := s.Sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + if !ok { + return nil + } + + if ext, ok := is.(*infoschema.SessionExtendedInfoSchema); ok { + return ext.InfoSchema + } + + return is +} + +func (s *Session) ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnTTL) + rs, err := s.Executor.ExecuteInternal(ctx, sql, args...) + if err != nil { + return nil, err + } + + if err != nil { + return nil, err + } + + if rs == nil { + return nil, nil + } + + defer func() { + terror.Log(rs.Close()) + }() + + return sqlexec.DrainRecordSet(ctx, rs, 8) +} + +func (s *Session) RunInTxn(ctx context.Context, fn func() error) (err error) { + if _, err = s.ExecuteSQL(ctx, "BEGIN"); err != nil { + return err + } + + success := false + defer func() { + if !success { + _, err = s.ExecuteSQL(ctx, "ROLLBACK") + terror.Log(err) + } + }() + + if err = fn(); err != nil { + return err + } + + success = true + return nil +} + +func (s *Session) ExecuteSQLWithTTLCheck(ctx context.Context, tbl *PhysicalTable, sql string) (result [][]types.Datum, retryable bool, err error) { + err = s.RunInTxn(ctx, func() error { + // TODO: check schema + rows, execErr := s.ExecuteSQL(ctx, sql) + // TODO: check retryable err + if execErr != nil { + return execErr + } + + result = make([][]types.Datum, len(rows)) + for i, row := range rows { + result[i] = row.GetDatumRow(tbl.KeyFieldTypes) + } + + return nil + }) + + return +} + +func (s *Session) Close() { + if s.CloseFn != nil { + s.CloseFn() + } +} diff --git a/ttl/sql.go b/ttl/sql.go new file mode 100644 index 0000000000000..28869db8872cf --- /dev/null +++ b/ttl/sql.go @@ -0,0 +1,472 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl + +import ( + "encoding/hex" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/format" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/pkg/errors" +) + +func writeHex(in io.Writer, d types.Datum) error { + _, err := fmt.Fprintf(in, "x'%s'", hex.EncodeToString(d.GetBytes())) + return err +} + +func writeDatum(in io.Writer, d types.Datum, ft *types.FieldType) error { + switch d.Kind() { + case types.KindString, types.KindBytes, types.KindBinaryLiteral: + if mysql.HasBinaryFlag(ft.GetFlag()) { + return writeHex(in, d) + } + _, err := fmt.Fprintf(in, "'%s'", sqlexec.EscapeString(d.GetString())) + return err + } + ctx := format.NewRestoreCtx(format.DefaultRestoreFlags, in) + expr := ast.NewValueExpr(d.GetValue(), ft.GetCharset(), ft.GetCollate()) + return expr.Restore(ctx) +} + +// FormatSQLDatum formats the datum to a value string in sql +func FormatSQLDatum(d types.Datum, ft *types.FieldType) (string, error) { + var sb strings.Builder + if err := writeDatum(&sb, d, ft); err != nil { + return "", err + } + return sb.String(), nil +} + +type sqlBuilderState int + +const ( + writeBegin sqlBuilderState = iota + writeSelOrDel + writeWhere + writeOrderBy + writeLimit + writeDone +) + +// SQLBuilder is used to build SQLs for TTL +type SQLBuilder struct { + tbl *PhysicalTable + sb strings.Builder + state sqlBuilderState + + isReadOnly bool + hasWriteExpireCond bool +} + +// NewSQLBuilder creates a new TTLSQLBuilder +func NewSQLBuilder(tbl *PhysicalTable) *SQLBuilder { + return &SQLBuilder{tbl: tbl, state: writeBegin} +} + +// Build builds the final sql +func (b *SQLBuilder) Build() (string, error) { + if b.state == writeBegin { + return "", errors.Errorf("invalid state: %v", b.state) + } + + if !b.isReadOnly && !b.hasWriteExpireCond { + // check whether the `timeRow < expire_time` condition has been written to make sure this SQL is safe. + return "", errors.New("expire condition not write") + } + + if b.state != writeDone { + b.state = writeDone + } + + return b.sb.String(), nil +} + +// WriteSelect writes a select statement to select key columns without any condition +func (b *SQLBuilder) WriteSelect() error { + if b.state != writeBegin { + return errors.Errorf("invalid state: %v", b.state) + } + b.sb.WriteString("SELECT LOW_PRIORITY ") + b.writeColNames(b.tbl.KeyColumns, false) + b.sb.WriteString(" FROM ") + b.writeTblName() + if par := b.tbl.PartitionDef; par != nil { + b.sb.WriteString(" PARTITION(`") + b.sb.WriteString(par.Name.O) + b.sb.WriteString("`)") + } + b.state = writeSelOrDel + b.isReadOnly = true + return nil +} + +// WriteDelete writes a delete statement without any condition +func (b *SQLBuilder) WriteDelete() error { + if b.state != writeBegin { + return errors.Errorf("invalid state: %v", b.state) + } + b.sb.WriteString("DELETE LOW_PRIORITY FROM ") + b.writeTblName() + if par := b.tbl.PartitionDef; par != nil { + b.sb.WriteString(" PARTITION(`") + b.sb.WriteString(par.Name.O) + b.sb.WriteString("`)") + } + b.state = writeSelOrDel + return nil +} + +// WriteCommonCondition writes a new condition +func (b *SQLBuilder) WriteCommonCondition(cols []*model.ColumnInfo, op string, dp []types.Datum) error { + switch b.state { + case writeSelOrDel: + b.sb.WriteString(" WHERE ") + b.state = writeWhere + case writeWhere: + b.sb.WriteString(" AND ") + default: + return errors.Errorf("invalid state: %v", b.state) + } + + b.writeColNames(cols, len(cols) > 1) + b.sb.WriteRune(' ') + b.sb.WriteString(op) + b.sb.WriteRune(' ') + return b.writeDataPoint(cols, dp) +} + +// WriteExpireCondition writes a condition with the time column +func (b *SQLBuilder) WriteExpireCondition(expire time.Time) error { + switch b.state { + case writeSelOrDel: + b.sb.WriteString(" WHERE ") + b.state = writeWhere + case writeWhere: + b.sb.WriteString(" AND ") + default: + return errors.Errorf("invalid state: %v", b.state) + } + + b.writeColNames([]*model.ColumnInfo{b.tbl.TimeColumn}, false) + b.sb.WriteString(" < ") + b.sb.WriteString("'") + b.sb.WriteString(expire.Format("2006-01-02 15:04:05.999999")) + b.sb.WriteString("'") + b.hasWriteExpireCond = true + return nil +} + +// WriteInCondition writes an IN condition +func (b *SQLBuilder) WriteInCondition(cols []*model.ColumnInfo, dps ...[]types.Datum) error { + switch b.state { + case writeSelOrDel: + b.sb.WriteString(" WHERE ") + b.state = writeWhere + case writeWhere: + b.sb.WriteString(" AND ") + default: + return errors.Errorf("invalid state: %v", b.state) + } + + b.writeColNames(cols, len(cols) > 1) + b.sb.WriteString(" IN ") + b.sb.WriteRune('(') + first := true + for _, v := range dps { + if first { + first = false + } else { + b.sb.WriteString(", ") + } + if err := b.writeDataPoint(cols, v); err != nil { + return err + } + } + b.sb.WriteRune(')') + return nil +} + +// WriteOrderBy writes order by +func (b *SQLBuilder) WriteOrderBy(cols []*model.ColumnInfo, desc bool) error { + if b.state != writeSelOrDel && b.state != writeWhere { + return errors.Errorf("invalid state: %v", b.state) + } + b.state = writeOrderBy + b.sb.WriteString(" ORDER BY ") + b.writeColNames(cols, false) + if desc { + b.sb.WriteString(" DESC") + } else { + b.sb.WriteString(" ASC") + } + return nil +} + +// WriteLimit writes the limit +func (b *SQLBuilder) WriteLimit(n int) error { + if b.state != writeSelOrDel && b.state != writeWhere && b.state != writeOrderBy { + return errors.Errorf("invalid state: %v", b.state) + } + b.state = writeLimit + b.sb.WriteString(" LIMIT ") + b.sb.WriteString(strconv.Itoa(n)) + return nil +} + +func (b *SQLBuilder) writeTblName() { + b.sb.WriteRune('`') + b.sb.WriteString(b.tbl.Schema.O) + b.sb.WriteString("`.`") + b.sb.WriteString(b.tbl.Name.O) + b.sb.WriteRune('`') +} + +func (b *SQLBuilder) writeColName(col *model.ColumnInfo) { + b.sb.WriteRune('`') + b.sb.WriteString(col.Name.O) + b.sb.WriteRune('`') +} + +func (b *SQLBuilder) writeColNames(cols []*model.ColumnInfo, writeBrackets bool) { + if writeBrackets { + b.sb.WriteRune('(') + } + + first := true + for _, col := range cols { + if first { + first = false + } else { + b.sb.WriteString(", ") + } + b.writeColName(col) + } + + if writeBrackets { + b.sb.WriteRune(')') + } +} + +func (b *SQLBuilder) writeDataPoint(cols []*model.ColumnInfo, dp []types.Datum) error { + writeBrackets := len(cols) > 1 + if len(cols) != len(dp) { + return errors.Errorf("col count not match %d != %d", len(cols), len(dp)) + } + + if writeBrackets { + b.sb.WriteRune('(') + } + + first := true + for i, d := range dp { + if first { + first = false + } else { + b.sb.WriteString(", ") + } + if err := writeDatum(&b.sb, d, &cols[i].FieldType); err != nil { + return err + } + } + + if writeBrackets { + b.sb.WriteRune(')') + } + + return nil +} + +// ScanQueryGenerator generates SQLs for scan task +type ScanQueryGenerator struct { + tbl *PhysicalTable + expire time.Time + keyRangeStart []types.Datum + keyRangeEnd []types.Datum + stack [][]types.Datum + limit int + exhausted bool +} + +// NewScanQueryGenerator creates a new ScanQueryGenerator +func NewScanQueryGenerator(tbl *PhysicalTable, expire time.Time, rangeStart []types.Datum, rangeEnd []types.Datum) (*ScanQueryGenerator, error) { + if len(rangeStart) > 0 { + if err := tbl.ValidateKey(rangeStart); err != nil { + return nil, err + } + } + + if len(rangeEnd) > 0 { + if err := tbl.ValidateKey(rangeEnd); err != nil { + return nil, err + } + } + + return &ScanQueryGenerator{ + tbl: tbl, + expire: expire, + keyRangeStart: rangeStart, + keyRangeEnd: rangeEnd, + }, nil +} + +func (g *ScanQueryGenerator) SetTimeZone(tz *time.Location) { + g.expire = g.expire.In(tz) +} + +// NextSQL creates next sql of the scan task +func (g *ScanQueryGenerator) NextSQL(continueFromResult [][]types.Datum, nextLimit int) (string, error) { + if g.exhausted { + return "", errors.New("generator is exhausted") + } + + if nextLimit <= 0 { + return "", errors.Errorf("invalid limit '%d'", nextLimit) + } + + if g.stack == nil { + g.stack = make([][]types.Datum, 0, len(g.tbl.KeyColumns)) + } + + if len(continueFromResult) >= g.limit { + var continueFromKey []types.Datum + if cnt := len(continueFromResult); cnt > 0 { + continueFromKey = continueFromResult[cnt-1] + } + if err := g.setStack(continueFromKey); err != nil { + return "", err + } + } else { + if l := len(g.stack); l > 0 { + g.stack = g.stack[:l-1] + } + if len(g.stack) == 0 { + g.exhausted = true + } + } + g.limit = nextLimit + return g.buildSQL() +} + +// IsExhausted returns whether the generator is exhausted +func (g *ScanQueryGenerator) IsExhausted() bool { + return g.exhausted +} + +func (g *ScanQueryGenerator) setStack(key []types.Datum) error { + if key == nil { + key = g.keyRangeStart + } + + if key == nil { + g.stack = g.stack[:0] + return nil + } + + if err := g.tbl.ValidateKey(key); err != nil { + return err + } + + g.stack = g.stack[:cap(g.stack)] + for i := 0; i < len(key); i++ { + g.stack[i] = key[0 : i+1] + } + return nil +} + +func (g *ScanQueryGenerator) buildSQL() (string, error) { + if g.limit <= 0 { + return "", errors.Errorf("invalid limit '%d'", g.limit) + } + + if g.exhausted { + return "", nil + } + + b := NewSQLBuilder(g.tbl) + if err := b.WriteSelect(); err != nil { + return "", err + } + if len(g.stack) > 0 { + for i, d := range g.stack[len(g.stack)-1] { + col := []*model.ColumnInfo{g.tbl.KeyColumns[i]} + val := []types.Datum{d} + var err error + if i < len(g.stack)-1 { + err = b.WriteCommonCondition(col, "=", val) + } else { + err = b.WriteCommonCondition(col, ">", val) + } + if err != nil { + return "", err + } + } + } + + if len(g.keyRangeEnd) > 0 { + if err := b.WriteCommonCondition(g.tbl.KeyColumns, "<=", g.keyRangeEnd); err != nil { + return "", err + } + } + + if err := b.WriteExpireCondition(g.expire); err != nil { + return "", err + } + + if err := b.WriteOrderBy(g.tbl.KeyColumns, false); err != nil { + return "", err + } + + if err := b.WriteLimit(g.limit); err != nil { + return "", err + } + + return b.Build() +} + +// BuildDeleteSQL builds a delete SQL +func BuildDeleteSQL(tbl *PhysicalTable, rows [][]types.Datum, expire time.Time) (string, error) { + if len(rows) == 0 { + return "", errors.New("Cannot build delete SQL with empty rows") + } + + b := NewSQLBuilder(tbl) + if err := b.WriteDelete(); err != nil { + return "", err + } + + if err := b.WriteInCondition(tbl.KeyColumns, rows...); err != nil { + return "", err + } + + if err := b.WriteExpireCondition(expire); err != nil { + return "", err + } + + if err := b.WriteLimit(len(rows)); err != nil { + return "", err + } + + return b.Build() +} diff --git a/ttl/sql_test.go b/ttl/sql_test.go new file mode 100644 index 0000000000000..7efcb3a3790a2 --- /dev/null +++ b/ttl/sql_test.go @@ -0,0 +1,688 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/ttl" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/stretchr/testify/require" +) + +func TestFormatSQLDatum(t *testing.T) { + cases := []struct { + ft string + values []interface{} + hex bool + notSupport bool + }{ + { + ft: "int", + values: []interface{}{1, 2, 3, -12}, + }, + { + ft: "decimal(5, 2)", + values: []interface{}{"0.3", "128.71", "-245.32"}, + }, + { + ft: "varchar(32) CHARACTER SET latin1", + values: []interface{}{ + "aa';delete from t where 1;", + string([]byte{0xf1, 0xf2}), + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + }, + }, + { + ft: "char(32) CHARACTER SET utf8mb4", + values: []interface{}{ + "demo", + "\n123", + "aa';delete from t where 1;", + "你好👋", + }, + }, + { + ft: "varchar(32) CHARACTER SET utf8mb4", + values: []interface{}{ + "demo", + "aa';delete from t where 1;", + "你好👋", + }, + }, + { + ft: "varchar(32) CHARACTER SET binary", + values: []interface{}{ + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + "你好👋", + "abcdef", + }, + hex: true, + }, + { + ft: "binary(8)", + values: []interface{}{ + string([]byte{0xf1, 0xf2}), + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + }, + hex: true, + }, + { + ft: "blob", + values: []interface{}{ + string([]byte{0xf1, 0xf2}), + string([]byte{0xf1, 0xf2, 0xf3, 0xf4}), + }, + hex: true, + }, + { + ft: "bit(1)", + values: []interface{}{0, 1}, + hex: true, + }, + { + ft: "date", + values: []interface{}{"2022-01-02", "1900-12-31"}, + }, + { + ft: "time", + values: []interface{}{"00:00", "01:23", "13:51:22"}, + }, + { + ft: "datetime", + values: []interface{}{"2022-01-02 12:11:11", "2022-01-02"}, + }, + { + ft: "timestamp", + values: []interface{}{"2022-01-02 12:11:11", "2022-01-02"}, + }, + { + ft: "json", + values: []interface{}{"{}"}, + notSupport: true, + }, + } + + store, do := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + // create a table with n columns + var sb strings.Builder + sb.WriteString("CREATE TABLE t (id varchar(32) primary key") + for i, c := range cases { + _, err := fmt.Fprintf(&sb, ",\n col%d %s DEFAULT NULL", i, c.ft) + require.NoError(t, err) + } + sb.WriteString("\n);") + tk.MustExec(sb.String()) + + tbl, err := do.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + for i, c := range cases { + for j, v := range c.values { + tk.MustExec(fmt.Sprintf("insert into t (id, col%d) values ('%d-%d', ?)", i, i, j), v) + } + } + + ctx := kv.WithInternalSourceType(context.TODO(), kv.InternalTxnOthers) + for i, c := range cases { + for j := range c.values { + rowID := fmt.Sprintf("%d-%d", i, j) + colName := fmt.Sprintf("col%d", i) + exec, ok := tk.Session().(sqlexec.SQLExecutor) + require.True(t, ok) + selectSQL := fmt.Sprintf("select %s from t where id='%s'", colName, rowID) + rs, err := exec.ExecuteInternal(ctx, selectSQL) + require.NoError(t, err, selectSQL) + rows, err := sqlexec.DrainRecordSet(ctx, rs, 1) + require.NoError(t, err, selectSQL) + require.Equal(t, 1, len(rows), selectSQL) + col := tbl.Meta().FindPublicColumnByName(colName) + d := rows[0].GetDatum(0, &col.FieldType) + s, err := ttl.FormatSQLDatum(d, &col.FieldType) + if c.notSupport { + require.Error(t, err) + } else { + require.NoError(t, err) + //fmt.Printf("%s: %s\n", c.ft, s) + tk.MustQuery("select id from t where " + colName + "=" + s).Check(testkit.Rows(rowID)) + } + if c.hex { + require.True(t, strings.HasPrefix(s, "x'"), s) + } + } + } +} + +func TestSQLBuilder(t *testing.T) { + must := func(err error) { + require.NoError(t, err) + } + + mustBuild := func(b *ttl.SQLBuilder, str string) { + s, err := b.Build() + require.NoError(t, err) + require.Equal(t, str, s) + } + + var b *ttl.SQLBuilder + + t1 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t1"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + t2 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test2"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t2"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + tp := &ttl.PhysicalTable{ + Schema: model.NewCIStr("testp"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("tp"), + }, + KeyColumns: t1.KeyColumns, + TimeColumn: t1.TimeColumn, + PartitionDef: &model.PartitionDefinition{ + Name: model.NewCIStr("p1"), + }, + } + + // test build select queries + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1`") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1"))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1"))) + must(b.WriteCommonCondition(t1.KeyColumns, "<=", d("c3"))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1' AND `id` <= 'c3'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + shLoc, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + must(b.WriteExpireCondition(time.UnixMilli(0).In(shLoc))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 08:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1"))) + must(b.WriteCommonCondition(t1.KeyColumns, "<=", d("c3"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1' AND `id` <= 'c3' AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteOrderBy(t1.KeyColumns, false)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` ORDER BY `id` ASC") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteOrderBy(t1.KeyColumns, true)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` ORDER BY `id` DESC") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteOrderBy(t1.KeyColumns, false)) + must(b.WriteLimit(128)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` ORDER BY `id` ASC LIMIT 128") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("';``~?%\"\n"))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > '\\';``~?%\\\"\\n'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t1.KeyColumns, ">", d("a1';'"))) + must(b.WriteCommonCondition(t1.KeyColumns, "<=", d("a2\""))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteOrderBy(t1.KeyColumns, false)) + must(b.WriteLimit(128)) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 'a1\\';\\'' AND `id` <= 'a2\\\"' AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 128") + + b = ttl.NewSQLBuilder(t2) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t2.KeyColumns, ">", d("x1", 20))) + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE (`a`, `b`) > ('x1', 20)") + + b = ttl.NewSQLBuilder(t2) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t2.KeyColumns, "<=", d("x2", 21))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteOrderBy(t2.KeyColumns, false)) + must(b.WriteLimit(100)) + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE (`a`, `b`) <= ('x2', 21) AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b` ASC LIMIT 100") + + b = ttl.NewSQLBuilder(t2) + must(b.WriteSelect()) + must(b.WriteCommonCondition(t2.KeyColumns[0:1], "=", d("x3"))) + must(b.WriteCommonCondition(t2.KeyColumns[1:2], ">", d(31))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteOrderBy(t2.KeyColumns, false)) + must(b.WriteLimit(100)) + mustBuild(b, "SELECT LOW_PRIORITY `a`, `b` FROM `test2`.`t2` WHERE `a` = 'x3' AND `b` > 31 AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b` ASC LIMIT 100") + + // test build delete queries + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + _, err = b.Build() + require.EqualError(t, err, "expire condition not write") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t1.KeyColumns, d("a"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a') AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t1.KeyColumns, d("a"), d("b"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN ('a', 'b') AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t2.KeyColumns, d("a", 1))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteLimit(100)) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1)) AND `time` < '1970-01-01 00:00:00' LIMIT 100") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t2.KeyColumns, d("a", 1), d("b", 2))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + must(b.WriteLimit(100)) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < '1970-01-01 00:00:00' LIMIT 100") + + b = ttl.NewSQLBuilder(t1) + must(b.WriteDelete()) + must(b.WriteInCondition(t2.KeyColumns, d("a", 1), d("b", 2))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE (`a`, `b`) IN (('a', 1), ('b', 2)) AND `time` < '1970-01-01 00:00:00'") + + // test select partition table + b = ttl.NewSQLBuilder(tp) + must(b.WriteSelect()) + must(b.WriteCommonCondition(tp.KeyColumns, ">", d("a1"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "SELECT LOW_PRIORITY `id` FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` > 'a1' AND `time` < '1970-01-01 00:00:00'") + + b = ttl.NewSQLBuilder(tp) + must(b.WriteDelete()) + must(b.WriteInCondition(tp.KeyColumns, d("a"), d("b"))) + must(b.WriteExpireCondition(time.UnixMilli(0).In(time.UTC))) + mustBuild(b, "DELETE LOW_PRIORITY FROM `testp`.`tp` PARTITION(`p1`) WHERE `id` IN ('a', 'b') AND `time` < '1970-01-01 00:00:00'") +} + +func TestScanQueryGenerator(t *testing.T) { + t1 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t1"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + t2 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test2"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t2"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + {Name: model.NewCIStr("c"), FieldType: types.NewFieldTypeBuilder().SetType(mysql.TypeString).SetFlag(mysql.BinaryFlag).Build()}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + result := func(last []types.Datum, n int) [][]types.Datum { + ds := make([][]types.Datum, n) + ds[n-1] = last + return ds + } + + cases := []struct { + tbl *ttl.PhysicalTable + expire time.Time + rangeStart []types.Datum + rangeEnd []types.Datum + path [][]interface{} + }{ + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + nil, 5, "", + }, + }, + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + [][]types.Datum{}, 5, "", + }, + }, + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + rangeStart: d(1), + rangeEnd: d(100), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 1 AND `id` <= 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + result(d(10), 3), 5, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 10 AND `id` <= 100 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", + }, + { + result(d(15), 4), 5, + "", + }, + }, + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 3, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 3", + }, + { + result(d(2), 3), 5, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 2 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 5", + }, + { + result(d(4), 5), 6, + "SELECT LOW_PRIORITY `id` FROM `test`.`t1` WHERE `id` > 4 AND `time` < '1970-01-01 00:00:00' ORDER BY `id` ASC LIMIT 6", + }, + { + result(d(7), 5), 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + nil, 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + nil, 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + [][]types.Datum{}, 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "x", []byte{0xf0}), 4), 5, "", + }, + }, + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + rangeStart: d(1, "x", []byte{0xe}), + rangeEnd: d(100, "z", []byte{0xff}), + path: [][]interface{}{ + { + nil, 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'0e' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "x", []byte{0x1a}), 5), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'x' AND `c` > x'1a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "x", []byte{0x20}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'x' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "y", []byte{0x0a}), 5), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` = 'y' AND `c` > x'0a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "y", []byte{0x11}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 1 AND `b` > 'y' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(1, "z", []byte{0x02}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 1 AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(3, "a", []byte{0x01}), 5), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` = 'a' AND `c` > x'01' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(3, "a", []byte{0x11}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` = 3 AND `b` > 'a' AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(3, "c", []byte{0x12}), 4), 5, + "SELECT LOW_PRIORITY `a`, `b`, `c` FROM `test2`.`t2` WHERE `a` > 3 AND (`a`, `b`, `c`) <= (100, 'z', x'ff') AND `time` < '1970-01-01 00:00:00' ORDER BY `a`, `b`, `c` ASC LIMIT 5", + }, + { + result(d(5, "e", []byte{0xa1}), 4), 5, "", + }, + }, + }, + } + + for i, c := range cases { + g, err := ttl.NewScanQueryGenerator(c.tbl, c.expire, c.rangeStart, c.rangeEnd) + require.NoError(t, err, fmt.Sprintf("%d", i)) + for j, p := range c.path { + msg := fmt.Sprintf("%d-%d", i, j) + var result [][]types.Datum + require.Equal(t, 3, len(p), msg) + if arg := p[0]; arg != nil { + r, ok := arg.([][]types.Datum) + require.True(t, ok, msg) + result = r + } + limit, ok := p[1].(int) + require.True(t, ok, msg) + sql, ok := p[2].(string) + require.True(t, ok, msg) + s, err := g.NextSQL(result, limit) + require.NoError(t, err, msg) + require.Equal(t, sql, s, msg) + require.Equal(t, s == "", g.IsExhausted(), msg) + } + } +} + +func TestBuildDeleteSQL(t *testing.T) { + t1 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t1"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("id"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + t2 := &ttl.PhysicalTable{ + Schema: model.NewCIStr("test2"), + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t2"), + }, + KeyColumns: []*model.ColumnInfo{ + {Name: model.NewCIStr("a"), FieldType: *types.NewFieldType(mysql.TypeVarchar)}, + {Name: model.NewCIStr("b"), FieldType: *types.NewFieldType(mysql.TypeInt24)}, + }, + TimeColumn: &model.ColumnInfo{ + Name: model.NewCIStr("time"), + FieldType: *types.NewFieldType(mysql.TypeDatetime), + }, + } + + cases := []struct { + tbl *ttl.PhysicalTable + expire time.Time + rows [][]types.Datum + sql string + }{ + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(1)}, + sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (1) AND `time` < '1970-01-01 00:00:00' LIMIT 1", + }, + { + tbl: t1, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(2), d(3), d(4)}, + sql: "DELETE LOW_PRIORITY FROM `test`.`t1` WHERE `id` IN (2, 3, 4) AND `time` < '1970-01-01 00:00:00' LIMIT 3", + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(1, "a")}, + sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a')) AND `time` < '1970-01-01 00:00:00' LIMIT 1", + }, + { + tbl: t2, + expire: time.UnixMilli(0).In(time.UTC), + rows: [][]types.Datum{d(1, "a"), d(2, "b")}, + sql: "DELETE LOW_PRIORITY FROM `test2`.`t2` WHERE (`a`, `b`) IN ((1, 'a'), (2, 'b')) AND `time` < '1970-01-01 00:00:00' LIMIT 2", + }, + } + + for _, c := range cases { + sql, err := ttl.BuildDeleteSQL(c.tbl, c.rows, c.expire) + require.NoError(t, err) + require.Equal(t, c.sql, sql) + } +} + +func d(vs ...interface{}) []types.Datum { + datums := make([]types.Datum, len(vs)) + for i, v := range vs { + switch val := v.(type) { + case string: + datums[i] = types.NewStringDatum(val) + case int: + datums[i] = types.NewIntDatum(int64(val)) + case []byte: + datums[i] = types.NewBytesDatum(val) + default: + panic(fmt.Sprintf("invalid value type: %T, value: %v", v, v)) + } + } + return datums +} diff --git a/ttl/states.go b/ttl/states.go new file mode 100644 index 0000000000000..842166265d205 --- /dev/null +++ b/ttl/states.go @@ -0,0 +1,76 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl + +import ( + "context" + "time" +) + +type TableStatus string + +const ( + TableStatusWaiting TableStatus = "waiting" + TableStatusRunning = "running" + TableStatusCancelling = "cancelling" + TableStatusCancelled = "cancelled" + TableStatusDone = "done" + TableStatusFailed = "failed" +) + +type JobInfo struct { + ID string + TableID int64 + StartTime time.Time +} + +type TableStateInfo struct { + ID int64 + Status string +} + +type TableStatesStore interface { + CreateNewJob(ctx context.Context, tblID int64, owner string) error +} + +func NewTableStatesStore() TableStatesStore { + // TODO: + return nil +} + +type TableStatesCache struct { + tables map[int64]*TableStateInfo +} + +func NewTableStatesCache() *TableStatesCache { + return &TableStatesCache{ + tables: make(map[int64]*TableStateInfo), + } +} + +func (c *TableStatesCache) Update(ctx context.Context, se *Session) error { + // TODO: + return nil +} + +func (c *TableStatesCache) UpdateTable(ctx context.Context, se *Session, tblID int) error { + // TODO: + return nil +} + +func (c *TableStatesCache) GetTableState(id int64) (info *TableStateInfo, ok bool) { + info, ok = c.tables[id] + return +} diff --git a/ttl/table.go b/ttl/table.go new file mode 100644 index 0000000000000..453e0b01323ae --- /dev/null +++ b/ttl/table.go @@ -0,0 +1,175 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttl + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/types" +) + +func IsTTLTable(tbl *model.TableInfo) bool { + for _, col := range tbl.Cols() { + if col.Name.L == "expire" { + return true + } + } + return false +} + +// PhysicalTable is used to provide some information for a physical table in TTL job +type PhysicalTable struct { + ID int64 + Schema model.CIStr + *model.TableInfo + // PartitionDef is the partition definition + PartitionDef *model.PartitionDefinition + // KeyColumns is the cluster index key columns for the table + KeyColumns []*model.ColumnInfo + KeyFieldTypes []*types.FieldType + // TimeColum is the time column used for TTL + TimeColumn *model.ColumnInfo +} + +func NewPhysicalTable(schema model.CIStr, tbl *model.TableInfo, par *model.PartitionDefinition) (*PhysicalTable, error) { + t := &PhysicalTable{ID: tbl.ID, Schema: schema, TableInfo: tbl, PartitionDef: par} + if par != nil { + t.ID = par.ID + } + + for _, col := range tbl.Cols() { + if col.Name.L == "expire" { + t.TimeColumn = col + } + } + + if t.TimeColumn == nil { + return nil, errors.New("no time column") + } + + if tbl.PKIsHandle { + for i, col := range tbl.Columns { + if mysql.HasPriKeyFlag(col.GetFlag()) { + t.KeyColumns = []*model.ColumnInfo{tbl.Columns[i]} + t.KeyFieldTypes = []*types.FieldType{&tbl.Columns[i].FieldType} + } + } + } else if tbl.IsCommonHandle { + idxInfo := tables.FindPrimaryIndex(tbl) + columns := make([]*model.ColumnInfo, len(idxInfo.Columns)) + fieldTypes := make([]*types.FieldType, len(idxInfo.Columns)) + for i, idxCol := range idxInfo.Columns { + columns[i] = tbl.Columns[idxCol.Offset] + fieldTypes[i] = &tbl.Columns[idxCol.Offset].FieldType + } + t.KeyColumns = columns + t.KeyFieldTypes = fieldTypes + } else { + t.KeyColumns = []*model.ColumnInfo{model.NewExtraHandleColInfo()} + t.KeyFieldTypes = []*types.FieldType{&t.KeyColumns[0].FieldType} + } + return t, nil +} + +// ValidateKey validates a key +func (t *PhysicalTable) ValidateKey(key []types.Datum) error { + if len(t.KeyColumns) != len(key) { + return errors.Errorf("invalid key length: %d, expected %d", len(key), len(t.KeyColumns)) + } + return nil +} + +type InfoSchemaTables struct { + schemaVer int64 + tables map[int64]*PhysicalTable +} + +func NewInfoSchemaTables() *InfoSchemaTables { + return &InfoSchemaTables{ + tables: make(map[int64]*PhysicalTable, 64), + } +} + +func (t *InfoSchemaTables) Foreach(fn func(t *PhysicalTable) bool) { + for _, tbl := range t.tables { + if !fn(tbl) { + break + } + } +} + +func (t *InfoSchemaTables) GetByID(id int64) (tbl *PhysicalTable, ok bool) { + tbl, ok = t.tables[id] + return +} + +func (t *InfoSchemaTables) Update(is infoschema.InfoSchema) error { + if is == nil { + return errors.New("Cannot update tables with nil information schema") + } + + if t.schemaVer == is.SchemaMetaVersion() { + return nil + } + + newTables := make(map[int64]*PhysicalTable, len(t.tables)) + for _, db := range is.AllSchemas() { + for _, tbl := range is.SchemaTables(db.Name) { + tblInfo := tbl.Meta() + if !IsTTLTable(tblInfo) { + continue + } + + if tblInfo.Partition == nil { + ttlTable, err := t.newTable(db.Name, tblInfo, nil) + if err != nil { + return err + } + newTables[ttlTable.ID] = ttlTable + continue + } + + for _, par := range tblInfo.Partition.Definitions { + ttlTable, err := t.newTable(db.Name, tblInfo, &par) + if err != nil { + return err + } + newTables[ttlTable.ID] = ttlTable + continue + } + } + } + + t.schemaVer = is.SchemaMetaVersion() + t.tables = newTables + return nil +} + +func (t *InfoSchemaTables) newTable(schema model.CIStr, tblInfo *model.TableInfo, par *model.PartitionDefinition) (*PhysicalTable, error) { + id := tblInfo.ID + if par != nil { + id = par.ID + } + + ttlTable, ok := t.tables[id] + if ok && ttlTable.TableInfo == tblInfo { + return ttlTable, nil + } + + return NewPhysicalTable(schema, tblInfo, par) +} diff --git a/ttl/ttlworker/del.go b/ttl/ttlworker/del.go new file mode 100644 index 0000000000000..94bfa7605a8da --- /dev/null +++ b/ttl/ttlworker/del.go @@ -0,0 +1,100 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "time" + + "github.com/pingcap/tidb/ttl" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type delRetryInfo struct { + task *delTask + retryBatches [][][]types.Datum +} + +type delTask struct { + tbl *ttl.PhysicalTable + expire time.Time + keys [][]types.Datum + statics *taskStatics +} + +type delWorker struct { + baseWorker + sessPool sessionPool + taskNotify <-chan *delTask +} + +func newDelWorker(ch <-chan *delTask, sessPool sessionPool) (w *delWorker) { + w = &delWorker{} + w.init(w.delLoop) + w.sessPool = sessPool + w.taskNotify = ch + return +} + +func (w *delWorker) delLoop() error { + se, err := getSession(w.sessPool) + if err != nil { + return err + } + defer se.Close() + + for { + select { + case <-w.ctx.Done(): + return nil + case task := <-w.taskNotify: + w.executeDelTask(se, task) + } + } +} + +func (w *delWorker) executeDelTask(se *ttl.Session, task *delTask) { + totalRows := len(task.keys) + batchSize := 2 + batch := make([][]types.Datum, 0, batchSize) + expire := task.expire.In(se.Sctx.GetSessionVars().TimeZone) + for i, row := range task.keys { + batch = append(batch, row) + if i == totalRows-1 || len(batch) == batchSize { + sql, err := ttl.BuildDeleteSQL(task.tbl, batch, expire) + if err != nil { + logutil.BgLogger().Error("", zap.Error(err)) + return + } + + batch = batch[:0] + w.doDelete(se, sql) + } + } +} + +func (w *delWorker) doDelete(se *ttl.Session, sql string) { + maxRetry := 10 + for i := 0; i <= maxRetry && w.Status() == workerStatusRunning; i++ { + logutil.BgLogger().Info("TTL delete query", zap.String("sql", sql)) + if _, err := se.ExecuteSQL(w.ctx, sql); err != nil { + logutil.BgLogger().Error("", zap.Error(err)) + time.Sleep(time.Second) + continue + } + return + } +} diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go new file mode 100644 index 0000000000000..ece28d0ea6138 --- /dev/null +++ b/ttl/ttlworker/job.go @@ -0,0 +1,347 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" + + "github.com/pingcap/errors" + + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/ttl" +) + +type JobManager struct { + baseWorker + + id string + delCh chan *delTask + scanWorkers []worker + delWorkers []worker + sessPool sessionPool +} + +func NewJobManager(id string, sessPool sessionPool) (manager *JobManager) { + manager = &JobManager{} + manager.init(manager.jobLoop) + manager.id = id + manager.delCh = make(chan *delTask) + manager.sessPool = sessPool + return +} + +func (m *JobManager) jobLoop() error { + se, err := getSession(m.sessPool) + if err != nil { + return err + } + + defer func() { + m.resizeScanWorkers(0) + m.resizeDelWorkers(0) + se.Close() + close(m.delCh) + }() + + m.resizeScanWorkers(4) + m.resizeDelWorkers(4) + + tables := newTTLTables(m.id) + terror.Log(tables.FullUpdate(m.ctx, se)) + ticker := time.Tick(10 * time.Second) + for { + select { + case <-m.ctx.Done(): + return nil + case <-ticker: + if time.Since(tables.TablesTryUpdateTime) > time.Minute { + terror.Log(tables.UpdateSchemaTables(se)) + } + if time.Since(tables.StatesTryUpdateTime) > time.Minute*10 { + terror.Log(tables.UpdateTableStates(m.ctx, se)) + } + m.rescheduleJobs(se, tables) + } + } +} + +func (m *JobManager) idleScanWorkers() []*scanWorker { + m.Lock() + defer m.Unlock() + workers := make([]*scanWorker, 0, len(m.scanWorkers)) + for _, w := range m.scanWorkers { + if w.(*scanWorker).Idle() { + workers = append(workers, w.(*scanWorker)) + } + } + return workers +} + +func (m *JobManager) resizeScanWorkers(n int) { + m.Lock() + defer m.Unlock() + m.scanWorkers = m.resizeWorkers(m.scanWorkers, n, func() worker { + return newScanWorker(m.delCh, m.sessPool) + }) +} + +func (m *JobManager) resizeDelWorkers(n int) { + m.Lock() + defer m.Unlock() + m.delWorkers = m.resizeWorkers(m.delWorkers, n, func() worker { + return newDelWorker(m.delCh, m.sessPool) + }) +} + +func (m *JobManager) resizeWorkers(workers []worker, n int, factory func() worker) []worker { + currentCnt := len(workers) + switch { + case n > currentCnt: + for i := currentCnt; i < n; i++ { + w := factory() + w.Start() + workers = append(workers, w) + } + case n < currentCnt: + for i := n; i < currentCnt; i++ { + workers[i].Stop() + } + workers = workers[0:n] + } + return workers +} + +func (m *JobManager) rescheduleJobs(se *ttl.Session, tables *ttlTables) { + defer func() { + for _, job := range tables.RunningJobs { + if job.Done() { + logutil.BgLogger().Info("JobFinished", zap.String("table", job.tbl.Name.O)) + terror.Log(tables.FinishJob(m.ctx, se, job)) + } + } + }() + + idleScanWorkers := m.idleScanWorkers() + if len(idleScanWorkers) == 0 { + return + } + + localJobs := tables.LocalJobs() + tbls := tables.ReadyForNewJobTables(se.Sctx.GetSessionVars()) + jobInfos := tables.ReadyForResumeJobs(se.Sctx.GetSessionVars()) + for len(idleScanWorkers) > 0 && (len(tbls) > 0 || len(localJobs) > 0) { + var job *ttlJob + var err error + switch { + case len(localJobs) > 0: + job = localJobs[0] + localJobs = localJobs[1:] + case len(tbls) > 0: + tbl := tbls[0] + tbls = tbls[1:] + job, err = tables.CreateNewJob(m.ctx, se, tbl.ID) + case len(jobInfos) > 0: + jobInfo := jobInfos[0] + jobInfos = jobInfos[1:] + job, err = tables.ResumeJob(m.ctx, se, jobInfo.TableID, jobInfo.ID) + } + + if err != nil || job == nil { + terror.Log(err) + continue + } + + for len(idleScanWorkers) > 0 && !job.Done() { + task, ok := job.PollNextScanTask() + if !ok { + break + } + idleWorker := idleScanWorkers[0] + idleScanWorkers = idleScanWorkers[1:] + logutil.BgLogger().Info("ScheduleTask", zap.String("table", task.tbl.Name.O)) + idleWorker.ScheduleTask(task) + } + } +} + +func (m *JobManager) ttlTableFromMeta(schema model.CIStr, tbl *model.TableInfo) []*ttl.PhysicalTable { + if tbl.Partition == nil { + ttlTbl, err := ttl.NewPhysicalTable(schema, tbl, nil) + if err != nil { + terror.Log(err) + return nil + } + return []*ttl.PhysicalTable{ttlTbl} + } + // TODO: partition + return nil +} + +type ttlJob struct { + sync.Mutex + tbl *ttl.PhysicalTable + tasks []*scanTask + nextPoll int +} + +func newTTLJob(t *ttl.PhysicalTable) *ttlJob { + return &ttlJob{ + tbl: t, + tasks: []*scanTask{ + { + tbl: t, + expire: time.Now().Add(time.Minute), + }, + }, + } +} + +func (t *ttlJob) Done() bool { + t.Lock() + defer t.Unlock() + return t.nextPoll < 0 || t.nextPoll >= len(t.tasks) +} + +func (t *ttlJob) PollNextScanTask() (*scanTask, bool) { + t.Lock() + defer t.Unlock() + if t.nextPoll < 0 || t.nextPoll >= len(t.tasks) { + return nil, false + } + + task := t.tasks[t.nextPoll] + t.nextPoll++ + return task, true +} + +type ttlTables struct { + instanceID string + RunningJobs map[int64]*ttlJob + runningJobsList []*ttlJob + Tables *ttl.InfoSchemaTables + TablesTryUpdateTime time.Time + TablesUpdateTime time.Time + + States *ttl.TableStatesCache + StatesTryUpdateTime time.Time + StatesUpdateTime time.Time +} + +func newTTLTables(instanceID string) *ttlTables { + return &ttlTables{ + instanceID: instanceID, + RunningJobs: make(map[int64]*ttlJob), + runningJobsList: make([]*ttlJob, 0), + Tables: ttl.NewInfoSchemaTables(), + TablesTryUpdateTime: time.UnixMilli(0), + TablesUpdateTime: time.UnixMilli(0), + States: ttl.NewTableStatesCache(), + StatesTryUpdateTime: time.UnixMilli(0), + StatesUpdateTime: time.UnixMilli(0), + } +} + +func (ts *ttlTables) FullUpdate(ctx context.Context, se *ttl.Session) error { + err1 := ts.UpdateSchemaTables(se) + err2 := ts.UpdateTableStates(ctx, se) + if err1 != nil { + return err1 + } + return err2 +} + +func (ts *ttlTables) UpdateSchemaTables(se *ttl.Session) error { + ts.TablesTryUpdateTime = time.Now() + is := se.GetDomainInfoSchema() + if err := ts.Tables.Update(is); err != nil { + return err + } + ts.TablesUpdateTime = time.Now() + return nil +} + +func (ts *ttlTables) UpdateTableStates(ctx context.Context, se *ttl.Session) error { + ts.StatesTryUpdateTime = time.Now() + if err := ts.States.Update(ctx, se); err != nil { + return err + } + ts.StatesUpdateTime = time.Now() + return nil +} + +func (ts *ttlTables) CreateNewJob(ctx context.Context, se *ttl.Session, tblID int64) (*ttlJob, error) { + if _, ok := ts.RunningJobs[tblID]; ok { + return nil, errors.New("exist") + } + + tbl, ok := ts.Tables.GetByID(tblID) + if !ok { + return nil, errors.New("not exists") + } + + job := newTTLJob(tbl) + ts.RunningJobs[tblID] = job + ts.runningJobsList = append(ts.runningJobsList, job) + + return job, nil +} + +func (ts *ttlTables) ResumeJob(ctx context.Context, se *ttl.Session, tblID int64, jobID string) (*ttlJob, error) { + // TODO: + return nil, nil +} + +func (ts *ttlTables) FinishJob(ctx context.Context, se *ttl.Session, job *ttlJob) error { + idx := -1 + for i, j := range ts.runningJobsList { + if j.tbl.ID == job.tbl.ID { + idx = i + } + } + + if idx >= 0 { + ts.runningJobsList = append(ts.runningJobsList[0:idx], ts.runningJobsList[idx+1:]...) + delete(ts.RunningJobs, job.tbl.ID) + } + + return nil +} + +func (ts *ttlTables) ReadyForNewJobTables(sessVars *variable.SessionVars) []*ttl.PhysicalTable { + tbls := make([]*ttl.PhysicalTable, 0) + ts.Tables.Foreach(func(t *ttl.PhysicalTable) bool { + if _, ok := ts.RunningJobs[t.ID]; !ok { + tbls = append(tbls, t) + } + return true + }) + return tbls +} + +func (ts *ttlTables) ReadyForResumeJobs(sessVars *variable.SessionVars) []*ttl.JobInfo { + // TODO: + return nil +} + +func (ts *ttlTables) LocalJobs() []*ttlJob { + return ts.runningJobsList +} diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go new file mode 100644 index 0000000000000..7d06a281fc178 --- /dev/null +++ b/ttl/ttlworker/scan.go @@ -0,0 +1,178 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/ttl" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type taskStatics struct { + TotalRows atomic.Uint64 + SuccessRows atomic.Uint64 + ErrorRows atomic.Uint64 +} + +type scanTask struct { + tbl *ttl.PhysicalTable + expire time.Time + rangeStart []types.Datum + rangeEnd []types.Datum + statics *taskStatics +} + +type scanWorker struct { + baseWorker + task *scanTask + del chan<- *delTask + sessionPool sessionPool +} + +func newScanWorker(del chan<- *delTask, sessPool sessionPool) (w *scanWorker) { + w = &scanWorker{} + w.init(w.scanLoop) + w.del = del + w.sessionPool = sessPool + return +} + +func (w *scanWorker) Idle() bool { + w.Lock() + defer w.Unlock() + return w.task == nil +} + +func (w *scanWorker) ScheduleTask(task *scanTask) bool { + w.Lock() + defer w.Unlock() + + if w.status != workerStatusRunning { + return false + } + + if w.task != nil { + return false + } + + select { + case w.Send() <- task: + w.task = task + return true + default: + return false + } +} + +func (w *scanWorker) scanLoop() error { + se, err := getSession(w.sessionPool) + if err != nil { + return err + } + defer se.Close() + + for { + select { + case <-w.ctx.Done(): + return nil + case msg := <-w.ch: + switch task := msg.(type) { + case *scanTask: + if err = w.executeScanTask(se, task); err != nil { + terror.Log(err) + } + default: + terror.Log(errors.Errorf("Cannot handle msg with type: %T", msg)) + } + } + } +} + +func (w *scanWorker) executeScanTask(se *ttl.Session, task *scanTask) error { + defer func() { + w.Lock() + defer w.Unlock() + w.task = nil + }() + + expire := task.expire.In(se.Sctx.GetSessionVars().TimeZone) + generator, err := ttl.NewScanQueryGenerator(task.tbl, expire, task.rangeStart, task.rangeEnd) + if err != nil { + return err + } + + limit := 5 + lastResult := make([][]types.Datum, 0, limit) + + sql := "" + retryTimes := 0 + for w.Status() == workerStatusRunning { + if totalRows := w.task.statics.TotalRows.Load(); totalRows > 1000 { + errRows := w.task.statics.ErrorRows.Load() + if float64(errRows)/float64(totalRows) > 0.6 { + return errors.Errorf("failed") + } + } + + if _, err = se.ExecuteSQL(w.ctx, "set @@time_zone = @@global.time_zone"); err != nil { + return err + } + generator.SetTimeZone(se.Sctx.GetSessionVars().TimeZone) + + if sql == "" { + sql, err = generator.NextSQL(lastResult, limit) + if sql == "" || err != nil { + return err + } + } + + logutil.BgLogger().Info("TTL scan query", zap.String("sql", sql)) + rows, retryable, err := se.ExecuteSQLWithTTLCheck(w.ctx, w.task.tbl, sql) + if err != nil { + if !retryable || retryTimes >= 3 { + return err + } + retryTimes += 1 + time.Sleep(time.Second * 10) + terror.Log(err) + continue + } + + sql = "" + retryTimes = 0 + w.task.statics.TotalRows.Add(uint64(len(rows))) + deleteTask := &delTask{ + tbl: task.tbl, + expire: task.expire, + keys: rows, + statics: w.task.statics, + } + lastResult = rows + + select { + case <-w.ctx.Done(): + return w.ctx.Err() + case w.del <- deleteTask: + continue + } + } + return nil +} diff --git a/ttl/ttlworker/session.go b/ttl/ttlworker/session.go new file mode 100644 index 0000000000000..47333ded88367 --- /dev/null +++ b/ttl/ttlworker/session.go @@ -0,0 +1,64 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + + "github.com/ngaut/pools" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/ttl" + "github.com/pingcap/tidb/util/sqlexec" +) + +type sessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + +func getSession(pool sessionPool) (*ttl.Session, error) { + resource, err := pool.Get() + if err != nil { + return nil, err + } + + sctx, ok := resource.(sessionctx.Context) + if !ok { + pool.Put(resource) + return nil, errors.Errorf("%T cannot be casted to sessionctx.Context", sctx) + } + + exec, ok := resource.(sqlexec.SQLExecutor) + if !ok { + pool.Put(resource) + return nil, errors.Errorf("%T cannot be casted to sqlexec.SQLExecutor", sctx) + } + + se := &ttl.Session{ + Sctx: sctx, + Executor: exec, + CloseFn: func() { + pool.Put(resource) + }, + } + + if _, err = se.ExecuteSQL(context.Background(), "commit"); err != nil { + se.Close() + return nil, err + } + + return se, nil +} diff --git a/ttl/ttlworker/worker.go b/ttl/ttlworker/worker.go new file mode 100644 index 0000000000000..a2f59287badeb --- /dev/null +++ b/ttl/ttlworker/worker.go @@ -0,0 +1,111 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ttlworker + +import ( + "context" + "sync" +) + +type workerStatus int + +const ( + workerStatusCreated workerStatus = iota + workerStatusRunning + workerStatusStopping + workerStatusStopped +) + +type worker interface { + Start() + Stop() + Status() workerStatus + Error() error + Send() chan<- interface{} +} + +type baseWorker struct { + sync.Mutex + ctx context.Context + cancel func() + ch chan interface{} + loopFunc func() error + + err error + status workerStatus +} + +func (w *baseWorker) init(loop func() error) { + w.ctx, w.cancel = context.WithCancel(context.Background()) + w.status = workerStatusCreated + w.loopFunc = loop + w.ch = make(chan interface{}) +} + +func (w *baseWorker) Start() { + w.Lock() + defer w.Unlock() + if w.status != workerStatusCreated { + return + } + + go w.loop() + w.status = workerStatusRunning +} + +func (w *baseWorker) Stop() { + w.Lock() + defer w.Unlock() + switch w.status { + case workerStatusCreated: + w.cancel() + w.toStopped(nil) + case workerStatusRunning: + w.cancel() + w.status = workerStatusStopping + } +} + +func (w *baseWorker) Status() workerStatus { + w.Lock() + defer w.Unlock() + return w.status +} + +func (w *baseWorker) Error() error { + w.Lock() + defer w.Unlock() + return w.err +} + +func (w *baseWorker) Send() chan<- interface{} { + return w.ch +} + +func (w *baseWorker) loop() { + var err error + defer func() { + w.Lock() + defer w.Unlock() + w.toStopped(err) + }() + err = w.loopFunc() +} + +func (w *baseWorker) toStopped(err error) { + w.status = workerStatusStopped + w.err = err + close(w.ch) +}