Skip to content

Commit

Permalink
cdc/codec: fix nil value in canal codec. (#4741) (#4745)
Browse files Browse the repository at this point in the history
close #4736
  • Loading branch information
ti-chi-bot authored Apr 27, 2022
1 parent f393119 commit 2f8ce64
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 15 deletions.
42 changes: 29 additions & 13 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"reflect"
"strconv"
"strings"

Expand Down Expand Up @@ -133,38 +134,50 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st
return h
}

func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) {
func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType, err error) {
javaType := mySQLType2JavaType(c.Type, c.Flag.IsBinary())

switch javaType {
case JavaSQLTypeBINARY, JavaSQLTypeVARBINARY, JavaSQLTypeLONGVARBINARY:
if strings.Contains(mysqlType, "text") {
return JavaSQLTypeCLOB
return JavaSQLTypeCLOB, nil
}
return JavaSQLTypeBLOB
return JavaSQLTypeBLOB, nil
}

// flag `isUnsigned` only for `numerical` and `bit`, `year` data type.
if !c.Flag.IsUnsigned() {
return javaType
return javaType, nil
}

// for year, to `int64`, others to `uint64`.
// no need to promote type for `year` and `bit`
if c.Type == mysql.TypeYear || c.Type == mysql.TypeBit {
return javaType
return javaType, nil
}

if c.Type == mysql.TypeFloat || c.Type == mysql.TypeDouble || c.Type == mysql.TypeNewDecimal {
return javaType
return javaType, nil
}

// for **unsigned** integral types, should have type in `uint64`. see reference:
// https://github.com/pingcap/ticdc/blob/f0a38a7aaf9f3b11a4d807da275b567642733f58/cdc/entry/mounter.go#L493
// for **unsigned** integral types, type would be `uint64` or `string`. see reference:
// https://github.com/pingcap/tiflow/blob/1e3dd155049417e3fd7bf9b0a0c7b08723b33791/cdc/entry/mounter.go#L501
// https://github.com/pingcap/tidb/blob/6495a5a116a016a3e077d181b8c8ad81f76ac31b/types/datum.go#L423-L455
number, ok := c.Value.(uint64)
if !ok {
log.Panic("unsigned value not in type uint64", zap.Any("column", c))
if c.Value == nil {
return javaType, nil
}
var number uint64
switch v := c.Value.(type) {
case uint64:
number = v
case string:
a, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return javaType, err
}
number = a
default:
return javaType, errors.Errorf("unexpected type for unsigned value: %+v, column: %+v", reflect.TypeOf(v), c)
}

// Some special cases handled in canal
Expand Down Expand Up @@ -193,7 +206,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) {
}
}

return javaType
return javaType, nil
}

