Skip to content

Commit

Permalink
sink/mq(ticdc): split json.go and add callback (#6176)
Browse files Browse the repository at this point in the history
ref #5928
  • Loading branch information
Rustin170506 authored Jul 6, 2022
1 parent 13264dd commit b813e1b
Show file tree
Hide file tree
Showing 26 changed files with 1,141 additions and 935 deletions.
5 changes: 3 additions & 2 deletions cdc/sink/mq/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(
ctx context.Context,
topic string,
e *model.RowChangedEvent,
_ func(),
) error {
log.Debug("AppendRowChangedEvent", zap.Any("rowChangedEvent", e))
mqMessage := NewMQMessage(
mqMessage := newMsg(
config.ProtocolAvro,
nil,
nil,
Expand Down Expand Up @@ -840,7 +841,7 @@ func (b *avroEventBatchEncoderBuilder) Build() EventBatchEncoder {
encoder.keySchemaManager = b.keySchemaManager
encoder.valueSchemaManager = b.valueSchemaManager
encoder.resultBuf = make([]*MQMessage, 0, 4096)
encoder.maxMessageBytes = b.config.MaxMessageBytes()
encoder.maxMessageBytes = b.config.maxMessageBytes
encoder.enableTiDBExtension = b.config.enableTiDBExtension
encoder.decimalHandlingMode = b.config.avroDecimalHandlingMode
encoder.bigintUnsignedHandlingMode = b.config.avroBigintUnsignedHandlingMode
Expand Down
9 changes: 5 additions & 4 deletions cdc/sink/mq/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,10 @@ func (d *CanalEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, e

// AppendRowChangedEvent implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) AppendRowChangedEvent(
ctx context.Context,
topic string,
_ context.Context,
_ string,
e *model.RowChangedEvent,
_ func(),
) error {
entry, err := d.entryBuilder.FromRowEvent(e)
if err != nil {
Expand Down Expand Up @@ -467,7 +468,7 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

return newDDLMQMessage(config.ProtocolCanal, nil, b, e), nil
return newDDLMsg(config.ProtocolCanal, nil, b, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -486,7 +487,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if err != nil {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MessageTypeRow, nil, nil)
ret := newMsg(config.ProtocolCanal, nil, value, 0, model.MessageTypeRow, nil, nil)
ret.SetRowsCount(rowCount)
d.messages.Reset()
d.resetPacket()
Expand Down
13 changes: 7 additions & 6 deletions cdc/sink/mq/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,15 @@ func (c *CanalFlatEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessag
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
}
return newResolvedMQMessage(config.ProtocolCanalJSON, nil, value, ts), nil
return newResolvedMsg(config.ProtocolCanalJSON, nil, value, ts), nil
}

// AppendRowChangedEvent implements the interface EventBatchEncoder
func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent(
ctx context.Context,
topic string,
_ context.Context,
_ string,
e *model.RowChangedEvent,
_ func(),
) error {
message, err := c.newFlatMessageForDML(e)
if err != nil {
Expand All @@ -360,7 +361,7 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa
if err != nil {
return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err)
}
return newDDLMQMessage(config.ProtocolCanalJSON, nil, value, e), nil
return newDDLMsg(config.ProtocolCanalJSON, nil, value, e), nil
}

// Build implements the EventBatchEncoder interface
Expand All @@ -375,7 +376,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MessageTypeRow, msg.getSchema(), msg.getTable())
m := newMsg(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MessageTypeRow, msg.getSchema(), msg.getTable())
m.IncRowsCount()
ret[i] = m
}
Expand Down Expand Up @@ -529,7 +530,7 @@ func canalFlatJSONColumnMap2SinkColumns(cols map[string]interface{}, mysqlType m
}
mysqlTypeStr = trimUnsignedFromMySQLType(mysqlTypeStr)
mysqlType := types.StrToType(mysqlTypeStr)
col := newColumn(value, mysqlType).decodeCanalJSONColumn(name, JavaSQLType(javaType))
col := newColumn(value, mysqlType).toCanalJSONFormatColumn(name, JavaSQLType(javaType))
result = append(result, col)
}
if len(result) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestNewCanalFlatEventBatchDecoder4RowMessage(t *testing.T) {
encoder := &CanalFlatEventBatchEncoder{builder: NewCanalEntryBuilder(), enableTiDBExtension: encodeEnable}
require.NotNil(t, encoder)

err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert)
err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, nil)
require.Nil(t, err)

mqMessages := encoder.Build()
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestBatching(t *testing.T) {
for i := 1; i <= 1000; i++ {
ts := uint64(i)
updateCase.CommitTs = ts
err := encoder.AppendRowChangedEvent(context.Background(), "", &updateCase)
err := encoder.AppendRowChangedEvent(context.Background(), "", &updateCase, nil)
require.Nil(t, err)

if i%100 == 0 {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestCanalEventBatchEncoder(t *testing.T) {
for _, cs := range s.rowCases {
encoder := NewCanalEventBatchEncoder()
for _, row := range cs {
err := encoder.AppendRowChangedEvent(context.Background(), "", row)
err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil)
require.Nil(t, err)
}
res := encoder.Build()
Expand Down
22 changes: 11 additions & 11 deletions cdc/sink/mq/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ func TestJsonVsCraftVsPB(t *testing.T) {
craftEncoder.(*CraftEventBatchEncoder).maxBatchSize = 64
craftMessages := encodeRowCase(t, craftEncoder, cs)

jsonEncoder := NewJSONEventBatchEncoder()
jsonEncoder.(*JSONEventBatchEncoder).maxMessageBytes = 8192
jsonEncoder.(*JSONEventBatchEncoder).maxBatchSize = 64
jsonEncoder := newOpenProtocolBatchEncoder()
jsonEncoder.(*OpenProtocolBatchEncoder).maxMessageBytes = 8192
jsonEncoder.(*OpenProtocolBatchEncoder).maxBatchSize = 64
jsonMessages := encodeRowCase(t, jsonEncoder, cs)

protobuf1Messages := codecEncodeRowChangedPB1ToMessage(cs)
Expand Down Expand Up @@ -352,7 +352,7 @@ func codecEncodeRowChangedPB2(events []*model.RowChangedEvent) []byte {

func codecEncodeRowCase(encoder EventBatchEncoder, events []*model.RowChangedEvent) ([]*MQMessage, error) {
for _, event := range events {
err := encoder.AppendRowChangedEvent(context.Background(), "", event)
err := encoder.AppendRowChangedEvent(context.Background(), "", event, nil)
if err != nil {
return nil, err
}
Expand All @@ -373,9 +373,9 @@ func init() {
panic(err)
}

encoder = NewJSONEventBatchEncoder()
encoder.(*JSONEventBatchEncoder).maxMessageBytes = 8192
encoder.(*JSONEventBatchEncoder).maxBatchSize = 64
encoder = newOpenProtocolBatchEncoder()
encoder.(*OpenProtocolBatchEncoder).maxMessageBytes = 8192
encoder.(*OpenProtocolBatchEncoder).maxBatchSize = 64
if codecJSONEncodedRowChanges, err = codecEncodeRowCase(encoder, codecBenchmarkRowChanges); err != nil {
panic(err)
}
Expand All @@ -394,9 +394,9 @@ func BenchmarkCraftEncoding(b *testing.B) {
}

func BenchmarkJsonEncoding(b *testing.B) {
encoder := NewJSONEventBatchEncoder()
encoder.(*JSONEventBatchEncoder).maxMessageBytes = 8192
encoder.(*JSONEventBatchEncoder).maxBatchSize = 64
encoder := newOpenProtocolBatchEncoder()
encoder.(*OpenProtocolBatchEncoder).maxMessageBytes = 8192
encoder.(*OpenProtocolBatchEncoder).maxBatchSize = 64
for i := 0; i < b.N; i++ {
_, _ = codecEncodeRowCase(encoder, codecBenchmarkRowChanges)
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func BenchmarkCraftDecoding(b *testing.B) {
func BenchmarkJsonDecoding(b *testing.B) {
for i := 0; i < b.N; i++ {
for _, message := range codecJSONEncodedRowChanges {
if decoder, err := NewJSONEventBatchDecoder(message.Key, message.Value); err != nil {
if decoder, err := NewOpenProtocolBatchDecoder(message.Key, message.Value); err != nil {
panic(err)
} else {
for {
Expand Down
188 changes: 188 additions & 0 deletions cdc/sink/mq/codec/column.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package codec

import (
"encoding/base64"
"encoding/json"
"strconv"

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
"golang.org/x/text/encoding/charmap"
)

type column struct {
Type byte `json:"t"`
// Deprecated: please use Flag instead.
WhereHandle *bool `json:"h,omitempty"`
Flag model.ColumnFlagType `json:"f"`
Value any `json:"v"`
}

func newColumn(value any, tp byte) *column {
return &column{
Value: value,
Type: tp,
}
}

func (c *column) fromRowChangeColumn(col *model.Column) {
c.Type = col.Type
c.Flag = col.Flag
if c.Flag.IsHandleKey() {
whereHandle := true
c.WhereHandle = &whereHandle
}
if col.Value == nil {
c.Value = nil
return
}
switch col.Type {
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
var str string
switch col.Value.(type) {
case []byte:
str = string(col.Value.([]byte))
case string:
str = col.Value.(string)
default:
log.Panic("invalid column value, please report a bug", zap.Any("col", col))
}
if c.Flag.IsBinary() {
str = strconv.Quote(str)
str = str[1 : len(str)-1]
}
c.Value = str
default:
c.Value = col.Value
}
}

func (c *column) toRowChangeColumn(name string) *model.Column {
col := new(model.Column)
col.Type = c.Type
col.Flag = c.Flag
col.Name = name
col.Value = c.Value
if c.Value == nil {
return col
}
switch col.Type {
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
str := col.Value.(string)
var err error
if c.Flag.IsBinary() {
str, err = strconv.Unquote("\"" + str + "\"")
if err != nil {
log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err))
}
}
col.Value = []byte(str)
default:
col.Value = c.Value
}
return col
}

func (c *column) toCanalJSONFormatColumn(name string, javaType JavaSQLType) *model.Column {
col := new(model.Column)
col.Type = c.Type
col.Flag = c.Flag
col.Name = name
col.Value = c.Value
if c.Value == nil {
return col
}

value, ok := col.Value.(string)
if !ok {
log.Panic("canal-json encoded message should have type in `string`")
}

if javaType == JavaSQLTypeBIT {
val, err := strconv.ParseUint(value, 10, 64)
if err != nil {
log.Panic("invalid column value for bit", zap.Any("col", c), zap.Error(err))
}
col.Value = val
return col
}

if javaType != JavaSQLTypeBLOB {
col.Value = value
return col
}

// when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back.
encoder := charmap.ISO8859_1.NewEncoder()
value, err := encoder.String(value)
if err != nil {
log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err))
}

col.Value = value
return col
}

func formatColumn(c column) column {
switch c.Type {
case mysql.TypeTinyBlob, mysql.TypeMediumBlob,
mysql.TypeLongBlob, mysql.TypeBlob:
if s, ok := c.Value.(string); ok {
var err error
c.Value, err = base64.StdEncoding.DecodeString(s)
if err != nil {
log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err))
}
}
case mysql.TypeFloat, mysql.TypeDouble:
if s, ok := c.Value.(json.Number); ok {
f64, err := s.Float64()
if err != nil {
log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err))
}
c.Value = f64
}
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
if s, ok := c.Value.(json.Number); ok {
var err error
if c.Flag.IsUnsigned() {
c.Value, err = strconv.ParseUint(s.String(), 10, 64)
} else {
c.Value, err = strconv.ParseInt(s.String(), 10, 64)
}
if err != nil {
log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err))
}
} else if f, ok := c.Value.(float64); ok {
if c.Flag.IsUnsigned() {
c.Value = uint64(f)
} else {
c.Value = int64(f)
}
}
case mysql.TypeBit:
if s, ok := c.Value.(json.Number); ok {
intNum, err := s.Int64()
if err != nil {
log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err))
}
c.Value = uint64(intNum)
}
}
return c
}
Loading

0 comments on commit b813e1b

Please sign in to comment.