From 5f527341a153a5801fb42feacb9b3a8416178719 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Tue, 22 Oct 2024 18:37:56 +0800 Subject: [PATCH] update --- pkg/sink/codec/debezium/codec.go | 60 +++++---- pkg/sink/codec/debezium/helper.go | 18 ++- .../debezium/sql/debezium/default_value.sql | 39 ++++-- .../debezium/src/test_cases.go | 123 +++++++----------- 4 files changed, 123 insertions(+), 117 deletions(-) diff --git a/pkg/sink/codec/debezium/codec.go b/pkg/sink/codec/debezium/codec.go index edcac091c3d..996217e33d6 100644 --- a/pkg/sink/codec/debezium/codec.go +++ b/pkg/sink/codec/debezium/codec.go @@ -427,8 +427,11 @@ func (c *dbzCodec) writeDebeziumFieldValue( ft *types.FieldType, ) error { if col.Value == nil { - writer.WriteNullField(col.GetName()) - return nil + if col.GetDefaultValue() == nil { + writer.WriteNullField(col.GetName()) + return nil + } + col.Value = col.GetDefaultValue() } switch col.GetType() { case mysql.TypeBit: @@ -492,7 +495,6 @@ func (c *dbzCodec) writeDebeziumFieldValue( writer.WriteStringField(col.GetName(), "") return nil } - writer.WriteStringField(col.GetName(), enumVar.Name) return nil @@ -511,7 +513,6 @@ func (c *dbzCodec) writeDebeziumFieldValue( writer.WriteStringField(col.GetName(), "") return nil } - writer.WriteStringField(col.GetName(), setVar.Name) return nil @@ -530,7 +531,6 @@ func (c *dbzCodec) writeDebeziumFieldValue( cerror.ErrDebeziumEncodeFailed, err) } - writer.WriteFloat64Field(col.GetName(), floatV) return nil @@ -555,7 +555,6 @@ func (c *dbzCodec) writeDebeziumFieldValue( return nil } } - writer.WriteInt64Field(col.GetName(), t.Unix()/60/60/24) return nil @@ -573,25 +572,23 @@ func (c *dbzCodec) writeDebeziumFieldValue( col.GetName()) } - t, err := time.Parse("2006-01-02 15:04:05.999999", v) + t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", v, c.config.TimeZone) if err != nil { // For example, time may be 1000-00-00 if mysql.HasNotNullFlag(ft.GetFlag()) { writer.WriteInt64Field(col.GetName(), 0) return nil - } else { - writer.WriteNullField(col.GetName()) - return nil } + writer.WriteNullField(col.GetName()) + return nil } if ft.GetDecimal() <= 3 { writer.WriteInt64Field(col.GetName(), t.UnixMilli()) return nil - } else { - writer.WriteInt64Field(col.GetName(), t.UnixMicro()) - return nil } + writer.WriteInt64Field(col.GetName(), t.UnixMicro()) + return nil case mysql.TypeTimestamp: // Debezium behavior from doc: @@ -616,10 +613,9 @@ func (c *dbzCodec) writeDebeziumFieldValue( // For example, time may be invalid like 1000-00-00 if mysql.HasNotNullFlag(ft.GetFlag()) { t = time.Unix(0, 0) - } else { - writer.WriteNullField(col.GetName()) - return nil } + writer.WriteNullField(col.GetName()) + return nil } str := t.UTC().Format("2006-01-02T15:04:05") @@ -629,7 +625,6 @@ func (c *dbzCodec) writeDebeziumFieldValue( str = str + tmp[:1+fsp] } str += "Z" - writer.WriteStringField(col.GetName(), str) return nil @@ -645,20 +640,29 @@ func (c *dbzCodec) writeDebeziumFieldValue( col.GetName()) } - d, _, _, err := types.StrToDuration(types.DefaultStmtNoWarningContext, v, ft.GetDecimal()) + t, err := time.ParseInLocation("15:04:05.999999", v, c.config.TimeZone) + if err != nil { + // For example, time may be invalid like 1000-00-00 + if mysql.HasNotNullFlag(ft.GetFlag()) { + t = time.Unix(0, 0) + } else { + writer.WriteNullField(col.GetName()) + return nil + } + } + str := t.AddDate(2006, 1, 2).UTC().Format("15:04:05.999999") + d, _, _, err := types.StrToDuration(types.DefaultStmtNoWarningContext, str, ft.GetDecimal()) if err != nil { return cerror.WrapError( cerror.ErrDebeziumEncodeFailed, err) } - writer.WriteInt64Field(col.GetName(), d.Microseconds()) return nil - case mysql.TypeLonglong: + case mysql.TypeLonglong, mysql.TypeLong: // Note: Although Debezium's doc claims to use INT32 for INT, but it // actually uses INT64. Debezium also uses INT32 for SMALLINT. - // So we only handle with TypeLonglong here. if col.GetFlag().IsUnsigned() { // Handle with BIGINT UNSIGNED. // Debezium always produce INT64 instead of UINT64 for BIGINT. @@ -669,11 +673,23 @@ func (c *dbzCodec) writeDebeziumFieldValue( col.Value, col.GetName()) } - writer.WriteInt64Field(col.GetName(), int64(v)) return nil } + case mysql.TypeInt24, mysql.TypeShort, mysql.TypeTiny: + if col.GetFlag().IsUnsigned() { + v, ok := col.Value.(uint32) + if !ok { + return cerror.ErrDebeziumEncodeFailed.GenWithStack( + "unexpected column value type %T for unsigned bigint column %s", + col.Value, + col.GetName()) + } + writer.WriteIntField(col.GetName(), int(v)) + return nil + } + case mysql.TypeTiDBVectorFloat32: v, ok := col.Value.(types.VectorFloat32) if !ok { diff --git a/pkg/sink/codec/debezium/helper.go b/pkg/sink/codec/debezium/helper.go index 26c38826220..da1f5ab99ac 100644 --- a/pkg/sink/codec/debezium/helper.go +++ b/pkg/sink/codec/debezium/helper.go @@ -132,6 +132,9 @@ func getCharset(ft types.FieldType) string { func getLen(ft types.FieldType) int { decimal := ft.GetDecimal() flen := ft.GetFlen() + if mysql.HasUnsignedFlag(ft.GetFlag()) { + return -1 + } switch ft.GetType() { case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration: return decimal @@ -140,12 +143,15 @@ func getLen(ft types.FieldType) int { return flen case mysql.TypeEnum: return 1 - // case mysql.TypeLonglong, mysql.TypeInt24: - // if mysql.HasNotNullFlag(ft.GetFlag()) { - // return -1 - // } - // return flen - case mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24: + case mysql.TypeLonglong, mysql.TypeInt24: + if mysql.HasNotNullFlag(ft.GetFlag()) { + return -1 + } + defaultFlen, _ := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType()) + if flen != defaultFlen { + return flen + } + case mysql.TypeLong: defaultFlen, _ := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType()) if flen != defaultFlen { return flen diff --git a/tests/integration_tests/debezium/sql/debezium/default_value.sql b/tests/integration_tests/debezium/sql/debezium/default_value.sql index 5a916e44157..09c10380ed9 100644 --- a/tests/integration_tests/debezium/sql/debezium/default_value.sql +++ b/tests/integration_tests/debezium/sql/debezium/default_value.sql @@ -1,4 +1,5 @@ CREATE TABLE UNSIGNED_TINYINT_TABLE ( + id int PRIMARY KEY, A TINYINT UNSIGNED NULL DEFAULT 0, B TINYINT UNSIGNED NULL DEFAULT '10', C TINYINT UNSIGNED NULL, @@ -7,9 +8,10 @@ CREATE TABLE UNSIGNED_TINYINT_TABLE ( F TINYINT UNSIGNED NOT NULL DEFAULT '0', G TINYINT UNSIGNED NULL DEFAULT '100' ); -INSERT INTO UNSIGNED_TINYINT_TABLE VALUES (DEFAULT, DEFAULT, 0, 1, DEFAULT, DEFAULT, NULL); +INSERT INTO UNSIGNED_TINYINT_TABLE VALUES (1, DEFAULT, DEFAULT, 0, 1, DEFAULT, DEFAULT, NULL); CREATE TABLE UNSIGNED_SMALLINT_TABLE ( + id int PRIMARY KEY, A SMALLINT UNSIGNED NULL DEFAULT 0, B SMALLINT UNSIGNED NULL DEFAULT '10', C SMALLINT UNSIGNED NULL, @@ -18,9 +20,10 @@ CREATE TABLE UNSIGNED_SMALLINT_TABLE ( F SMALLINT UNSIGNED NOT NULL DEFAULT '0', G SMALLINT UNSIGNED NULL DEFAULT '100' ); -INSERT INTO UNSIGNED_SMALLINT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL); +INSERT INTO UNSIGNED_SMALLINT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL); CREATE TABLE UNSIGNED_MEDIUMINT_TABLE ( + id int PRIMARY KEY, A MEDIUMINT UNSIGNED NULL DEFAULT 0, B MEDIUMINT UNSIGNED NULL DEFAULT '10', C MEDIUMINT UNSIGNED NULL, @@ -29,9 +32,10 @@ CREATE TABLE UNSIGNED_MEDIUMINT_TABLE ( F MEDIUMINT UNSIGNED NOT NULL DEFAULT '0', G MEDIUMINT UNSIGNED NULL DEFAULT '100' ); -INSERT INTO UNSIGNED_MEDIUMINT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL); +INSERT INTO UNSIGNED_MEDIUMINT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL); CREATE TABLE UNSIGNED_INT_TABLE ( + id int PRIMARY KEY, A INT UNSIGNED NULL DEFAULT 0, B INT UNSIGNED NULL DEFAULT '10', C INT UNSIGNED NULL, @@ -40,9 +44,10 @@ CREATE TABLE UNSIGNED_INT_TABLE ( F INT UNSIGNED NOT NULL DEFAULT '0', G INT UNSIGNED NULL DEFAULT '100' ); -INSERT INTO UNSIGNED_INT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL); +INSERT INTO UNSIGNED_INT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL); CREATE TABLE UNSIGNED_BIGINT_TABLE ( + id int PRIMARY KEY, A BIGINT UNSIGNED NULL DEFAULT 0, B BIGINT UNSIGNED NULL DEFAULT '10', C BIGINT UNSIGNED NULL, @@ -51,9 +56,10 @@ CREATE TABLE UNSIGNED_BIGINT_TABLE ( F BIGINT UNSIGNED NOT NULL DEFAULT '0', G BIGINT UNSIGNED NULL DEFAULT '100' ); -INSERT INTO UNSIGNED_BIGINT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL); +INSERT INTO UNSIGNED_BIGINT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL); CREATE TABLE STRING_TABLE ( + id int PRIMARY KEY, A CHAR(1) NULL DEFAULT 'A', B CHAR(1) NULL DEFAULT 'b', C VARCHAR(10) NULL DEFAULT 'CC', @@ -65,9 +71,10 @@ CREATE TABLE STRING_TABLE ( I VARCHAR(10) NULL DEFAULT '100' ); INSERT INTO STRING_TABLE -VALUES (DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL); +VALUES (1, DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL); CREATE TABLE BIT_TABLE ( + id int PRIMARY KEY, A BIT(1) NULL DEFAULT NULL, B BIT(1) DEFAULT 0, C BIT(1) DEFAULT 1, @@ -81,9 +88,10 @@ CREATE TABLE BIT_TABLE ( K BIT(25) DEFAULT b'10110000100001111' ); INSERT INTO BIT_TABLE -VALUES (false ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, DEFAULT ,NULL ,DEFAULT, NULL); +VALUES (1, false ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, DEFAULT ,NULL ,DEFAULT, NULL); CREATE TABLE BOOLEAN_TABLE ( + id int PRIMARY KEY, A BOOL NULL DEFAULT 0, B BOOLEAN NOT NULL DEFAULT '1', C BOOLEAN NOT NULL DEFAULT '1', @@ -92,9 +100,10 @@ CREATE TABLE BOOLEAN_TABLE ( F BOOLEAN DEFAULT TRUE ); INSERT INTO BOOLEAN_TABLE -VALUES (TRUE ,TRUE ,TRUE ,DEFAULT ,TRUE, NULL); +VALUES (1, TRUE ,TRUE ,TRUE ,DEFAULT ,TRUE, NULL); CREATE TABLE NUMBER_TABLE ( + id int PRIMARY KEY, A TINYINT NULL DEFAULT 10, B SMALLINT NOT NULL DEFAULT '5', C INTEGER NOT NULL DEFAULT 0, @@ -105,34 +114,38 @@ CREATE TABLE NUMBER_TABLE ( H INT(1) NOT NULL DEFAULT TRUE ); INSERT INTO NUMBER_TABLE -VALUES (DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL, DEFAULT, DEFAULT); +VALUES (1, DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL, DEFAULT, DEFAULT); CREATE TABLE FlOAT_DOUBLE_TABLE ( + id int PRIMARY KEY, F FLOAT NULL DEFAULT 0, G DOUBLE NOT NULL DEFAULT 1.0, H DOUBLE NULL DEFAULT 3.0 ); INSERT INTO FlOAT_DOUBLE_TABLE -VALUES (DEFAULT, DEFAULT, NULL); +VALUES (1, DEFAULT, DEFAULT, NULL); CREATE TABLE REAL_TABLE ( + id int PRIMARY KEY, A REAL NOT NULL DEFAULT 1, B REAL NULL DEFAULT NULL, C REAL NULL DEFAULT 3 ); INSERT INTO REAL_TABLE -VALUES (DEFAULT ,DEFAULT, NULL); +VALUES (1, DEFAULT ,DEFAULT, NULL); CREATE TABLE NUMERIC_DECIMAL_TABLE ( + id int PRIMARY KEY, A NUMERIC(3, 2) NOT NULL DEFAULT 1.23, B DECIMAL(4, 3) NOT NULL DEFAULT 2.321, C NUMERIC(7, 5) NULL DEFAULT '12.678', D NUMERIC(7, 5) NULL DEFAULT '15.28' ); INSERT INTO NUMERIC_DECIMAL_TABLE -VALUES (1.33 ,2.111 , 3.444, NULL); +VALUES (1, 1.33 ,2.111 , 3.444, NULL); CREATE TABLE DATE_TIME_TABLE ( + id int PRIMARY KEY, A DATE NOT NULL DEFAULT '1976-08-23', B TIMESTAMP DEFAULT '1970-01-01 00:00:01', C DATETIME DEFAULT '2018-01-03 00:00:10', @@ -148,7 +161,7 @@ CREATE TABLE DATE_TIME_TABLE ( M TIME(6) DEFAULT '123:00:00.123456' ); INSERT INTO DATE_TIME_TABLE -VALUES (DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, NULL, DEFAULT, DEFAULT); +VALUES (1, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, NULL, DEFAULT, DEFAULT); CREATE TABLE DBZ_771_CUSTOMERS ( id INTEGER NOT NULL PRIMARY KEY, diff --git a/tests/integration_tests/debezium/src/test_cases.go b/tests/integration_tests/debezium/src/test_cases.go index 7580942b069..de0f0f9f915 100644 --- a/tests/integration_tests/debezium/src/test_cases.go +++ b/tests/integration_tests/debezium/src/test_cases.go @@ -35,6 +35,8 @@ import ( "go.uber.org/zap" ) +var timeOut = time.Second * 30 + var ( nFailed = 0 nPassed = 0 @@ -89,7 +91,6 @@ func runAllTestCases(dir string) bool { for _, path := range files { logger.Info("Run", zap.String("case", path)) - runTestCase(path) } if nFailed > 0 { @@ -116,6 +117,20 @@ func resetDB(db *DBHelper) { func runTestCase(testCasePath string) bool { resetDB(dbMySQL) resetDB(dbTiDB) + // consume reset DB events + for i := 0; i < 2; i++ { + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + fetchNextCDCRecord(readerDebezium, KindMySQL, timeOut) + wg.Done() + }() + go func() { + fetchNextCDCRecord(readerTiCDC, KindTiDB, timeOut) + wg.Done() + }() + wg.Wait() + } statementKindsToWaitCDCRecord := map[string]bool{ "Delete": true, @@ -146,7 +161,6 @@ func runTestCase(testCasePath string) bool { hasError = true } } - return hasError } @@ -251,35 +265,8 @@ func fetchNextCDCRecord(reader *kafka.Reader, kind Kind, timeout time.Duration) } } -func replaceString(s any, key any, val any) string { - return strings.Replace(s.(string), key.(string), val.(string), 1) -} - -func fetchAllCDCRecords(reader *kafka.Reader, kind Kind) ([]map[string]any, []map[string]any) { - var records []map[string]any - var keyMaps []map[string]any - waitTimeout := 30 * time.Second - for { - keyMap, obj, isRow, err := fetchNextCDCRecord(reader, kind, waitTimeout) - if err != nil { - logger.Error( - "Received error when fetching CDC record", - zap.Error(err), - zap.String("kind", string(kind))) - break - } - if obj != nil { - records = append(records, obj) - keyMaps = append(keyMaps, keyMap) - } - if obj == nil || isRow { - // Row record must be one by one - break - } - waitTimeout = time.Millisecond * 1000 - } - - return keyMaps, records +func replaceString(s any, old any, new any) string { + return strings.Replace(s.(string), old.(string), new.(string), 1) } var ignoredRecordPaths = map[string]bool{ @@ -321,8 +308,18 @@ func runSingleQuery(query string, waitCDCRows bool) bool { }() wg.Wait() } - if !waitCDCRows { + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + fetchNextCDCRecord(readerDebezium, KindMySQL, timeOut) + wg.Done() + }() + go func() { + fetchNextCDCRecord(readerTiCDC, KindTiDB, timeOut) + wg.Done() + }() + wg.Wait() return true } @@ -336,51 +333,23 @@ func runSingleQuery(query string, waitCDCRows bool) bool { testCasePassed = false } - var keyMapsDebezium []map[string]any - var objsDebezium []map[string]any - var keyMapsTiCDC []map[string]any - var objsTiCDC []map[string]any + var keyMapsDebezium map[string]any + var objsDebezium map[string]any + var keyMapsTiCDC map[string]any + var objsTiCDC map[string]any { wg := &sync.WaitGroup{} wg.Add(2) go func() { - keyMapsDebezium, objsDebezium = fetchAllCDCRecords(readerDebezium, KindMySQL) + keyMapsDebezium, objsDebezium, _, _ = fetchNextCDCRecord(readerDebezium, KindMySQL, timeOut) wg.Done() }() go func() { - keyMapsTiCDC, objsTiCDC = fetchAllCDCRecords(readerTiCDC, KindTiDB) + keyMapsTiCDC, objsTiCDC, _, _ = fetchNextCDCRecord(readerTiCDC, KindTiDB, timeOut) wg.Done() }() wg.Wait() } - - diff(keyMapsDebezium, keyMapsTiCDC, onError, msgKey) - diff(objsDebezium, objsTiCDC, onError, msgValue) - - return testCasePassed -} - -func diff(recordsDebezium, recordsTiCDC []map[string]any, onError func(error), msgType string) { - if len(recordsDebezium) != len(recordsTiCDC) { - onError(fmt.Errorf( - "Mismatch CDC %s: Got %d record from Debezium and %d record from TiCDC", - msgType, - len(recordsDebezium), - len(recordsTiCDC))) - headingColor.Print("\nDebezium output:\n\n") - for _, record := range recordsDebezium { - printRecord(record) - } - headingColor.Print("\nTiCDC output:\n\n") - for _, record := range recordsTiCDC { - printRecord(record) - } - return - } - if len(recordsDebezium) == 0 { - onError(fmt.Errorf( - "Mismatch CDC %s: Got 0 record from Debezium and TiCDC", msgType)) - } cmpOption := cmp.FilterPath( func(p cmp.Path) bool { path := p.GoString() @@ -390,15 +359,17 @@ func diff(recordsDebezium, recordsTiCDC []map[string]any, onError func(error), m cmp.Ignore(), ) - for i := 0; i < len(recordsDebezium); i++ { - recordDebezium := recordsDebezium[i] - recordTiCDC := recordsTiCDC[i] - if diff := cmp.Diff(recordDebezium, recordTiCDC, cmpOption); diff != "" { - onError(fmt.Errorf("Found mismatch CDC record (output record #%d)", i+1)) - headingColor.Print("\nCDC Result Diff (-debezium +ticdc):\n\n") - quick.Highlight(os.Stdout, diff, "diff", "terminal16m", "murphy") - fmt.Println() - continue - } + if diff := cmp.Diff(keyMapsDebezium, keyMapsTiCDC, cmpOption); diff != "" { + onError(fmt.Errorf("Found mismatch CDC record (msg type %s)", msgKey)) + headingColor.Print("\nCDC Result Diff (-debezium +ticdc):\n\n") + quick.Highlight(os.Stdout, diff, "diff", "terminal16m", "murphy") + fmt.Println() } + if diff := cmp.Diff(objsDebezium, objsTiCDC, cmpOption); diff != "" { + onError(fmt.Errorf("Found mismatch CDC record (msg type %s)", msgValue)) + headingColor.Print("\nCDC Result Diff (-debezium +ticdc):\n\n") + quick.Highlight(os.Stdout, diff, "diff", "terminal16m", "murphy") + fmt.Println() + } + return testCasePassed }