Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6403
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Jul 21, 2022
1 parent 47d3511 commit 37cce24
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
4 changes: 4 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,16 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
cols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
for _, colInfo := range tableInfo.Columns {
if !model.IsColCDCVisible(colInfo) {
log.Info("skip the column which is not visible",
zap.String("table", tableInfo.Name.O), zap.String("column", colInfo.Name.O))
continue
}
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
log.Info("column value is not found",
zap.String("table", tableInfo.Name.O), zap.String("column", colName))
continue
}
var err error
Expand Down
11 changes: 7 additions & 4 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,12 @@ func shouldSplitUpdateEvent(updateEvent *model.PolymorphicEvent) bool {
handleKeyCount := 0
equivalentHandleKeyCount := 0
for i := range updateEvent.Row.Columns {
if updateEvent.Row.Columns[i].Flag.IsHandleKey() && updateEvent.Row.PreColumns[i].Flag.IsHandleKey() {
col := updateEvent.Row.Columns[i]
preCol := updateEvent.Row.PreColumns[i]
if col != nil && col.Flag.IsHandleKey() && preCol != nil && preCol.Flag.IsHandleKey() {
handleKeyCount++
colValueString := model.ColumnValueString(updateEvent.Row.Columns[i].Value)
preColValueString := model.ColumnValueString(updateEvent.Row.PreColumns[i].Value)
colValueString := model.ColumnValueString(col.Value)
preColValueString := model.ColumnValueString(preCol.Value)
if colValueString == preColValueString {
equivalentHandleKeyCount++
}
Expand Down Expand Up @@ -252,7 +254,8 @@ func splitUpdateEvent(updateEvent *model.PolymorphicEvent) (*model.PolymorphicEv
deleteEvent.Row.Columns = nil
for i := range deleteEvent.Row.PreColumns {
// NOTICE: Only the handle key pre column is retained in the delete event.
if !deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
if deleteEvent.Row.PreColumns[i] != nil &&
!deleteEvent.Row.PreColumns[i].Flag.IsHandleKey() {
deleteEvent.Row.PreColumns[i] = nil
}
}
Expand Down
21 changes: 21 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
node.eventBuffer = []*model.PolymorphicEvent{}
// Update to the handle key column.
columns = []*model.Column{
{
Name: "col0",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
{
Name: "col1",
Flag: model.BinaryFlag,
Expand All @@ -517,6 +522,8 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
},
}
preColumns = []*model.Column{
// It is possible that the pre columns are nil. For example, when you do `add column` DDL.
nil,
{
Name: "col1",
Flag: model.BinaryFlag,
Expand All @@ -542,18 +549,32 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
c.Assert(node.eventBuffer, check.HasLen, 2)

deleteEventIndex := 0
<<<<<<< HEAD
c.Assert(node.eventBuffer[deleteEventIndex].Row.Columns, check.HasLen, 0)
c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns, check.HasLen, 2)
nonHandleKeyColIndex := 0
handleKeyColIndex := 1
=======
require.Len(t, sink.received[deleteEventIndex].row.Columns, 0)
require.Len(t, sink.received[deleteEventIndex].row.PreColumns, 3)
nilColIndex := 0
require.Nil(t, sink.received[deleteEventIndex].row.PreColumns[nilColIndex])
nonHandleKeyColIndex := 1
handleKeyColIndex := 2
>>>>>>> 5d3e4a2ca (sink(ticdc): precheck before split the update event (#6403))
// NOTICE: When old value disabled, we only keep the handle key pre cols.
c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[nonHandleKeyColIndex], check.IsNil)
c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Name, check.Equals, "col2")
c.Assert(node.eventBuffer[deleteEventIndex].Row.PreColumns[handleKeyColIndex].Flag.IsHandleKey(), check.IsTrue)

insertEventIndex := 1
<<<<<<< HEAD
c.Assert(node.eventBuffer[insertEventIndex].Row.Columns, check.HasLen, 2)
c.Assert(node.eventBuffer[insertEventIndex].Row.PreColumns, check.HasLen, 0)
=======
require.Len(t, sink.received[insertEventIndex].row.Columns, 3)
require.Len(t, sink.received[insertEventIndex].row.PreColumns, 0)
>>>>>>> 5d3e4a2ca (sink(ticdc): precheck before split the update event (#6403))
}

type flushFlowController struct {
Expand Down

0 comments on commit 37cce24

Please sign in to comment.