Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flashback: make br backup full works during flashback cluster #40950

Merged
merged 24 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_library(
"//distsql",
"//kv",
"//meta",
"//meta/autoid",
"//parser/model",
"//statistics/handle",
"//util",
Expand Down
16 changes: 6 additions & 10 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -561,24 +560,21 @@ func BuildBackupRangeAndSchema(
zap.String("table", tableInfo.Name.O),
)

tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)
autoIDAccess := m.GetAutoIDAccessors(dbInfo.ID, tableInfo.ID)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
globalAutoID, err = seqAlloc.NextGlobalAutoID()
globalAutoID, err = autoIDAccess.SequenceCycle().Get()
case tableInfo.IsView() || !utils.NeedAutoID(tableInfo):
// no auto ID for views or table without either rowID nor auto_increment ID.
default:
globalAutoID, err = idAlloc.NextGlobalAutoID()
globalAutoID, err = autoIDAccess.RowID().Get()
}
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoIncID = globalAutoID
tableInfo.AutoIncID = globalAutoID + 1
if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
Expand All @@ -591,11 +587,11 @@ func BuildBackupRangeAndSchema(
if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
globalAutoRandID, err = autoIDAccess.RandomID().Get()
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoRandID = globalAutoRandID
tableInfo.AutoRandID = globalAutoRandID + 1
logger.Debug("change table AutoRandID",
zap.Int64("AutoRandID", globalAutoRandID))
}
Expand Down
12 changes: 6 additions & 6 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ func splitRegionsByKeyRanges(d *ddlCtx, keyRanges []kv.KeyRange) {

// A Flashback has 4 different stages.
// 1. before lock flashbackClusterJobID, check clusterJobID and lock it.
// 2. before flashback start, check timestamp, disable GC and close PD schedule.
// 3. phase 1, get key ranges, lock all regions.
// 2. before flashback start, check timestamp, disable GC and close PD schedule, get flashback key ranges.
// 3. phase 1, lock flashback key ranges.
// 4. phase 2, send flashback RPC, do flashback jobs.
func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
inFlashbackTest := false
Expand Down Expand Up @@ -692,7 +692,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue
job.SchemaState = model.StateDeleteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
// Stage 2, check flashbackTS, close GC and PD schedule, get flashback key ranges.
case model.StateDeleteOnly:
if err = checkAndSetFlashbackClusterInfo(sess, d, t, job, flashbackTS); err != nil {
job.State = model.JobStateCancelled
Expand All @@ -711,8 +711,8 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
job.Args[keyRangesOffset] = keyRanges
job.SchemaState = model.StateWriteOnly
return ver, nil
// Stage 3, get key ranges and get locks.
return updateSchemaVersion(d, t, job)
// Stage 3, lock related key ranges.
case model.StateWriteOnly:
// TODO: Support flashback in unistore.
if inFlashbackTest {
Expand Down Expand Up @@ -742,7 +742,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
job.Args[commitTSOffset] = commitTS
job.SchemaState = model.StateWriteReorganization
return updateSchemaVersion(d, t, job)
return ver, nil
// Stage 4, get key ranges and send flashback RPC.
case model.StateWriteReorganization:
// TODO: Support flashback in unistore.
Expand Down
3 changes: 2 additions & 1 deletion ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func TestAddDDLDuringFlashback(t *testing.T) {
hook.OnJobRunBeforeExported = func(job *model.Job) {
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
if job.SchemaState == model.StateWriteOnly {
_, err := tk.Exec("alter table t add column b int")
tk1 := testkit.NewTestKit(t, store)
_, err := tk1.Exec("alter table test.t add column b int")
assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job")
}
}
Expand Down
24 changes: 22 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,18 @@ func (do *Domain) GetScope(status string) variable.ScopeFlag {
return variable.DefaultStatusVarScopeFlag
}

func getFlashbackStartTSFromErrorMsg(err error) uint64 {
slices := strings.Split(err.Error(), "is in flashback progress, FlashbackStartTS is ")
if len(slices) != 2 {
return 0
}
version, err := strconv.ParseUint(slices[1], 10, 0)
if err != nil {
return 0
}
return version
}

// Reload reloads InfoSchema.
// It's public in order to do the test.
func (do *Domain) Reload() error {
Expand All @@ -490,7 +502,15 @@ func (do *Domain) Reload() error {
return err
}

is, hitCache, oldSchemaVersion, changes, err := do.loadInfoSchema(ver.Ver)
version := ver.Ver
is, hitCache, oldSchemaVersion, changes, err := do.loadInfoSchema(version)
if err != nil {
if version = getFlashbackStartTSFromErrorMsg(err); version != 0 {
// use the lastest available version to create domain
version -= 1
is, hitCache, oldSchemaVersion, changes, err = do.loadInfoSchema(version)
}
}
xhebox marked this conversation as resolved.
Show resolved Hide resolved
metrics.LoadSchemaDuration.Observe(time.Since(startTime).Seconds())
if err != nil {
metrics.LoadSchemaCounter.WithLabelValues("failed").Inc()
Expand Down Expand Up @@ -519,7 +539,7 @@ func (do *Domain) Reload() error {
}

// lease renew, so it must be executed despite it is cache or not
do.SchemaValidator.Update(ver.Ver, oldSchemaVersion, is.SchemaMetaVersion(), changes)
do.SchemaValidator.Update(version, oldSchemaVersion, is.SchemaMetaVersion(), changes)
lease := do.DDL().GetLease()
sub := time.Since(startTime)
// Reload interval is lease / 2, if load schema time elapses more than this interval,
Expand Down
4 changes: 3 additions & 1 deletion domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha
s.do.Store().GetMemCache().Delete(tblIDs[idx])
}
if ac == uint64(model.ActionFlashbackCluster) {
s.do.InfoSyncer().GetSessionManager().KillNonFlashbackClusterConn()
if s.do != nil && s.do.InfoSyncer() != nil && s.do.InfoSyncer().GetSessionManager() != nil {
s.do.InfoSyncer().GetSessionManager().KillNonFlashbackClusterConn()
xhebox marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
logutil.BgLogger().Debug("update schema validator", zap.Int64("oldVer", oldVer),
Expand Down
4 changes: 4 additions & 0 deletions tests/realtikvtest/flashbacktest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ go_test(
flaky = True,
race = "on",
deps = [
"//ddl",
"//ddl/util",
"//domain",
"//errno",
"//meta",
"//parser/model",
"//testkit",
"//testkit/testsetup",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
Expand Down
73 changes: 73 additions & 0 deletions tests/realtikvtest/flashbacktest/flashback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ package flashbacktest
import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
tikvutil "github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -524,3 +529,71 @@ func TestFlashbackTmpTable(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
}

func TestFlashbackInProcessErrorMsg(t *testing.T) {
if *realtikvtest.WithRealTiKV {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)

originHook := dom.DDL().GetHook()

tk := testkit.NewTestKit(t, store)
timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()

tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")

time.Sleep(1 * time.Second)
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

// do some ddl and dml
tk.MustExec("alter table t add index k(a)")
tk.MustExec("insert into t values (1), (2), (3)")

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

hook := newTestCallBack(t, dom)
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionFlashbackCluster && job.SchemaState == model.StateWriteReorganization {
txn, err := store.Begin()
assert.NoError(t, err)
_, err = meta.NewMeta(txn).ListDatabases()
errorMsg := err.Error()
assert.Contains(t, errorMsg, "is in flashback progress, FlashbackStartTS is ")
slices := strings.Split(errorMsg, "is in flashback progress, FlashbackStartTS is ")
assert.Equal(t, len(slices), 2)
assert.NotEqual(t, slices[1], "0")
txn.Rollback()
}
}
dom.DDL().SetHook(hook)
tk.Exec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
dom.DDL().SetHook(originHook)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/injectSafeTS"))
}
}

type testCallback struct {
ddl.Callback
OnJobRunBeforeExported func(job *model.Job)
}

func newTestCallBack(t *testing.T, dom *domain.Domain) *testCallback {
defHookFactory, err := ddl.GetCustomizedHook("default_hook")
require.NoError(t, err)
return &testCallback{
Callback: defHookFactory(dom),
}
}

func (c *testCallback) OnJobRunBefore(job *model.Job) {
if c.OnJobRunBeforeExported != nil {
c.OnJobRunBeforeExported(job)
}
}