Skip to content

Commit

Permalink
ddl: reimplement get flashback cluster related key ranges (#40460)
Browse files Browse the repository at this point in the history
close #40318, close #40477
  • Loading branch information
Defined2014 committed Jan 13, 2023
1 parent b1ecabb commit 338fd30
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 119 deletions.
164 changes: 101 additions & 63 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -69,6 +68,7 @@ const (
startTSOffset
commitTSOffset
ttlJobEnableOffSet
keyRangesOffset
)

func closePDSchedule() error {
Expand Down Expand Up @@ -172,6 +172,20 @@ func isFlashbackSupportedDDLAction(action model.ActionType) bool {
}
}

func checkSystemSchemaID(t *meta.Meta, schemaID int64, flashbackTSString string) error {
if schemaID <= 0 {
return nil
}
DBInfo, err := t.GetDatabase(schemaID)
if err != nil || DBInfo == nil {
return errors.Trace(err)
}
if filter.IsSystemSchema(DBInfo.Name.L) {
return errors.Errorf("Detected modified system table during [%s, now), can't do flashback", flashbackTSString)
}
return nil
}

func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
Expand Down Expand Up @@ -237,9 +251,16 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
if err != nil {
return errors.Trace(err)
}
if diff != nil && !isFlashbackSupportedDDLAction(diff.Type) {
if diff == nil {
continue
}
if !isFlashbackSupportedDDLAction(diff.Type) {
return errors.Errorf("Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", diff.Type.String(), flashbackTSString)
}
err = checkSystemSchemaID(flashbackSnapshotMeta, diff.SchemaID, flashbackTSString)
if err != nil {
return errors.Trace(err)
}
}

jobs, err := GetAllDDLJobs(sess, t)
Expand All @@ -260,81 +281,100 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
return nil
}

type flashbackID struct {
id int64
excluded bool
func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []int64) []int64 {
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") && tableName != "gc_delete_range" {
flashbackIDs = append(flashbackIDs, tableID)
}
return flashbackIDs
}

func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []flashbackID) []flashbackID {
var excluded bool
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") && tableName != "gc_delete_range" {
excluded = true
// GetTableDataKeyRanges get keyRanges by `flashbackIDs`.
// This func will return all flashback table data key ranges.
func GetTableDataKeyRanges(nonFlashbackTableIDs []int64) []kv.KeyRange {
var keyRanges []kv.KeyRange

nonFlashbackTableIDs = append(nonFlashbackTableIDs, -1)

slices.SortFunc(nonFlashbackTableIDs, func(a, b int64) bool {
return a < b
})

for i := 1; i < len(nonFlashbackTableIDs); i++ {
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: tablecodec.EncodeTablePrefix(nonFlashbackTableIDs[i-1] + 1),
EndKey: tablecodec.EncodeTablePrefix(nonFlashbackTableIDs[i]),
})
}
flashbackIDs = append(flashbackIDs, flashbackID{
id: tableID,
excluded: excluded,

// Add all other key ranges.
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: tablecodec.EncodeTablePrefix(nonFlashbackTableIDs[len(nonFlashbackTableIDs)-1] + 1),
EndKey: tablecodec.EncodeTablePrefix(meta.MaxGlobalID),
})
return flashbackIDs

return keyRanges
}

