Skip to content

Commit

Permalink
sink(ticdc): craft support callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Jul 12, 2022
1 parent 318b4e9 commit c22ae1c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
16 changes: 15 additions & 1 deletion cdc/sink/mq/codec/craft_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type craftBatchEncoder struct {
rowChangedBuffer *craft.RowChangedEventBuffer
messageBuf []*MQMessage
callbackBuf []func()

// configs
maxMessageBytes int
Expand All @@ -45,9 +46,12 @@ func (e *craftBatchEncoder) AppendRowChangedEvent(
_ context.Context,
_ string,
ev *model.RowChangedEvent,
_ func(),
callback func(),
) error {
rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev)
if callback != nil {
e.callbackBuf = append(e.callbackBuf, callback)
}
if size > e.maxMessageBytes || rows >= e.maxBatchSize {
e.flush()
}
Expand Down Expand Up @@ -80,6 +84,15 @@ func (e *craftBatchEncoder) flush() {
mqMessage := newMsg(config.ProtocolCraft,
nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table)
mqMessage.SetRowsCount(rowsCnt)
if len(e.callbackBuf) != 0 && len(e.callbackBuf) == rowsCnt {
callbacks := e.callbackBuf
mqMessage.Callback = func() {
for _, cb := range callbacks {
cb()
}
}
e.callbackBuf = make([]func(), 0)
}
e.messageBuf = append(e.messageBuf, mqMessage)
}

Expand Down Expand Up @@ -112,6 +125,7 @@ func newCraftBatchEncoderWithAllocator(allocator *craft.SliceAllocator) EventBat
return &craftBatchEncoder{
allocator: allocator,
messageBuf: make([]*MQMessage, 0, 2),
callbackBuf: make([]func(), 0),
rowChangedBuffer: craft.NewRowChangedEventBuffer(allocator),
}
}
72 changes: 72 additions & 0 deletions cdc/sink/mq/codec/craft_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,75 @@ func TestDefaultCraftBatchCodec(t *testing.T) {
cfg.maxBatchSize = 64
testBatchCodec(t, newCraftBatchEncoderBuilder(cfg), newCraftBatchDecoder)
}

func TestCraftAppendRowChangedEventWithCallback(t *testing.T) {
t.Parallel()
cfg := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(10485760)
cfg.maxBatchSize = 2
encoder := newCraftBatchEncoderBuilder(cfg).Build()
require.NotNil(t, encoder)

count := 0

row := &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{Schema: "a", Table: "b"},
Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}},
}

tests := []struct {
row *model.RowChangedEvent
callback func()
}{
{
row: row,
callback: func() {
count += 1
},
},
{
row: row,
callback: func() {
count += 2
},
},
{
row: row,
callback: func() {
count += 3
},
},
{
row: row,
callback: func() {
count += 4
},
},
{
row: row,
callback: func() {
count += 5
},
},
}

// Empty build makes sure that the callback build logic not broken.
msgs := encoder.Build()
require.Len(t, msgs, 0, "no message should be built and no panic")

// Append the events.
for _, test := range tests {
err := encoder.AppendRowChangedEvent(context.Background(), "", test.row, test.callback)
require.Nil(t, err)
}
require.Equal(t, 0, count, "nothing should be called")

msgs = encoder.Build()
require.Len(t, msgs, 3, "expected 3 messages")
msgs[0].Callback()
require.Equal(t, 3, count, "expected 2 callbacks to be called")
msgs[1].Callback()
require.Equal(t, 10, count, "expected 2 callbacks to be called")
msgs[2].Callback()
require.Equal(t, 15, count, "expected one callback to be called")
}

0 comments on commit c22ae1c

Please sign in to comment.