Skip to content

Commit

Permalink
Merge branch 'master' into enable-new-sched-by-default
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Mar 11, 2022
2 parents 76fe383 + ce83af5 commit 687e248
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 9 deletions.
16 changes: 8 additions & 8 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strconv"
"time"

"github.com/linkedin/goavro/v2"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -227,11 +228,11 @@ func ColumnInfoToAvroSchema(name string, columnInfo []*model.Column) (string, er
}
field := make(map[string]interface{})
field["name"] = col.Name
if col.Flag.IsHandleKey() {
field["type"] = avroType
} else {
if col.Flag.IsNullable() {
field["type"] = []interface{}{"null", avroType}
field["default"] = nil
} else {
field["type"] = avroType
}

top.Fields = append(top.Fields, field)
Expand All @@ -256,13 +257,12 @@ func rowToAvroNativeData(cols []*model.Column, colInfos []rowcodec.ColInfo, tz *
return nil, err
}

if col.Flag.IsHandleKey() {
// https://pkg.go.dev/github.com/linkedin/goavro/v2#Union
if col.Flag.IsNullable() {
ret[col.Name] = goavro.Union(str, data)
} else {
ret[col.Name] = data
continue
}
union := make(map[string]interface{}, 1)
union[str] = data
ret[col.Name] = union
}
return ret, nil
}
Expand Down
77 changes: 76 additions & 1 deletion cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package codec

import (
"context"
"encoding/json"
"time"

"github.com/linkedin/goavro/v2"
Expand Down Expand Up @@ -157,6 +158,80 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
log.Info("TestAvroEncodeOnly", zap.ByteString("result", txt))
}

func (s *avroBatchEncoderSuite) TestAvroNull(c *check.C) {
defer testleak.AfterTest(c)()

table := model.TableName{
Schema: "testdb",
Table: "TestAvroNull",
}

cols := []*model.Column{
{Name: "id", Value: int64(1), Flag: model.HandleKeyFlag, Type: mysql.TypeLong},
{Name: "colNullable", Value: nil, Flag: model.NullableFlag, Type: mysql.TypeLong},
{Name: "colNotnull", Value: int64(0), Type: mysql.TypeLong},
{Name: "colNullable1", Value: int64(0), Flag: model.NullableFlag, Type: mysql.TypeLong},
}

colInfos := []rowcodec.ColInfo{
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{
ID: 3, IsPKHandle: false, VirtualGenCol: false,
Ft: setFlag(types.NewFieldType(mysql.TypeLong), uint(model.NullableFlag)),
},
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
}

schema, err := ColumnInfoToAvroSchema(table.Table, cols)
c.Assert(err, check.IsNil)
var schemaObj avroSchemaTop
err = json.Unmarshal([]byte(schema), &schemaObj)
c.Assert(err, check.IsNil)
for _, v := range schemaObj.Fields {
if v["name"] == "colNullable" {
c.Assert(v["type"], check.DeepEquals, []interface{}{"null", "int"})
}
if v["name"] == "colNotnull" {
c.Assert(v["type"], check.Equals, "int")
}
}

native, err := rowToAvroNativeData(cols, colInfos, time.Local)
c.Assert(err, check.IsNil)
for k, v := range native.(map[string]interface{}) {
if k == "colNullable" {
c.Check(v, check.IsNil)
}
if k == "colNotnull" {
c.Assert(v, check.Equals, int64(0))
}
if k == "colNullable1" {
c.Assert(v, check.DeepEquals, map[string]interface{}{"int": int64(0)})
}
}

avroCodec, err := goavro.NewCodec(schema)
c.Assert(err, check.IsNil)
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, colInfos, time.Local)
c.Assert(err, check.IsNil)

native, _, err = avroCodec.NativeFromBinary(r.data)
c.Check(err, check.IsNil)
c.Check(native, check.NotNil)
for k, v := range native.(map[string]interface{}) {
if k == "colNullable" {
c.Check(v, check.IsNil)
}
if k == "colNotnull" {
c.Assert(v.(int32), check.Equals, int32(0))
}
if k == "colNullable1" {
c.Assert(v, check.DeepEquals, map[string]interface{}{"int": int32(0)})
}
}
}

func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) {
defer testleak.AfterTest(c)()

Expand Down Expand Up @@ -198,7 +273,7 @@ func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) {
res, _, err := avroCodec.NativeFromBinary(r.data)
c.Check(err, check.IsNil)
c.Check(res, check.NotNil)
actual := (res.(map[string]interface{}))["ts"].(map[string]interface{})["long.timestamp-millis"].(time.Time)
actual := (res.(map[string]interface{}))["ts"].(time.Time)
c.Check(actual.Local().Sub(timestamp), check.LessEqual, time.Millisecond)
}

Expand Down

0 comments on commit 687e248

Please sign in to comment.