diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index bc837517c26..e72264e1191 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -39,6 +39,8 @@ type AvroEventBatchEncoder struct { keySchemaManager *AvroSchemaManager valueSchemaManager *AvroSchemaManager resultBuf []*MQMessage + + tz *time.Location } type avroEncodeResult struct { @@ -75,13 +77,19 @@ func (a *AvroEventBatchEncoder) GetKeySchemaManager() *AvroSchemaManager { return a.keySchemaManager } +// SetTimeZone sets the time-zone that is used to serialize Avro date-time types +func (a *AvroEventBatchEncoder) SetTimeZone(tz *time.Location) { + log.Debug("Setting Avro serializer timezone", zap.String("tz", tz.String())) + a.tz = tz +} + // AppendRowChangedEvent appends a row change event to the encoder // NOTE: the encoder can only store one RowChangedEvent! func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) (EncoderResult, error) { mqMessage := NewMQMessage(ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table) if !e.IsDelete() { - res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns) + res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz) if err != nil { log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String())) return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro") @@ -100,7 +108,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) pkeyCols := e.HandleKeyColumns() - res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols) + res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, a.tz) if err != nil { log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String())) return EncoderNoOperation, errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro") @@ -169,7 +177,7 @@ func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error { return nil } -func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column) (*avroEncodeResult, error) { +func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, tz *time.Location) (*avroEncodeResult, error) { schemaGen := func() (string, error) { schema, err := ColumnInfoToAvroSchema(table.Table, cols) if err != nil { @@ -184,7 +192,7 @@ func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion return nil, errors.Annotate(err, "AvroEventBatchEncoder: get-or-register failed") } - native, err := rowToAvroNativeData(cols) + native, err := rowToAvroNativeData(cols, tz) if err != nil { return nil, errors.Annotate(err, "AvroEventBatchEncoder: converting to native failed") } @@ -255,13 +263,13 @@ func ColumnInfoToAvroSchema(name string, columnInfo []*model.Column) (string, er return string(str), nil } -func rowToAvroNativeData(cols []*model.Column) (interface{}, error) { +func rowToAvroNativeData(cols []*model.Column, tz *time.Location) (interface{}, error) { ret := make(map[string]interface{}, len(cols)) for _, col := range cols { if col == nil { continue } - data, str, err := columnToAvroNativeData(col) + data, str, err := columnToAvroNativeData(col, tz) if err != nil { return nil, err } @@ -361,7 +369,12 @@ func getAvroDataTypeFromColumn(col *model.Column) (interface{}, error) { } } -func columnToAvroNativeData(col *model.Column) (interface{}, string, error) { +var ( + zeroTimeStr = types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, 0).String() + zeroDateStr = types.NewTime(types.ZeroCoreTime, mysql.TypeDate, 0).String() +) + +func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{}, string, error) { if col.Value == nil { return nil, "null", nil } @@ -379,19 +392,35 @@ func columnToAvroNativeData(col *model.Column) (interface{}, string, error) { switch col.Type { case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp: - str := col.Value.(string) - t, err := time.Parse(types.DateFormat, str) + // Refer to `unflatten` in cdc/entry/codec.go for why this piece of code is like this. const fullType = "long." + timestampMillis + str := col.Value.(string) + + if (col.Type == mysql.TypeDate && str == zeroDateStr) || + (col.Type != mysql.TypeDate && str == zeroTimeStr) { + + return time.Time{}, string(fullType), nil + } + + var actualTz *time.Location + if col.Type != mysql.TypeTimestamp { + actualTz = time.UTC + } else { + actualTz = tz + } + + t, err := time.ParseInLocation(types.DateFormat, str, actualTz) + if err == nil { return t, string(fullType), nil } - t, err = time.Parse(types.TimeFormat, str) + t, err = time.ParseInLocation(types.TimeFormat, str, actualTz) if err == nil { return t, string(fullType), nil } - t, err = time.Parse(types.TimeFSPFormat, str) + t, err = time.ParseInLocation(types.TimeFSPFormat, str, actualTz) if err != nil { return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) } diff --git a/cdc/sink/codec/avro_test.go b/cdc/sink/codec/avro_test.go index 0f982c1a7f2..66100cbd9f5 100644 --- a/cdc/sink/codec/avro_test.go +++ b/cdc/sink/codec/avro_test.go @@ -88,7 +88,7 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) { {Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat}, {Name: "mybytes", Value: []byte("Hello World"), Type: mysql.TypeBlob}, {Name: "ts", Value: time.Now().Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp}, - }) + }, time.Local) c.Assert(err, check.IsNil) res, _, err := avroCodec.NativeFromBinary(r.data) @@ -100,6 +100,50 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) { log.Info("TestAvroEncodeOnly", zap.ByteString("result", txt)) } +func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) { + defer testleak.AfterTest(c)() + avroCodec, err := goavro.NewCodec(` + { + "type": "record", + "name": "test1", + "fields" : [ + {"name": "id", "type": ["null", "int"], "default": null}, + {"name": "myint", "type": ["null", "int"], "default": null}, + {"name": "mybool", "type": ["null", "int"], "default": null}, + {"name": "myfloat", "type": ["null", "float"], "default": null}, + {"name": "mybytes", "type": ["null", "bytes"], "default": null}, + {"name": "ts", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null} + ] + }`) + + c.Assert(err, check.IsNil) + + table := model.TableName{ + Schema: "testdb", + Table: "test1", + } + + location, err := time.LoadLocation("UTC") + c.Check(err, check.IsNil) + + timestamp := time.Now() + r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, []*model.Column{ + {Name: "id", Value: int64(1), Type: mysql.TypeLong}, + {Name: "myint", Value: int64(2), Type: mysql.TypeLong}, + {Name: "mybool", Value: int64(1), Type: mysql.TypeTiny}, + {Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat}, + {Name: "mybytes", Value: []byte("Hello World"), Type: mysql.TypeBlob}, + {Name: "ts", Value: timestamp.In(location).Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp}, + }, location) + c.Assert(err, check.IsNil) + + res, _, err := avroCodec.NativeFromBinary(r.data) + c.Check(err, check.IsNil) + c.Check(res, check.NotNil) + actual := (res.(map[string]interface{}))["ts"].(map[string]interface{})["long.timestamp-millis"].(time.Time) + c.Check(actual.Local().Sub(timestamp), check.LessEqual, time.Millisecond) +} + func (s *avroBatchEncoderSuite) TestAvroEnvelope(c *check.C) { defer testleak.AfterTest(c)() avroCodec, err := goavro.NewCodec(` diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 7377198d4a1..07fd0cb9803 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/notify" "github.com/pingcap/ticdc/pkg/security" + "github.com/pingcap/ticdc/pkg/util" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -104,6 +105,7 @@ func newMqSink( avroEncoder := newEncoder1().(*codec.AvroEventBatchEncoder) avroEncoder.SetKeySchemaManager(keySchemaManager) avroEncoder.SetValueSchemaManager(valueSchemaManager) + avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx)) return avroEncoder } } else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue { diff --git a/docker-compose-avro.yml b/docker-compose-avro.yml index 0462232efcf..74bbb45d453 100644 --- a/docker-compose-avro.yml +++ b/docker-compose-avro.yml @@ -43,6 +43,7 @@ services: - --log-file=/logs/capturer0.log - --log-level=debug - --advertise-addr=capturer0:8300 + - --tz=${CDC_TIME_ZONE:-SYSTEM} depends_on: - "upstream-tidb" - "downstream-tidb" @@ -64,6 +65,7 @@ services: - --log-file=/logs/capturer1.log - --log-level=debug - --advertise-addr=capturer1:8300 + - --tz=${CDC_TIME_ZONE:-SYSTEM} depends_on: - "upstream-tidb" - "downstream-tidb" @@ -85,6 +87,7 @@ services: - --log-file=/logs/capturer2.log - --log-level=debug - --advertise-addr=capturer2:8300 + - --tz=${CDC_TIME_ZONE:-SYSTEM} depends_on: - "upstream-tidb" - "downstream-tidb" diff --git a/integration/integration.go b/integration/integration.go index f7677c19cbc..11e6a13cb0a 100644 --- a/integration/integration.go +++ b/integration/integration.go @@ -33,8 +33,10 @@ var ( func testAvro() { env := avro.NewKafkaDockerEnv(*dockerComposeFile) + env.DockerComposeOperator.ExecEnv = []string{"CDC_TIME_ZONE=America/Los_Angeles"} task := &avro.SingleTableTask{TableName: "test"} testCases := []framework.Task{ + tests.NewDateTimeCase(task), tests.NewSimpleCase(task), tests.NewDeleteCase(task), tests.NewManyTypesCase(task), diff --git a/integration/tests/case_date_time.go b/integration/tests/case_date_time.go new file mode 100644 index 00000000000..8d5f622a398 --- /dev/null +++ b/integration/tests/case_date_time.go @@ -0,0 +1,110 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tests + +import ( + "errors" + "log" + "time" + + "github.com/pingcap/ticdc/integration/framework" + "github.com/pingcap/ticdc/integration/framework/avro" + "github.com/pingcap/ticdc/integration/framework/canal" + "github.com/pingcap/ticdc/integration/framework/mysql" +) + +// DateTimeCase is base impl of test case for different types data +type DateTimeCase struct { + framework.Task +} + +// NewDateTimeCase create a test case which has many types +func NewDateTimeCase(task framework.Task) *DateTimeCase { + return &DateTimeCase{ + Task: task, + } +} + +// Name impl framework.Task interface +func (s *DateTimeCase) Name() string { + return "Date Time" +} + +// Run impl framework.Task interface +func (s *DateTimeCase) Run(ctx *framework.TaskContext) error { + var createDBQuery string + switch s.Task.(type) { + case *avro.SingleTableTask: + createDBQuery = `create table test ( + id INT, + t_date DATE, + t_datetime DATETIME, + t_timestamp TIMESTAMP NULL, + PRIMARY KEY (id) + )` + case *canal.SingleTableTask, *mysql.SingleTableTask: + log.Panic("DateTimeCase does not support downstreams other than Avro") + default: + return errors.New("unknown test case type") + } + + _, err := ctx.Upstream.ExecContext(ctx.Ctx, createDBQuery) + if err != nil { + return err + } + if _, ok := s.Task.(*avro.SingleTableTask); ok { + _, err = ctx.Downstream.ExecContext(ctx.Ctx, "drop table if exists test") + if err != nil { + return err + } + + _, err = ctx.Downstream.ExecContext(ctx.Ctx, createDBQuery) + if err != nil { + return err + } + } + + // Get a handle of an existing table + table := ctx.SQLHelper().GetTable("test") + + // Zero value case + zeroValue := time.Unix(0, 0) + data := map[string]interface{}{ + "id": 0, + "t_date": zeroValue, + "t_datetime": zeroValue, + "t_timestamp": zeroValue.Add(time.Second), + } + err = table.Insert(data).Send().Wait().Check() + if err != nil { + return err + } + + // Ancient date case. We DO NOT support it. + // TODO investigate why and find out a solution + /* ancientTime := time.Date(960, 1, 1, 15, 33, 0, 0, time.UTC) + data = map[string]interface{}{ + "id": 1, + "t_date": ancientTime, + "t_datetime": ancientTime, + "t_timestamp": zeroValue.Add(time.Second), // Timestamp does not support the Zero value of `time.Time`, so we test the Unix epoch instead + } + err = table.Insert(data).Send().Wait().Check() + if err != nil { + return err + } + */ + + return nil +}