From fc7f8442118fac11df0e9b5aa64138c2c79a672c Mon Sep 17 00:00:00 2001 From: emptyOVO Date: Wed, 2 Oct 2024 23:47:41 +0800 Subject: [PATCH] [INLONG-11242][SDK] Transform support MAP() function --- .../process/function/MapFunction.java | 73 +++++++++++++++ .../function/string/TestMapFunction.java | 89 +++++++++++++++++++ 2 files changed, 162 insertions(+) create mode 100644 inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFunction.java create mode 100644 inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFunction.java diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFunction.java new file mode 100644 index 00000000000..637b283c809 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFunction.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +/** + * MapFunction + * description: MAP(ANY1, ANY2, ANY3, ANY4, ...)--Returns a map created from a list of + * key-value pairs ((value1, value2), (value3, value4), …). + * for example: Map('he',7,'xxd')--return null + * Map('he',1,'xxd','cloud')--return {he=1, xxd=cloud} + * Map('xxd','cloud',map(1,2),map(3,'apple'))--return {xxd=cloud, {1=2}={3=apple}} + */ +@TransformFunction(names = {"map"}) +public class MapFunction implements ValueParser { + + private List parserList; + + public MapFunction(Function expr) { + if (expr.getParameters() == null) { + this.parserList = new ArrayList<>(); + } else { + List params = expr.getParameters().getExpressions(); + parserList = new ArrayList<>(params.size()); + for (Expression param : params) { + ValueParser node = OperatorTools.buildParser(param); + parserList.add(node); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + if (parserList.size() % 2 != 0) { + throw new IllegalArgumentException("Input values must be in key-value pairs."); + } + Map res = new LinkedHashMap<>(); + + for (int i = 0; i < parserList.size(); i += 2) { + Object key = parserList.get(i).parse(sourceData, rowIndex, context); + Object value = parserList.get(i + 1).parse(sourceData, rowIndex, context); + res.put(key, value); + } + return res; + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFunction.java new file mode 100644 index 00000000000..4d9336b09fc --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestMapFunction extends AbstractFunctionStringTestBase { + + @Test + public void testMapFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor processor = null; + List output = null; + + transformSql = "select map(string1,numeric1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: Map('he',7,'xxd') + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select map(string1,numeric1,string2,string3) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: Map('he',1,'xxd','cloud') + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={he=1, xxd=cloud}", output.get(0)); + + transformSql = "select map(numeric1,numeric2,string1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: Map(1,2,'cloud','xxd') + data = "1|2|3|xxd|cloud|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={xxd=cloud, 1=2}", output.get(0)); + + transformSql = "select map(numeric1,numeric2,map(string1,string2),map(string3,numeric3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: Map('xxd','cloud',map(1,2),map(3,'apple')) + data = "1|2|3|xxd|cloud|apple"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={xxd=cloud, {1=2}={3=apple}}", output.get(0)); + + } +}