From 02b0dfb43a53aecd8b0151525d0559469d7f4e9f Mon Sep 17 00:00:00 2001 From: Xiang Zhang Date: Wed, 18 May 2022 10:46:37 +0800 Subject: [PATCH] sink/codec(cdc): add message size check and support schema default (#5451) ref pingcap/tiflow#5338 --- cdc/entry/mounter.go | 11 ++ cdc/entry/mounter_test.go | 149 ++++++++++++------ cdc/model/sink.go | 1 + cdc/sink/codec/avro.go | 121 +++++++++++++- cdc/sink/codec/avro_test.go | 4 +- cdc/sink/codec/schema_registry.go | 3 +- .../integration_tests/multi_changefeed/run.sh | 2 +- 7 files changed, 233 insertions(+), 58 deletions(-) diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 4d91a2dd699..b50f18a72b7 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -270,12 +270,14 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill if warn != "" { log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String())) } + defaultValue := getDDLDefaultDefinition(colInfo) colSize += size cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{ Name: colName, Type: colInfo.Tp, Charset: colInfo.Charset, Value: colValue, + Default: defaultValue, Flag: tableInfo.ColumnsFlag[colInfo.ID], // ApproximateBytes = column data size + column struct size ApproximateBytes: colSize + sizeOfEmptyColumn, @@ -489,6 +491,15 @@ func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, int, string, e return formatColVal(d, col) } +func getDDLDefaultDefinition(col *timodel.ColumnInfo) interface{} { + defaultValue := col.GetDefaultValue() + if defaultValue == nil { + defaultValue = col.GetOriginDefaultValue() + } + defaultDatum := types.NewDatum(defaultValue) + return defaultDatum.GetValue() +} + // DecodeTableID decodes the raw key to a table ID func DecodeTableID(key []byte) (model.TableID, error) { _, physicalTableID, err := decodeTableID(key) diff --git a/cdc/entry/mounter_test.go b/cdc/entry/mounter_test.go index bbf5219f3ee..d1da363729a 100644 --- a/cdc/entry/mounter_test.go +++ b/cdc/entry/mounter_test.go @@ -444,6 +444,7 @@ func TestGetDefaultZeroValue(t *testing.T) { Name string ColInfo timodel.ColumnInfo Res interface{} + Default interface{} }{ // mysql flag null { @@ -453,7 +454,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: uint(0), }, }, - Res: nil, + Res: nil, + Default: nil, }, // mysql.TypeTiny + notnull + nodefault { @@ -464,7 +466,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: int64(0), + Res: int64(0), + Default: nil, }, // mysql.TypeTiny + notnull + default { @@ -476,22 +479,24 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: int64(-1314), + Res: int64(-1314), + Default: int64(-1314), }, - // mysql.TypeTiny + notnull + default + unsigned + // mysql.TypeTiny + notnull + unsigned { - Name: "mysql.TypeTiny + notnull + default + unsigned", + Name: "mysql.TypeTiny + notnull + unsigned", ColInfo: timodel.ColumnInfo{ FieldType: types.FieldType{ Tp: mysql.TypeTiny, Flag: mysql.NotNullFlag | mysql.UnsignedFlag, }, }, - Res: uint64(0), + Res: uint64(0), + Default: nil, }, - // mysql.TypeTiny + notnull + unsigned + // mysql.TypeTiny + notnull + default + unsigned { - Name: "mysql.TypeTiny + notnull + unsigned", + Name: "mysql.TypeTiny + notnull + default + unsigned", ColInfo: timodel.ColumnInfo{ OriginDefaultValue: uint64(1314), FieldType: types.FieldType{ @@ -499,7 +504,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag | mysql.UnsignedFlag, }, }, - Res: uint64(1314), + Res: uint64(1314), + Default: uint64(1314), }, // mysql.TypeTiny + null + default { @@ -511,7 +517,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: uint(0), }, }, - Res: int64(-1314), + Res: int64(-1314), + Default: int64(-1314), }, // mysql.TypeTiny + null + nodefault { @@ -522,7 +529,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: uint(0), }, }, - Res: nil, + Res: nil, + Default: nil, }, // mysql.TypeShort, others testCases same as tiny { @@ -533,7 +541,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: int64(0), + Res: int64(0), + Default: nil, }, // mysql.TypeLong, others testCases same as tiny { @@ -544,7 +553,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: int64(0), + Res: int64(0), + Default: nil, }, // mysql.TypeLonglong, others testCases same as tiny { @@ -555,7 +565,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: int64(0), + Res: int64(0), + Default: nil, }, // mysql.TypeInt24, others testCases same as tiny { @@ -566,7 +577,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: int64(0), + Res: int64(0), + Default: nil, }, // mysql.TypeFloat + notnull + nodefault { @@ -577,7 +589,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: float64(0), + Res: float64(0), + Default: nil, }, // mysql.TypeFloat + notnull + default { @@ -589,7 +602,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: float64(-3.1415), + Res: float64(-3.1415), + Default: float64(-3.1415), }, // mysql.TypeFloat + notnull + default + unsigned { @@ -601,7 +615,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag | mysql.UnsignedFlag, }, }, - Res: float64(3.1415), + Res: float64(3.1415), + Default: float64(3.1415), }, // mysql.TypeFloat + notnull + unsigned { @@ -612,7 +627,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag | mysql.UnsignedFlag, }, }, - Res: float64(0), + Res: float64(0), + Default: nil, }, // mysql.TypeFloat + null + default { @@ -624,7 +640,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: uint(0), }, }, - Res: float64(-3.1415), + Res: float64(-3.1415), + Default: float64(-3.1415), }, // mysql.TypeFloat + null + nodefault { @@ -635,7 +652,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: uint(0), }, }, - Res: nil, + Res: nil, + Default: nil, }, // mysql.TypeDouble, other testCases same as float { @@ -646,7 +664,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: float64(0), + Res: float64(0), + Default: nil, }, // mysql.TypeNewDecimal + notnull + nodefault { @@ -659,7 +678,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Decimal: 2, }, }, - Res: "0", // related with Flen and Decimal + Res: "0", // related with Flen and Decimal + Default: nil, }, // mysql.TypeNewDecimal + null + nodefault { @@ -672,7 +692,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Decimal: 2, }, }, - Res: nil, + Res: nil, + Default: nil, }, // mysql.TypeNewDecimal + null + default { @@ -686,7 +707,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Decimal: 2, }, }, - Res: "-3.14", + Res: "-3.14", + Default: "-3.14", }, // mysql.TypeNull { @@ -696,7 +718,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Tp: mysql.TypeNull, }, }, - Res: nil, + Res: nil, + Default: nil, }, // mysql.TypeTimestamp + notnull + nodefault { @@ -707,7 +730,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: "0000-00-00 00:00:00", + Res: "0000-00-00 00:00:00", + Default: nil, }, // mysql.TypeTimestamp + notnull + default { @@ -719,7 +743,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: "2020-11-19 12:12:12", + Res: "2020-11-19 12:12:12", + Default: "2020-11-19 12:12:12", }, // mysql.TypeTimestamp + null + default { @@ -731,7 +756,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: "2020-11-19 12:12:12", + Res: "2020-11-19 12:12:12", + Default: "2020-11-19 12:12:12", }, // mysql.TypeDate, other testCases same as TypeTimestamp { @@ -742,7 +768,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: "0000-00-00", + Res: "0000-00-00", + Default: nil, }, // mysql.TypeDuration, other testCases same as TypeTimestamp { @@ -753,7 +780,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: "00:00:00", + Res: "00:00:00", + Default: nil, }, // mysql.TypeDatetime, other testCases same as TypeTimestamp { @@ -764,7 +792,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: "0000-00-00 00:00:00", + Res: "0000-00-00 00:00:00", + Default: nil, }, // mysql.TypeYear + notnull + nodefault { @@ -775,7 +804,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: int64(0), + Res: int64(0), + Default: nil, }, // mysql.TypeYear + notnull + default { @@ -788,7 +818,8 @@ func TestGetDefaultZeroValue(t *testing.T) { }, }, // TypeYear default value will be a string and then translate to []byte - Res: "2021", + Res: "2021", + Default: "2021", }, // mysql.TypeNewDate { @@ -799,7 +830,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: nil, // [TODO] seems not support by TiDB, need check + Res: nil, // [TODO] seems not support by TiDB, need check + Default: nil, }, // mysql.TypeVarchar + notnull + nodefault { @@ -810,7 +842,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: []byte{}, + Res: []byte{}, + Default: nil, }, // mysql.TypeVarchar + notnull + default { @@ -823,7 +856,8 @@ func TestGetDefaultZeroValue(t *testing.T) { }, }, // TypeVarchar default value will be a string and then translate to []byte - Res: "e0", + Res: "e0", + Default: "e0", }, // mysql.TypeTinyBlob { @@ -834,7 +868,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: []byte{}, + Res: []byte{}, + Default: nil, }, // mysql.TypeMediumBlob { @@ -845,7 +880,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: []byte{}, + Res: []byte{}, + Default: nil, }, // mysql.TypeLongBlob { @@ -856,7 +892,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: []byte{}, + Res: []byte{}, + Default: nil, }, // mysql.TypeBlob { @@ -867,7 +904,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: []byte{}, + Res: []byte{}, + Default: nil, }, // mysql.TypeVarString { @@ -878,7 +916,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: []byte{}, + Res: []byte{}, + Default: nil, }, // mysql.TypeString { @@ -889,7 +928,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: []byte{}, + Res: []byte{}, + Default: nil, }, // mysql.TypeBit { @@ -900,7 +940,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Tp: mysql.TypeBit, }, }, - Res: uint64(0), + Res: uint64(0), + Default: nil, }, // BLOB, TEXT, GEOMETRY or JSON column can't have a default value // mysql.TypeJSON @@ -912,7 +953,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: "null", + Res: "null", + Default: nil, }, // mysql.TypeEnum + notnull + nodefault { @@ -926,7 +968,8 @@ func TestGetDefaultZeroValue(t *testing.T) { }, // TypeEnum value will be a string and then translate to []byte // NotNull && no default will choose first element - Res: uint64(0), + Res: uint64(0), + Default: nil, }, // mysql.TypeEnum + notnull + default { @@ -940,7 +983,8 @@ func TestGetDefaultZeroValue(t *testing.T) { }, }, // TypeEnum default value will be a string and then translate to []byte - Res: "e1", + Res: "e1", + Default: "e1", }, // mysql.TypeSet + notnull { @@ -951,7 +995,8 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: uint64(0), + Res: uint64(0), + Default: nil, }, // mysql.TypeSet + notnull + default { @@ -964,7 +1009,8 @@ func TestGetDefaultZeroValue(t *testing.T) { }, }, // TypeSet default value will be a string and then translate to []byte - Res: "1,e", + Res: "1,e", + Default: "1,e", }, // mysql.TypeGeometry { @@ -975,12 +1021,15 @@ func TestGetDefaultZeroValue(t *testing.T) { Flag: mysql.NotNullFlag, }, }, - Res: nil, // not support yet + Res: nil, // not support yet + Default: nil, }, } for _, tc := range testCases { val, _, _, _ := getDefaultOrZeroValue(&tc.ColInfo) require.Equal(t, tc.Res, val, tc.Name) + val = getDDLDefaultDefinition(&tc.ColInfo) + require.Equal(t, tc.Default, val, tc.Name) } } diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 9666d941a64..9b4f9620315 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -407,6 +407,7 @@ type Column struct { Charset string `json:"charset" msg:"charset"` Flag ColumnFlagType `json:"flag" msg:"-"` Value interface{} `json:"value" msg:"value"` + Default interface{} `json:"default" msg:"-"` // ApproximateBytes is approximate bytes consumed by the column. ApproximateBytes int `json:"-"` diff --git a/cdc/sink/codec/avro.go b/cdc/sink/codec/avro.go index 116847c0714..4bdbfb377d0 100644 --- a/cdc/sink/codec/avro.go +++ b/cdc/sink/codec/avro.go @@ -42,6 +42,7 @@ type AvroEventBatchEncoder struct { keySchemaManager *AvroSchemaManager valueSchemaManager *AvroSchemaManager resultBuf []*MQMessage + maxMessageBytes int enableTiDBExtension bool decimalHandlingMode string @@ -107,6 +108,20 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent( mqMessage.Key = nil } mqMessage.IncRowsCount() + + if mqMessage.Length() > a.maxMessageBytes { + log.Error( + "Single message too large", + zap.Int( + "maxMessageBytes", + a.maxMessageBytes, + ), + zap.Int("length", mqMessage.Length()), + zap.Any("table", e.Table), + ) + return cerror.ErrAvroEncodeFailed.GenWithStackByArgs() + } + a.resultBuf = append(a.resultBuf, mqMessage) return nil @@ -388,11 +403,43 @@ func rowToAvroSchema( } field := make(map[string]interface{}) field["name"] = sanitizeName(col.Name) - if col.Flag.IsNullable() { - field["type"] = []interface{}{"null", avroType} - field["default"] = nil + + copy := *col + copy.Value = copy.Default + defaultValue, _, err := columnToAvroData( + ©, + colInfos[i].Ft, + decimalHandlingMode, + bigintUnsignedHandlingMode, + ) + if err != nil { + log.Error("fail to get default value for avro schema") + return "", errors.Trace(err) + } + // goavro doesn't support set default value for logical type + // https://github.com/linkedin/goavro/issues/202 + if _, ok := avroType.(avroLogicalTypeSchema); ok { + if col.Flag.IsNullable() { + field["type"] = []interface{}{"null", avroType} + field["default"] = nil + } else { + field["type"] = avroType + } } else { - field["type"] = avroType + if col.Flag.IsNullable() { + // https://stackoverflow.com/questions/22938124/avro-field-default-values + if defaultValue == nil { + field["type"] = []interface{}{"null", avroType} + } else { + field["type"] = []interface{}{avroType, "null"} + } + field["default"] = defaultValue + } else { + field["type"] = avroType + if defaultValue != nil { + field["default"] = defaultValue + } + } } top.Fields = append(top.Fields, field) @@ -609,16 +656,50 @@ func columnToAvroData( switch col.Type { case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24: + if v, ok := col.Value.(string); ok { + n, err := strconv.ParseInt(v, 10, 32) + if err != nil { + return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) + } + return int32(n), "int", nil + } if col.Flag.IsUnsigned() { return int32(col.Value.(uint64)), "int", nil } return int32(col.Value.(int64)), "int", nil case mysql.TypeLong: + if v, ok := col.Value.(string); ok { + n, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) + } + if col.Flag.IsUnsigned() { + return n, "long", nil + } + return int32(n), "int", nil + } if col.Flag.IsUnsigned() { return int64(col.Value.(uint64)), "long", nil } return int32(col.Value.(int64)), "int", nil case mysql.TypeLonglong: + if v, ok := col.Value.(string); ok { + if col.Flag.IsUnsigned() { + if bigintUnsignedHandlingMode == bigintUnsignedHandlingModeString { + return v, "string", nil + } + n, err := strconv.ParseUint(v, 10, 64) + if err != nil { + return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) + } + return int64(n), "long", nil + } + n, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) + } + return n, "long", nil + } if col.Flag.IsUnsigned() { if bigintUnsignedHandlingMode == bigintUnsignedHandlingModeLong { return int64(col.Value.(uint64)), "long", nil @@ -628,8 +709,18 @@ func columnToAvroData( } return col.Value.(int64), "long", nil case mysql.TypeFloat, mysql.TypeDouble: + if v, ok := col.Value.(string); ok { + n, err := strconv.ParseFloat(v, 64) + if err != nil { + return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) + } + return n, "double", nil + } return col.Value.(float64), "double", nil case mysql.TypeBit: + if v, ok := col.Value.(string); ok { + return []byte(v), "bytes", nil + } return []byte(types.NewBinaryLiteralFromUint(col.Value.(uint64), -1)), "bytes", nil case mysql.TypeNewDecimal: if decimalHandlingMode == decimalHandlingModePrecise { @@ -651,16 +742,28 @@ func columnToAvroData( mysql.TypeMediumBlob, mysql.TypeLongBlob: if col.Flag.IsBinary() { + if v, ok := col.Value.(string); ok { + return []byte(v), "bytes", nil + } return col.Value, "bytes", nil } + if v, ok := col.Value.(string); ok { + return v, "string", nil + } return string(col.Value.([]byte)), "string", nil case mysql.TypeEnum: + if v, ok := col.Value.(string); ok { + return v, "string", nil + } enumVar, err := types.ParseEnumValue(ft.Elems, col.Value.(uint64)) if err != nil { return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) } return enumVar.Name, "string", nil case mysql.TypeSet: + if v, ok := col.Value.(string); ok { + return v, "string", nil + } setVar, err := types.ParseSetValue(ft.Elems, col.Value.(uint64)) if err != nil { return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) @@ -671,7 +774,14 @@ func columnToAvroData( case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp, mysql.TypeDuration: return col.Value.(string), "string", nil case mysql.TypeYear: - return col.Value.(int64), "int", nil + if v, ok := col.Value.(string); ok { + n, err := strconv.ParseInt(v, 10, 32) + if err != nil { + return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) + } + return int32(n), "int", nil + } + return int32(col.Value.(int64)), "int", nil default: log.Error("unknown mysql type", zap.Any("mysqlType", col.Type)) return nil, "", cerror.ErrAvroEncodeFailed.GenWithStack("unknown mysql type") @@ -743,6 +853,7 @@ func (b *avroEventBatchEncoderBuilder) Build() EventBatchEncoder { encoder.keySchemaManager = b.keySchemaManager encoder.valueSchemaManager = b.valueSchemaManager encoder.resultBuf = make([]*MQMessage, 0, 4096) + encoder.maxMessageBytes = b.config.MaxMessageBytes() encoder.enableTiDBExtension = b.config.enableTiDBExtension encoder.decimalHandlingMode = b.config.avroDecimalHandlingMode encoder.bigintUnsignedHandlingMode = b.config.avroBigintUnsignedHandlingMode diff --git a/cdc/sink/codec/avro_test.go b/cdc/sink/codec/avro_test.go index 22265273f13..87e99de4ec1 100644 --- a/cdc/sink/codec/avro_test.go +++ b/cdc/sink/codec/avro_test.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "encoding/json" + "math" "math/big" "testing" @@ -60,6 +61,7 @@ func setupEncoderAndSchemaRegistry( valueSchemaManager: valueManager, keySchemaManager: keyManager, resultBuf: make([]*MQMessage, 0, 4096), + maxMessageBytes: math.MaxInt, enableTiDBExtension: enableTiDBExtension, decimalHandlingMode: decimalHandlingMode, bigintUnsignedHandlingMode: bigintUnsignedHandlingMode, @@ -565,7 +567,7 @@ var avroTestColumns = []*avroTestColumnTuple{ Ft: types.NewFieldType(mysql.TypeYear), }, avroSchema{Type: "int", Parameters: map[string]string{"tidb_type": "YEAR"}}, - int64(1970), "int", + int32(1970), "int", }, } diff --git a/cdc/sink/codec/schema_registry.go b/cdc/sink/codec/schema_registry.go index c1baed6826b..f7e683a6383 100644 --- a/cdc/sink/codec/schema_registry.go +++ b/cdc/sink/codec/schema_registry.go @@ -132,7 +132,7 @@ func (m *AvroSchemaManager) Register( } uri := m.registryURL + "/subjects/" + url.QueryEscape( m.topicNameToSchemaSubject(topicName), - ) + "/versions?normalize=true" + ) + "/versions" log.Info("Registering schema", zap.String("uri", uri), zap.ByteString("payload", payload)) req, err := http.NewRequestWithContext(ctx, "POST", uri, bytes.NewReader(payload)) @@ -145,6 +145,7 @@ func (m *AvroSchemaManager) Register( "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, "+ "application/json", ) + req.Header.Add("Content-Type", "application/vnd.schemaregistry.v1+json") resp, err := httpRetry(ctx, m.credential, req) if err != nil { return 0, err diff --git a/tests/integration_tests/multi_changefeed/run.sh b/tests/integration_tests/multi_changefeed/run.sh index 3cca6363425..09b73fa962a 100755 --- a/tests/integration_tests/multi_changefeed/run.sh +++ b/tests/integration_tests/multi_changefeed/run.sh @@ -46,7 +46,7 @@ function check_old_value_enabled() { # check if exist a delete row without a complete `pre-column` # When old value is turned off, the pre-column in our delete will only include the handle columns. # So here we only have 1 (id). - delete_without_old_value_count=$(grep "EmitRowChangedEvents" "$1/cdc.log" | grep 'pre\-columns\\\":\[' | grep 'columns\\\":null' | grep -c 'value\\\":1},null') + delete_without_old_value_count=$(grep "EmitRowChangedEvents" "$1/cdc.log" | grep 'pre\-columns\\\":\[' | grep 'columns\\\":null' | grep -c 'value\\\":1,\\\"default\\\":null},null') if [[ "$delete_without_old_value_count" -ne 1 ]]; then echo "can't found delete row without old value" exit 1