Skip to content

Commit

Permalink
ddlPuller (ticdc): fix rename table ddl (pingcap#9521)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored and 3AceShowHand committed Aug 29, 2023
1 parent f382dfd commit 2e2485a
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
11 changes: 11 additions & 0 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error {
// DDLs on multiple schema or tables, ignore them.
return nil
}
if job.Type == timodel.ActionRenameTable && job.SchemaName != "" {
// DDL on single table with schema name, ignore it.
return nil
}

if job.Type == timodel.ActionCreateSchema ||
job.Type == timodel.ActionDropSchema {
job.SchemaName = job.BinlogInfo.DBInfo.Name.O
Expand Down Expand Up @@ -429,6 +434,12 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error {
if err != nil {
return errors.Trace(err)
}
// If it a rename table job and the schema does not exist,
// there is no need to create the table, since this table
// will not be replicated in the future.
if _, ok := s.inner.schemaByID(job.SchemaID); !ok {
return nil
}
// create table
err = s.inner.createTable(getWrapTableInfo(job), job.BinlogInfo.FinishedTS)
if err != nil {
Expand Down
40 changes: 30 additions & 10 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (p *ddlJobPullerImpl) handleRawKVEntry(ctx context.Context, ddlRawKV *model
log.Info("handle ddl job",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("query", job.Query),
zap.Stringer("job", job), zap.Bool("skip", skip))
if skip {
return nil
Expand Down Expand Up @@ -354,15 +355,29 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
if p.schemaStorage == nil {
return false, nil
}

defer func() {
if skip && err == nil {
log.Info("ddl job schema or table does not match, discard it",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.String("job", job.String()))
}
}()

snap := p.schemaStorage.GetLastSnapshot()
// Do this first to fill the schema name to its origin schema name.
if err := snap.FillSchemaName(job); err != nil {
log.Info("failed to fill schema name for ddl job", zap.Error(err))
// If we can't find a job's schema, check if it's been filtered.
if p.filter.ShouldIgnoreTable(job.SchemaName, job.TableName) ||
p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
return true, nil
}
return true, errors.Trace(err)

}

if job.BinlogInfo.FinishedTS <= p.getResolvedTs() ||
Expand All @@ -387,6 +402,13 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
return true, errors.Trace(err)
}
case timodel.ActionRenameTable:
log.Info("rename table ddl job",
zap.Int64("newSchemaID", job.SchemaID),
zap.String("newSchemaName", job.SchemaName),
zap.Int64("tableID", job.TableID),
zap.String("oldTableName", job.BinlogInfo.TableInfo.Name.O),
zap.String("newTableName", job.TableName),
)
oldTable, ok := snap.PhysicalTableByID(job.TableID)
if !ok {
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
Expand All @@ -395,10 +417,15 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
}
skip = true
} else {
log.Info("rename table ddl job",
zap.String("oldTableName", oldTable.TableName.Table),
zap.String("oldSchemaName", oldTable.TableName.Schema))
// since we can find the old table, we must can find the old schema.
// 2. If we can find the preTableInfo, we filter it by the old table name.
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, oldTable.Name.O)
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table)
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
if skip && !p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) {
if skipByOldTableName && !skipByNewTableName {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
}
Expand All @@ -411,13 +438,6 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
}

if skip {
log.Info("ddl job schema or table does not match, discard it",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.String("schema", job.SchemaName),
zap.String("table", job.TableName),
zap.String("query", job.Query),
zap.String("job", job.String()))
return true, nil
}

Expand Down
33 changes: 33 additions & 0 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func TestHandleRenameTable(t *testing.T) {
"test1.t66",
"test1.t99",
"test1.t100",
"test1.t20230808",
"test1.t202308081",
"test1.t202308082",

"test2.t4",

Expand Down Expand Up @@ -320,6 +323,20 @@ func TestHandleRenameTable(t *testing.T) {
mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1)
waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1)

job = helper.DDL2Job("create table test1.t20230808 (id int)")
mockPuller.appendDDL(job)
mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1)
waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1)

job = helper.DDL2Job("create table test1.t202308081 (id int)")
mockPuller.appendDDL(job)
mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1)
waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1)

job = helper.DDL2Job("create table test1.t202308082 (id int)")
mockPuller.appendDDL(job)
mockPuller.appendResolvedTs(job.BinlogInfo.FinishedTS + 1)
waitResolvedTs(t, ddlJobPuller, job.BinlogInfo.FinishedTS+1)
// since test1.99 in filter rule, we replicate it
job = helper.DDL2Job("rename table test1.t99 to test1.t999")
skip, err := ddlJobPullerImpl.handleJob(job)
Expand All @@ -340,6 +357,22 @@ func TestHandleRenameTable(t *testing.T) {
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)

// since test1.t20230808 is in filter rule, replicate it
// ref: https://github.com/pingcap/tiflow/issues/9488
job = helper.DDL2Job("rename table test1.t20230808 to ignore1.ignore")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.False(t, skip)

// FIXME(dongmen): since test1.t202308081 and test1.t202308082 are in filter rule, it should be replicated
// but now it will throw an error since schema ignore1 are not in schemaStorage
// ref: https://github.com/pingcap/tiflow/issues/9488
job = helper.DDL2Job("rename table test1.t202308081 to ignore1.ignore1, test1.t202308082 to ignore1.dongmen")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NotNil(t, err)
require.True(t, skip)
require.Contains(t, err.Error(), "ErrSnapshotSchemaNotFound")
}
}

Expand Down

0 comments on commit 2e2485a

Please sign in to comment.