Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 committed Oct 22, 2024
1 parent 12a4955 commit 5f52734
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 117 deletions.
60 changes: 38 additions & 22 deletions pkg/sink/codec/debezium/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,11 @@ func (c *dbzCodec) writeDebeziumFieldValue(
ft *types.FieldType,
) error {
if col.Value == nil {
writer.WriteNullField(col.GetName())
return nil
if col.GetDefaultValue() == nil {
writer.WriteNullField(col.GetName())
return nil
}
col.Value = col.GetDefaultValue()
}
switch col.GetType() {
case mysql.TypeBit:
Expand Down Expand Up @@ -492,7 +495,6 @@ func (c *dbzCodec) writeDebeziumFieldValue(
writer.WriteStringField(col.GetName(), "")
return nil
}

writer.WriteStringField(col.GetName(), enumVar.Name)
return nil

Expand All @@ -511,7 +513,6 @@ func (c *dbzCodec) writeDebeziumFieldValue(
writer.WriteStringField(col.GetName(), "")
return nil
}

writer.WriteStringField(col.GetName(), setVar.Name)
return nil

Expand All @@ -530,7 +531,6 @@ func (c *dbzCodec) writeDebeziumFieldValue(
cerror.ErrDebeziumEncodeFailed,
err)
}

writer.WriteFloat64Field(col.GetName(), floatV)
return nil

Expand All @@ -555,7 +555,6 @@ func (c *dbzCodec) writeDebeziumFieldValue(
return nil
}
}

writer.WriteInt64Field(col.GetName(), t.Unix()/60/60/24)
return nil

Expand All @@ -573,25 +572,23 @@ func (c *dbzCodec) writeDebeziumFieldValue(
col.GetName())
}

t, err := time.Parse("2006-01-02 15:04:05.999999", v)
t, err := time.ParseInLocation("2006-01-02 15:04:05.999999", v, c.config.TimeZone)
if err != nil {
// For example, time may be 1000-00-00
if mysql.HasNotNullFlag(ft.GetFlag()) {
writer.WriteInt64Field(col.GetName(), 0)
return nil
} else {
writer.WriteNullField(col.GetName())
return nil
}
writer.WriteNullField(col.GetName())
return nil
}

if ft.GetDecimal() <= 3 {
writer.WriteInt64Field(col.GetName(), t.UnixMilli())
return nil
} else {
writer.WriteInt64Field(col.GetName(), t.UnixMicro())
return nil
}
writer.WriteInt64Field(col.GetName(), t.UnixMicro())
return nil

case mysql.TypeTimestamp:
// Debezium behavior from doc:
Expand All @@ -616,10 +613,9 @@ func (c *dbzCodec) writeDebeziumFieldValue(
// For example, time may be invalid like 1000-00-00
if mysql.HasNotNullFlag(ft.GetFlag()) {
t = time.Unix(0, 0)
} else {
writer.WriteNullField(col.GetName())
return nil
}
writer.WriteNullField(col.GetName())
return nil
}

str := t.UTC().Format("2006-01-02T15:04:05")
Expand All @@ -629,7 +625,6 @@ func (c *dbzCodec) writeDebeziumFieldValue(
str = str + tmp[:1+fsp]
}
str += "Z"

writer.WriteStringField(col.GetName(), str)
return nil

Expand All @@ -645,20 +640,29 @@ func (c *dbzCodec) writeDebeziumFieldValue(
col.GetName())
}

d, _, _, err := types.StrToDuration(types.DefaultStmtNoWarningContext, v, ft.GetDecimal())
t, err := time.ParseInLocation("15:04:05.999999", v, c.config.TimeZone)
if err != nil {
// For example, time may be invalid like 1000-00-00
if mysql.HasNotNullFlag(ft.GetFlag()) {
t = time.Unix(0, 0)
} else {
writer.WriteNullField(col.GetName())
return nil
}
}
str := t.AddDate(2006, 1, 2).UTC().Format("15:04:05.999999")
d, _, _, err := types.StrToDuration(types.DefaultStmtNoWarningContext, str, ft.GetDecimal())
if err != nil {
return cerror.WrapError(
cerror.ErrDebeziumEncodeFailed,
err)
}

writer.WriteInt64Field(col.GetName(), d.Microseconds())
return nil

case mysql.TypeLonglong:
case mysql.TypeLonglong, mysql.TypeLong:
// Note: Although Debezium's doc claims to use INT32 for INT, but it
// actually uses INT64. Debezium also uses INT32 for SMALLINT.
// So we only handle with TypeLonglong here.
if col.GetFlag().IsUnsigned() {
// Handle with BIGINT UNSIGNED.
// Debezium always produce INT64 instead of UINT64 for BIGINT.
Expand All @@ -669,11 +673,23 @@ func (c *dbzCodec) writeDebeziumFieldValue(
col.Value,
col.GetName())
}

writer.WriteInt64Field(col.GetName(), int64(v))
return nil
}

