diff --git a/pkg/sink/codec/bootstraper.go b/pkg/sink/codec/bootstraper.go index b2a371c777e..1ec99ef0536 100644 --- a/pkg/sink/codec/bootstraper.go +++ b/pkg/sink/codec/bootstraper.go @@ -109,7 +109,7 @@ func (b *bootstrapWorker) addEvent( ) error { table, ok := b.activeTables.Load(row.PhysicalTableID) if !ok { - tb := newTableStatus(key, row) + tb := newTableStatistic(key, row) b.activeTables.Store(tb.id, tb) // Send bootstrap message immediately when a new table is added err := b.sendBootstrapMsg(ctx, tb) @@ -218,7 +218,7 @@ type tableStatistic struct { tableInfo atomic.Value } -func newTableStatus(key model.TopicPartitionKey, row *model.RowChangedEvent) *tableStatistic { +func newTableStatistic(key model.TopicPartitionKey, row *model.RowChangedEvent) *tableStatistic { res := &tableStatistic{ id: row.PhysicalTableID, topic: key.Topic, @@ -245,11 +245,13 @@ func (t *tableStatistic) update(row *model.RowChangedEvent, totalPartition int32 t.counter.Add(1) t.lastMsgReceivedTime.Store(time.Now()) - if t.version.Load() != row.TableInfo.UpdateTS { + // Note(dongmen): Rename Table DDL is a special case, + // the TableInfo.Name is changed but the TableInfo.UpdateTs is not changed. + if t.version.Load() != row.TableInfo.UpdateTS || + t.tableInfo.Load().(*model.TableInfo).Name != row.TableInfo.Name { t.version.Store(row.TableInfo.UpdateTS) t.tableInfo.Store(row.TableInfo) } - if t.totalPartition.Load() != totalPartition { t.totalPartition.Store(totalPartition) } diff --git a/pkg/sink/codec/bootstraper_test.go b/pkg/sink/codec/bootstraper_test.go index 5a1b2eb8a6c..1888cffb27e 100644 --- a/pkg/sink/codec/bootstraper_test.go +++ b/pkg/sink/codec/bootstraper_test.go @@ -20,6 +20,7 @@ import ( "time" timodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" @@ -50,7 +51,7 @@ func getMockTableStatus(tableName string, PhysicalTableID: tableID, TableInfo: tableInfo, } - tb := newTableStatus(key, row) + tb := newTableStatistic(key, row) return key, row, tb } @@ -176,3 +177,47 @@ l2: // The bootstrap events are sent to all partition require.Equal(t, key2.TotalPartition, msgCount) } + +func TestUpdateTableStatistic(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + + sql := `create table test.t1( + id int primary key, + name varchar(64) not null, + age int, + email varchar(255) not null, + unique index idx_name(name), + index idx_age_email(age,email) + );` + tableInfo1 := helper.DDL2Event(sql).TableInfo + row1 := &model.RowChangedEvent{ + PhysicalTableID: tableInfo1.ID, + TableInfo: tableInfo1, + } + tableStatistic := newTableStatistic(model.TopicPartitionKey{}, row1) + + // case 1: The tableStatistic should not be updated if the tableInfo is the same + tableStatistic.update(row1, 1) + require.Equal(t, tableInfo1, tableStatistic.tableInfo.Load().(*model.TableInfo)) + + // case 2: The tableStatistic should be updated if the tableInfo is different + sql = `alter table test.t1 add column address varchar(255) not null;` + tableInfo2 := helper.DDL2Event(sql).TableInfo + row2 := &model.RowChangedEvent{ + PhysicalTableID: tableInfo2.ID, + TableInfo: tableInfo2, + } + tableStatistic.update(row2, 1) + require.Equal(t, tableInfo2, tableStatistic.tableInfo.Load().(*model.TableInfo)) + + // case 3: The tableStatistic should be updated when rename table + sql = `alter table test.t1 rename to test.t2;` + tableInfo3 := helper.DDL2Event(sql).TableInfo + row3 := &model.RowChangedEvent{ + PhysicalTableID: tableInfo3.ID, + TableInfo: tableInfo3, + } + tableStatistic.update(row3, 1) + require.Equal(t, tableInfo3, tableStatistic.tableInfo.Load().(*model.TableInfo)) +}