From c86af2027d6af4f5c4a552a35fe9f43454af92fa Mon Sep 17 00:00:00 2001 From: vinlee19 <1401597760@qq.com> Date: Mon, 23 Sep 2024 17:38:37 +0800 Subject: [PATCH] Add the conversion from Flink TIME type to Doris. --- .../connectors/doris/sink/DorisRowConverter.java | 3 +++ .../doris/sink/DorisMetadataApplierITCase.java | 15 +++++++++++---- .../doris/sink/DorisRowConverterTest.java | 10 ++++++++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java index 45576032fe..4624c56047 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverter.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.io.Serializable; import java.time.LocalDate; +import java.time.LocalTime; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Arrays; @@ -116,6 +117,8 @@ static SerializationConverter createExternalConverter(DataType type, ZoneId pipe case TIMESTAMP_WITH_TIME_ZONE: final int zonedP = ((ZonedTimestampType) type).getPrecision(); return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp(); + case TIME_WITHOUT_TIME_ZONE: + return (index, val) -> LocalTime.ofNanoOfDay(val.getLong(index) * 1_000_000L); case ARRAY: return (index, val) -> convertArrayData(val.getArray(index), type); case MAP: diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 438cc37815..e04fe9730f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -234,10 +234,14 @@ public void testDorisDataTypes() throws Exception { .column(new PhysicalColumn("string", DataTypes.STRING(), "String")) .column(new PhysicalColumn("decimal", DataTypes.DECIMAL(17, 7), "Decimal")) .column(new PhysicalColumn("date", DataTypes.DATE(), "Date")) - // Doris sink doesn't support TIME type yet. - // .column(new PhysicalColumn("time", DataTypes.TIME(), "Time")) - // .column(new PhysicalColumn("time_3", DataTypes.TIME(3), "Time With - // Precision")) + // Doris sink doesn't support TIME type ,thus convert TIME to STRING + .column(new PhysicalColumn("time", DataTypes.TIME(), "Time")) + .column( + new PhysicalColumn( + "time_3", DataTypes.TIME(3), "Time With Precision")) + .column( + new PhysicalColumn( + "time_6", DataTypes.TIME(6), "Time With Precision")) .column(new PhysicalColumn("timestamp", DataTypes.TIMESTAMP(), "Timestamp")) .column( new PhysicalColumn( @@ -296,6 +300,9 @@ public void testDorisDataTypes() throws Exception { "string | TEXT | Yes | false | null", "decimal | DECIMAL(17, 7) | Yes | false | null", "date | DATE | Yes | false | null", + "time | TEXT | Yes | false | null", + "time_3 | TEXT | Yes | false | null", + "time_6 | TEXT | Yes | false | null", "timestamp | DATETIME(6) | Yes | false | null", "timestamp_3 | DATETIME(3) | Yes | false | null", "timestamptz | DATETIME(6) | Yes | false | null", diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverterTest.java index 5f47fe5092..a1e8973b17 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverterTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisRowConverterTest.java @@ -62,7 +62,10 @@ public void testExternalConvert() { Column.physicalColumn("f19", DataTypes.TIMESTAMP()), Column.physicalColumn("f20", DataTypes.TIMESTAMP_LTZ()), Column.physicalColumn("f21", DataTypes.TIMESTAMP_LTZ()), - Column.physicalColumn("f22", DataTypes.TIMESTAMP_LTZ())); + Column.physicalColumn("f22", DataTypes.TIMESTAMP_LTZ()), + Column.physicalColumn("f23", DataTypes.TIME(0)), + Column.physicalColumn("f24", DataTypes.TIME(3)), + Column.physicalColumn("f24", DataTypes.TIME(6))); List dataTypes = columns.stream().map(v -> v.getType()).collect(Collectors.toList()); @@ -104,6 +107,9 @@ public void testExternalConvert() { LocalZonedTimestampData.fromInstant(f20), LocalZonedTimestampData.fromInstant(f21), LocalZonedTimestampData.fromInstant(f22), + 3661000, + 3661123, + 3661123 }); List row = new ArrayList(); for (int i = 0; i < recordData.getArity(); i++) { @@ -115,7 +121,7 @@ public void testExternalConvert() { Assert.assertEquals( "[true, 1.2, 1.2345, 1, 32, 64, 128, 2021-01-01 08:00:00.000000, 2021-01-01, a, doris, 2021-01-01 " + "08:01:11.000000, 2021-01-01 08:01:11.123000, 2021-01-01 08:01:11.123456, 2021-01-01 " - + "16:01:11.000000, 2021-01-01 16:01:11.123000, 2021-01-01 16:01:11.123456]", + + "16:01:11.000000, 2021-01-01 16:01:11.123000, 2021-01-01 16:01:11.123456, 01:01:01, 01:01:01.123, 01:01:01.123]", row.toString()); } }