Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc/mq: accurately demonstrate txn_batch_size metric for MQ sink #3609

Merged
merged 16 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ func (e *CraftEventBatchEncoder) flush() {
ts := headers.GetTs(0)
schema := headers.GetSchema(0)
table := headers.GetTable(0)
e.messageBuf = append(e.messageBuf, NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table))
rowsCnt := e.rowChangedBuffer.RowsCount()
mqMessage := NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table)
mqMessage.SetRowsCount(rowsCnt)
e.messageBuf = append(e.messageBuf, mqMessage)
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/codec/craft/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ func (b *RowChangedEventBuffer) Size() int {
return b.estimatedSize
}

// Number of rows batched in this buffer
func (b *RowChangedEventBuffer) RowsCount() int {
return b.eventsCount
}

// GetHeaders returns headers of buffer
func (b *RowChangedEventBuffer) GetHeaders() *Headers {
return b.headers
Expand Down
45 changes: 31 additions & 14 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ type EventBatchEncoder interface {

// MQMessage represents an MQ message to the mqSink
type MQMessage struct {
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol config.Protocol // protocol
Key []byte
Value []byte
Ts uint64 // reserved for possible output sorting
Schema *string // schema
Table *string // table
Type model.MqMessageType // type
Protocol config.Protocol // protocol
rowsCount int // rows in one MQ Message
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
Expand All @@ -83,6 +84,21 @@ func (m *MQMessage) PhysicalTime() time.Time {
return oracle.GetTimeFromTS(m.Ts)
}

// GetRowsCount returns the number of rows batched in one MQMessage
func (m *MQMessage) GetRowsCount() int {
return m.rowsCount
}

// SetRowsCount set the number of rows
func (m *MQMessage) SetRowsCount(cnt int) {
m.rowsCount = cnt
}

// IncRowsCount increase the number of rows
func (m *MQMessage) IncRowsCount() {
m.rowsCount++
}

func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLEvent) *MQMessage {
return NewMQMessage(proto, key, value, event.CommitTs, model.MqMessageTypeDDL, &event.TableInfo.Schema, &event.TableInfo.Table)
}
Expand All @@ -95,13 +111,14 @@ func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType, schema, table *string) *MQMessage {
ret := &MQMessage{
Key: nil,
Value: nil,
Ts: ts,
Schema: schema,
Table: table,
Type: ty,
Protocol: proto,
Key: nil,
Value: nil,
Ts: ts,
Schema: schema,
Table: table,
Type: ty,
Protocol: proto,
rowsCount: 0,
}

if key != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

if message.Length() > d.maxMessageSize {
// `len(d.messageBuf) == 1` is implied
Expand Down
5 changes: 3 additions & 2 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
flushToProducer := func(op codec.EncoderResult) error {
return k.statistics.RecordBatchExecution(func() (int, error) {
messages := encoder.Build()
thisBatchSize := len(messages)
if thisBatchSize == 0 {
thisBatchSize := 0
if len(messages) == 0 {
return 0, nil
}

Expand All @@ -291,6 +291,7 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
if err != nil {
return 0, err
}
thisBatchSize += msg.GetRowsCount()
}

if op == codec.EncoderNeedSyncWrite {
Expand Down