diff --git a/cdc/sink/mq/codec/avro.go b/cdc/sink/mq/codec/avro.go index ed8faa80e05..daf3e5c6c82 100644 --- a/cdc/sink/mq/codec/avro.go +++ b/cdc/sink/mq/codec/avro.go @@ -60,9 +60,10 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent( ctx context.Context, topic string, e *model.RowChangedEvent, + _ func(), ) error { log.Debug("AppendRowChangedEvent", zap.Any("rowChangedEvent", e)) - mqMessage := NewMQMessage( + mqMessage := newMsg( config.ProtocolAvro, nil, nil, @@ -840,7 +841,7 @@ func (b *avroEventBatchEncoderBuilder) Build() EventBatchEncoder { encoder.keySchemaManager = b.keySchemaManager encoder.valueSchemaManager = b.valueSchemaManager encoder.resultBuf = make([]*MQMessage, 0, 4096) - encoder.maxMessageBytes = b.config.MaxMessageBytes() + encoder.maxMessageBytes = b.config.maxMessageBytes encoder.enableTiDBExtension = b.config.enableTiDBExtension encoder.decimalHandlingMode = b.config.avroDecimalHandlingMode encoder.bigintUnsignedHandlingMode = b.config.avroBigintUnsignedHandlingMode diff --git a/cdc/sink/mq/codec/canal.go b/cdc/sink/mq/codec/canal.go index eb86a5e300c..77c59647f1f 100644 --- a/cdc/sink/mq/codec/canal.go +++ b/cdc/sink/mq/codec/canal.go @@ -421,9 +421,10 @@ func (d *CanalEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, e // AppendRowChangedEvent implements the EventBatchEncoder interface func (d *CanalEventBatchEncoder) AppendRowChangedEvent( - ctx context.Context, - topic string, + _ context.Context, + _ string, e *model.RowChangedEvent, + _ func(), ) error { entry, err := d.entryBuilder.FromRowEvent(e) if err != nil { @@ -467,7 +468,7 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err) } - return newDDLMQMessage(config.ProtocolCanal, nil, b, e), nil + return newDDLMsg(config.ProtocolCanal, nil, b, e), nil } // Build implements the EventBatchEncoder interface @@ -486,7 +487,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage { if err != nil { log.Panic("Error when serializing Canal packet", zap.Error(err)) } - ret := NewMQMessage(config.ProtocolCanal, nil, value, 0, model.MessageTypeRow, nil, nil) + ret := newMsg(config.ProtocolCanal, nil, value, 0, model.MessageTypeRow, nil, nil) ret.SetRowsCount(rowCount) d.messages.Reset() d.resetPacket() diff --git a/cdc/sink/mq/codec/canal_flat.go b/cdc/sink/mq/codec/canal_flat.go index e41b9d904be..17e21b8d921 100644 --- a/cdc/sink/mq/codec/canal_flat.go +++ b/cdc/sink/mq/codec/canal_flat.go @@ -336,14 +336,15 @@ func (c *CanalFlatEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessag if err != nil { return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err) } - return newResolvedMQMessage(config.ProtocolCanalJSON, nil, value, ts), nil + return newResolvedMsg(config.ProtocolCanalJSON, nil, value, ts), nil } // AppendRowChangedEvent implements the interface EventBatchEncoder func (c *CanalFlatEventBatchEncoder) AppendRowChangedEvent( - ctx context.Context, - topic string, + _ context.Context, + _ string, e *model.RowChangedEvent, + _ func(), ) error { message, err := c.newFlatMessageForDML(e) if err != nil { @@ -360,7 +361,7 @@ func (c *CanalFlatEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessa if err != nil { return nil, cerrors.WrapError(cerrors.ErrCanalEncodeFailed, err) } - return newDDLMQMessage(config.ProtocolCanalJSON, nil, value, e), nil + return newDDLMsg(config.ProtocolCanalJSON, nil, value, e), nil } // Build implements the EventBatchEncoder interface @@ -375,7 +376,7 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage { log.Panic("CanalFlatEventBatchEncoder", zap.Error(err)) return nil } - m := NewMQMessage(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MessageTypeRow, msg.getSchema(), msg.getTable()) + m := newMsg(config.ProtocolCanalJSON, nil, value, msg.getTikvTs(), model.MessageTypeRow, msg.getSchema(), msg.getTable()) m.IncRowsCount() ret[i] = m } @@ -529,7 +530,7 @@ func canalFlatJSONColumnMap2SinkColumns(cols map[string]interface{}, mysqlType m } mysqlTypeStr = trimUnsignedFromMySQLType(mysqlTypeStr) mysqlType := types.StrToType(mysqlTypeStr) - col := newColumn(value, mysqlType).decodeCanalJSONColumn(name, JavaSQLType(javaType)) + col := newColumn(value, mysqlType).toCanalJSONFormatColumn(name, JavaSQLType(javaType)) result = append(result, col) } if len(result) == 0 { diff --git a/cdc/sink/mq/codec/canal_flat_test.go b/cdc/sink/mq/codec/canal_flat_test.go index a9b655cb96c..93fa7f747a8 100644 --- a/cdc/sink/mq/codec/canal_flat_test.go +++ b/cdc/sink/mq/codec/canal_flat_test.go @@ -165,7 +165,7 @@ func TestNewCanalFlatEventBatchDecoder4RowMessage(t *testing.T) { encoder := &CanalFlatEventBatchEncoder{builder: NewCanalEntryBuilder(), enableTiDBExtension: encodeEnable} require.NotNil(t, encoder) - err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert) + err := encoder.AppendRowChangedEvent(context.Background(), "", testCaseInsert, nil) require.Nil(t, err) mqMessages := encoder.Build() @@ -294,7 +294,7 @@ func TestBatching(t *testing.T) { for i := 1; i <= 1000; i++ { ts := uint64(i) updateCase.CommitTs = ts - err := encoder.AppendRowChangedEvent(context.Background(), "", &updateCase) + err := encoder.AppendRowChangedEvent(context.Background(), "", &updateCase, nil) require.Nil(t, err) if i%100 == 0 { diff --git a/cdc/sink/mq/codec/canal_test.go b/cdc/sink/mq/codec/canal_test.go index d1b13083c78..edb6afb53f0 100644 --- a/cdc/sink/mq/codec/canal_test.go +++ b/cdc/sink/mq/codec/canal_test.go @@ -86,7 +86,7 @@ func TestCanalEventBatchEncoder(t *testing.T) { for _, cs := range s.rowCases { encoder := NewCanalEventBatchEncoder() for _, row := range cs { - err := encoder.AppendRowChangedEvent(context.Background(), "", row) + err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) } res := encoder.Build() diff --git a/cdc/sink/mq/codec/codec_test.go b/cdc/sink/mq/codec/codec_test.go index 83e8ff93226..fe49d4ad358 100644 --- a/cdc/sink/mq/codec/codec_test.go +++ b/cdc/sink/mq/codec/codec_test.go @@ -220,9 +220,9 @@ func TestJsonVsCraftVsPB(t *testing.T) { craftEncoder.(*CraftEventBatchEncoder).maxBatchSize = 64 craftMessages := encodeRowCase(t, craftEncoder, cs) - jsonEncoder := NewJSONEventBatchEncoder() - jsonEncoder.(*JSONEventBatchEncoder).maxMessageBytes = 8192 - jsonEncoder.(*JSONEventBatchEncoder).maxBatchSize = 64 + jsonEncoder := newOpenProtocolBatchEncoder() + jsonEncoder.(*OpenProtocolBatchEncoder).maxMessageBytes = 8192 + jsonEncoder.(*OpenProtocolBatchEncoder).maxBatchSize = 64 jsonMessages := encodeRowCase(t, jsonEncoder, cs) protobuf1Messages := codecEncodeRowChangedPB1ToMessage(cs) @@ -352,7 +352,7 @@ func codecEncodeRowChangedPB2(events []*model.RowChangedEvent) []byte { func codecEncodeRowCase(encoder EventBatchEncoder, events []*model.RowChangedEvent) ([]*MQMessage, error) { for _, event := range events { - err := encoder.AppendRowChangedEvent(context.Background(), "", event) + err := encoder.AppendRowChangedEvent(context.Background(), "", event, nil) if err != nil { return nil, err } @@ -373,9 +373,9 @@ func init() { panic(err) } - encoder = NewJSONEventBatchEncoder() - encoder.(*JSONEventBatchEncoder).maxMessageBytes = 8192 - encoder.(*JSONEventBatchEncoder).maxBatchSize = 64 + encoder = newOpenProtocolBatchEncoder() + encoder.(*OpenProtocolBatchEncoder).maxMessageBytes = 8192 + encoder.(*OpenProtocolBatchEncoder).maxBatchSize = 64 if codecJSONEncodedRowChanges, err = codecEncodeRowCase(encoder, codecBenchmarkRowChanges); err != nil { panic(err) } @@ -394,9 +394,9 @@ func BenchmarkCraftEncoding(b *testing.B) { } func BenchmarkJsonEncoding(b *testing.B) { - encoder := NewJSONEventBatchEncoder() - encoder.(*JSONEventBatchEncoder).maxMessageBytes = 8192 - encoder.(*JSONEventBatchEncoder).maxBatchSize = 64 + encoder := newOpenProtocolBatchEncoder() + encoder.(*OpenProtocolBatchEncoder).maxMessageBytes = 8192 + encoder.(*OpenProtocolBatchEncoder).maxBatchSize = 64 for i := 0; i < b.N; i++ { _, _ = codecEncodeRowCase(encoder, codecBenchmarkRowChanges) } @@ -438,7 +438,7 @@ func BenchmarkCraftDecoding(b *testing.B) { func BenchmarkJsonDecoding(b *testing.B) { for i := 0; i < b.N; i++ { for _, message := range codecJSONEncodedRowChanges { - if decoder, err := NewJSONEventBatchDecoder(message.Key, message.Value); err != nil { + if decoder, err := NewOpenProtocolBatchDecoder(message.Key, message.Value); err != nil { panic(err) } else { for { diff --git a/cdc/sink/mq/codec/column.go b/cdc/sink/mq/codec/column.go new file mode 100644 index 00000000000..72fa26f6f9b --- /dev/null +++ b/cdc/sink/mq/codec/column.go @@ -0,0 +1,188 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "encoding/base64" + "encoding/json" + "strconv" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/model" + "go.uber.org/zap" + "golang.org/x/text/encoding/charmap" +) + +type column struct { + Type byte `json:"t"` + // Deprecated: please use Flag instead. + WhereHandle *bool `json:"h,omitempty"` + Flag model.ColumnFlagType `json:"f"` + Value any `json:"v"` +} + +func newColumn(value any, tp byte) *column { + return &column{ + Value: value, + Type: tp, + } +} + +func (c *column) fromRowChangeColumn(col *model.Column) { + c.Type = col.Type + c.Flag = col.Flag + if c.Flag.IsHandleKey() { + whereHandle := true + c.WhereHandle = &whereHandle + } + if col.Value == nil { + c.Value = nil + return + } + switch col.Type { + case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: + var str string + switch col.Value.(type) { + case []byte: + str = string(col.Value.([]byte)) + case string: + str = col.Value.(string) + default: + log.Panic("invalid column value, please report a bug", zap.Any("col", col)) + } + if c.Flag.IsBinary() { + str = strconv.Quote(str) + str = str[1 : len(str)-1] + } + c.Value = str + default: + c.Value = col.Value + } +} + +func (c *column) toRowChangeColumn(name string) *model.Column { + col := new(model.Column) + col.Type = c.Type + col.Flag = c.Flag + col.Name = name + col.Value = c.Value + if c.Value == nil { + return col + } + switch col.Type { + case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: + str := col.Value.(string) + var err error + if c.Flag.IsBinary() { + str, err = strconv.Unquote("\"" + str + "\"") + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) + } + } + col.Value = []byte(str) + default: + col.Value = c.Value + } + return col +} + +func (c *column) toCanalJSONFormatColumn(name string, javaType JavaSQLType) *model.Column { + col := new(model.Column) + col.Type = c.Type + col.Flag = c.Flag + col.Name = name + col.Value = c.Value + if c.Value == nil { + return col + } + + value, ok := col.Value.(string) + if !ok { + log.Panic("canal-json encoded message should have type in `string`") + } + + if javaType == JavaSQLTypeBIT { + val, err := strconv.ParseUint(value, 10, 64) + if err != nil { + log.Panic("invalid column value for bit", zap.Any("col", c), zap.Error(err)) + } + col.Value = val + return col + } + + if javaType != JavaSQLTypeBLOB { + col.Value = value + return col + } + + // when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back. + encoder := charmap.ISO8859_1.NewEncoder() + value, err := encoder.String(value) + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) + } + + col.Value = value + return col +} + +func formatColumn(c column) column { + switch c.Type { + case mysql.TypeTinyBlob, mysql.TypeMediumBlob, + mysql.TypeLongBlob, mysql.TypeBlob: + if s, ok := c.Value.(string); ok { + var err error + c.Value, err = base64.StdEncoding.DecodeString(s) + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) + } + } + case mysql.TypeFloat, mysql.TypeDouble: + if s, ok := c.Value.(json.Number); ok { + f64, err := s.Float64() + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) + } + c.Value = f64 + } + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear: + if s, ok := c.Value.(json.Number); ok { + var err error + if c.Flag.IsUnsigned() { + c.Value, err = strconv.ParseUint(s.String(), 10, 64) + } else { + c.Value, err = strconv.ParseInt(s.String(), 10, 64) + } + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) + } + } else if f, ok := c.Value.(float64); ok { + if c.Flag.IsUnsigned() { + c.Value = uint64(f) + } else { + c.Value = int64(f) + } + } + case mysql.TypeBit: + if s, ok := c.Value.(json.Number); ok { + intNum, err := s.Int64() + if err != nil { + log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) + } + c.Value = uint64(intNum) + } + } + return c +} diff --git a/cdc/sink/mq/codec/column_test.go b/cdc/sink/mq/codec/column_test.go new file mode 100644 index 00000000000..6479db61e11 --- /dev/null +++ b/cdc/sink/mq/codec/column_test.go @@ -0,0 +1,91 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "testing" + + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tiflow/cdc/model" + "github.com/stretchr/testify/require" +) + +func TestFormatCol(t *testing.T) { + t.Parallel() + row := &mqMessageRow{Update: map[string]column{"test": { + Type: mysql.TypeString, + Value: "测", + }}} + rowEncode, err := row.encode() + require.Nil(t, err) + row2 := new(mqMessageRow) + err = row2.decode(rowEncode) + require.Nil(t, err) + require.Equal(t, row, row2) + + row = &mqMessageRow{Update: map[string]column{"test": { + Type: mysql.TypeBlob, + Value: []byte("测"), + }}} + rowEncode, err = row.encode() + require.Nil(t, err) + row2 = new(mqMessageRow) + err = row2.decode(rowEncode) + require.Nil(t, err) + require.Equal(t, row, row2) +} + +func TestNonBinaryStringCol(t *testing.T) { + t.Parallel() + col := &model.Column{ + Name: "test", + Type: mysql.TypeString, + Value: "value", + } + mqCol := column{} + mqCol.fromRowChangeColumn(col) + row := &mqMessageRow{Update: map[string]column{"test": mqCol}} + rowEncode, err := row.encode() + require.Nil(t, err) + row2 := new(mqMessageRow) + err = row2.decode(rowEncode) + require.Nil(t, err) + require.Equal(t, row, row2) + mqCol2 := row2.Update["test"] + col2 := mqCol2.toRowChangeColumn("test") + col2.Value = string(col2.Value.([]byte)) + require.Equal(t, col, col2) +} + +func TestVarBinaryCol(t *testing.T) { + t.Parallel() + col := &model.Column{ + Name: "test", + Type: mysql.TypeString, + Flag: model.BinaryFlag, + Value: []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}, + } + mqCol := column{} + mqCol.fromRowChangeColumn(col) + row := &mqMessageRow{Update: map[string]column{"test": mqCol}} + rowEncode, err := row.encode() + require.Nil(t, err) + row2 := new(mqMessageRow) + err = row2.decode(rowEncode) + require.Nil(t, err) + require.Equal(t, row, row2) + mqCol2 := row2.Update["test"] + col2 := mqCol2.toRowChangeColumn("test") + require.Equal(t, col, col2) +} diff --git a/cdc/sink/mq/codec/craft.go b/cdc/sink/mq/codec/craft.go index 46fe93c106f..6ac68afacd3 100644 --- a/cdc/sink/mq/codec/craft.go +++ b/cdc/sink/mq/codec/craft.go @@ -37,7 +37,7 @@ type CraftEventBatchEncoder struct { // EncodeCheckpointEvent implements the EventBatchEncoder interface func (e *CraftEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { - return newResolvedMQMessage(config.ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil + return newResolvedMsg(config.ProtocolCraft, nil, craft.NewResolvedEventEncoder(e.allocator, ts).Encode(), ts), nil } func (e *CraftEventBatchEncoder) flush() { @@ -46,16 +46,17 @@ func (e *CraftEventBatchEncoder) flush() { schema := headers.GetSchema(0) table := headers.GetTable(0) rowsCnt := e.rowChangedBuffer.RowsCount() - mqMessage := NewMQMessage(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table) + mqMessage := newMsg(config.ProtocolCraft, nil, e.rowChangedBuffer.Encode(), ts, model.MessageTypeRow, &schema, &table) mqMessage.SetRowsCount(rowsCnt) e.messageBuf = append(e.messageBuf, mqMessage) } // AppendRowChangedEvent implements the EventBatchEncoder interface func (e *CraftEventBatchEncoder) AppendRowChangedEvent( - ctx context.Context, - topic string, + _ context.Context, + _ string, ev *model.RowChangedEvent, + _ func(), ) error { rows, size := e.rowChangedBuffer.AppendRowChangedEvent(ev) if size > e.maxMessageBytes || rows >= e.maxBatchSize { @@ -66,7 +67,7 @@ func (e *CraftEventBatchEncoder) AppendRowChangedEvent( // EncodeDDLEvent implements the EventBatchEncoder interface func (e *CraftEventBatchEncoder) EncodeDDLEvent(ev *model.DDLEvent) (*MQMessage, error) { - return newDDLMQMessage(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil + return newDDLMsg(config.ProtocolCraft, nil, craft.NewDDLEventEncoder(e.allocator, ev).Encode(), ev), nil } // Build implements the EventBatchEncoder interface diff --git a/cdc/sink/mq/codec/craft_test.go b/cdc/sink/mq/codec/craft_test.go index 10aefc2abbc..571365ea4e2 100644 --- a/cdc/sink/mq/codec/craft_test.go +++ b/cdc/sink/mq/codec/craft_test.go @@ -75,12 +75,12 @@ func testBatchCodec( } encoder := encoderBuilder.Build() - s := NewDefaultBatchTester() + s := newDefaultBatchTester() for _, cs := range s.rowCases { events := 0 for _, row := range cs { - err := encoder.AppendRowChangedEvent(context.Background(), "", row) + err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) events++ require.Nil(t, err) } @@ -131,7 +131,7 @@ func TestCraftMaxMessageBytes(t *testing.T) { } for i := 0; i < 10000; i++ { - err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent) + err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) require.Nil(t, err) } @@ -154,7 +154,7 @@ func TestCraftMaxBatchSize(t *testing.T) { } for i := 0; i < 10000; i++ { - err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent) + err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) require.Nil(t, err) } diff --git a/cdc/sink/mq/codec/encoder.go b/cdc/sink/mq/codec/encoder.go index 44dfbc2e6e6..05e88354bcc 100644 --- a/cdc/sink/mq/codec/encoder.go +++ b/cdc/sink/mq/codec/encoder.go @@ -21,6 +21,11 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ) +const ( + // BatchVersion1 represents the version of batch format + BatchVersion1 uint64 = 1 +) + // EventBatchEncoder is an abstraction for events encoder type EventBatchEncoder interface { // EncodeCheckpointEvent appends a checkpoint event into the batch. @@ -28,7 +33,7 @@ type EventBatchEncoder interface { EncodeCheckpointEvent(ts uint64) (*MQMessage, error) // AppendRowChangedEvent appends the calling context, a row changed event and the dispatch // topic into the batch - AppendRowChangedEvent(context.Context, string, *model.RowChangedEvent) error + AppendRowChangedEvent(context.Context, string, *model.RowChangedEvent, func()) error // EncodeDDLEvent appends a DDL event into the batch EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) // Build builds the batch and returns the bytes of key and value. @@ -44,7 +49,7 @@ type EncoderBuilder interface { func NewEventBatchEncoderBuilder(ctx context.Context, c *Config) (EncoderBuilder, error) { switch c.protocol { case config.ProtocolDefault, config.ProtocolOpen: - return newJSONEventBatchEncoderBuilder(c), nil + return newOpenProtocolBatchEncoderBuilder(c), nil case config.ProtocolCanal: return newCanalEventBatchEncoderBuilder(), nil case config.ProtocolAvro: diff --git a/cdc/sink/mq/codec/json.go b/cdc/sink/mq/codec/json.go deleted file mode 100644 index c1e86bc956a..00000000000 --- a/cdc/sink/mq/codec/json.go +++ /dev/null @@ -1,766 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package codec - -import ( - "bytes" - "context" - "encoding/base64" - "encoding/binary" - "encoding/json" - "sort" - "strconv" - "strings" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - timodel "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" - cerror "github.com/pingcap/tiflow/pkg/errors" - "go.uber.org/zap" - "golang.org/x/text/encoding/charmap" -) - -const ( - // BatchVersion1 represents the version of batch format - BatchVersion1 uint64 = 1 -) - -type column struct { - Type byte `json:"t"` - - // WhereHandle is deprecation - // WhereHandle is replaced by HandleKey in Flag - WhereHandle *bool `json:"h,omitempty"` - Flag model.ColumnFlagType `json:"f"` - Value interface{} `json:"v"` -} - -func newColumn(value interface{}, tp byte) *column { - return &column{ - Value: value, - Type: tp, - } -} - -func (c *column) FromSinkColumn(col *model.Column) { - c.Type = col.Type - c.Flag = col.Flag - if c.Flag.IsHandleKey() { - whereHandle := true - c.WhereHandle = &whereHandle - } - if col.Value == nil { - c.Value = nil - return - } - switch col.Type { - case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: - var str string - switch col.Value.(type) { - case []byte: - str = string(col.Value.([]byte)) - case string: - str = col.Value.(string) - default: - log.Panic("invalid column value, please report a bug", zap.Any("col", col)) - } - if c.Flag.IsBinary() { - str = strconv.Quote(str) - str = str[1 : len(str)-1] - } - c.Value = str - default: - c.Value = col.Value - } -} - -func (c *column) decodeCanalJSONColumn(name string, javaType JavaSQLType) *model.Column { - col := new(model.Column) - col.Type = c.Type - col.Flag = c.Flag - col.Name = name - col.Value = c.Value - if c.Value == nil { - return col - } - - value, ok := col.Value.(string) - if !ok { - log.Panic("canal-json encoded message should have type in `string`") - } - - if javaType == JavaSQLTypeBIT { - val, err := strconv.ParseUint(value, 10, 64) - if err != nil { - log.Panic("invalid column value for bit", zap.Any("col", c), zap.Error(err)) - } - col.Value = val - return col - } - - if javaType != JavaSQLTypeBLOB { - col.Value = value - return col - } - - // when encoding the `JavaSQLTypeBLOB`, use `ISO8859_1` decoder, now reverse it back. - encoder := charmap.ISO8859_1.NewEncoder() - value, err := encoder.String(value) - if err != nil { - log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - - col.Value = value - return col -} - -func (c *column) ToSinkColumn(name string) *model.Column { - col := new(model.Column) - col.Type = c.Type - col.Flag = c.Flag - col.Name = name - col.Value = c.Value - if c.Value == nil { - return col - } - switch col.Type { - case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar: - str := col.Value.(string) - var err error - if c.Flag.IsBinary() { - str, err = strconv.Unquote("\"" + str + "\"") - if err != nil { - log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - } - col.Value = []byte(str) - default: - col.Value = c.Value - } - return col -} - -func formatColumnVal(c column) column { - switch c.Type { - case mysql.TypeTinyBlob, mysql.TypeMediumBlob, - mysql.TypeLongBlob, mysql.TypeBlob: - if s, ok := c.Value.(string); ok { - var err error - c.Value, err = base64.StdEncoding.DecodeString(s) - if err != nil { - log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - } - case mysql.TypeFloat, mysql.TypeDouble: - if s, ok := c.Value.(json.Number); ok { - f64, err := s.Float64() - if err != nil { - log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - c.Value = f64 - } - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear: - if s, ok := c.Value.(json.Number); ok { - var err error - if c.Flag.IsUnsigned() { - c.Value, err = strconv.ParseUint(s.String(), 10, 64) - } else { - c.Value, err = strconv.ParseInt(s.String(), 10, 64) - } - if err != nil { - log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - } else if f, ok := c.Value.(float64); ok { - if c.Flag.IsUnsigned() { - c.Value = uint64(f) - } else { - c.Value = int64(f) - } - } - case mysql.TypeBit: - if s, ok := c.Value.(json.Number); ok { - intNum, err := s.Int64() - if err != nil { - log.Panic("invalid column value, please report a bug", zap.Any("col", c), zap.Error(err)) - } - c.Value = uint64(intNum) - } - } - return c -} - -type mqMessageKey struct { - // TODO: should we rename it to CRTs - Ts uint64 `json:"ts"` - Schema string `json:"scm,omitempty"` - Table string `json:"tbl,omitempty"` - RowID int64 `json:"rid,omitempty"` - Partition *int64 `json:"ptn,omitempty"` - Type model.MessageType `json:"t"` -} - -func (m *mqMessageKey) Encode() ([]byte, error) { - data, err := json.Marshal(m) - return data, cerror.WrapError(cerror.ErrMarshalFailed, err) -} - -func (m *mqMessageKey) Decode(data []byte) error { - return cerror.WrapError(cerror.ErrUnmarshalFailed, json.Unmarshal(data, m)) -} - -type mqMessageRow struct { - Update map[string]column `json:"u,omitempty"` - PreColumns map[string]column `json:"p,omitempty"` - Delete map[string]column `json:"d,omitempty"` -} - -func (m *mqMessageRow) Encode() ([]byte, error) { - data, err := json.Marshal(m) - return data, cerror.WrapError(cerror.ErrMarshalFailed, err) -} - -func (m *mqMessageRow) Decode(data []byte) error { - decoder := json.NewDecoder(bytes.NewReader(data)) - decoder.UseNumber() - err := decoder.Decode(m) - if err != nil { - return cerror.WrapError(cerror.ErrUnmarshalFailed, err) - } - for colName, column := range m.Update { - m.Update[colName] = formatColumnVal(column) - } - for colName, column := range m.Delete { - m.Delete[colName] = formatColumnVal(column) - } - for colName, column := range m.PreColumns { - m.PreColumns[colName] = formatColumnVal(column) - } - return nil -} - -type mqMessageDDL struct { - Query string `json:"q"` - Type timodel.ActionType `json:"t"` -} - -func (m *mqMessageDDL) Encode() ([]byte, error) { - data, err := json.Marshal(m) - return data, cerror.WrapError(cerror.ErrMarshalFailed, err) -} - -func (m *mqMessageDDL) Decode(data []byte) error { - return cerror.WrapError(cerror.ErrUnmarshalFailed, json.Unmarshal(data, m)) -} - -func newResolvedMessage(ts uint64) *mqMessageKey { - return &mqMessageKey{ - Ts: ts, - Type: model.MessageTypeResolved, - } -} - -func rowEventToMqMessage(e *model.RowChangedEvent) (*mqMessageKey, *mqMessageRow) { - var partition *int64 - if e.Table.IsPartition { - partition = &e.Table.TableID - } - key := &mqMessageKey{ - Ts: e.CommitTs, - Schema: e.Table.Schema, - Table: e.Table.Table, - RowID: e.RowID, - Partition: partition, - Type: model.MessageTypeRow, - } - value := &mqMessageRow{} - if e.IsDelete() { - value.Delete = sinkColumns2JsonColumns(e.PreColumns) - } else { - value.Update = sinkColumns2JsonColumns(e.Columns) - value.PreColumns = sinkColumns2JsonColumns(e.PreColumns) - } - return key, value -} - -func sinkColumns2JsonColumns(cols []*model.Column) map[string]column { - jsonCols := make(map[string]column, len(cols)) - for _, col := range cols { - if col == nil { - continue - } - c := column{} - c.FromSinkColumn(col) - jsonCols[col.Name] = c - } - if len(jsonCols) == 0 { - return nil - } - return jsonCols -} - -func jsonColumns2SinkColumns(cols map[string]column) []*model.Column { - sinkCols := make([]*model.Column, 0, len(cols)) - for name, col := range cols { - c := col.ToSinkColumn(name) - sinkCols = append(sinkCols, c) - } - if len(sinkCols) == 0 { - return nil - } - sort.Slice(sinkCols, func(i, j int) bool { - return strings.Compare(sinkCols[i].Name, sinkCols[j].Name) > 0 - }) - return sinkCols -} - -func mqMessageToRowEvent(key *mqMessageKey, value *mqMessageRow) *model.RowChangedEvent { - e := new(model.RowChangedEvent) - // TODO: we lost the startTs from kafka message - // startTs-based txn filter is out of work - e.CommitTs = key.Ts - e.Table = &model.TableName{ - Schema: key.Schema, - Table: key.Table, - } - // TODO: we lost the tableID from kafka message - if key.Partition != nil { - e.Table.TableID = *key.Partition - e.Table.IsPartition = true - } - - if len(value.Delete) != 0 { - e.PreColumns = jsonColumns2SinkColumns(value.Delete) - } else { - e.Columns = jsonColumns2SinkColumns(value.Update) - e.PreColumns = jsonColumns2SinkColumns(value.PreColumns) - } - return e -} - -func ddlEventtoMqMessage(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) { - key := &mqMessageKey{ - Ts: e.CommitTs, - Schema: e.TableInfo.Schema, - Table: e.TableInfo.Table, - Type: model.MessageTypeDDL, - } - value := &mqMessageDDL{ - Query: e.Query, - Type: e.Type, - } - return key, value -} - -func mqMessageToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent { - e := new(model.DDLEvent) - e.TableInfo = new(model.SimpleTableInfo) - // TODO: we lost the startTs from kafka message - // startTs-based txn filter is out of work - e.CommitTs = key.Ts - e.TableInfo.Table = key.Table - e.TableInfo.Schema = key.Schema - e.Type = value.Type - e.Query = value.Query - return e -} - -// JSONEventBatchEncoder encodes the events into the byte of a batch into. -type JSONEventBatchEncoder struct { - messageBuf []*MQMessage - curBatchSize int - // configs - maxMessageBytes int - maxBatchSize int -} - -// GetMaxMessageBytes is only for unit testing. -func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int { - return d.maxMessageBytes -} - -// GetMaxBatchSize is only for unit testing. -func (d *JSONEventBatchEncoder) GetMaxBatchSize() int { - return d.maxBatchSize -} - -// EncodeCheckpointEvent implements the EventBatchEncoder interface -func (d *JSONEventBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { - keyMsg := newResolvedMessage(ts) - key, err := keyMsg.Encode() - if err != nil { - return nil, errors.Trace(err) - } - - var keyLenByte [8]byte - binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) - var valueLenByte [8]byte - binary.BigEndian.PutUint64(valueLenByte[:], 0) - - keyBuf := new(bytes.Buffer) - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - keyBuf.Write(versionByte[:]) - keyBuf.Write(keyLenByte[:]) - keyBuf.Write(key) - - valueBuf := new(bytes.Buffer) - valueBuf.Write(valueLenByte[:]) - - ret := newResolvedMQMessage(config.ProtocolOpen, keyBuf.Bytes(), valueBuf.Bytes(), ts) - return ret, nil -} - -// AppendRowChangedEvent implements the EventBatchEncoder interface -func (d *JSONEventBatchEncoder) AppendRowChangedEvent( - ctx context.Context, - topic string, - e *model.RowChangedEvent, -) error { - keyMsg, valueMsg := rowEventToMqMessage(e) - key, err := keyMsg.Encode() - if err != nil { - return errors.Trace(err) - } - value, err := valueMsg.Encode() - if err != nil { - return errors.Trace(err) - } - - var keyLenByte [8]byte - binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) - var valueLenByte [8]byte - binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - - // for single message that longer than max-message-size, do not send it. - // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` - length := len(key) + len(value) + maximumRecordOverhead + 16 + 8 - if length > d.maxMessageBytes { - log.Warn("Single message too large", - zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table)) - return cerror.ErrJSONCodecRowTooLarge.GenWithStackByArgs() - } - - if len(d.messageBuf) == 0 || - d.curBatchSize >= d.maxBatchSize || - d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes { - - versionHead := make([]byte, 8) - binary.BigEndian.PutUint64(versionHead, BatchVersion1) - - d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MessageTypeRow, nil, nil)) - d.curBatchSize = 0 - } - - message := d.messageBuf[len(d.messageBuf)-1] - message.Key = append(message.Key, keyLenByte[:]...) - message.Key = append(message.Key, key...) - message.Value = append(message.Value, valueLenByte[:]...) - message.Value = append(message.Value, value...) - message.Ts = e.CommitTs - message.Schema = &e.Table.Schema - message.Table = &e.Table.Table - message.IncRowsCount() - - if message.Length() > d.maxMessageBytes { - // `len(d.messageBuf) == 1` is implied - log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", - zap.Int("eventLen", message.Length()), zap.Int("max-message-bytes", d.maxMessageBytes)) - } - d.curBatchSize++ - return nil -} - -// EncodeDDLEvent implements the EventBatchEncoder interface -func (d *JSONEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) { - keyMsg, valueMsg := ddlEventtoMqMessage(e) - key, err := keyMsg.Encode() - if err != nil { - return nil, errors.Trace(err) - } - value, err := valueMsg.Encode() - if err != nil { - return nil, errors.Trace(err) - } - - var keyLenByte [8]byte - binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) - var valueLenByte [8]byte - binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) - - keyBuf := new(bytes.Buffer) - var versionByte [8]byte - binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) - keyBuf.Write(versionByte[:]) - keyBuf.Write(keyLenByte[:]) - keyBuf.Write(key) - - valueBuf := new(bytes.Buffer) - valueBuf.Write(valueLenByte[:]) - valueBuf.Write(value) - - ret := newDDLMQMessage(config.ProtocolOpen, keyBuf.Bytes(), valueBuf.Bytes(), e) - return ret, nil -} - -// Build implements the EventBatchEncoder interface -func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) { - ret := d.messageBuf - d.messageBuf = make([]*MQMessage, 0) - return ret -} - -type jsonEventBatchEncoderBuilder struct { - config *Config -} - -// Build a JSONEventBatchEncoder -func (b *jsonEventBatchEncoderBuilder) Build() EventBatchEncoder { - encoder := NewJSONEventBatchEncoder() - encoder.(*JSONEventBatchEncoder).maxMessageBytes = b.config.maxMessageBytes - encoder.(*JSONEventBatchEncoder).maxBatchSize = b.config.maxBatchSize - - return encoder -} - -func newJSONEventBatchEncoderBuilder(config *Config) EncoderBuilder { - return &jsonEventBatchEncoderBuilder{config: config} -} - -// NewJSONEventBatchEncoder creates a new JSONEventBatchEncoder. -func NewJSONEventBatchEncoder() EventBatchEncoder { - batch := &JSONEventBatchEncoder{} - return batch -} - -// JSONEventBatchMixedDecoder decodes the byte of a batch into the original messages. -type JSONEventBatchMixedDecoder struct { - mixedBytes []byte - nextKey *mqMessageKey - nextKeyLen uint64 -} - -// HasNext implements the EventBatchDecoder interface -func (b *JSONEventBatchMixedDecoder) HasNext() (model.MessageType, bool, error) { - if !b.hasNext() { - return 0, false, nil - } - if err := b.decodeNextKey(); err != nil { - return 0, false, err - } - return b.nextKey.Type, true, nil -} - -// NextResolvedEvent implements the EventBatchDecoder interface -func (b *JSONEventBatchMixedDecoder) NextResolvedEvent() (uint64, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return 0, err - } - } - b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeResolved { - return 0, cerror.ErrJSONCodecInvalidData.GenWithStack("not found resolved event message") - } - valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - b.mixedBytes = b.mixedBytes[valueLen+8:] - resolvedTs := b.nextKey.Ts - b.nextKey = nil - return resolvedTs, nil -} - -// NextRowChangedEvent implements the EventBatchDecoder interface -func (b *JSONEventBatchMixedDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeRow { - return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found row event message") - } - valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - value := b.mixedBytes[8 : valueLen+8] - b.mixedBytes = b.mixedBytes[valueLen+8:] - rowMsg := new(mqMessageRow) - if err := rowMsg.Decode(value); err != nil { - return nil, errors.Trace(err) - } - rowEvent := mqMessageToRowEvent(b.nextKey, rowMsg) - b.nextKey = nil - return rowEvent, nil -} - -// NextDDLEvent implements the EventBatchDecoder interface -func (b *JSONEventBatchMixedDecoder) NextDDLEvent() (*model.DDLEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeDDL { - return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found ddl event message") - } - valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - value := b.mixedBytes[8 : valueLen+8] - b.mixedBytes = b.mixedBytes[valueLen+8:] - ddlMsg := new(mqMessageDDL) - if err := ddlMsg.Decode(value); err != nil { - return nil, errors.Trace(err) - } - ddlEvent := mqMessageToDDLEvent(b.nextKey, ddlMsg) - b.nextKey = nil - return ddlEvent, nil -} - -func (b *JSONEventBatchMixedDecoder) hasNext() bool { - return len(b.mixedBytes) > 0 -} - -func (b *JSONEventBatchMixedDecoder) decodeNextKey() error { - keyLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) - key := b.mixedBytes[8 : keyLen+8] - // drop value bytes - msgKey := new(mqMessageKey) - err := msgKey.Decode(key) - if err != nil { - return errors.Trace(err) - } - b.nextKey = msgKey - b.nextKeyLen = keyLen - return nil -} - -// JSONEventBatchDecoder decodes the byte of a batch into the original messages. -type JSONEventBatchDecoder struct { - keyBytes []byte - valueBytes []byte - nextKey *mqMessageKey - nextKeyLen uint64 -} - -// HasNext implements the EventBatchDecoder interface -func (b *JSONEventBatchDecoder) HasNext() (model.MessageType, bool, error) { - if !b.hasNext() { - return 0, false, nil - } - if err := b.decodeNextKey(); err != nil { - return 0, false, err - } - return b.nextKey.Type, true, nil -} - -// NextResolvedEvent implements the EventBatchDecoder interface -func (b *JSONEventBatchDecoder) NextResolvedEvent() (uint64, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return 0, err - } - } - b.keyBytes = b.keyBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeResolved { - return 0, cerror.ErrJSONCodecInvalidData.GenWithStack("not found resolved event message") - } - valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) - b.valueBytes = b.valueBytes[valueLen+8:] - resolvedTs := b.nextKey.Ts - b.nextKey = nil - return resolvedTs, nil -} - -// NextRowChangedEvent implements the EventBatchDecoder interface -func (b *JSONEventBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.keyBytes = b.keyBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeRow { - return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found row event message") - } - valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) - value := b.valueBytes[8 : valueLen+8] - b.valueBytes = b.valueBytes[valueLen+8:] - rowMsg := new(mqMessageRow) - if err := rowMsg.Decode(value); err != nil { - return nil, errors.Trace(err) - } - rowEvent := mqMessageToRowEvent(b.nextKey, rowMsg) - b.nextKey = nil - return rowEvent, nil -} - -// NextDDLEvent implements the EventBatchDecoder interface -func (b *JSONEventBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { - if b.nextKey == nil { - if err := b.decodeNextKey(); err != nil { - return nil, err - } - } - b.keyBytes = b.keyBytes[b.nextKeyLen+8:] - if b.nextKey.Type != model.MessageTypeDDL { - return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("not found ddl event message") - } - valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) - value := b.valueBytes[8 : valueLen+8] - b.valueBytes = b.valueBytes[valueLen+8:] - ddlMsg := new(mqMessageDDL) - if err := ddlMsg.Decode(value); err != nil { - return nil, errors.Trace(err) - } - ddlEvent := mqMessageToDDLEvent(b.nextKey, ddlMsg) - b.nextKey = nil - return ddlEvent, nil -} - -func (b *JSONEventBatchDecoder) hasNext() bool { - return len(b.keyBytes) > 0 && len(b.valueBytes) > 0 -} - -func (b *JSONEventBatchDecoder) decodeNextKey() error { - keyLen := binary.BigEndian.Uint64(b.keyBytes[:8]) - key := b.keyBytes[8 : keyLen+8] - msgKey := new(mqMessageKey) - err := msgKey.Decode(key) - if err != nil { - return errors.Trace(err) - } - b.nextKey = msgKey - b.nextKeyLen = keyLen - return nil -} - -// NewJSONEventBatchDecoder creates a new JSONEventBatchDecoder. -func NewJSONEventBatchDecoder(key []byte, value []byte) (EventBatchDecoder, error) { - version := binary.BigEndian.Uint64(key[:8]) - key = key[8:] - if version != BatchVersion1 { - return nil, cerror.ErrJSONCodecInvalidData.GenWithStack("unexpected key format version") - } - // if only decode one byte slice, we choose MixedDecoder - if len(key) > 0 && len(value) == 0 { - return &JSONEventBatchMixedDecoder{ - mixedBytes: key, - }, nil - } - return &JSONEventBatchDecoder{ - keyBytes: key, - valueBytes: value, - }, nil -} diff --git a/cdc/sink/mq/codec/maxwell.go b/cdc/sink/mq/codec/maxwell.go index c6efc18e72a..6d0305d0a7d 100644 --- a/cdc/sink/mq/codec/maxwell.go +++ b/cdc/sink/mq/codec/maxwell.go @@ -165,9 +165,10 @@ func rowEventToMaxwellMessage(e *model.RowChangedEvent) (*mqMessageKey, *maxwell // AppendRowChangedEvent implements the EventBatchEncoder interface func (d *MaxwellEventBatchEncoder) AppendRowChangedEvent( - ctx context.Context, - topic string, + _ context.Context, + _ string, e *model.RowChangedEvent, + _ func(), ) error { _, valueMsg := rowEventToMaxwellMessage(e) value, err := valueMsg.Encode() @@ -266,7 +267,7 @@ func ddlEventtoMaxwellMessage(e *model.DDLEvent) (*mqMessageKey, *DdlMaxwellMess // DDL message unresolved tso func (d *MaxwellEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) { keyMsg, valueMsg := ddlEventtoMaxwellMessage(e) - key, err := keyMsg.Encode() + key, err := keyMsg.encode() if err != nil { return nil, errors.Trace(err) } @@ -275,7 +276,7 @@ func (d *MaxwellEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage return nil, errors.Trace(err) } - return newDDLMQMessage(config.ProtocolMaxwell, key, value, e), nil + return newDDLMsg(config.ProtocolMaxwell, key, value, e), nil } // Build implements the EventBatchEncoder interface @@ -284,7 +285,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage { return nil } - ret := NewMQMessage(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil) + ret := newMsg(config.ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil) ret.SetRowsCount(d.batchSize) d.Reset() return []*MQMessage{ret} diff --git a/cdc/sink/mq/codec/maxwell_test.go b/cdc/sink/mq/codec/maxwell_test.go index 672913cc605..6a1bb493e7a 100644 --- a/cdc/sink/mq/codec/maxwell_test.go +++ b/cdc/sink/mq/codec/maxwell_test.go @@ -33,7 +33,7 @@ func TestMaxwellEventBatchCodec(t *testing.T) { for _, cs := range rowCases { encoder := newEncoder() for _, row := range cs { - err := encoder.AppendRowChangedEvent(context.Background(), "", row) + err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) } messages := encoder.Build() diff --git a/cdc/sink/mq/codec/message.go b/cdc/sink/mq/codec/message.go index a257e6f3702..f6d5106c4ed 100644 --- a/cdc/sink/mq/codec/message.go +++ b/cdc/sink/mq/codec/message.go @@ -22,6 +22,13 @@ import ( "github.com/tikv/client-go/v2/oracle" ) +// MaxRecordOverhead is used to calculate message size by sarama kafka client. +// reference: https://github.com/Shopify/sarama/blob/ +// 66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 +// For TiCDC, minimum supported kafka version is `0.11.0.2`, +// which will be treated as `version = 2` by sarama producer. +const MaxRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 + // MQMessage represents an MQ message to the mqSink type MQMessage struct { Key []byte @@ -32,18 +39,14 @@ type MQMessage struct { Type model.MessageType // type Protocol config.Protocol // protocol rowsCount int // rows in one MQ Message + Callback func() // Callback function will be called when the message is sent to the mqSink. } -// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. -// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 -// for TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer. -const maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 - // Length returns the expected size of the Kafka message // We didn't append any `Headers` when send the message, so ignore the calculations related to it. // If `ProducerMessage` Headers fields used, this method should also adjust. func (m *MQMessage) Length() int { - return len(m.Key) + len(m.Value) + maximumRecordOverhead + return len(m.Key) + len(m.Value) + MaxRecordOverhead } // PhysicalTime returns physical time part of Ts in time.Time @@ -66,8 +69,8 @@ func (m *MQMessage) IncRowsCount() { m.rowsCount++ } -func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { - return NewMQMessage( +func newDDLMsg(proto config.Protocol, key, value []byte, event *model.DDLEvent) *MQMessage { + return newMsg( proto, key, value, @@ -78,13 +81,13 @@ func newDDLMQMessage(proto config.Protocol, key, value []byte, event *model.DDLE ) } -func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *MQMessage { - return NewMQMessage(proto, key, value, ts, model.MessageTypeResolved, nil, nil) +func newResolvedMsg(proto config.Protocol, key, value []byte, ts uint64) *MQMessage { + return newMsg(proto, key, value, ts, model.MessageTypeResolved, nil, nil) } -// NewMQMessage should be used when creating a MQMessage struct. +// newMsg should be used when creating a MQMessage struct. // It copies the input byte slices to avoid any surprises in asynchronous MQ writes. -func NewMQMessage( +func newMsg( proto config.Protocol, key []byte, value []byte, diff --git a/cdc/sink/mq/codec/message_key.go b/cdc/sink/mq/codec/message_key.go new file mode 100644 index 00000000000..64076260a17 --- /dev/null +++ b/cdc/sink/mq/codec/message_key.go @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "encoding/json" + + "github.com/pingcap/tiflow/cdc/model" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +type mqMessageKey struct { + Ts uint64 `json:"ts"` + Schema string `json:"scm,omitempty"` + Table string `json:"tbl,omitempty"` + RowID int64 `json:"rid,omitempty"` + Partition *int64 `json:"ptn,omitempty"` + Type model.MessageType `json:"t"` +} + +func (m *mqMessageKey) encode() ([]byte, error) { + data, err := json.Marshal(m) + return data, cerror.WrapError(cerror.ErrMarshalFailed, err) +} + +func (m *mqMessageKey) decode(data []byte) error { + return cerror.WrapError(cerror.ErrUnmarshalFailed, json.Unmarshal(data, m)) +} diff --git a/cdc/sink/mq/codec/message_test.go b/cdc/sink/mq/codec/message_test.go index 3b05fba738b..57ad55a976b 100644 --- a/cdc/sink/mq/codec/message_test.go +++ b/cdc/sink/mq/codec/message_test.go @@ -46,7 +46,7 @@ func TestCreate(t *testing.T) { CommitTs: 5678, } - msg := NewMQMessage(config.ProtocolOpen, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table) + msg := newMsg(config.ProtocolOpen, []byte("key1"), []byte("value1"), rowEvent.CommitTs, model.MessageTypeRow, &rowEvent.Table.Schema, &rowEvent.Table.Table) require.Equal(t, []byte("key1"), msg.Key) require.Equal(t, []byte("value1"), msg.Value) @@ -94,7 +94,7 @@ func TestCreate(t *testing.T) { ddlEvent := &model.DDLEvent{} ddlEvent.FromJob(job, preTableInfo) - msg = newDDLMQMessage(config.ProtocolMaxwell, nil, []byte("value1"), ddlEvent) + msg = newDDLMsg(config.ProtocolMaxwell, nil, []byte("value1"), ddlEvent) require.Nil(t, msg.Key) require.Equal(t, []byte("value1"), msg.Value) require.Equal(t, ddlEvent.CommitTs, msg.Ts) @@ -103,7 +103,7 @@ func TestCreate(t *testing.T) { require.Equal(t, ddlEvent.TableInfo.Table, *msg.Table) require.Equal(t, config.ProtocolMaxwell, msg.Protocol) - msg = newResolvedMQMessage(config.ProtocolCanal, []byte("key1"), nil, 1234) + msg = newResolvedMsg(config.ProtocolCanal, []byte("key1"), nil, 1234) require.Equal(t, []byte("key1"), msg.Key) require.Nil(t, msg.Value) require.Equal(t, uint64(1234), msg.Ts) diff --git a/cdc/sink/mq/codec/open_protocol_decoder.go b/cdc/sink/mq/codec/open_protocol_decoder.go new file mode 100644 index 00000000000..d70cba9ba39 --- /dev/null +++ b/cdc/sink/mq/codec/open_protocol_decoder.go @@ -0,0 +1,241 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "encoding/binary" + + "github.com/pingcap/errors" + "github.com/pingcap/tiflow/cdc/model" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +// OpenProtocolBatchMixedDecoder decodes the byte of a batch into the original messages. +type OpenProtocolBatchMixedDecoder struct { + mixedBytes []byte + nextKey *mqMessageKey + nextKeyLen uint64 +} + +// HasNext implements the EventBatchDecoder interface +func (b *OpenProtocolBatchMixedDecoder) HasNext() (model.MessageType, bool, error) { + if !b.hasNext() { + return 0, false, nil + } + if err := b.decodeNextKey(); err != nil { + return 0, false, err + } + return b.nextKey.Type, true, nil +} + +// NextResolvedEvent implements the EventBatchDecoder interface +func (b *OpenProtocolBatchMixedDecoder) NextResolvedEvent() (uint64, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return 0, err + } + } + b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MessageTypeResolved { + return 0, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found resolved event message") + } + valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) + b.mixedBytes = b.mixedBytes[valueLen+8:] + resolvedTs := b.nextKey.Ts + b.nextKey = nil + return resolvedTs, nil +} + +// NextRowChangedEvent implements the EventBatchDecoder interface +func (b *OpenProtocolBatchMixedDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return nil, err + } + } + b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MessageTypeRow { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") + } + valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) + value := b.mixedBytes[8 : valueLen+8] + b.mixedBytes = b.mixedBytes[valueLen+8:] + rowMsg := new(mqMessageRow) + if err := rowMsg.decode(value); err != nil { + return nil, errors.Trace(err) + } + rowEvent := msgToRowChange(b.nextKey, rowMsg) + b.nextKey = nil + return rowEvent, nil +} + +// NextDDLEvent implements the EventBatchDecoder interface +func (b *OpenProtocolBatchMixedDecoder) NextDDLEvent() (*model.DDLEvent, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return nil, err + } + } + b.mixedBytes = b.mixedBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MessageTypeDDL { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found ddl event message") + } + valueLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) + value := b.mixedBytes[8 : valueLen+8] + b.mixedBytes = b.mixedBytes[valueLen+8:] + ddlMsg := new(mqMessageDDL) + if err := ddlMsg.decode(value); err != nil { + return nil, errors.Trace(err) + } + ddlEvent := msgToDDLEvent(b.nextKey, ddlMsg) + b.nextKey = nil + return ddlEvent, nil +} + +func (b *OpenProtocolBatchMixedDecoder) hasNext() bool { + return len(b.mixedBytes) > 0 +} + +func (b *OpenProtocolBatchMixedDecoder) decodeNextKey() error { + keyLen := binary.BigEndian.Uint64(b.mixedBytes[:8]) + key := b.mixedBytes[8 : keyLen+8] + // drop value bytes + msgKey := new(mqMessageKey) + err := msgKey.decode(key) + if err != nil { + return errors.Trace(err) + } + b.nextKey = msgKey + b.nextKeyLen = keyLen + return nil +} + +// OpenProtocolBatchDecoder decodes the byte of a batch into the original messages. +type OpenProtocolBatchDecoder struct { + keyBytes []byte + valueBytes []byte + nextKey *mqMessageKey + nextKeyLen uint64 +} + +// HasNext implements the EventBatchDecoder interface +func (b *OpenProtocolBatchDecoder) HasNext() (model.MessageType, bool, error) { + if !b.hasNext() { + return 0, false, nil + } + if err := b.decodeNextKey(); err != nil { + return 0, false, err + } + return b.nextKey.Type, true, nil +} + +// NextResolvedEvent implements the EventBatchDecoder interface +func (b *OpenProtocolBatchDecoder) NextResolvedEvent() (uint64, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return 0, err + } + } + b.keyBytes = b.keyBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MessageTypeResolved { + return 0, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found resolved event message") + } + valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) + b.valueBytes = b.valueBytes[valueLen+8:] + resolvedTs := b.nextKey.Ts + b.nextKey = nil + return resolvedTs, nil +} + +// NextRowChangedEvent implements the EventBatchDecoder interface +func (b *OpenProtocolBatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return nil, err + } + } + b.keyBytes = b.keyBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MessageTypeRow { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message") + } + valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) + value := b.valueBytes[8 : valueLen+8] + b.valueBytes = b.valueBytes[valueLen+8:] + rowMsg := new(mqMessageRow) + if err := rowMsg.decode(value); err != nil { + return nil, errors.Trace(err) + } + rowEvent := msgToRowChange(b.nextKey, rowMsg) + b.nextKey = nil + return rowEvent, nil +} + +// NextDDLEvent implements the EventBatchDecoder interface +func (b *OpenProtocolBatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { + if b.nextKey == nil { + if err := b.decodeNextKey(); err != nil { + return nil, err + } + } + b.keyBytes = b.keyBytes[b.nextKeyLen+8:] + if b.nextKey.Type != model.MessageTypeDDL { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found ddl event message") + } + valueLen := binary.BigEndian.Uint64(b.valueBytes[:8]) + value := b.valueBytes[8 : valueLen+8] + b.valueBytes = b.valueBytes[valueLen+8:] + ddlMsg := new(mqMessageDDL) + if err := ddlMsg.decode(value); err != nil { + return nil, errors.Trace(err) + } + ddlEvent := msgToDDLEvent(b.nextKey, ddlMsg) + b.nextKey = nil + return ddlEvent, nil +} + +func (b *OpenProtocolBatchDecoder) hasNext() bool { + return len(b.keyBytes) > 0 && len(b.valueBytes) > 0 +} + +func (b *OpenProtocolBatchDecoder) decodeNextKey() error { + keyLen := binary.BigEndian.Uint64(b.keyBytes[:8]) + key := b.keyBytes[8 : keyLen+8] + msgKey := new(mqMessageKey) + err := msgKey.decode(key) + if err != nil { + return errors.Trace(err) + } + b.nextKey = msgKey + b.nextKeyLen = keyLen + return nil +} + +// NewOpenProtocolBatchDecoder creates a new OpenProtocolBatchDecoder. +func NewOpenProtocolBatchDecoder(key []byte, value []byte) (EventBatchDecoder, error) { + version := binary.BigEndian.Uint64(key[:8]) + key = key[8:] + if version != BatchVersion1 { + return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("unexpected key format version") + } + // if only decode one byte slice, we choose MixedDecoder + if len(key) > 0 && len(value) == 0 { + return &OpenProtocolBatchMixedDecoder{ + mixedBytes: key, + }, nil + } + return &OpenProtocolBatchDecoder{ + keyBytes: key, + valueBytes: value, + }, nil +} diff --git a/cdc/sink/mq/codec/open_protocol_encoder.go b/cdc/sink/mq/codec/open_protocol_encoder.go new file mode 100644 index 00000000000..6e9cbf1bea2 --- /dev/null +++ b/cdc/sink/mq/codec/open_protocol_encoder.go @@ -0,0 +1,213 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "bytes" + "context" + "encoding/binary" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +// OpenProtocolBatchEncoder encodes the events into the byte of a batch into. +type OpenProtocolBatchEncoder struct { + messageBuf []*MQMessage + callbackBuff []func() + curBatchSize int + + // configs + maxMessageBytes int + maxBatchSize int +} + +// GetMaxMessageBytes is only for unit testing. +func (d *OpenProtocolBatchEncoder) GetMaxMessageBytes() int { + return d.maxMessageBytes +} + +// GetMaxBatchSize is only for unit testing. +func (d *OpenProtocolBatchEncoder) GetMaxBatchSize() int { + return d.maxBatchSize +} + +// AppendRowChangedEvent implements the EventBatchEncoder interface +func (d *OpenProtocolBatchEncoder) AppendRowChangedEvent( + _ context.Context, + _ string, + e *model.RowChangedEvent, + callback func(), +) error { + keyMsg, valueMsg := rowChangeToMsg(e) + key, err := keyMsg.encode() + if err != nil { + return errors.Trace(err) + } + value, err := valueMsg.encode() + if err != nil { + return errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) + + // for single message that longer than max-message-size, do not send it. + // 16 is the length of `keyLenByte` and `valueLenByte`, 8 is the length of `versionHead` + length := len(key) + len(value) + MaxRecordOverhead + 16 + 8 + if length > d.maxMessageBytes { + log.Warn("Single message too large", + zap.Int("max-message-size", d.maxMessageBytes), zap.Int("length", length), zap.Any("table", e.Table)) + return cerror.ErrOpenProtocolCodecRowTooLarge.GenWithStackByArgs() + } + + if len(d.messageBuf) == 0 || + d.curBatchSize >= d.maxBatchSize || + d.messageBuf[len(d.messageBuf)-1].Length()+len(key)+len(value)+16 > d.maxMessageBytes { + // Before we create a new message, we should handle the previous callbacks. + d.tryBuildCallback() + versionHead := make([]byte, 8) + binary.BigEndian.PutUint64(versionHead, BatchVersion1) + msg := newMsg(config.ProtocolOpen, versionHead, nil, 0, model.MessageTypeRow, nil, nil) + d.messageBuf = append(d.messageBuf, msg) + d.curBatchSize = 0 + } + + message := d.messageBuf[len(d.messageBuf)-1] + message.Key = append(message.Key, keyLenByte[:]...) + message.Key = append(message.Key, key...) + message.Value = append(message.Value, valueLenByte[:]...) + message.Value = append(message.Value, value...) + message.Ts = e.CommitTs + message.Schema = &e.Table.Schema + message.Table = &e.Table.Table + message.IncRowsCount() + + if callback != nil { + d.callbackBuff = append(d.callbackBuff, callback) + } + + d.curBatchSize++ + return nil +} + +// EncodeDDLEvent implements the EventBatchEncoder interface +func (d *OpenProtocolBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage, error) { + keyMsg, valueMsg := ddlEventToMsg(e) + key, err := keyMsg.encode() + if err != nil { + return nil, errors.Trace(err) + } + value, err := valueMsg.encode() + if err != nil { + return nil, errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value))) + + keyBuf := new(bytes.Buffer) + var versionByte [8]byte + binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) + keyBuf.Write(versionByte[:]) + keyBuf.Write(keyLenByte[:]) + keyBuf.Write(key) + + valueBuf := new(bytes.Buffer) + valueBuf.Write(valueLenByte[:]) + valueBuf.Write(value) + + ret := newDDLMsg(config.ProtocolOpen, keyBuf.Bytes(), valueBuf.Bytes(), e) + return ret, nil +} + +// EncodeCheckpointEvent implements the EventBatchEncoder interface +func (d *OpenProtocolBatchEncoder) EncodeCheckpointEvent(ts uint64) (*MQMessage, error) { + keyMsg := newResolvedMessage(ts) + key, err := keyMsg.encode() + if err != nil { + return nil, errors.Trace(err) + } + + var keyLenByte [8]byte + binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key))) + var valueLenByte [8]byte + binary.BigEndian.PutUint64(valueLenByte[:], 0) + + keyBuf := new(bytes.Buffer) + var versionByte [8]byte + binary.BigEndian.PutUint64(versionByte[:], BatchVersion1) + keyBuf.Write(versionByte[:]) + keyBuf.Write(keyLenByte[:]) + keyBuf.Write(key) + + valueBuf := new(bytes.Buffer) + valueBuf.Write(valueLenByte[:]) + + ret := newResolvedMsg(config.ProtocolOpen, keyBuf.Bytes(), valueBuf.Bytes(), ts) + return ret, nil +} + +// Build implements the EventBatchEncoder interface +func (d *OpenProtocolBatchEncoder) Build() (mqMessages []*MQMessage) { + d.tryBuildCallback() + ret := d.messageBuf + d.messageBuf = make([]*MQMessage, 0) + return ret +} + +// tryBuildCallback will collect all the callbacks into one message's callback. +func (d *OpenProtocolBatchEncoder) tryBuildCallback() { + if len(d.messageBuf) != 0 && len(d.callbackBuff) != 0 { + lastMsg := d.messageBuf[len(d.messageBuf)-1] + callbacks := d.callbackBuff + lastMsg.Callback = func() { + for _, cb := range callbacks { + cb() + } + } + d.callbackBuff = make([]func(), 0) + } +} + +type openProtocolBatchEncoderBuilder struct { + config *Config +} + +// Build a OpenProtocolBatchEncoder +func (b *openProtocolBatchEncoderBuilder) Build() EventBatchEncoder { + encoder := newOpenProtocolBatchEncoder() + encoder.(*OpenProtocolBatchEncoder).maxMessageBytes = b.config.maxMessageBytes + encoder.(*OpenProtocolBatchEncoder).maxBatchSize = b.config.maxBatchSize + + return encoder +} + +func newOpenProtocolBatchEncoderBuilder(config *Config) EncoderBuilder { + return &openProtocolBatchEncoderBuilder{config: config} +} + +// newOpenProtocolBatchEncoder creates a new OpenProtocolBatchEncoder. +func newOpenProtocolBatchEncoder() EventBatchEncoder { + batch := &OpenProtocolBatchEncoder{} + return batch +} diff --git a/cdc/sink/mq/codec/json_test.go b/cdc/sink/mq/codec/open_protocol_encoder_test.go similarity index 65% rename from cdc/sink/mq/codec/json_test.go rename to cdc/sink/mq/codec/open_protocol_encoder_test.go index eda03317ca0..29485b9b796 100644 --- a/cdc/sink/mq/codec/json_test.go +++ b/cdc/sink/mq/codec/open_protocol_encoder_test.go @@ -1,4 +1,4 @@ -// Copyright 2020 PingCAP, Inc. +// Copyright 2022 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ import ( "sort" "testing" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" "github.com/stretchr/testify/require" @@ -38,7 +37,7 @@ func (a columnsArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func sortColumnsArrays(arrays ...[]*model.Column) { +func sortColumnArrays(arrays ...[]*model.Column) { for _, array := range arrays { if array != nil { sort.Sort(columnsArray(array)) @@ -52,7 +51,7 @@ type batchTester struct { resolvedTsCases [][]uint64 } -func NewDefaultBatchTester() *batchTester { +func newDefaultBatchTester() *batchTester { return &batchTester{ rowCases: codecRowCases, ddlCases: codecDDLCases, @@ -76,7 +75,7 @@ func (s *batchTester) testBatchCodec( require.Equal(t, model.MessageTypeRow, tp) row, err := decoder.NextRowChangedEvent() require.Nil(t, err) - sortColumnsArrays(row.Columns, row.PreColumns, cs[index].Columns, cs[index].PreColumns) + sortColumnArrays(row.Columns, row.PreColumns, cs[index].Columns, cs[index].PreColumns) require.Equal(t, cs[index], row) index++ } @@ -116,7 +115,7 @@ func (s *batchTester) testBatchCodec( encoder := encoderBuilder.Build() for _, row := range cs { - err := encoder.AppendRowChangedEvent(context.Background(), "", row) + err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil) require.Nil(t, err) } @@ -155,11 +154,11 @@ func (s *batchTester) testBatchCodec( } } -func TestBuildJSONEventBatchEncoder(t *testing.T) { +func TestBuildOpenProtocolBatchEncoder(t *testing.T) { t.Parallel() config := NewConfig(config.ProtocolOpen) - builder := &jsonEventBatchEncoderBuilder{config: config} - encoder, ok := builder.Build().(*JSONEventBatchEncoder) + builder := &openProtocolBatchEncoderBuilder{config: config} + encoder, ok := builder.Build().(*OpenProtocolBatchEncoder) require.True(t, ok) require.Equal(t, config.maxBatchSize, encoder.maxBatchSize) require.Equal(t, config.maxMessageBytes, encoder.maxMessageBytes) @@ -176,24 +175,24 @@ func TestMaxMessageBytes(t *testing.T) { ctx := context.Background() topic := "" - // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it. + // for a single message, the overhead is 36(maxRecordOverhead) + 8(versionHea) = 44, just can hold it. a := 87 + 44 config := NewConfig(config.ProtocolOpen).WithMaxMessageBytes(a) - encoder := newJSONEventBatchEncoderBuilder(config).Build() - err := encoder.AppendRowChangedEvent(ctx, topic, testEvent) + encoder := newOpenProtocolBatchEncoderBuilder(config).Build() + err := encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) require.Nil(t, err) // cannot hold a single message config = config.WithMaxMessageBytes(a - 1) - encoder = newJSONEventBatchEncoderBuilder(config).Build() - err = encoder.AppendRowChangedEvent(ctx, topic, testEvent) + encoder = newOpenProtocolBatchEncoderBuilder(config).Build() + err = encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) require.NotNil(t, err) // make sure each batch's `Length` not greater than `max-message-bytes` config = config.WithMaxMessageBytes(256) - encoder = newJSONEventBatchEncoderBuilder(config).Build() + encoder = newOpenProtocolBatchEncoderBuilder(config).Build() for i := 0; i < 10000; i++ { - err := encoder.AppendRowChangedEvent(ctx, topic, testEvent) + err := encoder.AppendRowChangedEvent(ctx, topic, testEvent, nil) require.Nil(t, err) } @@ -207,7 +206,7 @@ func TestMaxBatchSize(t *testing.T) { t.Parallel() config := NewConfig(config.ProtocolOpen).WithMaxMessageBytes(1048576) config.maxBatchSize = 64 - encoder := newJSONEventBatchEncoderBuilder(config).Build() + encoder := newOpenProtocolBatchEncoderBuilder(config).Build() testEvent := &model.RowChangedEvent{ CommitTs: 1, @@ -216,14 +215,14 @@ func TestMaxBatchSize(t *testing.T) { } for i := 0; i < 10000; i++ { - err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent) + err := encoder.AppendRowChangedEvent(context.Background(), "", testEvent, nil) require.Nil(t, err) } messages := encoder.Build() sum := 0 for _, msg := range messages { - decoder, err := NewJSONEventBatchDecoder(msg.Key, msg.Value) + decoder, err := NewOpenProtocolBatchDecoder(msg.Key, msg.Value) require.Nil(t, err) count := 0 for { @@ -244,78 +243,85 @@ func TestMaxBatchSize(t *testing.T) { require.Equal(t, 10000, sum) } -func TestDefaultEventBatchCodec(t *testing.T) { - config := NewConfig(config.ProtocolOpen).WithMaxMessageBytes(8192) - config.maxBatchSize = 64 - tester := NewDefaultBatchTester() - tester.testBatchCodec(t, newJSONEventBatchEncoderBuilder(config), NewJSONEventBatchDecoder) -} - -func TestFormatCol(t *testing.T) { +func TestOpenProtocolAppendRowChangedEventWithCallback(t *testing.T) { t.Parallel() - row := &mqMessageRow{Update: map[string]column{"test": { - Type: mysql.TypeString, - Value: "测", - }}} - rowEncode, err := row.Encode() - require.Nil(t, err) - row2 := new(mqMessageRow) - err = row2.Decode(rowEncode) - require.Nil(t, err) - require.Equal(t, row, row2) - row = &mqMessageRow{Update: map[string]column{"test": { - Type: mysql.TypeBlob, - Value: []byte("测"), - }}} - rowEncode, err = row.Encode() - require.Nil(t, err) - row2 = new(mqMessageRow) - err = row2.Decode(rowEncode) - require.Nil(t, err) - require.Equal(t, row, row2) -} + cfg := NewConfig(config.ProtocolOpen) + // Set the max batch size to 2, so that we can test the callback. + cfg.maxBatchSize = 2 + builder := &openProtocolBatchEncoderBuilder{config: cfg} + encoder, ok := builder.Build().(*OpenProtocolBatchEncoder) + require.True(t, ok) + require.Equal(t, cfg.maxBatchSize, encoder.maxBatchSize) -func TestNonBinaryStringCol(t *testing.T) { - t.Parallel() - col := &model.Column{ - Name: "test", - Type: mysql.TypeString, - Value: "value", + count := 0 + + row := &model.RowChangedEvent{ + CommitTs: 1, + Table: &model.TableName{Schema: "a", Table: "b"}, + Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } - jsonCol := column{} - jsonCol.FromSinkColumn(col) - row := &mqMessageRow{Update: map[string]column{"test": jsonCol}} - rowEncode, err := row.Encode() - require.Nil(t, err) - row2 := new(mqMessageRow) - err = row2.Decode(rowEncode) - require.Nil(t, err) - require.Equal(t, row, row2) - jsonCol2 := row2.Update["test"] - col2 := jsonCol2.ToSinkColumn("test") - col2.Value = string(col2.Value.([]byte)) - require.Equal(t, col, col2) -} -func TestVarBinaryCol(t *testing.T) { - t.Parallel() - col := &model.Column{ - Name: "test", - Type: mysql.TypeString, - Flag: model.BinaryFlag, - Value: []byte{0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A}, + tests := []struct { + row *model.RowChangedEvent + callback func() + }{ + { + row: row, + callback: func() { + count += 1 + }, + }, + { + row: row, + callback: func() { + count += 2 + }, + }, + { + row: row, + callback: func() { + count += 3 + }, + }, + { + row: row, + callback: func() { + count += 4 + }, + }, + { + row: row, + callback: func() { + count += 5 + }, + }, } - jsonCol := column{} - jsonCol.FromSinkColumn(col) - row := &mqMessageRow{Update: map[string]column{"test": jsonCol}} - rowEncode, err := row.Encode() - require.Nil(t, err) - row2 := new(mqMessageRow) - err = row2.Decode(rowEncode) - require.Nil(t, err) - require.Equal(t, row, row2) - jsonCol2 := row2.Update["test"] - col2 := jsonCol2.ToSinkColumn("test") - require.Equal(t, col, col2) + + // Empty build makes sure that the callback build logic not broken. + msgs := encoder.Build() + require.Len(t, msgs, 0, "no message should be built and no panic") + + // Append the events. + for _, test := range tests { + err := encoder.AppendRowChangedEvent(context.Background(), "", test.row, test.callback) + require.Nil(t, err) + } + require.Equal(t, 0, count, "nothing should be called") + + msgs = encoder.Build() + require.Len(t, msgs, 3, "expected 3 messages") + msgs[0].Callback() + require.Equal(t, 3, count, "expected 2 callbacks be called") + msgs[1].Callback() + require.Equal(t, 10, count, "expected 2 callbacks be called") + msgs[2].Callback() + require.Equal(t, 15, count, "expected 1 callback be called") +} + +func TestOpenProtocolBatchCodec(t *testing.T) { + config := NewConfig(config.ProtocolOpen).WithMaxMessageBytes(8192) + config.maxBatchSize = 64 + tester := newDefaultBatchTester() + tester.testBatchCodec(t, newOpenProtocolBatchEncoderBuilder(config), NewOpenProtocolBatchDecoder) } diff --git a/cdc/sink/mq/codec/open_protocol_message.go b/cdc/sink/mq/codec/open_protocol_message.go new file mode 100644 index 00000000000..d4bd1d82fa7 --- /dev/null +++ b/cdc/sink/mq/codec/open_protocol_message.go @@ -0,0 +1,181 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "bytes" + "encoding/json" + "sort" + "strings" + + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/model" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +type mqMessageRow struct { + Update map[string]column `json:"u,omitempty"` + PreColumns map[string]column `json:"p,omitempty"` + Delete map[string]column `json:"d,omitempty"` +} + +func (m *mqMessageRow) encode() ([]byte, error) { + data, err := json.Marshal(m) + return data, cerror.WrapError(cerror.ErrMarshalFailed, err) +} + +func (m *mqMessageRow) decode(data []byte) error { + decoder := json.NewDecoder(bytes.NewReader(data)) + decoder.UseNumber() + err := decoder.Decode(m) + if err != nil { + return cerror.WrapError(cerror.ErrUnmarshalFailed, err) + } + for colName, column := range m.Update { + m.Update[colName] = formatColumn(column) + } + for colName, column := range m.Delete { + m.Delete[colName] = formatColumn(column) + } + for colName, column := range m.PreColumns { + m.PreColumns[colName] = formatColumn(column) + } + return nil +} + +type mqMessageDDL struct { + Query string `json:"q"` + Type timodel.ActionType `json:"t"` +} + +func (m *mqMessageDDL) encode() ([]byte, error) { + data, err := json.Marshal(m) + return data, cerror.WrapError(cerror.ErrMarshalFailed, err) +} + +func (m *mqMessageDDL) decode(data []byte) error { + return cerror.WrapError(cerror.ErrUnmarshalFailed, json.Unmarshal(data, m)) +} + +func newResolvedMessage(ts uint64) *mqMessageKey { + return &mqMessageKey{ + Ts: ts, + Type: model.MessageTypeResolved, + } +} + +func rowChangeToMsg(e *model.RowChangedEvent) (*mqMessageKey, *mqMessageRow) { + var partition *int64 + if e.Table.IsPartition { + partition = &e.Table.TableID + } + key := &mqMessageKey{ + Ts: e.CommitTs, + Schema: e.Table.Schema, + Table: e.Table.Table, + RowID: e.RowID, + Partition: partition, + Type: model.MessageTypeRow, + } + value := &mqMessageRow{} + if e.IsDelete() { + value.Delete = rowChangeColumns2MQColumns(e.PreColumns) + } else { + value.Update = rowChangeColumns2MQColumns(e.Columns) + value.PreColumns = rowChangeColumns2MQColumns(e.PreColumns) + } + return key, value +} + +func msgToRowChange(key *mqMessageKey, value *mqMessageRow) *model.RowChangedEvent { + e := new(model.RowChangedEvent) + // TODO: we lost the startTs from kafka message + // startTs-based txn filter is out of work + e.CommitTs = key.Ts + e.Table = &model.TableName{ + Schema: key.Schema, + Table: key.Table, + } + // TODO: we lost the tableID from kafka message + if key.Partition != nil { + e.Table.TableID = *key.Partition + e.Table.IsPartition = true + } + + if len(value.Delete) != 0 { + e.PreColumns = mqColumns2RowChangeColumns(value.Delete) + } else { + e.Columns = mqColumns2RowChangeColumns(value.Update) + e.PreColumns = mqColumns2RowChangeColumns(value.PreColumns) + } + return e +} + +func rowChangeColumns2MQColumns(cols []*model.Column) map[string]column { + jsonCols := make(map[string]column, len(cols)) + for _, col := range cols { + if col == nil { + continue + } + c := column{} + c.fromRowChangeColumn(col) + jsonCols[col.Name] = c + } + if len(jsonCols) == 0 { + return nil + } + return jsonCols +} + +func mqColumns2RowChangeColumns(cols map[string]column) []*model.Column { + sinkCols := make([]*model.Column, 0, len(cols)) + for name, col := range cols { + c := col.toRowChangeColumn(name) + sinkCols = append(sinkCols, c) + } + if len(sinkCols) == 0 { + return nil + } + sort.Slice(sinkCols, func(i, j int) bool { + return strings.Compare(sinkCols[i].Name, sinkCols[j].Name) > 0 + }) + return sinkCols +} + +func ddlEventToMsg(e *model.DDLEvent) (*mqMessageKey, *mqMessageDDL) { + key := &mqMessageKey{ + Ts: e.CommitTs, + Schema: e.TableInfo.Schema, + Table: e.TableInfo.Table, + Type: model.MessageTypeDDL, + } + value := &mqMessageDDL{ + Query: e.Query, + Type: e.Type, + } + return key, value +} + +func msgToDDLEvent(key *mqMessageKey, value *mqMessageDDL) *model.DDLEvent { + e := new(model.DDLEvent) + e.TableInfo = new(model.SimpleTableInfo) + // TODO: we lost the startTs from kafka message + // startTs-based txn filter is out of work + e.CommitTs = key.Ts + e.TableInfo.Table = key.Table + e.TableInfo.Schema = key.Schema + e.Type = value.Type + e.Query = value.Query + return e +} diff --git a/cdc/sink/mq/mq_flush_worker.go b/cdc/sink/mq/mq_flush_worker.go index 5db379e2950..29f826b5074 100644 --- a/cdc/sink/mq/mq_flush_worker.go +++ b/cdc/sink/mq/mq_flush_worker.go @@ -162,7 +162,7 @@ func (w *flushWorker) asyncSend( ) error { for key, events := range partitionedRows { for _, event := range events { - err := w.encoder.AppendRowChangedEvent(ctx, key.topic, event) + err := w.encoder.AppendRowChangedEvent(ctx, key.topic, event, nil) if err != nil { return err } diff --git a/cdc/sink/mq/mq_test.go b/cdc/sink/mq/mq_test.go index 53f14a5e267..9be9aee39c3 100644 --- a/cdc/sink/mq/mq_test.go +++ b/cdc/sink/mq/mq_test.go @@ -92,9 +92,9 @@ func TestKafkaSink(t *testing.T) { encoder := sink.encoderBuilder.Build() - require.IsType(t, &codec.JSONEventBatchEncoder{}, encoder) - require.Equal(t, 1, encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize()) - require.Equal(t, 1048576, encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes()) + require.IsType(t, &codec.OpenProtocolBatchEncoder{}, encoder) + require.Equal(t, 1, encoder.(*codec.OpenProtocolBatchEncoder).GetMaxBatchSize()) + require.Equal(t, 1048576, encoder.(*codec.OpenProtocolBatchEncoder).GetMaxMessageBytes()) // mock kafka broker processes 1 row changed event tableID := model.TableID(1) @@ -179,9 +179,9 @@ func TestPulsarSinkEncoderConfig(t *testing.T) { require.Nil(t, err) encoder := sink.encoderBuilder.Build() - require.IsType(t, &codec.JSONEventBatchEncoder{}, encoder) - require.Equal(t, 1, encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize()) - require.Equal(t, 4194304, encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes()) + require.IsType(t, &codec.OpenProtocolBatchEncoder{}, encoder) + require.Equal(t, 1, encoder.(*codec.OpenProtocolBatchEncoder).GetMaxBatchSize()) + require.Equal(t, 4194304, encoder.(*codec.OpenProtocolBatchEncoder).GetMaxMessageBytes()) // FIXME: mock pulsar client doesn't support close, // so we can't call sink.Close() to close it. diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index b440244cb6e..e2813e023b7 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -504,7 +504,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram ) switch c.protocol { case config.ProtocolOpen, config.ProtocolDefault: - decoder, err = codec.NewJSONEventBatchDecoder(message.Key, message.Value) + decoder, err = codec.NewOpenProtocolBatchDecoder(message.Key, message.Value) case config.ProtocolCanalJSON: decoder = codec.NewCanalFlatEventBatchDecoder(message.Value, c.enableTiDBExtension) default: diff --git a/errors.toml b/errors.toml index 23efca79c86..024035693b0 100755 --- a/errors.toml +++ b/errors.toml @@ -391,16 +391,6 @@ error = ''' invalid task key: %s ''' -["CDC:ErrJSONCodecInvalidData"] -error = ''' -json codec invalid data -''' - -["CDC:ErrJSONCodecRowTooLarge"] -error = ''' -json codec single row too large -''' - ["CDC:ErrKVStorageBackoffFailed"] error = ''' backoff failed @@ -611,6 +601,16 @@ error = ''' old value is not enabled ''' +["CDC:ErrOpenProtocolCodecInvalidData"] +error = ''' +open-protocol codec invalid data +''' + +["CDC:ErrOpenProtocolCodecRowTooLarge"] +error = ''' +open-protocol codec single row too large +''' + ["CDC:ErrOperateOnClosedNotifier"] error = ''' operate on a closed notifier diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 3ffeed229eb..d9b941c4b27 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -371,13 +371,13 @@ var ( "maxwell invalid data", errors.RFCCodeText("CDC:ErrMaxwellInvalidData"), ) - ErrJSONCodecInvalidData = errors.Normalize( - "json codec invalid data", - errors.RFCCodeText("CDC:ErrJSONCodecInvalidData"), + ErrOpenProtocolCodecInvalidData = errors.Normalize( + "open-protocol codec invalid data", + errors.RFCCodeText("CDC:ErrOpenProtocolCodecInvalidData"), ) - ErrJSONCodecRowTooLarge = errors.Normalize( - "json codec single row too large", - errors.RFCCodeText("CDC:ErrJSONCodecRowTooLarge"), + ErrOpenProtocolCodecRowTooLarge = errors.Normalize( + "open-protocol codec single row too large", + errors.RFCCodeText("CDC:ErrOpenProtocolCodecRowTooLarge"), ) ErrCanalDecodeFailed = errors.Normalize( "canal decode failed",