Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4074
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Dec 26, 2021
1 parent 87f287e commit 1bc3559
Show file tree
Hide file tree
Showing 8 changed files with 774 additions and 13 deletions.
282 changes: 282 additions & 0 deletions cdc/sink/codec/craft.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License")
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.orglicensesLICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"context"
"math"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/craft"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
)

// CraftEventBatchEncoder encodes the events into the byte of a batch into craft binary format.
type CraftEventBatchEncoder struct {
rowChangedBuffer *craft.RowChangedEventBuffer
messageBuf []*MQMessage

// configs
maxMessageBytes int
maxBatchSize int

allocator *craft.SliceAllocator
}

// EncodeCheckpointEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) {
return newResolvedMQMessage(config.ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil
}

func (e *CraftEventBatchEncoder) flush() {
headers := e.rowChangedBuffer.GetHeaders()
ts := headers.GetTs(0)
schema := headers.GetSchema(0)
table := headers.GetTable(0)
rowsCnt := e.rowChangedBuffer.RowsCount()
mqMessage := NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MqMessageTypeRow, &schema, &table)
mqMessage.SetRowsCount(rowsCnt)
e.messageBuf = append(e.messageBuf, mqMessage)
}

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) AppendRowChangedEvent(ev *model.RowChangedEvent) (EncoderResult, error) {
rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev)
if size > e.maxMessageBytes || rows >= e.maxBatchSize {
e.flush()
}
return EncoderNoOperation, nil
}

// AppendResolvedEvent is no-op
func (e *CraftEventBatchEncoder) AppendResolvedEvent(ts uint64) (EncoderResult, error) {
return EncoderNoOperation, nil
}

// EncodeDDLEvent implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) {
return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil
}

// Build implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Build() []*MQMessage {
if e.rowChangedBuffer.Size() > 0 {
// flush buffered data to message buffer
e.flush()
}
ret := e.messageBuf
e.messageBuf = make([]*MQMessage, 0, 2)
return ret
}

// MixedBuild implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) MixedBuild(withVersion bool) []byte {
panic("Only JsonEncoder supports mixed build")
}

// Size implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Size() int {
return e.rowChangedBuffer.Size()
}

// Reset implements the EventBatchEncoder interface
func (e *CraftEventBatchEncoder) Reset() {
e.rowChangedBuffer.Reset()
}

// SetParams reads relevant parameters for craft protocol
func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error {
var err error
maxMessageBytes, ok := params["max-message-bytes"]
if !ok {
return cerror.ErrSinkInvalidConfig.GenWithStack("max-message-bytes not found")
}

e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes)
if err != nil {
return cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
}
if e.maxMessageBytes <= 0 || e.maxMessageBytes > math.MaxInt32 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageBytes))
}

e.maxBatchSize = DefaultMaxBatchSize
if maxBatchSize, ok := params["max-batch-size"]; ok {
e.maxBatchSize, err = strconv.Atoi(maxBatchSize)
if err != nil {
return cerror.ErrSinkInvalidConfig.Wrap(err)
}
}
if e.maxBatchSize <= 0 || e.maxBatchSize > math.MaxUint16 {
return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-batch-size %d", e.maxBatchSize))
}

return nil
}

// NewCraftEventBatchEncoder creates a new CraftEventBatchEncoder.
func NewCraftEventBatchEncoder() EventBatchEncoder {
// 64 is a magic number that come up with these assumptions and manual benchmark.
// 1. Most table will not have more than 64 columns
// 2. It only worth allocating slices in batch for slices that's small enough
return NewCraftEventBatchEncoderWithAllocator(craft.NewSliceAllocator(64))
}

type craftEventBatchEncoderBuilder struct {
opts map[string]string
}

