Skip to content

Commit

Permalink
[Improve][transform-v2] Support dynamic types for array function (#8139)
Browse files Browse the repository at this point in the history
  • Loading branch information
CosmosNi authored Dec 26, 2024
1 parent 9f02943 commit eae369b
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 34 deletions.
11 changes: 9 additions & 2 deletions docs/en/transform-v2/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,15 @@ Generate an array.

Example:

select ARRAY('test1','test2','test3') as arrays

SELECT Array('c_1','c_2') as string_array,
Array(1.23,2.34) as double_array,
Array(1,2) as int_array,
Array(2147483648,2147483649) as long_array,
Array(1.23,2147483648) as double_array_1,
Array(1.23,2147483648,'c_1') as string_array_1
FROM fake

notes: Currently only string, double, long, int types are supported

### LATERAL VIEW
#### EXPLODE
Expand Down
10 changes: 9 additions & 1 deletion docs/zh/transform-v2/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,15 @@ select UUID() as seatunnel_uuid

示例:

select ARRAY('test1','test2','test3') as arrays
SELECT Array('c_1','c_2') as string_array,
Array(1.23,2.34) as double_array,
Array(1,2) as int_array,
Array(2147483648,2147483649) as long_array,
Array(1.23,2147483648) as double_array_1,
Array(1.23,2147483648,'c_1') as string_array_1
FROM fake

注意:目前仅支持string、double、long、int几种类型

### LATERAL VIEW
#### EXPLODE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ sink{
},
{
field_name = num
field_type = string
field_type = int
field_value = [{equals_to = 1}]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ sink{
},
{
field_name = num
field_type = "string"
field_type = "int"
field_value = [{equals_to = 1}]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
###### This config file is a demonstration of streaming processing in seatunnel config
######


env {
job.mode = "BATCH"
parallelism = 1
Expand All @@ -30,6 +31,7 @@ source {
fields {
pk_id = string
name = string
id = int
}
primaryKey {
name = "pk_id"
Expand All @@ -39,22 +41,33 @@ source {
rows = [
{
kind = INSERT
fields = ["id001", "zhangsan,zhangsan"]
fields = ["id001", "zhangsan,zhangsan",123]
}
]
}
}

transform {
Sql {
plugin_input = "fake"
plugin_output = "fake1"
query = "SELECT *,Array('c_1','c_2') as c_array FROM dual "
plugin_output = "fake"
query = """SELECT
*,
Array(pk_id,id) as field_array_1,
Array(pk_id,'c_1') as field_array_2,
Array(id,123) as field_array_3,
Array('c_1','c_2') as string_array,
Array(1.23,2.34) as double_array,
Array(1,2) as int_array,
Array(2147483648,2147483649) as long_array,
Array(1.23,2147483648) as double_array_1,
Array(1.23,2147483648,'c_1') as string_array_1
FROM fake """
}
}

sink{
assert {
plugin_output = "fake"
rules =
{
row_rules = [
Expand All @@ -79,12 +92,56 @@ sink{
field_value = [{equals_to = "zhangsan,zhangsan"}]
},
{
field_name = c_array
field_type = array<string>
field_name = id
field_type = int
field_value = [{equals_to = 123}]
},
{
field_name = field_array_1
field_type = array<STRING>
field_value = [{equals_to = ["id001" ,"123"]}]
},
{
field_name = field_array_2
field_type = array<STRING>
field_value = [{equals_to = ["id001" ,"c_1"]}]
},
{
field_name = field_array_3
field_type = array<INT>
field_value = [{equals_to = [123 ,123]}]
},
{
field_name = string_array
field_type = array<STRING>
field_value = [{equals_to = ["c_1" ,"c_2"]}]
}
},
{
field_name = double_array
field_type = array<DOUBLE>
field_value = [{equals_to = [1.23,2.34]}]
},
{
field_name = int_array
field_type = array<INT>
field_value = [{equals_to = [1,2]}]
},
{
field_name = long_array
field_type = array<BIGINT>
field_value = [{equals_to = [2147483648,2147483649]}]
},
{
field_name = double_array_1
field_type = array<DOUBLE>
field_value = [{equals_to = [1.23,2147483648]}]
},
{
field_name = string_array_1
field_type = array<STRING>
field_value = [{equals_to = ["1.23","2147483648","c_1"]}]
}
]
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class ZetaSQLEngine implements SQLEngine {
private String inputTableName;
@Nullable private String catalogTableName;
private SeaTunnelRowType inputRowType;
private SeaTunnelRowType outRowType;

private String sql;
private PlainSelect selectBody;
Expand Down Expand Up @@ -216,10 +217,13 @@ public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
}
List<LateralView> lateralViews = selectBody.getLateralViews();
if (CollectionUtils.isEmpty(lateralViews)) {
return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
outRowType = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
} else {
outRowType =
zetaSQLFunction.lateralViewMapping(
fieldNames, seaTunnelDataTypes, lateralViews, inputColumnsMapping);
}
return zetaSQLFunction.lateralViewMapping(
fieldNames, seaTunnelDataTypes, lateralViews, inputColumnsMapping);
return outRowType;
}

private static String cleanEscape(String columnName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -29,6 +28,7 @@
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.zeta.functions.ArrayFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.DateTimeFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.NumericFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.StringFunction;
Expand Down Expand Up @@ -192,6 +192,7 @@ public class ZetaSQLFunction {
public static final String UUID = "UUID";

private final SeaTunnelRowType inputRowType;

private final ZetaSQLType zetaSQLType;
private final ZetaSQLFilter zetaSQLFilter;

Expand Down Expand Up @@ -552,7 +553,7 @@ public Object executeFunctionExpr(String functionName, List<Object> args) {
case NULLIF:
return SystemFunction.nullif(args);
case ARRAY:
return SystemFunction.array(args);
return ArrayFunction.array(args);
case UUID:
return randomUUID().toString();
default:
Expand Down Expand Up @@ -743,8 +744,7 @@ private List<SeaTunnelRow> explode(
next,
aliasFieldIndex,
row,
expression,
true);
expression);
}
seaTunnelRows = next;
} else if (expression instanceof Function) {
Expand All @@ -758,8 +758,7 @@ private List<SeaTunnelRow> explode(
next,
aliasFieldIndex,
row,
expression,
false);
expression);
}
seaTunnelRows = next;
}
Expand All @@ -774,8 +773,7 @@ private void transformExplodeValue(
List<SeaTunnelRow> next,
int aliasFieldIndex,
SeaTunnelRow row,
Expression expression,
boolean keepValueType) {
Expression expression) {
if (splitFieldValue == null) {
if (isUsingOuter) {
next.add(
Expand All @@ -798,13 +796,9 @@ private void transformExplodeValue(
if (!isUsingOuter && fieldValue == null) {
continue;
}
Object value =
fieldValue == null
? null
: (keepValueType ? fieldValue : String.valueOf(fieldValue));
next.add(
copySeaTunnelRowWithNewValue(
outRowType.getTotalFields(), row, aliasFieldIndex, value));
outRowType.getTotalFields(), row, aliasFieldIndex, fieldValue));
}
} else {
throw new SeaTunnelRuntimeException(
Expand Down Expand Up @@ -865,14 +859,13 @@ public SeaTunnelRowType lateralViewMapping(
seaTunnelDataTypes[columnIndex] = seaTunnelDataType;
}
} else {
// default string type
SeaTunnelDataType seaTunnelDataType =
PhysicalColumn.of(alias, BasicType.STRING_TYPE, 10L, true, "", "")
.getDataType();

ArrayType arrayType = (ArrayType) zetaSQLType.getExpressionType(expression);

if (aliasIndex == -1) {
fieldNames = ArrayUtils.add(fieldNames, alias);
seaTunnelDataTypes =
ArrayUtils.add(seaTunnelDataTypes, seaTunnelDataType);
ArrayUtils.add(seaTunnelDataTypes, arrayType.getElementType());
inputColumnsMapping.add(alias);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.zeta.functions.ArrayFunction;

import org.apache.commons.collections4.CollectionUtils;

Expand Down Expand Up @@ -448,6 +449,7 @@ private SeaTunnelDataType<?> getFunctionType(Function function) {
case ZetaSQLFunction.TRUNCATE:
return BasicType.DOUBLE_TYPE;
case ZetaSQLFunction.ARRAY:
return ArrayFunction.castArrayTypeMapping(function, inputRowType);
case ZetaSQLFunction.SPLIT:
return ArrayType.STRING_ARRAY_TYPE;
case ZetaSQLFunction.NOW:
Expand Down
Loading

0 comments on commit eae369b

Please sign in to comment.