Skip to content

Commit

Permalink
ddl: wait schema change before rename table job is done (pingcap#43341)…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Jun 29, 2023
1 parent e4773fd commit 04ad9d4
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 4 deletions.
61 changes: 58 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,9 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

if job.SchemaState == model.StatePublic {
return finishJobRenameTable(d, t, job)
}
newSchemaID := job.SchemaID
err := checkTableNotExists(d, t, newSchemaID, tableName.L)
if err != nil {
Expand All @@ -904,7 +907,7 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

ver, tblInfo, err := checkAndRenameTables(t, job, oldSchemaID, job.SchemaID, &oldSchemaName, &tableName)
ver, _, err = checkAndRenameTables(t, job, oldSchemaID, job.SchemaID, &oldSchemaName, &tableName)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -913,7 +916,7 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
job.SchemaState = model.StatePublic
return ver, nil
}

Expand All @@ -929,6 +932,10 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
return ver, errors.Trace(err)
}

if job.SchemaState == model.StatePublic {
return finishJobRenameTables(d, t, job, tableNames, tableIDs, newSchemaIDs)
}

var tblInfos = make([]*model.TableInfo, 0, len(tableNames))
var err error
for i, oldSchemaID := range oldSchemaIDs {
Expand All @@ -945,7 +952,7 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
if err != nil {
return ver, errors.Trace(err)
}
job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos)
job.SchemaState = model.StatePublic
return ver, nil
}

Expand Down Expand Up @@ -1005,6 +1012,54 @@ func checkAndRenameTables(t *meta.Meta, job *model.Job, oldSchemaID, newSchemaID
return ver, tblInfo, nil
}

// We split the renaming table job into two steps:
// 1. rename table and update the schema version.
// 2. update the job state to JobStateDone.
// This is the requirement from TiCDC because
// - it uses the job state to check whether the DDL is finished.
// - there is a gap between schema reloading and job state updating:
// when the job state is updated to JobStateDone, before the new schema reloaded,
// there may be DMLs that use the old schema.
// - TiCDC cannot handle the DMLs that use the old schema, because
// the commit TS of the DMLs are greater than the job state updating TS.
func finishJobRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
tblInfo, err := getTableInfo(t, job.TableID, job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
ver, err := updateSchemaVersion(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

func finishJobRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job,
tableNames []*model.CIStr, tableIDs, newSchemaIDs []int64) (int64, error) {
tblSchemaIDs := make(map[int64]int64, len(tableIDs))
for i := range tableIDs {
tblSchemaIDs[tableIDs[i]] = newSchemaIDs[i]
}
tblInfos := make([]*model.TableInfo, 0, len(tableNames))
for i := range tableIDs {
tblID := tableIDs[i]
tblInfo, err := getTableInfo(t, tblID, tblSchemaIDs[tblID])
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
tblInfos = append(tblInfos, tblInfo)
}
ver, err := updateSchemaVersion(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos)
return ver, nil
}

func onModifyTableComment(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var comment string
if err := job.DecodeArgs(&comment); err != nil {
Expand Down
55 changes: 55 additions & 0 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -365,3 +366,57 @@ func TestCreateTables(t *testing.T) {
testGetTable(t, domain, genIDs[1])
testGetTable(t, domain, genIDs[2])
}

func TestRenameTableIntermediateState(t *testing.T) {
ddl.RunInGoTest = true
defer func() {
ddl.RunInGoTest = false
}()
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
originHook := dom.DDL().GetHook()
tk.MustExec("create database db1;")
tk.MustExec("create database db2;")
tk.MustExec("create table db1.t(a int);")

testCases := []struct {
renameSQL string
insertSQL string
errMsg string
finalDB string
}{
{"rename table db1.t to db1.t1;", "insert into db1.t values(1);", "[schema:1146]Table 'db1.t' doesn't exist", "db1.t1"},
{"rename table db1.t1 to db1.t;", "insert into db1.t values(1);", "", "db1.t"},
{"rename table db1.t to db2.t;", "insert into db1.t values(1);", "[schema:1146]Table 'db1.t' doesn't exist", "db2.t"},
{"rename table db2.t to db1.t;", "insert into db1.t values(1);", "", "db1.t"},
}

for _, tc := range testCases {
hook := &ddl.TestDDLCallback{Do: dom}
runInsert := false
fn := func(job *model.Job) {
if job.SchemaState == model.StatePublic && !runInsert && !t.Failed() {
_, err := tk2.Exec(tc.insertSQL)
if len(tc.errMsg) > 0 {
assert.Equal(t, tc.errMsg, err.Error())
} else {
assert.NoError(t, err)
}
runInsert = true
}
}
hook.OnJobUpdatedExported = fn
dom.DDL().SetHook(hook)
tk.MustExec(tc.renameSQL)
result := tk.MustQuery(fmt.Sprintf("select * from %s;", tc.finalDB))
if len(tc.errMsg) > 0 {
result.Check(testkit.Rows())
} else {
result.Check(testkit.Rows("1"))
}
tk.MustExec(fmt.Sprintf("delete from %s;", tc.finalDB))
}
dom.DDL().SetHook(originHook)
}
2 changes: 1 addition & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (job *Job) IsRollbackable() bool {
ActionDropForeignKey, ActionDropTablePartition:
return job.SchemaState == StatePublic
case ActionDropColumns, ActionRebaseAutoID, ActionShardRowID,
ActionTruncateTable, ActionAddForeignKey, ActionRenameTable,
ActionTruncateTable, ActionAddForeignKey, ActionRenameTable, ActionRenameTables,
ActionModifyTableCharsetAndCollate, ActionTruncateTablePartition,
ActionModifySchemaCharsetAndCollate, ActionRepairTable,
ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement:
Expand Down

0 comments on commit 04ad9d4

Please sign in to comment.