From 2189b9a22f9a9c2b63426018b402351a0db94e62 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 21 Jul 2022 15:23:17 +0800 Subject: [PATCH 1/3] sink(ticdc): precheck before split the update event --- cdc/entry/mounter.go | 4 ++++ cdc/processor/pipeline/sink.go | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 2c5b8e6c6be..d6702b0bc28 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -279,12 +279,16 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill for _, colInfo := range tableInfo.Columns { colSize := 0 if !model.IsColCDCVisible(colInfo) { + log.Info("skip the column which is not visible", + zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O)) continue } colName := colInfo.Name.O colDatums, exist := datums[colInfo.ID] var colValue interface{} if !exist && !fillWithDefaultValue { + log.Info("column value is not found", + zap.String("table", tableInfo.Name.O), zap.String("column", colName)) continue } var err error diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index d308453d97d..30e3c0482f0 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -253,7 +253,9 @@ func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool { handleKeyCount := 0 equivalentHandleKeyCount := 0 for i := range updateEvent.Row.Columns { - if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() { + col := updateEvent.Row.Columns[i] + preCol := updateEvent.Row.PreColumns[i] + if col != nil && col.Flag.IsHandleKey() && preCol != nil && preCol.Flag.IsHandleKey() { handleKeyCount++ colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value) preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value) @@ -286,7 +288,8 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv deleteEvent.Row.Columns = nil for i := range deleteEvent.Row.PreColumns { // NOTICE: Only the handle key pre column is retained in the delete event. - if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { + if deleteEvent.Row.PreColumns[i] != nil && + !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() { deleteEvent.Row.PreColumns[i] = nil } } From 75b60388818235ae9a5d330beb44f35d2794906a Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 21 Jul 2022 15:39:56 +0800 Subject: [PATCH 2/3] sink(ticdc): add test --- cdc/processor/pipeline/sink_test.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 1804d4d1770..17cd5948a8f 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -616,6 +616,11 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { sink.Reset() // Update to the handle key column. columns = []*model.Column{ + { + Name: "col0", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, { Name: "col1", Flag: model.BinaryFlag, @@ -628,6 +633,8 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { }, } preColumns = []*model.Column{ + // It is possible that the pre columns are nil. For example, when you do `add column` DDL. + nil, { Name: "col1", Flag: model.BinaryFlag, @@ -653,9 +660,11 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { deleteEventIndex := 0 require.Len(t, sink.received[deleteEventIndex].row.Columns, 0) - require.Len(t, sink.received[deleteEventIndex].row.PreColumns, 2) - nonHandleKeyColIndex := 0 - handleKeyColIndex := 1 + require.Len(t, sink.received[deleteEventIndex].row.PreColumns, 3) + nilColIndex := 0 + require.Nil(t, sink.received[deleteEventIndex].row.PreColumns[nilColIndex]) + nonHandleKeyColIndex := 1 + handleKeyColIndex := 2 // NOTICE: When old value disabled, we only keep the handle key pre cols. require.Nil(t, sink.received[deleteEventIndex].row.PreColumns[nonHandleKeyColIndex]) require.Equal(t, "col2", sink.received[deleteEventIndex].row.PreColumns[handleKeyColIndex].Name) @@ -664,7 +673,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { ) insertEventIndex := 1 - require.Len(t, sink.received[insertEventIndex].row.Columns, 2) + require.Len(t, sink.received[insertEventIndex].row.Columns, 3) require.Len(t, sink.received[insertEventIndex].row.PreColumns, 0) } From b2f3cbe8d68ead20ed63aa0bffed65af714287a4 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 21 Jul 2022 15:49:33 +0800 Subject: [PATCH 3/3] sink(ticdc): better code --- cdc/processor/pipeline/sink.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 30e3c0482f0..449799da33b 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -257,8 +257,8 @@ func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool { preCol := updateEvent.Row.PreColumns[i] if col != nil && col.Flag.IsHandleKey() && preCol != nil && preCol.Flag.IsHandleKey() { handleKeyCount++ - colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value) - preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value) + colValueString := model.ColumnValueString(col.Value) + preColValueString := model.ColumnValueString(preCol.Value) if colValueString == preColValueString { equivalentHandleKeyCount++ }