Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6403
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Jul 21, 2022
1 parent 2f363f6 commit 3a24f0d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
4 changes: 4 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,16 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
for _, colInfo := range tableInfo.Columns {
colSize := 0
if !model.IsColCDCVisible(colInfo) {
log.Info("skip the column which is not visible",
zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O))
continue
}
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
log.Info("column value is not found",
zap.String("table", tableInfo.Name.O), zap.String("column", colName))
continue
}
var err error
Expand Down
11 changes: 7 additions & 4 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,12 @@ func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool {
handleKeyCount := 0
equivalentHandleKeyCount := 0
for i := range updateEvent.Row.Columns {
if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() {
col := updateEvent.Row.Columns[i]
preCol := updateEvent.Row.PreColumns[i]
if col != nil && col.Flag.IsHandleKey() && preCol != nil && preCol.Flag.IsHandleKey() {
handleKeyCount++
colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value)
preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value)
colValueString := model.ColumnValueString(col.Value)
preColValueString := model.ColumnValueString(preCol.Value)
if colValueString == preColValueString {
equivalentHandleKeyCount++
}
Expand Down Expand Up @@ -265,7 +267,8 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
if deleteEvent.Row.PreColumns[i] != nil &&
!deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}
Expand Down
21 changes: 21 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
node.rowBuffer = []*model.RowChangedEvent{}
// Update to the handle key column.
columns = []*model.Column{
{
Name: "col0",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col1",
Flag: model.BinaryFlag,
Expand All @@ -504,6 +509,8 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
},
}
preColumns = []*model.Column{
// It is possible that the pre columns are nil. For example, when you do `add column` DDL.
nil,
{
Name: "col1",
Flag: model.BinaryFlag,
Expand All @@ -527,18 +534,32 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
require.Equal(t, 2, len(node.rowBuffer))

deleteEventIndex := 0
<<<<<<< HEAD
require.Equal(t, 0, len(node.rowBuffer[deleteEventIndex].Columns))
require.Equal(t, 2, len(node.rowBuffer[deleteEventIndex].PreColumns))
nonHandleKeyColIndex := 0
handleKeyColIndex := 1
=======
require.Len(t, sink.received[deleteEventIndex].row.Columns, 0)
require.Len(t, sink.received[deleteEventIndex].row.PreColumns, 3)
nilColIndex := 0
require.Nil(t, sink.received[deleteEventIndex].row.PreColumns[nilColIndex])
nonHandleKeyColIndex := 1
handleKeyColIndex := 2
>>>>>>> 5d3e4a2ca (sink(ticdc): precheck before split the update event (#6403))
// NOTICE: When old value disabled, we only keep the handle key pre cols.
require.Nil(t, node.rowBuffer[deleteEventIndex].PreColumns[nonHandleKeyColIndex])
require.Equal(t, "col2", node.rowBuffer[deleteEventIndex].PreColumns[handleKeyColIndex].Name)
require.True(t, node.rowBuffer[deleteEventIndex].PreColumns[handleKeyColIndex].Flag.IsHandleKey())

insertEventIndex := 1
<<<<<<< HEAD
require.Equal(t, 2, len(node.rowBuffer[insertEventIndex].Columns))
require.Equal(t, 0, len(node.rowBuffer[insertEventIndex].PreColumns))
=======
require.Len(t, sink.received[insertEventIndex].row.Columns, 3)
require.Len(t, sink.received[insertEventIndex].row.PreColumns, 0)
>>>>>>> 5d3e4a2ca (sink(ticdc): precheck before split the update event (#6403))
}

type flushFlowController struct {
Expand Down

0 comments on commit 3a24f0d

Please sign in to comment.