// GetFlashbackKeyRanges make keyRanges efficiently for flashback cluster when many tables in cluster,
// GetFlashbackKeyRanges get keyRanges for flashback cluster.
// It contains all non system table key ranges and meta data key ranges.
// The time complexity is O(nlogn).
func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
func GetFlashbackKeyRanges(sess sessionctx.Context, flashbackTS uint64) ([]kv.KeyRange, error) {
schemas := sess.GetDomainInfoSchema().(infoschema.InfoSchema).AllSchemas()

// The semantic of keyRanges(output).
var keyRanges []kv.KeyRange
keyRanges := make([]kv.KeyRange, 0)

var flashbackIDs []flashbackID
for _, db := range schemas {
for _, table := range db.Tables {
if !table.IsBaseTable() || table.ID > meta.MaxGlobalID {
continue
}
flashbackIDs = addToSlice(db.Name.L, table.Name.L, table.ID, flashbackIDs)
if table.Partition != nil {
for _, partition := range table.Partition.Definitions {
flashbackIDs = addToSlice(db.Name.L, table.Name.L, partition.ID, flashbackIDs)
}
}
}
// get snapshot schema IDs.
flashbackSnapshotMeta := meta.NewSnapshotMeta(sess.GetStore().GetSnapshot(kv.NewVersion(flashbackTS)))
snapshotSchemas, err := flashbackSnapshotMeta.ListDatabases()
if err != nil {
return nil, errors.Trace(err)
}

slices.SortFunc(flashbackIDs, func(a, b flashbackID) bool {
return a.id < b.id
})

lastExcludeIdx := -1
for i, id := range flashbackIDs {
if id.excluded {
// Found a range [lastExcludeIdx, i) needs to be added.
if i > lastExcludeIdx+1 {
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: tablecodec.EncodeTablePrefix(flashbackIDs[lastExcludeIdx+1].id),
EndKey: tablecodec.EncodeTablePrefix(flashbackIDs[i-1].id + 1),
})
}
lastExcludeIdx = i
schemaIDs := make(map[int64]struct{})
for _, schema := range schemas {
if !filter.IsSystemSchema(schema.Name.L) {
schemaIDs[schema.ID] = struct{}{}
}
}
for _, schema := range snapshotSchemas {
if !filter.IsSystemSchema(schema.Name.L) {
schemaIDs[schema.ID] = struct{}{}
}
}

// The last part needs to be added.
if lastExcludeIdx < len(flashbackIDs)-1 {
// The meta data key ranges.
for schemaID := range schemaIDs {
metaStartKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID))
metaEndKey := tablecodec.EncodeMetaKeyPrefix(meta.DBkey(schemaID + 1))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: tablecodec.EncodeTablePrefix(flashbackIDs[lastExcludeIdx+1].id),
EndKey: tablecodec.EncodeTablePrefix(flashbackIDs[len(flashbackIDs)-1].id + 1),
StartKey: metaStartKey,
EndKey: metaEndKey,
})
}

// The meta data key ranges.
metaStartKey := tablecodec.EncodeMetaKey(meta.DBkey(0), meta.TableKey(0))
metaEndKey := tablecodec.EncodeMetaKey(meta.DBkey(math.MaxInt64), meta.TableKey(math.MaxInt64))
startKey := tablecodec.EncodeMetaKeyPrefix([]byte("DBs"))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
StartKey: startKey,
EndKey: startKey.PrefixNext(),
})

return keyRanges, nil
var nonFlashbackTableIDs []int64
for _, db := range schemas {
for _, table := range db.Tables {
if !table.IsBaseTable() || table.ID > meta.MaxGlobalID {
continue
}
nonFlashbackTableIDs = addToSlice(db.Name.L, table.Name.L, table.ID, nonFlashbackTableIDs)
if table.Partition != nil {
for _, partition := range table.Partition.Definitions {
nonFlashbackTableIDs = addToSlice(db.Name.L, table.Name.L, partition.ID, nonFlashbackTableIDs)
}
}
}
}

return append(keyRanges, GetTableDataKeyRanges(nonFlashbackTableIDs)...), nil
}

// SendPrepareFlashbackToVersionRPC prepares regions for flashback, the purpose is to put region into flashback state which region stop write
Expand Down Expand Up @@ -571,7 +611,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabledValue bool
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
var keyRanges []kv.KeyRange
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue, &keyRanges); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -632,6 +673,11 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, errors.Trace(err)
}
job.Args[startTSOffset] = startTS
keyRanges, err = GetFlashbackKeyRanges(sess, flashbackTS)
if err != nil {
return ver, errors.Trace(err)
}
job.Args[keyRangesOffset] = keyRanges
job.SchemaState = model.StateWriteOnly
return ver, nil
// Stage 3, get key ranges and get locks.
Expand All @@ -641,10 +687,6 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.SchemaState = model.StateWriteReorganization
return updateSchemaVersion(d, t, job)
}
keyRanges, err := GetFlashbackKeyRanges(sess)
if err != nil {
return ver, errors.Trace(err)
}
// Split region by keyRanges, make sure no unrelated key ranges be locked.
splitRegionsByKeyRanges(d, keyRanges)
totalRegions.Store(0)
Expand Down Expand Up @@ -678,10 +720,6 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.SchemaState = model.StatePublic
return ver, nil
}
keyRanges, err := GetFlashbackKeyRanges(sess)
if err != nil {
return ver, errors.Trace(err)
}

