Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encoder(ticdc): fix simple decoder set index column offset incorrect (#11222) #11228

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type TableInfo struct {
columnsOffset map[int64]int
indicesOffset map[int64]int

// Column name -> ColumnID
nameToColID map[string]int64

hasUniqueColumn bool

// It's a mapping from ColumnID to the offset of the columns in row changed events.
Expand Down Expand Up @@ -90,6 +93,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
hasUniqueColumn: false,
Version: version,
columnsOffset: make(map[int64]int, len(info.Columns)),
nameToColID: make(map[string]int64, len(info.Columns)),
indicesOffset: make(map[int64]int, len(info.Indices)),
RowColumnsOffset: make(map[int64]int, len(info.Columns)),
ColumnsFlag: make(map[int64]ColumnFlagType, len(info.Columns)),
Expand All @@ -105,6 +109,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
ti.columnsOffset[col.ID] = i
pkIsHandle := false
if IsColCDCVisible(col) {
ti.nameToColID[col.Name.O] = col.ID
ti.RowColumnsOffset[col.ID] = rowColumnsCurrentOffset
rowColumnsCurrentOffset++
pkIsHandle = (ti.PKIsHandle && mysql.HasPriKeyFlag(col.GetFlag())) || col.ID == model.ExtraHandleID
Expand Down Expand Up @@ -430,6 +435,16 @@ func (ti *TableInfo) ForceGetColumnName(colID int64) string {
return ti.ForceGetColumnInfo(colID).Name.O
}

// ForceGetColumnIDByName return column ID by column name
// Caller must ensure `colID` exists
func (ti *TableInfo) ForceGetColumnIDByName(name string) int64 {
colID, ok := ti.nameToColID[name]
if !ok {
log.Panic("invalid column name", zap.String("column", name))
}
return colID
}

// GetColumnDefaultValue returns the default definition of a column.
func GetColumnDefaultValue(col *model.ColumnInfo) interface{} {
defaultValue := col.GetDefaultValue()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func TestAvroEncode(t *testing.T) {
require.Fail(t, "key shall not include extension fields")
}
}
require.Equal(t, int32(1), res.(map[string]interface{})["id"])
require.Equal(t, int32(127), res.(map[string]interface{})["tu1"])

bin, err = encoder.encodeValue(ctx, topic, event)
require.NoError(t, err)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sink/codec/simple/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func newTableSchemaMap(tableInfo *model.TableInfo) interface{} {
mysqlType["decimal"] = map[string]interface{}{
"int": col.GetDecimal(),
}
mysqlType["unsigned"] = map[string]interface{}{
"boolean": mysql.HasUnsignedFlag(col.GetFlag()),
}
mysqlType["zerofill"] = map[string]interface{}{
"boolean": mysql.HasZerofillFlag(col.GetFlag()),
}
default:
}

Expand Down
72 changes: 69 additions & 3 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,19 @@ func TestEncodeDMLEnableChecksum(t *testing.T) {
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

_, err = dec.NextDDLEvent()
decodedDDL, err := dec.NextDDLEvent()
require.NoError(t, err)

originFlags := createTableDDL.TableInfo.ColumnsFlag
obtainedFlags := decodedDDL.TableInfo.ColumnsFlag

for colID, expected := range originFlags {
name := createTableDDL.TableInfo.ForceGetColumnName(colID)
actualID := decodedDDL.TableInfo.ForceGetColumnIDByName(name)
actual := obtainedFlags[actualID]
require.Equal(t, expected, actual)
}

err = enc.AppendRowChangedEvent(ctx, "", updateEvent, func() {})
require.NoError(t, err)

Expand Down Expand Up @@ -905,6 +915,62 @@ func TestEncodeDDLEvent(t *testing.T) {
}
}

func TestColumnFlags(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

createTableDDL := `create table test.t(
a bigint(20) unsigned not null,
b bigint(20) default null,
c varbinary(767) default null,
d int(11) unsigned not null,
e int(11) default null,
primary key (a),
key idx_c(c),
key idx_b(b),
unique key idx_de(d, e))`
createTableDDLEvent := helper.DDL2Event(createTableDDL)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolSimple)
for _, format := range []common.EncodingFormatType{
common.EncodingFormatAvro,
common.EncodingFormatJSON,
} {
codecConfig.EncodingFormat = format
b, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)
enc := b.Build()

m, err := enc.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

dec, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

decodedDDLEvent, err := dec.NextDDLEvent()
require.NoError(t, err)

originFlags := createTableDDLEvent.TableInfo.ColumnsFlag
obtainedFlags := decodedDDLEvent.TableInfo.ColumnsFlag

for colID, expected := range originFlags {
name := createTableDDLEvent.TableInfo.ForceGetColumnName(colID)
actualID := decodedDDLEvent.TableInfo.ForceGetColumnIDByName(name)
actual := obtainedFlags[actualID]
require.Equal(t, expected, actual)
}
}
}

func TestEncodeIntegerTypes(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
Expand Down Expand Up @@ -1605,10 +1671,10 @@ func TestLargeMessageHandleKeyOnly(t *testing.T) {
mock.ExpectExec(query).WillReturnResult(driver.ResultNoRows)

names, values := utils.LargeColumnKeyValues()
mock.ExpectQuery("select * from test.t where t = 127").
mock.ExpectQuery("select * from test.t where tu1 = 127").
WillReturnRows(mock.NewRows(names).AddRow(values...))

mock.ExpectQuery("select * from test.t where t = 127").
mock.ExpectQuery("select * from test.t where tu1 = 127").
WillReturnRows(mock.NewRows(names).AddRow(values...))

}
Expand Down
69 changes: 34 additions & 35 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,16 @@ func newTiColumnInfo(
}

for _, index := range indexes {
if index.Primary {
for _, name := range index.Columns {
if name == column.Name {
for _, name := range index.Columns {
if name == column.Name {
if index.Primary {
col.AddFlag(mysql.PriKeyFlag)
break
} else if index.Unique {
col.AddFlag(mysql.UniqueKeyFlag)
} else {
col.AddFlag(mysql.MultipleKeyFlag)
}
}
break
}
}

Expand Down Expand Up @@ -242,16 +244,24 @@ func newIndexSchema(index *timodel.IndexInfo, columns []*timodel.ColumnInfo) *In
}

// newTiIndexInfo convert IndexSchema to a tidb index info.
func newTiIndexInfo(indexSchema *IndexSchema) *timodel.IndexInfo {
func newTiIndexInfo(indexSchema *IndexSchema, columns []*timodel.ColumnInfo, indexID int64) *timodel.IndexInfo {
indexColumns := make([]*timodel.IndexColumn, len(indexSchema.Columns))
for i, col := range indexSchema.Columns {
var offset int
for idx, column := range columns {
if column.Name.O == col {
offset = idx
break
}
}
indexColumns[i] = &timodel.IndexColumn{
Name: timodel.NewCIStr(col),
Offset: i,
Offset: offset,
}
}

return &timodel.IndexInfo{
ID: indexID,
Name: timodel.NewCIStr(indexSchema.Name),
Columns: indexColumns,
Unique: indexSchema.Unique,
Expand Down Expand Up @@ -319,42 +329,31 @@ func newTableSchema(tableInfo *model.TableInfo) *TableSchema {
func newTableInfo(m *TableSchema) *model.TableInfo {
var (
database string
table string
tableID int64
schemaVersion uint64
)

tidbTableInfo := &timodel.TableInfo{}
if m != nil {
database = m.Schema
table = m.Table
tableID = m.TableID
schemaVersion = m.Version
}
tidbTableInfo := &timodel.TableInfo{
ID: tableID,
Name: timodel.NewCIStr(table),
UpdateTS: schemaVersion,
}

if m == nil {
return &model.TableInfo{
TableName: model.TableName{
Schema: database,
Table: table,
TableID: tableID,
},
TableInfo: tidbTableInfo,
tidbTableInfo.ID = m.TableID
tidbTableInfo.Name = timodel.NewCIStr(m.Table)
tidbTableInfo.UpdateTS = m.Version

nextMockID := int64(100)
for _, col := range m.Columns {
tiCol := newTiColumnInfo(col, nextMockID, m.Indexes)
nextMockID += 100
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
}
}

nextMockID := int64(100)
for _, col := range m.Columns {
tiCol := newTiColumnInfo(col, nextMockID, m.Indexes)
nextMockID += 100
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
}
for _, idx := range m.Indexes {
index := newTiIndexInfo(idx)
tidbTableInfo.Indices = append(tidbTableInfo.Indices, index)
mockIndexID := int64(1)
for _, idx := range m.Indexes {
index := newTiIndexInfo(idx, tidbTableInfo.Columns, mockIndexID)
tidbTableInfo.Indices = append(tidbTableInfo.Indices, index)
mockIndexID += 1
}
}
return model.WrapTableInfo(100, database, schemaVersion, tidbTableInfo)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/utils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func NewLargeEvent4Test(
defer helper.Close()

sql := `create table test.t(
t tinyint primary key,
tu1 tinyint unsigned default 1,
t tinyint,
tu1 tinyint unsigned default 1 primary key,
tu2 tinyint unsigned default 2,
tu3 tinyint unsigned default 3,
tu4 tinyint unsigned default 4,
Expand Down
Loading