From b9acb573b75055931ab5dc911aa05a04ea830c4f Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 30 Jul 2024 16:12:57 +0800 Subject: [PATCH] [Feature][Transforms] Support cast to bytes function of sql (#7284) --- docs/en/transform-v2/sql-functions.md | 2 +- docs/zh/transform-v2/sql-functions.md | 2 +- .../src/test/resources/sql_transform/func_system.conf | 11 ++++++++++- .../seatunnel/transform/sql/zeta/ZetaSQLType.java | 4 ++++ .../transform/sql/zeta/functions/SystemFunction.java | 3 +++ 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/docs/en/transform-v2/sql-functions.md b/docs/en/transform-v2/sql-functions.md index e1c541ef1c9..3438a24de9c 100644 --- a/docs/en/transform-v2/sql-functions.md +++ b/docs/en/transform-v2/sql-functions.md @@ -889,7 +889,7 @@ CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6') Converts a value to another data type. -Supported data types: STRING | VARCHAR, INT | INTEGER, LONG | BIGINT, BYTE, FLOAT, DOUBLE, DECIMAL(p,s), TIMESTAMP, DATE, TIME +Supported data types: STRING | VARCHAR, INT | INTEGER, LONG | BIGINT, BYTE, FLOAT, DOUBLE, DECIMAL(p,s), TIMESTAMP, DATE, TIME, BYTES Example: diff --git a/docs/zh/transform-v2/sql-functions.md b/docs/zh/transform-v2/sql-functions.md index cd90b948674..57c440a39b3 100644 --- a/docs/zh/transform-v2/sql-functions.md +++ b/docs/zh/transform-v2/sql-functions.md @@ -880,7 +880,7 @@ CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6') 将一个值转换为另一个数据类型。 -支持的数据类型有:STRING | VARCHAR,INT | INTEGER,LONG | BIGINT,BYTE,FLOAT,DOUBLE,DECIMAL(p,s),TIMESTAMP,DATE,TIME +支持的数据类型有:STRING | VARCHAR,INT | INTEGER,LONG | BIGINT,BYTE,FLOAT,DOUBLE,DECIMAL(p,s),TIMESTAMP,DATE,TIME,BYTES 示例: diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf index 558d0cceb38..14f41665e34 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf @@ -49,7 +49,7 @@ transform { Sql { source_table_name = "fake" result_table_name = "fake1" - query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6 from fake" + query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, cast(name as bytes) as c7 from fake" } } @@ -155,6 +155,15 @@ sink { field_value = [ {equals_to = "23:51:09"} ] + }, + { + field_name = "c7" + field_type = "bytes" + field_value = [ + { + rule_type = NOT_NULL + } + ] } ] } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java index 934cd883080..45b269bae67 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.table.type.SqlType; @@ -69,6 +70,7 @@ public class ZetaSQLType { public static final String BIGINT = "BIGINT"; public static final String LONG = "LONG"; public static final String BYTE = "BYTE"; + public static final String BYTES = "BYTES"; public static final String DOUBLE = "DOUBLE"; public static final String FLOAT = "FLOAT"; public static final String TIMESTAMP = "TIMESTAMP"; @@ -311,6 +313,8 @@ private SeaTunnelDataType getCastType(CastExpression castExpression) { return BasicType.LONG_TYPE; case BYTE: return BasicType.BYTE_TYPE; + case BYTES: + return PrimitiveByteArrayType.INSTANCE; case DOUBLE: return BasicType.DOUBLE_TYPE; case FLOAT: diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java index 0039f0cade9..0b616b0fbe8 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java @@ -24,6 +24,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -98,6 +99,8 @@ public static Object castAs(List args) { return Long.parseLong(v1.toString()); case "BYTE": return Byte.parseByte(v1.toString()); + case "BYTES": + return v1.toString().getBytes(StandardCharsets.UTF_8); case "DOUBLE": return Double.parseDouble(v1.toString()); case "FLOAT":