for _, r := range keyRanges {
if err = flashbackToVersion(d.ctx, d,
Expand Down
80 changes: 28 additions & 52 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,67 +24,43 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/dbterror"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func TestGetFlashbackKeyRanges(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
se, err := session.CreateSession4Test(store)
require.NoError(t, err)

kvRanges, err := ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
// The results are 8 key ranges
// 0: (stats_meta,stats_histograms,stats_buckets, gc_delete_range)
// 1: (stats_feedback)
// 2: (stats_top_n)
// 3: (stats_extended)
// 4: (stats_fm_sketch)
// 5: (stats_history, stats_meta_history)
// 6: (stats_table_locked)
// 7: meta Ranges
require.Len(t, kvRanges, 8)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE employees (" +
" id INT NOT NULL," +
" store_id INT NOT NULL" +
") PARTITION BY RANGE (store_id) (" +
" PARTITION p0 VALUES LESS THAN (6)," +
" PARTITION p1 VALUES LESS THAN (11)," +
" PARTITION p2 VALUES LESS THAN (16)," +
" PARTITION p3 VALUES LESS THAN (21)" +
");")
tk.MustExec("truncate table mysql.analyze_jobs")

// truncate all `stats_` and `gc_delete_range` tables, make table ID consecutive.
tk.MustExec("truncate table mysql.stats_meta")
tk.MustExec("truncate table mysql.stats_histograms")
tk.MustExec("truncate table mysql.stats_buckets")
tk.MustExec("truncate table mysql.stats_feedback")
tk.MustExec("truncate table mysql.stats_top_n")
tk.MustExec("truncate table mysql.stats_extended")
tk.MustExec("truncate table mysql.stats_fm_sketch")
tk.MustExec("truncate table mysql.stats_history")
tk.MustExec("truncate table mysql.stats_meta_history")
tk.MustExec("truncate table mysql.stats_table_locked")
tk.MustExec("truncate table mysql.gc_delete_range")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 3)

tk.MustExec("truncate table test.employees")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 2)
func TestGetTableDataKeyRanges(t *testing.T) {
// case 1, empty flashbackIDs
keyRanges := ddl.GetTableDataKeyRanges([]int64{})
require.Len(t, keyRanges, 1)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 2, insert a execluded table ID
keyRanges = ddl.GetTableDataKeyRanges([]int64{3})
require.Len(t, keyRanges, 2)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))

// case 3, insert some execluded table ID
keyRanges = ddl.GetTableDataKeyRanges([]int64{3, 5, 9})
require.Len(t, keyRanges, 4)
require.Equal(t, keyRanges[0].StartKey, tablecodec.EncodeTablePrefix(0))
require.Equal(t, keyRanges[0].EndKey, tablecodec.EncodeTablePrefix(3))
require.Equal(t, keyRanges[1].StartKey, tablecodec.EncodeTablePrefix(4))
require.Equal(t, keyRanges[1].EndKey, tablecodec.EncodeTablePrefix(5))
require.Equal(t, keyRanges[2].StartKey, tablecodec.EncodeTablePrefix(6))
require.Equal(t, keyRanges[2].EndKey, tablecodec.EncodeTablePrefix(9))
require.Equal(t, keyRanges[3].StartKey, tablecodec.EncodeTablePrefix(10))
require.Equal(t, keyRanges[3].EndKey, tablecodec.EncodeTablePrefix(meta.MaxGlobalID))
}

func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2753,7 +2753,8 @@ func (d *ddl) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error
0, /* totalRegions */
0, /* startTS */
0, /* commitTS */
variable.On /* tidb_ttl_job_enable */},
variable.On, /* tidb_ttl_job_enable */
[]kv.KeyRange{} /* flashback key_ranges */},
}
err = d.DoDDLJob(ctx, job)
err = d.callHookOnChanged(job, err)
Expand Down
11 changes: 9 additions & 2 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,18 @@ func TestRecoverClusterMeetError(t *testing.T) {
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), errno.ErrPrivilegeCheckFail)
tk.MustExec("drop user 'testflashback'@'localhost';")

// update tidb_server_version
// detect modify system table
nowTS, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec("truncate table mysql.stats_meta")
errorMsg := fmt.Sprintf("[ddl:-1]Detected modified system table during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(nowTS)), errorMsg)

// update tidb_server_version
nowTS, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec("update mysql.tidb set VARIABLE_VALUE=VARIABLE_VALUE+1 where VARIABLE_NAME='tidb_server_version'")
errorMsg := fmt.Sprintf("[ddl:-1]Detected TiDB upgrade during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String())
errorMsg = fmt.Sprintf("[ddl:-1]Detected TiDB upgrade during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(nowTS)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
Expand Down
9 changes: 9 additions & 0 deletions tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ func EncodeMetaKey(key []byte, field []byte) kv.Key {
return ek
}

// EncodeMetaKeyPrefix encodes the key prefix into meta key
func EncodeMetaKeyPrefix(key []byte) kv.Key {
ek := make([]byte, 0, len(metaPrefix)+codec.EncodedBytesLength(len(key))+8)
ek = append(ek, metaPrefix...)
ek = codec.EncodeBytes(ek, key)
ek = codec.EncodeUint(ek, uint64(structure.HashData))
return ek
}

// DecodeMetaKey decodes the key and get the meta key and meta field.
func DecodeMetaKey(ek kv.Key) (key []byte, field []byte, err error) {
var tp uint64
Expand Down
Loading

0 comments on commit 338fd30

Please sign in to comment.