Skip to content

Commit

Permalink
mounter(ticdc): fix extend column info order to match the columns info (
Browse files Browse the repository at this point in the history
#8960) (#8968)

close #8958
  • Loading branch information
ti-chi-bot committed May 23, 2023
1 parent 1881877 commit 6d5af0f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
35 changes: 22 additions & 13 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -269,11 +270,18 @@ func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
return job, nil
}

func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool) ([]*model.Column, []types.Datum, error) {
func datum2Column(
tableInfo *model.TableInfo, datums map[int64]types.Datum, fillWithDefaultValue bool,
) ([]*model.Column, []types.Datum, []rowcodec.ColInfo, error) {
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
rawCols := make([]types.Datum, len(tableInfo.RowColumnsOffset))
for _, colInfo := range tableInfo.Columns {
colSize := 0

// columnInfos and rowColumnInfos hold different column metadata,
// they should have the same length and order.
rowColumnInfos := make([]rowcodec.ColInfo, len(tableInfo.RowColumnsOffset))
_, _, extendColumnInfos := tableInfo.GetRowColInfos()

for idx, colInfo := range tableInfo.Columns {
if !model.IsColCDCVisible(colInfo) {
log.Debug("skip the column which is not visible",
zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O))
Expand All @@ -296,26 +304,27 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colDatums, colValue, size, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
defaultValue := getDDLDefaultDefinition(colInfo)
colSize += size
rawCols[tableInfo.RowColumnsOffset[colInfo.ID]] = colDatums
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
offset := tableInfo.RowColumnsOffset[colInfo.ID]
rawCols[offset] = colDatums
cols[offset] = &model.Column{
Name: colName,
Type: colInfo.GetType(),
Charset: colInfo.GetCharset(),
Value: colValue,
Default: defaultValue,
Flag: tableInfo.ColumnsFlag[colInfo.ID],
// ApproximateBytes = column data size + column struct size
ApproximateBytes: colSize + sizeOfEmptyColumn,
ApproximateBytes: size + sizeOfEmptyColumn,
}
rowColumnInfos[offset] = extendColumnInfos[idx]
}
return cols, rawCols, nil
return cols, rawCols, rowColumnInfos, nil
}

func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, model.RowChangedDatums, error) {
Expand All @@ -324,6 +333,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
var preCols []*model.Column
var preRawCols []types.Datum
var rawRow model.RowChangedDatums
var extendColumnInfos []rowcodec.ColInfo
// Since we now always use old value internally,
// we need to control the output(sink will use the PreColumns field to determine whether to output old value).
// Normally old value is output when only enableOldValue is on,
Expand All @@ -332,7 +342,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, preRawCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
preCols, preRawCols, extendColumnInfos, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -352,7 +362,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
var cols []*model.Column
var rawCols []types.Datum
if row.RowExist {
cols, rawCols, err = datum2Column(tableInfo, row.Row, true)
cols, rawCols, extendColumnInfos, err = datum2Column(tableInfo, row.Row, true)
if err != nil {
return nil, rawRow, errors.Trace(err)
}
Expand All @@ -365,7 +375,6 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
intRowID = row.RecordID.IntValue()
}

_, _, colInfos := tableInfo.GetRowColInfos()
rawRow.PreRowDatums = preRawCols
rawRow.RowDatums = rawCols
return &model.RowChangedEvent{
Expand All @@ -378,7 +387,7 @@ func (m *mounter) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, d
TableID: row.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
ColInfos: colInfos,
ColInfos: extendColumnInfos,
TableInfo: tableInfo,
Columns: cols,
PreColumns: preCols,
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,7 @@ func TestBuildTableInfo(t *testing.T) {
originTI, err := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
require.NoError(t, err)
cdcTableInfo := model.WrapTableInfo(0, "test", 0, originTI)
cols, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
cols, _, _, err := datum2Column(cdcTableInfo, map[int64]types.Datum{}, true)
require.NoError(t, err)
recoveredTI := model.BuildTiDBTableInfo(cols, cdcTableInfo.IndexColumnsOffset)
handle := sqlmodel.GetWhereHandle(recoveredTI, recoveredTI)
Expand Down
6 changes: 4 additions & 2 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ type TableInfo struct {
HandleIndexID int64

IndexColumnsOffset [][]int
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType
// rowColInfos extend the model.ColumnInfo with some extra information
// it's the same length and order with the model.TableInfo.Columns
rowColInfos []rowcodec.ColInfo
rowColFieldTps map[int64]*types.FieldType
}

// WrapTableInfo creates a TableInfo from a timodel.TableInfo
Expand Down
4 changes: 2 additions & 2 deletions scripts/check-diff-line-width.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ set -e
# the pattern `\(#[0-9]+\)$`. It's usually a master branch commit.
BASE_HASH=$(git --no-pager log -E --grep='\(#[0-9]+\)$' -n 1 --format=format:%H)
# Please contact TiFlow maintainers before changing following settings.
WARN_THRESHOLD=80
ERROR_THRESHOLD=100
WARN_THRESHOLD=100
ERROR_THRESHOLD=140

git --no-pager diff $BASE_HASH -U0 -- cdc pkg cmd \
-- ':(exclude)*_gen.go' \
Expand Down

0 comments on commit 6d5af0f

Please sign in to comment.