Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Support flashback cluster with ddl history #40209

Merged
merged 12 commits into from
Dec 29, 2022
60 changes: 54 additions & 6 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -147,6 +148,17 @@ func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
return val, nil
}

func isFlashbackSupportedDDLAction(action model.ActionType) bool {
switch action {
case model.ActionSetTiFlashReplica, model.ActionUpdateTiFlashReplicaStatus, model.ActionAlterPlacementPolicy,
model.ActionAlterTablePlacement, model.ActionAlterTablePartitionPlacement, model.ActionCreatePlacementPolicy,
model.ActionDropPlacementPolicy, model.ActionModifySchemaDefaultPlacement:
return false
default:
return true
}
}

func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
Expand All @@ -170,19 +182,47 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
return errors.Trace(err)
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
flashbackSnapshotMeta := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS)))
flashbackSchemaVersion, err := flashbackSnapshotMeta.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
flashbackTSString := oracle.GetTimeFromTS(flashbackTS).String()

// Check if there is an upgrade during [flashbackTS, now)
sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString)
rows, err := newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
xhebox marked this conversation as resolved.
Show resolved Hide resolved
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback")
}
sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0))
rows, err = newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
if err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.Errorf("Detected TiDB upgrade during [%s, now), can't do flashback", flashbackTSString)
}

// Check is there a DDL task at flashbackTS.
sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString)
rows, err = newSession(sess).execute(d.ctx, sql, "check_history_job")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history ddl jobs failed, can't do flashback")
}
if rows[0].GetInt64(0) != 0 {
return errors.Errorf("Detected another DDL job at %s, can't do flashback", flashbackTSString)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we should check all schema diffs during [flashbackTs, now).
for i := flashbackSchemaVersion + 1; i <= nowSchemaVersion; i++ {
diff, err := t.GetSchemaDiff(i)
if err != nil {
return errors.Trace(err)
}
if diff != nil && diff.Type != model.ActionFlashbackCluster {
return errors.Errorf("Detected schema change due to another DDL job during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS))
if diff != nil && !isFlashbackSupportedDDLAction(diff.Type) {
return errors.Errorf("Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", diff.Type.String(), flashbackTSString)
}
}

Expand Down Expand Up @@ -211,7 +251,7 @@ type flashbackID struct {

func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []flashbackID) []flashbackID {
var excluded bool
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") {
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") && tableName != "gc_delete_range" {
bb7133 marked this conversation as resolved.
Show resolved Hide resolved
excluded = true
}
flashbackIDs = append(flashbackIDs, flashbackID{
Expand Down Expand Up @@ -270,6 +310,14 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
})
}

// The meta data key ranges.
metaStartKey := tablecodec.EncodeMetaKey(meta.DBkey(0), meta.TableKey(0))
metaEndKey := tablecodec.EncodeMetaKey(meta.DBkey(math.MaxInt64), meta.TableKey(math.MaxInt64))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
})

return keyRanges, nil
}

Expand Down Expand Up @@ -633,7 +681,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
return updateSchemaVersion(d, t, job)
}
return ver, nil
}
Expand Down
14 changes: 8 additions & 6 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ func TestGetFlashbackKeyRanges(t *testing.T) {

kvRanges, err := ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
// The results are 6 key ranges
// 0: (stats_meta,stats_histograms,stats_buckets)
// The results are 8 key ranges
// 0: (stats_meta,stats_histograms,stats_buckets, gc_delete_range)
// 1: (stats_feedback)
// 2: (stats_top_n)
// 3: (stats_extended)
// 4: (stats_fm_sketch)
// 5: (stats_history, stats_meta_history)
// 6: (stats_table_locked)
require.Len(t, kvRanges, 7)
// 7: meta Ranges
require.Len(t, kvRanges, 8)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE employees (" +
Expand All @@ -64,7 +65,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
");")
tk.MustExec("truncate table mysql.analyze_jobs")

// truncate all `stats_` tables, make table ID consecutive.
// truncate all `stats_` and `gc_delete_range` tables, make table ID consecutive.
tk.MustExec("truncate table mysql.stats_meta")
tk.MustExec("truncate table mysql.stats_histograms")
tk.MustExec("truncate table mysql.stats_buckets")
Expand All @@ -75,14 +76,15 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
tk.MustExec("truncate table mysql.stats_history")
tk.MustExec("truncate table mysql.stats_meta_history")
tk.MustExec("truncate table mysql.stats_table_locked")
tk.MustExec("truncate table mysql.gc_delete_range")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 2)
require.Len(t, kvRanges, 3)

