Skip to content

Commit

Permalink
simple (ticdc): fix simple rename table ddl handle error (#10687)
Browse files Browse the repository at this point in the history
ref #9898
  • Loading branch information
asddongmen committed Mar 6, 2024
1 parent 71f4f7f commit 06fcdc5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
10 changes: 6 additions & 4 deletions pkg/sink/codec/bootstraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (b *bootstrapWorker) addEvent(
) error {
table, ok := b.activeTables.Load(row.PhysicalTableID)
if !ok {
tb := newTableStatus(key, row)
tb := newTableStatistic(key, row)
b.activeTables.Store(tb.id, tb)
// Send bootstrap message immediately when a new table is added
err := b.sendBootstrapMsg(ctx, tb)
Expand Down Expand Up @@ -218,7 +218,7 @@ type tableStatistic struct {
tableInfo atomic.Value
}

func newTableStatus(key model.TopicPartitionKey, row *model.RowChangedEvent) *tableStatistic {
func newTableStatistic(key model.TopicPartitionKey, row *model.RowChangedEvent) *tableStatistic {
res := &tableStatistic{
id: row.PhysicalTableID,
topic: key.Topic,
Expand All @@ -245,11 +245,13 @@ func (t *tableStatistic) update(row *model.RowChangedEvent, totalPartition int32
t.counter.Add(1)
t.lastMsgReceivedTime.Store(time.Now())

if t.version.Load() != row.TableInfo.UpdateTS {
// Note(dongmen): Rename Table DDL is a special case,
// the TableInfo.Name is changed but the TableInfo.UpdateTs is not changed.
if t.version.Load() != row.TableInfo.UpdateTS ||
t.tableInfo.Load().(*model.TableInfo).Name != row.TableInfo.Name {
t.version.Store(row.TableInfo.UpdateTS)
t.tableInfo.Store(row.TableInfo)
}

if t.totalPartition.Load() != totalPartition {
t.totalPartition.Store(totalPartition)
}
Expand Down
47 changes: 46 additions & 1 deletion pkg/sink/codec/bootstraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -50,7 +51,7 @@ func getMockTableStatus(tableName string,
PhysicalTableID: tableID,
TableInfo: tableInfo,
}
tb := newTableStatus(key, row)
tb := newTableStatistic(key, row)
return key, row, tb
}

Expand Down Expand Up @@ -176,3 +177,47 @@ l2:
// The bootstrap events are sent to all partition
require.Equal(t, key2.TotalPartition, msgCount)
}

func TestUpdateTableStatistic(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

sql := `create table test.t1(
id int primary key,
name varchar(64) not null,
age int,
email varchar(255) not null,
unique index idx_name(name),
index idx_age_email(age,email)
);`
tableInfo1 := helper.DDL2Event(sql).TableInfo
row1 := &model.RowChangedEvent{
PhysicalTableID: tableInfo1.ID,
TableInfo: tableInfo1,
}
tableStatistic := newTableStatistic(model.TopicPartitionKey{}, row1)

// case 1: The tableStatistic should not be updated if the tableInfo is the same
tableStatistic.update(row1, 1)
require.Equal(t, tableInfo1, tableStatistic.tableInfo.Load().(*model.TableInfo))

// case 2: The tableStatistic should be updated if the tableInfo is different
sql = `alter table test.t1 add column address varchar(255) not null;`
tableInfo2 := helper.DDL2Event(sql).TableInfo
row2 := &model.RowChangedEvent{
PhysicalTableID: tableInfo2.ID,
TableInfo: tableInfo2,
}
tableStatistic.update(row2, 1)
require.Equal(t, tableInfo2, tableStatistic.tableInfo.Load().(*model.TableInfo))

// case 3: The tableStatistic should be updated when rename table
sql = `alter table test.t1 rename to test.t2;`
tableInfo3 := helper.DDL2Event(sql).TableInfo
row3 := &model.RowChangedEvent{
PhysicalTableID: tableInfo3.ID,
TableInfo: tableInfo3,
}
tableStatistic.update(row3, 1)
require.Equal(t, tableInfo3, tableStatistic.tableInfo.Load().(*model.TableInfo))
}

0 comments on commit 06fcdc5

Please sign in to comment.