Skip to content

Commit

Permalink
ticdc/mq: accurately demonstrate txn_batch_size metric for MQ sink (#…
Browse files Browse the repository at this point in the history
…3609) (#3819)

* fix the txn_batch_size metric inaccuracy bug when the sink target is MQ

* address comments

* add comments for exported functions

* fix the compiling problem

* fix conflicts.

Co-authored-by: zhaoxinyu <zhaoxinyu512@gmail.com>
Co-authored-by: Ling Jin <7138436+3AceShowHand@users.noreply.github.com>
Co-authored-by: 3AceShowHand <jinl1037@hotmail.com>
  • Loading branch information
4 people authored Jan 17, 2022
1 parent 9779df7 commit 6c434c2
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
5 changes: 4 additions & 1 deletion cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func (e *CraftEventBatchEncoder) flush() {
ts := headers.GetTs(0)
schema := headers.GetSchema(0)
table := headers.GetTable(0)
e.messageBuf = append(e.messageBuf, NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table))
rowsCnt := e.rowChangedBuffer.RowsCount()
mqMessage := NewMQMessage(ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table)
mqMessage.SetRowsCount(rowsCnt)
e.messageBuf = append(e.messageBuf, mqMessage)
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/codec/craft/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ func (b *RowChangedEventBuffer) Size() int {
return b.estimatedSize
}

// Number of rows batched in this buffer
func (b *RowChangedEventBuffer) RowsCount() int {
return b.eventsCount
}

// GetHeaders returns headers of buffer
func (b *RowChangedEventBuffer) GetHeaders() *Headers {
return b.headers
Expand Down
45 changes: 31 additions & 14 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ type EventBatchEncoder interface {

// MQMessage represents an MQ message to the mqSink
type MQMessage struct {
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol Protocol // protocol
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol Protocol // protocol
rowsCount int // rows in one MQ Message
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
Expand All @@ -83,6 +84,21 @@ func (m *MQMessage) PhysicalTime() time.Time {
return oracle.GetTimeFromTS(m.Ts)
}

// GetRowsCount returns the number of rows batched in one MQMessage
func (m *MQMessage) GetRowsCount() int {
return m.rowsCount
}

// SetRowsCount set the number of rows
func (m *MQMessage) SetRowsCount(cnt int) {
m.rowsCount = cnt
}

// IncRowsCount increase the number of rows
func (m *MQMessage) IncRowsCount() {
m.rowsCount++
}

func newDDLMQMessage(proto Protocol, key, value []byte, event *model.DDLEvent) *MQMessage {
return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table)
}
Expand All @@ -95,13 +111,14 @@ func newResolvedMQMessage(proto Protocol, key, value []byte, ts uint64) *MQMessa
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(proto Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage {
ret := &MQMessage{
Key: nil,
Value: nil,
Ts: ts,
Schema: schema,
Table: table,
Type: ty,
Protocol: proto,
Key: nil,
Value: nil,
Ts: ts,
Schema: schema,
Table: table,
Type: ty,
Protocol: proto,
rowsCount: 0,
}

if key != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxMessageSize {
// `len(d.messageBuf) == 1` is implied
Expand Down
5 changes: 3 additions & 2 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
flushToProducer := func(op codec.EncoderResult) error {
return k.statistics.RecordBatchExecution(func() (int, error) {
messages := encoder.Build()
thisBatchSize := len(messages)
if thisBatchSize == 0 {
thisBatchSize := 0
if len(messages) == 0 {
return 0, nil
}

Expand All @@ -285,6 +285,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
if err != nil {
return 0, err
}
thisBatchSize += msg.GetRowsCount()
}

if op == codec.EncoderNeedSyncWrite {
Expand Down

0 comments on commit 6c434c2

Please sign in to comment.