case mysql.TypeInt24, mysql.TypeShort, mysql.TypeTiny:
if col.GetFlag().IsUnsigned() {
v, ok := col.Value.(uint32)
if !ok {
return cerror.ErrDebeziumEncodeFailed.GenWithStack(
"unexpected column value type %T for unsigned bigint column %s",
col.Value,
col.GetName())
}
writer.WriteIntField(col.GetName(), int(v))
return nil
}

case mysql.TypeTiDBVectorFloat32:
v, ok := col.Value.(types.VectorFloat32)
if !ok {
Expand Down
18 changes: 12 additions & 6 deletions pkg/sink/codec/debezium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func getCharset(ft types.FieldType) string {
func getLen(ft types.FieldType) int {
decimal := ft.GetDecimal()
flen := ft.GetFlen()
if mysql.HasUnsignedFlag(ft.GetFlag()) {
return -1
}
switch ft.GetType() {
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDuration:
return decimal
Expand All @@ -140,12 +143,15 @@ func getLen(ft types.FieldType) int {
return flen
case mysql.TypeEnum:
return 1
// case mysql.TypeLonglong, mysql.TypeInt24:
// if mysql.HasNotNullFlag(ft.GetFlag()) {
// return -1
// }
// return flen
case mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24:
case mysql.TypeLonglong, mysql.TypeInt24:
if mysql.HasNotNullFlag(ft.GetFlag()) {
return -1
}
defaultFlen, _ := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType())
if flen != defaultFlen {
return flen
}
case mysql.TypeLong:
defaultFlen, _ := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType())
if flen != defaultFlen {
return flen
Expand Down
39 changes: 26 additions & 13 deletions tests/integration_tests/debezium/sql/debezium/default_value.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
CREATE TABLE UNSIGNED_TINYINT_TABLE (
id int PRIMARY KEY,
A TINYINT UNSIGNED NULL DEFAULT 0,
B TINYINT UNSIGNED NULL DEFAULT '10',
C TINYINT UNSIGNED NULL,
Expand All @@ -7,9 +8,10 @@ CREATE TABLE UNSIGNED_TINYINT_TABLE (
F TINYINT UNSIGNED NOT NULL DEFAULT '0',
G TINYINT UNSIGNED NULL DEFAULT '100'
);
INSERT INTO UNSIGNED_TINYINT_TABLE VALUES (DEFAULT, DEFAULT, 0, 1, DEFAULT, DEFAULT, NULL);
INSERT INTO UNSIGNED_TINYINT_TABLE VALUES (1, DEFAULT, DEFAULT, 0, 1, DEFAULT, DEFAULT, NULL);

CREATE TABLE UNSIGNED_SMALLINT_TABLE (
id int PRIMARY KEY,
A SMALLINT UNSIGNED NULL DEFAULT 0,
B SMALLINT UNSIGNED NULL DEFAULT '10',
C SMALLINT UNSIGNED NULL,
Expand All @@ -18,9 +20,10 @@ CREATE TABLE UNSIGNED_SMALLINT_TABLE (
F SMALLINT UNSIGNED NOT NULL DEFAULT '0',
G SMALLINT UNSIGNED NULL DEFAULT '100'
);
INSERT INTO UNSIGNED_SMALLINT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL);
INSERT INTO UNSIGNED_SMALLINT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL);

CREATE TABLE UNSIGNED_MEDIUMINT_TABLE (
id int PRIMARY KEY,
A MEDIUMINT UNSIGNED NULL DEFAULT 0,
B MEDIUMINT UNSIGNED NULL DEFAULT '10',
C MEDIUMINT UNSIGNED NULL,
Expand All @@ -29,9 +32,10 @@ CREATE TABLE UNSIGNED_MEDIUMINT_TABLE (
F MEDIUMINT UNSIGNED NOT NULL DEFAULT '0',
G MEDIUMINT UNSIGNED NULL DEFAULT '100'
);
INSERT INTO UNSIGNED_MEDIUMINT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL);
INSERT INTO UNSIGNED_MEDIUMINT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL);

CREATE TABLE UNSIGNED_INT_TABLE (
id int PRIMARY KEY,
A INT UNSIGNED NULL DEFAULT 0,
B INT UNSIGNED NULL DEFAULT '10',
C INT UNSIGNED NULL,
Expand All @@ -40,9 +44,10 @@ CREATE TABLE UNSIGNED_INT_TABLE (
F INT UNSIGNED NOT NULL DEFAULT '0',
G INT UNSIGNED NULL DEFAULT '100'
);
INSERT INTO UNSIGNED_INT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL);
INSERT INTO UNSIGNED_INT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL);

CREATE TABLE UNSIGNED_BIGINT_TABLE (
id int PRIMARY KEY,
A BIGINT UNSIGNED NULL DEFAULT 0,
B BIGINT UNSIGNED NULL DEFAULT '10',
C BIGINT UNSIGNED NULL,
Expand All @@ -51,9 +56,10 @@ CREATE TABLE UNSIGNED_BIGINT_TABLE (
F BIGINT UNSIGNED NOT NULL DEFAULT '0',
G BIGINT UNSIGNED NULL DEFAULT '100'
);
INSERT INTO UNSIGNED_BIGINT_TABLE VALUES (1, 1, 1, 0, 1, 1, NULL);
INSERT INTO UNSIGNED_BIGINT_TABLE VALUES (1, 1, 1, 1, 0, 1, 1, NULL);

CREATE TABLE STRING_TABLE (
id int PRIMARY KEY,
A CHAR(1) NULL DEFAULT 'A',
B CHAR(1) NULL DEFAULT 'b',
C VARCHAR(10) NULL DEFAULT 'CC',
Expand All @@ -65,9 +71,10 @@ CREATE TABLE STRING_TABLE (
I VARCHAR(10) NULL DEFAULT '100'
);
INSERT INTO STRING_TABLE
VALUES (DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL);
VALUES (1, DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL);

CREATE TABLE BIT_TABLE (
id int PRIMARY KEY,
A BIT(1) NULL DEFAULT NULL,
B BIT(1) DEFAULT 0,
C BIT(1) DEFAULT 1,
Expand All @@ -81,9 +88,10 @@ CREATE TABLE BIT_TABLE (
K BIT(25) DEFAULT b'10110000100001111'
);
INSERT INTO BIT_TABLE
VALUES (false ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, DEFAULT ,NULL ,DEFAULT, NULL);
VALUES (1, false ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, DEFAULT ,NULL ,DEFAULT, NULL);

CREATE TABLE BOOLEAN_TABLE (
id int PRIMARY KEY,
A BOOL NULL DEFAULT 0,
B BOOLEAN NOT NULL DEFAULT '1',
C BOOLEAN NOT NULL DEFAULT '1',
Expand All @@ -92,9 +100,10 @@ CREATE TABLE BOOLEAN_TABLE (
F BOOLEAN DEFAULT TRUE
);
INSERT INTO BOOLEAN_TABLE
VALUES (TRUE ,TRUE ,TRUE ,DEFAULT ,TRUE, NULL);
VALUES (1, TRUE ,TRUE ,TRUE ,DEFAULT ,TRUE, NULL);

CREATE TABLE NUMBER_TABLE (
id int PRIMARY KEY,
A TINYINT NULL DEFAULT 10,
B SMALLINT NOT NULL DEFAULT '5',
C INTEGER NOT NULL DEFAULT 0,
Expand All @@ -105,34 +114,38 @@ CREATE TABLE NUMBER_TABLE (
H INT(1) NOT NULL DEFAULT TRUE
);
INSERT INTO NUMBER_TABLE
VALUES (DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL, DEFAULT, DEFAULT);
VALUES (1, DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT ,DEFAULT, NULL, DEFAULT, DEFAULT);

CREATE TABLE FlOAT_DOUBLE_TABLE (
id int PRIMARY KEY,
F FLOAT NULL DEFAULT 0,
G DOUBLE NOT NULL DEFAULT 1.0,
H DOUBLE NULL DEFAULT 3.0
);
INSERT INTO FlOAT_DOUBLE_TABLE
VALUES (DEFAULT, DEFAULT, NULL);
VALUES (1, DEFAULT, DEFAULT, NULL);

CREATE TABLE REAL_TABLE (
id int PRIMARY KEY,
A REAL NOT NULL DEFAULT 1,
B REAL NULL DEFAULT NULL,
C REAL NULL DEFAULT 3
);
INSERT INTO REAL_TABLE
VALUES (DEFAULT ,DEFAULT, NULL);
VALUES (1, DEFAULT ,DEFAULT, NULL);

CREATE TABLE NUMERIC_DECIMAL_TABLE (
id int PRIMARY KEY,
A NUMERIC(3, 2) NOT NULL DEFAULT 1.23,
B DECIMAL(4, 3) NOT NULL DEFAULT 2.321,
C NUMERIC(7, 5) NULL DEFAULT '12.678',
D NUMERIC(7, 5) NULL DEFAULT '15.28'
);
INSERT INTO NUMERIC_DECIMAL_TABLE
VALUES (1.33 ,2.111 , 3.444, NULL);
VALUES (1, 1.33 ,2.111 , 3.444, NULL);

CREATE TABLE DATE_TIME_TABLE (
id int PRIMARY KEY,
A DATE NOT NULL DEFAULT '1976-08-23',
B TIMESTAMP DEFAULT '1970-01-01 00:00:01',
C DATETIME DEFAULT '2018-01-03 00:00:10',
Expand All @@ -148,7 +161,7 @@ CREATE TABLE DATE_TIME_TABLE (
M TIME(6) DEFAULT '123:00:00.123456'
);
INSERT INTO DATE_TIME_TABLE
VALUES (DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, NULL, DEFAULT, DEFAULT);
VALUES (1, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, DEFAULT, NULL, DEFAULT, DEFAULT);

CREATE TABLE DBZ_771_CUSTOMERS (
id INTEGER NOT NULL PRIMARY KEY,
Expand Down
Loading

0 comments on commit 5f52734

Please sign in to comment.