From c22ae1cfb33d0200b6ad6d1076fc066b4de82757 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Tue, 12 Jul 2022 16:36:46 +0800 Subject: [PATCH] sink(ticdc): craft support callback --- cdc/sink/mq/codec/craft_encoder.go | 16 +++++- cdc/sink/mq/codec/craft_encoder_test.go | 72 +++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/cdc/sink/mq/codec/craft_encoder.go b/cdc/sink/mq/codec/craft_encoder.go index d6d6b290465..89d4d12833f 100644 --- a/cdc/sink/mq/codec/craft_encoder.go +++ b/cdc/sink/mq/codec/craft_encoder.go @@ -25,6 +25,7 @@ import ( type craftBatchEncoder struct { rowChangedBuffer *craft.RowChangedEventBuffer messageBuf []*MQMessage + callbackBuf []func() // configs maxMessageBytes int @@ -45,9 +46,12 @@ func (e *craftBatchEncoder) AppendRowChangedEvent( _ context.Context, _ string, ev *model.RowChangedEvent, - _ func(), + callback func(), ) error { rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev) + if callback != nil { + e.callbackBuf = append(e.callbackBuf, callback) + } if size > e.maxMessageBytes || rows >= e.maxBatchSize { e.flush() } @@ -80,6 +84,15 @@ func (e *craftBatchEncoder) flush() { mqMessage := newMsg(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table) mqMessage.SetRowsCount(rowsCnt) + if len(e.callbackBuf) != 0 && len(e.callbackBuf) == rowsCnt { + callbacks := e.callbackBuf + mqMessage.Callback = func() { + for _, cb := range callbacks { + cb() + } + } + e.callbackBuf = make([]func(), 0) + } e.messageBuf = append(e.messageBuf, mqMessage) } @@ -112,6 +125,7 @@ func newCraftBatchEncoderWithAllocator(allocator *craft.SliceAllocator) EventBat return &craftBatchEncoder{ allocator: allocator, messageBuf: make([]*MQMessage, 0, 2), + callbackBuf: make([]func(), 0), rowChangedBuffer: craft.NewRowChangedEventBuffer(allocator), } } diff --git a/cdc/sink/mq/codec/craft_encoder_test.go b/cdc/sink/mq/codec/craft_encoder_test.go index e193c6d3616..72d400bf8b2 100644 --- a/cdc/sink/mq/codec/craft_encoder_test.go +++ b/cdc/sink/mq/codec/craft_encoder_test.go @@ -198,3 +198,75 @@ func TestDefaultCraftBatchCodec(t *testing.T) { cfg.maxBatchSize = 64 testBatchCodec(t, newCraftBatchEncoderBuilder(cfg), newCraftBatchDecoder) } + +func TestCraftAppendRowChangedEventWithCallback(t *testing.T) { + t.Parallel() + cfg := NewConfig(config.ProtocolCraft).WithMaxMessageBytes(10485760) + cfg.maxBatchSize = 2 + encoder := newCraftBatchEncoderBuilder(cfg).Build() + require.NotNil(t, encoder) + + count := 0 + + row := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: mysql.TypeVarchar, Value: []byte("aa")}}, + } + + tests := []struct { + row *model.RowChangedEvent + callback func() + }{ + { + row: row, + callback: func() { + count += 1 + }, + }, + { + row: row, + callback: func() { + count += 2 + }, + }, + { + row: row, + callback: func() { + count += 3 + }, + }, + { + row: row, + callback: func() { + count += 4 + }, + }, + { + row: row, + callback: func() { + count += 5 + }, + }, + } + + // Empty build makes sure that the callback build logic not broken. + msgs := encoder.Build() + require.Len(t, msgs, 0, "no message should be built and no panic") + + // Append the events. + for _, test := range tests { + err := encoder.AppendRowChangedEvent(context.Background(), "", test.row, test.callback) + require.Nil(t, err) + } + require.Equal(t, 0, count, "nothing should be called") + + msgs = encoder.Build() + require.Len(t, msgs, 3, "expected 3 messages") + msgs[0].Callback() + require.Equal(t, 3, count, "expected 2 callbacks to be called") + msgs[1].Callback() + require.Equal(t, 10, count, "expected 2 callbacks to be called") + msgs[2].Callback() + require.Equal(t, 15, count, "expected one callback to be called") +}