// In the official canal-json implementation, value were extracted from binlog buffer.
Expand Down Expand Up @@ -277,7 +290,10 @@ func getMySQLType(c *model.Column) string {
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872
func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(c)
javaType := getJavaSQLType(c, mysqlType)
javaType, err := getJavaSQLType(c, mysqlType)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

value, err := b.formatValue(c.Value, javaType)
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func (s *canalFlatSuite) TestNewCanalFlatMessage4DML(c *check.C) {
continue
}

// for `Column.Value` is nil, which mean's it is nullable, set the value to `""`
if obtainedValue == nil {
c.Assert(item.expectedValue, check.Equals, "")
continue
}

if bytes, ok := item.column.Value.([]byte); ok {
expectedValue, err := charmap.ISO8859_1.NewDecoder().Bytes(bytes)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -194,8 +200,12 @@ func (s *canalFlatSuite) TestNewCanalFlatEventBatchDecoder4RowMessage(c *check.C
for _, col := range consumed.Columns {
expected, ok := expectedDecodedValues[col.Name]
c.Assert(ok, check.IsTrue)
if col.Value == nil {
c.Assert(expected, check.Equals, "")
} else {
c.Assert(col.Value, check.Equals, expected)
}

c.Assert(col.Value, check.Equals, expected)
for _, item := range testCaseInsert.Columns {
if item.Name == col.Name {
c.Assert(col.Type, check.Equals, item.Type)
Expand Down
13 changes: 12 additions & 1 deletion cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,22 +395,32 @@ var testColumnsTable = []*testColumnTuple{
{&model.Column{Name: "tinyint", Type: mysql.TypeTiny, Value: int64(127)}, "tinyint", JavaSQLTypeTINYINT, "127"}, // TinyInt
{&model.Column{Name: "tinyint unsigned", Type: mysql.TypeTiny, Value: uint64(127), Flag: model.UnsignedFlag}, "tinyint unsigned", JavaSQLTypeTINYINT, "127"},
{&model.Column{Name: "tinyint unsigned 2", Type: mysql.TypeTiny, Value: uint64(128), Flag: model.UnsignedFlag}, "tinyint unsigned", JavaSQLTypeSMALLINT, "128"},
{&model.Column{Name: "tinyint unsigned 3", Type: mysql.TypeTiny, Value: "0", Flag: model.UnsignedFlag}, "tinyint unsigned", JavaSQLTypeTINYINT, "0"},
{&model.Column{Name: "tinyint unsigned 4", Type: mysql.TypeTiny, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "tinyint unsigned", JavaSQLTypeTINYINT, ""},

{&model.Column{Name: "smallint", Type: mysql.TypeShort, Value: int64(32767)}, "smallint", JavaSQLTypeSMALLINT, "32767"},
{&model.Column{Name: "smallint unsigned", Type: mysql.TypeShort, Value: uint64(32767), Flag: model.UnsignedFlag}, "smallint unsigned", JavaSQLTypeSMALLINT, "32767"},
{&model.Column{Name: "smallint unsigned 2", Type: mysql.TypeShort, Value: uint64(32768), Flag: model.UnsignedFlag}, "smallint unsigned", JavaSQLTypeINTEGER, "32768"},
{&model.Column{Name: "smallint unsigned 3", Type: mysql.TypeShort, Value: "0", Flag: model.UnsignedFlag}, "smallint unsigned", JavaSQLTypeSMALLINT, "0"},
{&model.Column{Name: "smallint unsigned 4", Type: mysql.TypeShort, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "smallint unsigned", JavaSQLTypeSMALLINT, ""},

{&model.Column{Name: "mediumint", Type: mysql.TypeInt24, Value: int64(8388607)}, "mediumint", JavaSQLTypeINTEGER, "8388607"},
{&model.Column{Name: "mediumint unsigned", Type: mysql.TypeInt24, Value: uint64(8388607), Flag: model.UnsignedFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, "8388607"},
{&model.Column{Name: "mediumint unsigned 2", Type: mysql.TypeInt24, Value: uint64(8388608), Flag: model.UnsignedFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, "8388608"},
{&model.Column{Name: "mediumint unsigned 3", Type: mysql.TypeInt24, Value: "0", Flag: model.UnsignedFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, "0"},
{&model.Column{Name: "mediumint unsigned 4", Type: mysql.TypeInt24, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "mediumint unsigned", JavaSQLTypeINTEGER, ""},

{&model.Column{Name: "int", Type: mysql.TypeLong, Value: int64(2147483647)}, "int", JavaSQLTypeINTEGER, "2147483647"},
{&model.Column{Name: "int unsigned", Type: mysql.TypeLong, Value: uint64(2147483647), Flag: model.UnsignedFlag}, "int unsigned", JavaSQLTypeINTEGER, "2147483647"},
{&model.Column{Name: "int unsigned 2", Type: mysql.TypeLong, Value: uint64(2147483648), Flag: model.UnsignedFlag}, "int unsigned", JavaSQLTypeBIGINT, "2147483648"},
{&model.Column{Name: "int unsigned 3", Type: mysql.TypeLong, Value: "0", Flag: model.UnsignedFlag}, "int unsigned", JavaSQLTypeINTEGER, "0"},
{&model.Column{Name: "int unsigned 4", Type: mysql.TypeLong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "int unsigned", JavaSQLTypeINTEGER, ""},

{&model.Column{Name: "bigint", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, "bigint", JavaSQLTypeBIGINT, "9223372036854775807"},
{&model.Column{Name: "bigint unsigned", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "9223372036854775807"},
{&model.Column{Name: "bigint unsigned 2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeDECIMAL, "9223372036854775808"},
{&model.Column{Name: "bigint unsigned 3", Type: mysql.TypeLonglong, Value: "0", Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "0"},
{&model.Column{Name: "bigint unsigned 4", Type: mysql.TypeLonglong, Value: nil, Flag: model.BinaryFlag | model.UnsignedFlag | model.NullableFlag}, "bigint unsigned", JavaSQLTypeBIGINT, ""},

{&model.Column{Name: "float", Type: mysql.TypeFloat, Value: 3.14}, "float", JavaSQLTypeREAL, "3.14"},
{&model.Column{Name: "double", Type: mysql.TypeDouble, Value: 2.71}, "double", JavaSQLTypeDOUBLE, "2.71"},
Expand Down Expand Up @@ -454,7 +464,8 @@ func (s *canalEntrySuite) TestGetMySQLTypeAndJavaSQLType(c *check.C) {
obtainedMySQLType := getMySQLType(item.column)
c.Assert(obtainedMySQLType, check.Equals, item.expectedMySQLType)

obtainedJavaSQLType := getJavaSQLType(item.column, obtainedMySQLType)
obtainedJavaSQLType, err := getJavaSQLType(item.column, obtainedMySQLType)
c.Assert(err, check.IsNil)
c.Assert(obtainedJavaSQLType, check.Equals, item.expectedJavaSQLType)
}
}

0 comments on commit 2f8ce64

Please sign in to comment.