Skip to content

Commit

Permalink
session: use mDDLTableVersion key to control backfill tables (#40984)
Browse files Browse the repository at this point in the history
close #40980
  • Loading branch information
Defined2014 authored Feb 3, 2023
1 parent 3412b5d commit b91e2d9
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 54 deletions.
59 changes: 35 additions & 24 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,25 @@ var (
ErrInvalidString = dbterror.ClassMeta.NewStd(errno.ErrInvalidCharacterString)
)

// DDLTableVersion is to display ddl related table versions
type DDLTableVersion int

const (
// InitDDLTableVersion is the original version.
InitDDLTableVersion DDLTableVersion = 0
// BaseDDLTableVersion is for support concurrent DDL, it added tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history.
BaseDDLTableVersion DDLTableVersion = 1
// MDLTableVersion is for support MDL tables.
MDLTableVersion DDLTableVersion = 2
// BackfillTableVersion is for support distributed reorg stage, it added tidb_ddl_backfill, tidb_ddl_backfill_history.
BackfillTableVersion DDLTableVersion = 3
)

// Bytes returns the byte slice.
func (ver DDLTableVersion) Bytes() []byte {
return []byte(strconv.Itoa(int(ver)))
}

// Meta is for handling meta information in a transaction.
type Meta struct {
txn *structure.TxStructure
Expand Down Expand Up @@ -619,15 +638,25 @@ func (m *Meta) CreateTableOrView(dbID int64, tableInfo *model.TableInfo) error {
}

// SetDDLTables write a key into storage.
func (m *Meta) SetDDLTables() error {
err := m.txn.Set(mDDLTableVersion, []byte("1"))
func (m *Meta) SetDDLTables(ddlTableVersion DDLTableVersion) error {
err := m.txn.Set(mDDLTableVersion, ddlTableVersion.Bytes())
return errors.Trace(err)
}

// SetMDLTables write a key into storage.
func (m *Meta) SetMDLTables() error {
err := m.txn.Set(mDDLTableVersion, []byte("2"))
return errors.Trace(err)
// CheckDDLTableVersion check if the tables related to concurrent DDL exists.
func (m *Meta) CheckDDLTableVersion() (DDLTableVersion, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return -1, errors.Trace(err)
}
if string(v) == "" {
return InitDDLTableVersion, nil
}
ver, err := strconv.Atoi(string(v))
if err != nil {
return -1, errors.Trace(err)
}
return DDLTableVersion(ver), nil
}

// CreateMySQLDatabaseIfNotExists creates mysql schema and return its DB ID.
Expand Down Expand Up @@ -666,24 +695,6 @@ func (m *Meta) GetSystemDBID() (int64, error) {
return 0, nil
}

// CheckDDLTableExists check if the tables related to concurrent DDL exists.
func (m *Meta) CheckDDLTableExists() (bool, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return false, errors.Trace(err)
}
return len(v) != 0, nil
}

// CheckMDLTableExists check if the tables related to concurrent DDL exists.
func (m *Meta) CheckMDLTableExists() (bool, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return false, errors.Trace(err)
}
return bytes.Equal(v, []byte("2")), nil
}

// SetMetadataLock sets the metadata lock.
func (m *Meta) SetMetadataLock(b bool) error {
var data []byte
Expand Down
37 changes: 36 additions & 1 deletion session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ func TestBootstrapWithError(t *testing.T) {
se.txn.init()
se.mu.values = make(map[fmt.Stringer]interface{})
se.SetValue(sessionctx.Initing, true)
err := InitDDLJobTables(store)
err := InitDDLJobTables(store, meta.BaseDDLTableVersion)
require.NoError(t, err)
err = InitMDLTable(store)
require.NoError(t, err)
err = InitDDLJobTables(store, meta.BackfillTableVersion)
require.NoError(t, err)
dom, err := domap.Get(store)
require.NoError(t, err)
domain.BindDomain(se, dom)
Expand Down Expand Up @@ -215,10 +217,43 @@ func TestBootstrapWithError(t *testing.T) {
require.Equal(t, []byte("True"), row.GetBytes(0))
require.NoError(t, r.Close())

mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill")
mustExec(t, se, "SELECT * from mysql.tidb_ddl_backfill_history")

// Check tidb_ttl_table_status table
mustExec(t, se, "SELECT * from mysql.tidb_ttl_table_status")
}

func TestDDLTableCreateBackfillTable(t *testing.T) {
store, dom := createStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()
se := createSessionAndSetID(t, store)

txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMeta(txn)
ver, err := m.CheckDDLTableVersion()
require.NoError(t, err)
require.GreaterOrEqual(t, ver, meta.BackfillTableVersion)

// downgrade `mDDLTableVersion`
m.SetDDLTables(meta.MDLTableVersion)
mustExec(t, se, "drop table mysql.tidb_ddl_backfill")
mustExec(t, se, "drop table mysql.tidb_ddl_backfill_history")
err = txn.Commit(context.Background())
require.NoError(t, err)

// to upgrade session for create ddl related tables
dom.Close()
dom, err = BootstrapSession(store)
require.NoError(t, err)

se = createSessionAndSetID(t, store)
mustExec(t, se, "select * from mysql.tidb_ddl_backfill")
mustExec(t, se, "select * from mysql.tidb_ddl_backfill_history")
dom.Close()
}

// TestUpgrade tests upgrading
func TestUpgrade(t *testing.T) {
ctx := context.Background()
Expand Down
44 changes: 18 additions & 26 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3103,41 +3103,29 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) {
}
}

// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg, tidb_ddl_history, tidb_ddl_backfill and tidb_ddl_backfill_history.
func InitDDLJobTables(store kv.Storage) error {
// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history.
func InitDDLJobTables(store kv.Storage, targetVer meta.DDLTableVersion) error {
targetTables := DDLJobTables
if targetVer == meta.BackfillTableVersion {
targetTables = BackfillTables
}
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
exists, err := t.CheckDDLTableExists()
if err != nil {
tableVer, err := t.CheckDDLTableVersion()
if err != nil || tableVer >= targetVer {
return errors.Trace(err)
}
dbID, err := t.CreateMySQLDatabaseIfNotExists()
if err != nil {
return err
}
if exists {
return initBackfillJobTables(store, t, dbID)
}

if err = createAndSplitTables(store, t, dbID, DDLJobTables); err != nil {
return err
}
if err = initBackfillJobTables(store, t, dbID); err != nil {
if err = createAndSplitTables(store, t, dbID, targetTables); err != nil {
return err
}
return t.SetDDLTables()
return t.SetDDLTables(targetVer)
})
}

// initBackfillJobTables is to create tidb_ddl_backfill and tidb_ddl_backfill_history.
func initBackfillJobTables(store kv.Storage, t *meta.Meta, dbID int64) error {
tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id)
if err != nil || tblExist {
return errors.Trace(err)
}
return createAndSplitTables(store, t, dbID, BackfillTables)
}

func createAndSplitTables(store kv.Storage, t *meta.Meta, dbID int64, tables []tableBasicInfo) error {
tableIDs := make([]int64, 0, len(tables))
for _, tbl := range tables {
Expand Down Expand Up @@ -3169,8 +3157,8 @@ func createAndSplitTables(store kv.Storage, t *meta.Meta, dbID int64, tables []t
func InitMDLTable(store kv.Storage) error {
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
exists, err := t.CheckMDLTableExists()
if err != nil || exists {
ver, err := t.CheckDDLTableVersion()
if err != nil || ver >= meta.MDLTableVersion {
return errors.Trace(err)
}
dbID, err := t.CreateMySQLDatabaseIfNotExists()
Expand All @@ -3195,7 +3183,7 @@ func InitMDLTable(store kv.Storage) error {
return errors.Trace(err)
}

return t.SetMDLTables()
return t.SetDDLTables(meta.MDLTableVersion)
})
}

Expand Down Expand Up @@ -3272,14 +3260,18 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}
}
err := InitDDLJobTables(store)
err := InitDDLJobTables(store, meta.BaseDDLTableVersion)
if err != nil {
return nil, err
}
err = InitMDLTable(store)
if err != nil {
return nil, err
}
err = InitDDLJobTables(store, meta.BackfillTableVersion)
if err != nil {
return nil, err
}
ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand Down
12 changes: 9 additions & 3 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ func TestInitMetaTable(t *testing.T) {
tk.MustExec(sql.SQL)
}

for _, sql := range session.BackfillTables {
tk.MustExec(sql.SQL)
}

tbls := map[string]struct{}{
"tidb_ddl_job": {},
"tidb_ddl_reorg": {},
"tidb_ddl_history": {},
"tidb_ddl_job": {},
"tidb_ddl_reorg": {},
"tidb_ddl_history": {},
"tidb_ddl_backfill": {},
"tidb_ddl_backfill_history": {},
}

for tbl := range tbls {
Expand Down

0 comments on commit b91e2d9

Please sign in to comment.