From 6d5af0f15cf8ade15824007e65a885df665e74a1 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 23 May 2023 18:19:39 +0800 Subject: [PATCH] mounter(ticdc): fix extend column info order to match the columns info (#8960) (#8968) close pingcap/tiflow#8958 --- cdc/entry/mounter.go | 35 ++++++++++++++++++++------------ cdc/entry/mounter_test.go | 2 +- cdc/model/schema_storage.go | 6 ++++-- scripts/check-diff-line-width.sh | 4 ++-- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 6bcdc9f788b..a5d2c4f4732 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tiflow/cdc/model" cerror "github.com/pingcap/tiflow/pkg/errors" pfilter "github.com/pingcap/tiflow/pkg/filter" @@ -269,11 +270,18 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) { return job, nil } -func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) ([]*model.Column, []types.Datum, error) { +func datum2Column( + tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool, +) ([]*model.Column, []types.Datum, []rowcodec.ColInfo, error) { cols := make([]*model.Column, len(tableInfo.RowColumnsOffset)) rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset)) - for _, colInfo := range tableInfo.Columns { - colSize := 0 + + // columnInfos and rowColumnInfos hold different column metadata, + // they should have the same length and order. + rowColumnInfos := make([]rowcodec.ColInfo, len(tableInfo.RowColumnsOffset)) + _, _, extendColumnInfos := tableInfo.GetRowColInfos() + + for idx, colInfo := range tableInfo.Columns { if !model.IsColCDCVisible(colInfo) { log.Debug("skip the column which is not visible", zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O)) @@ -296,15 +304,15 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo) } if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } if warn != "" { log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) } defaultValue := getDDLDefaultDefinition(colInfo) - colSize += size - rawCols[tableInfo.RowColumnsOffset[colInfo.ID]] = colDatums - cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{ + offset := tableInfo.RowColumnsOffset[colInfo.ID] + rawCols[offset] = colDatums + cols[offset] = &model.Column{ Name: colName, Type: colInfo.GetType(), Charset: colInfo.GetCharset(), @@ -312,10 +320,11 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill Default: defaultValue, Flag: tableInfo.ColumnsFlag[colInfo.ID], // ApproximateBytes = column data size + column struct size - ApproximateBytes: colSize + sizeOfEmptyColumn, + ApproximateBytes: size + sizeOfEmptyColumn, } + rowColumnInfos[offset] = extendColumnInfos[idx] } - return cols, rawCols, nil + return cols, rawCols, rowColumnInfos, nil } func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) { @@ -324,6 +333,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d var preCols []*model.Column var preRawCols []types.Datum var rawRow model.RowChangedDatums + var extendColumnInfos []rowcodec.ColInfo // Since we now always use old value internally, // we need to control the output(sink will use the PreColumns field to determine whether to output old value). // Normally old value is output when only enableOldValue is on, @@ -332,7 +342,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d if row.PreRowExist { // FIXME(leoppro): using pre table info to mounter pre column datum // the pre column and current column in one event may using different table info - preCols, preRawCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) + preCols, preRawCols, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -352,7 +362,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d var cols []*model.Column var rawCols []types.Datum if row.RowExist { - cols, rawCols, err = datum2Column(tableInfo, row.Row, true) + cols, rawCols, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true) if err != nil { return nil, rawRow, errors.Trace(err) } @@ -365,7 +375,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d intRowID = row.RecordID.IntValue() } - _, _, colInfos := tableInfo.GetRowColInfos() rawRow.PreRowDatums = preRawCols rawRow.RowDatums = rawCols return &model.RowChangedEvent{ @@ -378,7 +387,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d TableID: row.PhysicalTableID, IsPartition: tableInfo.GetPartitionInfo() != nil, }, - ColInfos: colInfos, + ColInfos: extendColumnInfos, TableInfo: tableInfo, Columns: cols, PreColumns: preCols, diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index bed9a2941f3..281cd866539 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -1196,7 +1196,7 @@ func TestBuildTableInfo(t *testing.T) { originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt)) require.NoError(t, err) cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI) - cols, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) + cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true) require.NoError(t, err) recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset) handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI) diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 7735fd70fb9..dad989ad89f 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -66,8 +66,10 @@ type TableInfo struct { HandleIndexID int64 IndexColumnsOffset [][]int - rowColInfos []rowcodec.ColInfo - rowColFieldTps map[int64]*types.FieldType + // rowColInfos extend the model.ColumnInfo with some extra information + // it's the same length and order with the model.TableInfo.Columns + rowColInfos []rowcodec.ColInfo + rowColFieldTps map[int64]*types.FieldType } // WrapTableInfo creates a TableInfo from a timodel.TableInfo diff --git a/scripts/check-diff-line-width.sh b/scripts/check-diff-line-width.sh index caab5fb466f..7b0ff7c9f13 100755 --- a/scripts/check-diff-line-width.sh +++ b/scripts/check-diff-line-width.sh @@ -20,8 +20,8 @@ set -e # the pattern `\(#[0-9]+\)$`. It's usually a master branch commit. BASE_HASH=$(git --no-pager log -E --grep='\(#[0-9]+\)$' -n 1 --format=format:%H) # Please contact TiFlow maintainers before changing following settings. -WARN_THRESHOLD=80 -ERROR_THRESHOLD=100 +WARN_THRESHOLD=100 +ERROR_THRESHOLD=140 git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \ -- ':(exclude)*_gen.go' \