Skip to content

Commit

Permalink
encoder(ticdc): fix simple decoder set index column offset incorrect (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 5, 2024
1 parent 3625884 commit 5eae0bd
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 42 deletions.
18 changes: 16 additions & 2 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type TableInfo struct {
columnsOffset map[int64]int
indicesOffset map[int64]int

// Column name -> ColumnID
nameToColID map[string]int64

hasUniqueColumn bool

// It's a mapping from ColumnID to the offset of the columns in row changed events.
Expand Down Expand Up @@ -90,6 +93,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
hasUniqueColumn: false,
Version: version,
columnsOffset: make(map[int64]int, len(info.Columns)),
nameToColID: make(map[string]int64, len(info.Columns)),
indicesOffset: make(map[int64]int, len(info.Indices)),
RowColumnsOffset: make(map[int64]int, len(info.Columns)),
ColumnsFlag: make(map[int64]ColumnFlagType, len(info.Columns)),
Expand All @@ -105,6 +109,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
ti.columnsOffset[col.ID] = i
pkIsHandle := false
if IsColCDCVisible(col) {
ti.nameToColID[col.Name.O] = col.ID
ti.RowColumnsOffset[col.ID] = rowColumnsCurrentOffset
rowColumnsCurrentOffset++
pkIsHandle = (ti.PKIsHandle && mysql.HasPriKeyFlag(col.GetFlag())) || col.ID == model.ExtraHandleID
Expand Down Expand Up @@ -230,8 +235,7 @@ func (ti *TableInfo) initColumnsFlag() {
flag := ti.ColumnsFlag[colInfo.ID]
if idxInfo.Primary {
flag.SetIsPrimaryKey()
}
if idxInfo.Unique {
} else if idxInfo.Unique {
flag.SetIsUniqueKey()
}
if len(idxInfo.Columns) > 1 {
Expand Down Expand Up @@ -430,6 +434,16 @@ func (ti *TableInfo) ForceGetColumnName(colID int64) string {
return ti.ForceGetColumnInfo(colID).Name.O
}

// ForceGetColumnIDByName return column ID by column name
// Caller must ensure `colID` exists
func (ti *TableInfo) ForceGetColumnIDByName(name string) int64 {
colID, ok := ti.nameToColID[name]
if !ok {
log.Panic("invalid column name", zap.String("column", name))
}
return colID
}

// GetColumnDefaultValue returns the default definition of a column.
func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
Expand Down
6 changes: 6 additions & 0 deletions pkg/sink/codec/simple/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func newTableSchemaMap(tableInfo *model.TableInfo) interface{} {
mysqlType["decimal"] = map[string]interface{}{
"int": col.GetDecimal(),
}
mysqlType["unsigned"] = map[string]interface{}{
"boolean": mysql.HasUnsignedFlag(col.GetFlag()),
}
mysqlType["zerofill"] = map[string]interface{}{
"boolean": mysql.HasZerofillFlag(col.GetFlag()),
}
default:
}

Expand Down
72 changes: 69 additions & 3 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,19 @@ func TestEncodeDMLEnableChecksum(t *testing.T) {
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

_, err = dec.NextDDLEvent()
decodedDDL, err := dec.NextDDLEvent()
require.NoError(t, err)

originFlags := createTableDDL.TableInfo.ColumnsFlag
obtainedFlags := decodedDDL.TableInfo.ColumnsFlag

for colID, expected := range originFlags {
name := createTableDDL.TableInfo.ForceGetColumnName(colID)
actualID := decodedDDL.TableInfo.ForceGetColumnIDByName(name)
actual := obtainedFlags[actualID]
require.Equal(t, expected, actual)
}

err = enc.AppendRowChangedEvent(ctx, "", updateEvent, func() {})
require.NoError(t, err)

Expand Down Expand Up @@ -905,6 +915,62 @@ func TestEncodeDDLEvent(t *testing.T) {
}
}

func TestColumnFlags(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

createTableDDL := `create table test.t(
a bigint(20) unsigned not null,
b bigint(20) default null,
c varbinary(767) default null,
d int(11) unsigned not null,
e int(11) default null,
primary key (a),
key idx_c(c),
key idx_b(b),
unique key idx_de(d, e))`
createTableDDLEvent := helper.DDL2Event(createTableDDL)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolSimple)
for _, format := range []common.EncodingFormatType{
common.EncodingFormatAvro,
common.EncodingFormatJSON,
} {
codecConfig.EncodingFormat = format
b, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)
enc := b.Build()

m, err := enc.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

dec, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

decodedDDLEvent, err := dec.NextDDLEvent()
require.NoError(t, err)

originFlags := createTableDDLEvent.TableInfo.ColumnsFlag
obtainedFlags := decodedDDLEvent.TableInfo.ColumnsFlag

for colID, expected := range originFlags {
name := createTableDDLEvent.TableInfo.ForceGetColumnName(colID)
actualID := decodedDDLEvent.TableInfo.ForceGetColumnIDByName(name)
actual := obtainedFlags[actualID]
require.Equal(t, expected, actual)
}
}
}

func TestEncodeIntegerTypes(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
Expand Down Expand Up @@ -1605,10 +1671,10 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) {
mock.ExpectExec(query).WillReturnResult(driver.ResultNoRows)

names, values := utils.LargeColumnKeyValues()
mock.ExpectQuery("select * from test.t where t = 127").
mock.ExpectQuery("select * from test.t where tu1 = 127").
WillReturnRows(mock.NewRows(names).AddRow(values...))

mock.ExpectQuery("select * from test.t where t = 127").
mock.ExpectQuery("select * from test.t where tu1 = 127").
WillReturnRows(mock.NewRows(names).AddRow(values...))

}
Expand Down
69 changes: 34 additions & 35 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,16 @@ func newTiColumnInfo(
}

for _, index := range indexes {
if index.Primary {
for _, name := range index.Columns {
if name == column.Name {
for _, name := range index.Columns {
if name == column.Name {
if index.Primary {
col.AddFlag(mysql.PriKeyFlag)
break
} else if index.Unique {
col.AddFlag(mysql.UniqueKeyFlag)
} else {
col.AddFlag(mysql.MultipleKeyFlag)
}
}
break
}
}

Expand Down Expand Up @@ -242,16 +244,24 @@ func newIndexSchema(index *timodel.IndexInfo, columns []*timodel.ColumnInfo) *In
}

// newTiIndexInfo convert IndexSchema to a tidb index info.
func newTiIndexInfo(indexSchema *IndexSchema) *timodel.IndexInfo {
func newTiIndexInfo(indexSchema *IndexSchema, columns []*timodel.ColumnInfo, indexID int64) *timodel.IndexInfo {
indexColumns := make([]*timodel.IndexColumn, len(indexSchema.Columns))
for i, col := range indexSchema.Columns {
var offset int
for idx, column := range columns {
if column.Name.O == col {
offset = idx
break
}
}
indexColumns[i] = &timodel.IndexColumn{
Name: timodel.NewCIStr(col),
Offset: i,
Offset: offset,
}
}

return &timodel.IndexInfo{
ID: indexID,
Name: timodel.NewCIStr(indexSchema.Name),
Columns: indexColumns,
Unique: indexSchema.Unique,
Expand Down Expand Up @@ -319,42 +329,31 @@ func newTableSchema(tableInfo *model.TableInfo) *TableSchema {
func newTableInfo(m *TableSchema) *model.TableInfo {
var (
database string
table string
tableID int64
schemaVersion uint64
)

tidbTableInfo := &timodel.TableInfo{}
if m != nil {
database = m.Schema
table = m.Table
tableID = m.TableID
schemaVersion = m.Version
}
tidbTableInfo := &timodel.TableInfo{
ID: tableID,
Name: timodel.NewCIStr(table),
UpdateTS: schemaVersion,
}

if m == nil {
return &model.TableInfo{
TableName: model.TableName{
Schema: database,
Table: table,
TableID: tableID,
},
TableInfo: tidbTableInfo,
tidbTableInfo.ID = m.TableID
tidbTableInfo.Name = timodel.NewCIStr(m.Table)
tidbTableInfo.UpdateTS = m.Version

nextMockID := int64(100)
for _, col := range m.Columns {
tiCol := newTiColumnInfo(col, nextMockID, m.Indexes)
nextMockID += 100
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
}
}

nextMockID := int64(100)
for _, col := range m.Columns {
tiCol := newTiColumnInfo(col, nextMockID, m.Indexes)
nextMockID += 100
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
}
for _, idx := range m.Indexes {
index := newTiIndexInfo(idx)
tidbTableInfo.Indices = append(tidbTableInfo.Indices, index)
mockIndexID := int64(1)
for _, idx := range m.Indexes {
index := newTiIndexInfo(idx, tidbTableInfo.Columns, mockIndexID)
tidbTableInfo.Indices = append(tidbTableInfo.Indices, index)
mockIndexID += 1
}
}
return model.WrapTableInfo(100, database, schemaVersion, tidbTableInfo)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/utils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func NewLargeEvent4Test(
defer helper.Close()

sql := `create table test.t(
t tinyint primary key,
tu1 tinyint unsigned default 1,
t tinyint,
tu1 tinyint unsigned default 1 primary key,
tu2 tinyint unsigned default 2,
tu3 tinyint unsigned default 3,
tu4 tinyint unsigned default 4,
Expand Down

0 comments on commit 5eae0bd

Please sign in to comment.