diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index e7f3f0df3c1..d5373cd1b02 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -311,12 +311,16 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill for _, colInfo := range tableInfo.Columns { colSize := 0 if !model.IsColCDCVisible(colInfo) { + log.Info("skip the column which is not visible", + zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O)) continue } colName := colInfo.Name.O colDatums, exist := datums[colInfo.ID] var colValue interface{} if !exist && !fillWithDefaultValue { + log.Info("column value is not found", + zap.String("table", tableInfo.Name.O), zap.String("column", colName)) continue } var err error diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 88cce848c80..d3121c62bf0 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -232,10 +232,12 @@ func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool { handleKeyCount := 0 equivalentHandleKeyCount := 0 for i := range updateEvent.Row.Columns { - if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() { + col := updateEvent.Row.Columns[i] + preCol := updateEvent.Row.PreColumns[i] + if col != nil && col.Flag.IsHandleKey() && preCol != nil && preCol.Flag.IsHandleKey() { handleKeyCount++ - colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value) - preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value) + colValueString := model.ColumnValueString(col.Value) + preColValueString := model.ColumnValueString(preCol.Value) if colValueString == preColValueString { equivalentHandleKeyCount++ } @@ -265,7 +267,8 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv deleteEvent.Row.Columns = nil for i := range deleteEvent.Row.PreColumns { // NOTICE: Only the handle key pre column is retained in the delete event. - if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { + if deleteEvent.Row.PreColumns[i] != nil && + !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { deleteEvent.Row.PreColumns[i] = nil } } diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index b8864d29b6b..19a67898492 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -492,6 +492,11 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { node.rowBuffer = []*model.RowChangedEvent{} // Update to the handle key column. columns = []*model.Column{ + { + Name: "col0", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, { Name: "col1", Flag: model.BinaryFlag, @@ -504,6 +509,8 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { }, } preColumns = []*model.Column{ + // It is possible that the pre columns are nil. For example, when you do `add column` DDL. + nil, { Name: "col1", Flag: model.BinaryFlag, @@ -527,18 +534,32 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { require.Equal(t, 2, len(node.rowBuffer)) deleteEventIndex := 0 +<<<<<<< HEAD require.Equal(t, 0, len(node.rowBuffer[deleteEventIndex].Columns)) require.Equal(t, 2, len(node.rowBuffer[deleteEventIndex].PreColumns)) nonHandleKeyColIndex := 0 handleKeyColIndex := 1 +======= + require.Len(t, sink.received[deleteEventIndex].row.Columns, 0) + require.Len(t, sink.received[deleteEventIndex].row.PreColumns, 3) + nilColIndex := 0 + require.Nil(t, sink.received[deleteEventIndex].row.PreColumns[nilColIndex]) + nonHandleKeyColIndex := 1 + handleKeyColIndex := 2 +>>>>>>> 5d3e4a2ca (sink(ticdc): precheck before split the update event (#6403)) // NOTICE: When old value disabled, we only keep the handle key pre cols. require.Nil(t, node.rowBuffer[deleteEventIndex].PreColumns[nonHandleKeyColIndex]) require.Equal(t, "col2", node.rowBuffer[deleteEventIndex].PreColumns[handleKeyColIndex].Name) require.True(t, node.rowBuffer[deleteEventIndex].PreColumns[handleKeyColIndex].Flag.IsHandleKey()) insertEventIndex := 1 +<<<<<<< HEAD require.Equal(t, 2, len(node.rowBuffer[insertEventIndex].Columns)) require.Equal(t, 0, len(node.rowBuffer[insertEventIndex].PreColumns)) +======= + require.Len(t, sink.received[insertEventIndex].row.Columns, 3) + require.Len(t, sink.received[insertEventIndex].row.PreColumns, 0) +>>>>>>> 5d3e4a2ca (sink(ticdc): precheck before split the update event (#6403)) } type flushFlowController struct {