// Build a CraftEventBatchEncoder
func (b *craftEventBatchEncoderBuilder) Build(ctx context.Context) (EventBatchEncoder, error) {
encoder := NewCraftEventBatchEncoder()
if err := encoder.SetParams(b.opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

return encoder, nil
}

func newCraftEventBatchEncoderBuilder(opts map[string]string) EncoderBuilder {
return &craftEventBatchEncoderBuilder{opts: opts}
}

// NewCraftEventBatchEncoderWithAllocator creates a new CraftEventBatchEncoder with given allocator.
func NewCraftEventBatchEncoderWithAllocator(allocator *craft.SliceAllocator) EventBatchEncoder {
return &CraftEventBatchEncoder{
allocator: allocator,
messageBuf: make([]*MQMessage, 0, 2),
rowChangedBuffer: craft.NewRowChangedEventBuffer(allocator),
}
}

// CraftEventBatchDecoder decodes the byte of a batch into the original messages.
type CraftEventBatchDecoder struct {
headers *craft.Headers
decoder *craft.MessageDecoder
index int

allocator *craft.SliceAllocator
}

// HasNext implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) HasNext() (model.MqMessageType, bool, error) {
if b.index >= b.headers.Count() {
return model.MqMessageTypeUnknown, false, nil
}
return b.headers.GetType(b.index), true, nil
}

// NextResolvedEvent implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) NextResolvedEvent() (uint64, error) {
ty, hasNext, err := b.HasNext()
if err != nil {
return 0, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeResolved {
return 0, cerror.ErrCraftCodecInvalidData.GenWithStack("not found resolved event message")
}
ts := b.headers.GetTs(b.index)
b.index++
return ts, nil
}

// NextRowChangedEvent implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
ty, hasNext, err := b.HasNext()
if err != nil {
return nil, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeRow {
return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found row changed event message")
}
oldValue, newValue, err := b.decoder.RowChangedEvent(b.index)
if err != nil {
return nil, errors.Trace(err)
}
ev := &model.RowChangedEvent{}
if oldValue != nil {
if ev.PreColumns, err = oldValue.ToModel(); err != nil {
return nil, errors.Trace(err)
}
}
if newValue != nil {
if ev.Columns, err = newValue.ToModel(); err != nil {
return nil, errors.Trace(err)
}
}
ev.CommitTs = b.headers.GetTs(b.index)
ev.Table = &model.TableName{
Schema: b.headers.GetSchema(b.index),
Table: b.headers.GetTable(b.index),
}
partition := b.headers.GetPartition(b.index)
if partition >= 0 {
ev.Table.TableID = partition
ev.Table.IsPartition = true
}
b.index++
return ev, nil
}

// NextDDLEvent implements the EventBatchDecoder interface
func (b *CraftEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) {
ty, hasNext, err := b.HasNext()
if err != nil {
return nil, errors.Trace(err)
}
if !hasNext || ty != model.MqMessageTypeDDL {
return nil, cerror.ErrCraftCodecInvalidData.GenWithStack("not found ddl event message")
}
ddlType, query, err := b.decoder.DDLEvent(b.index)
if err != nil {
return nil, errors.Trace(err)
}
event := &model.DDLEvent{
CommitTs: b.headers.GetTs(b.index),
Query: query,
Type: ddlType,
TableInfo: &model.SimpleTableInfo{
Schema: b.headers.GetSchema(b.index),
Table: b.headers.GetTable(b.index),
},
}
b.index++
return event, nil
}

// NewCraftEventBatchDecoder creates a new CraftEventBatchDecoder.
func NewCraftEventBatchDecoder(bits []byte) (EventBatchDecoder, error) {
return NewCraftEventBatchDecoderWithAllocator(bits, craft.NewSliceAllocator(64))
}

// NewCraftEventBatchDecoderWithAllocator creates a new CraftEventBatchDecoder with given allocator.
func NewCraftEventBatchDecoderWithAllocator(bits []byte, allocator *craft.SliceAllocator) (EventBatchDecoder, error) {
decoder, err := craft.NewMessageDecoder(bits, allocator)
if err != nil {
return nil, errors.Trace(err)
}
headers, err := decoder.Headers()
if err != nil {
return nil, errors.Trace(err)
}

return &CraftEventBatchDecoder{
headers: headers,
decoder: decoder,
allocator: allocator,
}, nil
}
Loading

0 comments on commit 1bc3559

Please sign in to comment.