tk.MustExec("truncate table test.employees")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 1)
require.Len(t, kvRanges, 2)
}

func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,11 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
OldTableID: recoverTabsInfo[i].TableInfo.ID,
}
}
case model.ActionFlashbackCluster:
diff.TableID = -1
if job.SchemaState == model.StatePublic {
diff.RegenerateSchemaMap = true
}
default:
diff.TableID = job.TableID
}
Expand Down
1 change: 1 addition & 0 deletions ddl/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
Expand Down
39 changes: 39 additions & 0 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -438,6 +439,44 @@ func TestTiFlashDropPartition(t *testing.T) {
CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash")
}

func TestTiFlashFlashbackCluster(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values (1), (2), (3)")

ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

tk.MustExec("alter table t set tiflash replica 1")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable)
CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "t")

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s")
defer func() {
ChangeGCSafePoint(tk, time.Now(), "true", "10m0s")
}()

errorMsg := fmt.Sprintf("[ddl:-1]Detected unsupported DDL job type(%s) during [%s, now), can't do flashback",
model.ActionSetTiFlashReplica.String(), oracle.GetTimeFromTS(ts).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) {
tb, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table))
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 1. Not first time bootstrap loading, which needs a full load.
// 2. It is newer than the current one, so it will be "the current one" after this function call.
// 3. There are less 100 diffs.
// 4. No regenrated schema diff.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
Expand Down Expand Up @@ -347,6 +348,9 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
if err != nil {
return nil, nil, err
}
if diff.RegenerateSchemaMap {
return nil, nil, errors.Errorf("Meets a schema diff with RegenerateSchemaMap flag")
}
if canSkipSchemaCheckerDDL(diff.Type) {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64
affected := false
for i, tblID := range item.relatedIDs {
for _, relatedTblID := range tableIDs {
if tblID == relatedTblID {
if tblID == relatedTblID || relatedTblID == -1 {
// if actionType >= 64, the value of left shift equals 0, and it will not impact amend txn
changedTblMap[tblID] |= 1 << item.relatedActions[i]
affected = true
Expand Down
10 changes: 6 additions & 4 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,12 @@ func TestRecoverClusterMeetError(t *testing.T) {
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), errno.ErrPrivilegeCheckFail)
tk.MustExec("drop user 'testflashback'@'localhost';")

// Flashback failed because of ddl history.
tk.MustExec("use test;")
tk.MustExec("create table t(a int);")
tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback")
// update tidb_server_version
nowTS, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec("update mysql.tidb set VARIABLE_VALUE=VARIABLE_VALUE+1 where VARIABLE_NAME='tidb_server_version'")
errorMsg := fmt.Sprintf("[ddl:-1]Detected TiDB upgrade during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(nowTS)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
Expand Down
2 changes: 2 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ type SchemaDiff struct {
OldTableID int64 `json:"old_table_id"`
// OldSchemaID is the schema ID before rename table, only used by rename table DDL.
OldSchemaID int64 `json:"old_schema_id"`
// RegenerateSchemaMap means whether to rebuild the schema map when applying to the schema diff.
RegenerateSchemaMap bool `json:"regenerate_schema_map"`
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved

AffectedOpts []*AffectedOption `json:"affected_options"`
}
Expand Down
6 changes: 0 additions & 6 deletions tests/realtikvtest/brietest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,20 @@ go_test(
srcs = [
"backup_restore_test.go",
"binlog_test.go",
"flashback_test.go",
"main_test.go",
],
flaky = True,
race = "on",
deps = [
"//config",
"//ddl/util",
"//parser/model",
"//parser/mysql",
"//sessionctx/binloginfo",
"//store/mockstore/mockcopr",
"//testkit",
"//testkit/testsetup",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_goleak//:goleak",
],
Expand Down
Loading