Skip to content

Commit

Permalink
[INLONG-10130][SDK] Transform SQL support string concat function (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
luchunliang authored and herywang committed May 9, 2024
1 parent fc98df6 commit cacbd4d
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.util.ArrayList;
import java.util.List;

/**
* ConcatFunction
*
*/
public class ConcatFunction implements ValueParser {

private List<ValueParser> nodeList;

/**
* Constructor
* @param expr
*/
public ConcatFunction(Function expr) {
if (expr.getParameters() == null) {
this.nodeList = new ArrayList<>();
} else {
List<Expression> params = expr.getParameters().getExpressions();
nodeList = new ArrayList<>(params.size());
for (Expression param : params) {
ValueParser node = OperatorTools.buildParser(param);
nodeList.add(node);
}
}
}

/**
* parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
public Object parse(SourceData sourceData, int rowIndex) {
StringBuilder builder = new StringBuilder();
for (ValueParser node : nodeList) {
builder.append(node.parse(sourceData, rowIndex));
}
return builder.toString();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Function;

/**
* NowFunction
*
*/
public class NowFunction implements ValueParser {

/**
* Constructor
* @param expr
*/
public NowFunction(Function expr) {
}

/**
* parse
* @param sourceData
* @param rowIndex
* @return
*/
@Override
public Object parse(SourceData sourceData, int rowIndex) {
return String.valueOf(System.currentTimeMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sdk.transform.process.operator;

import org.apache.inlong.sdk.transform.process.function.ConcatFunction;
import org.apache.inlong.sdk.transform.process.function.NowFunction;
import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
Expand Down Expand Up @@ -108,6 +110,15 @@ public static ValueParser buildParser(Expression expr) {
return new ColumnParser((Function) expr);
} else {
// TODO
Function func = (Function) expr;
switch (func.getName()) {
case "concat":
return new ConcatFunction(func);
case "now":
return new NowFunction(func);
default:
return new ColumnParser(func);
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,45 @@ public void testPb2CsvForAdd() {
e.printStackTrace();
}
}

@Test
public void testPb2CsvForConcat() {
try {
List<FieldInfo> fields = this.getTestFieldList();
String transformBase64 = this.getPbTestDescription();
SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs");
SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select $root.sid,$root.packageID,$child.msgTime,"
+ "concat($root.sid,$root.packageID,$child.msgTime,$child.msg) msg,$root.msgs.msgTime.msg from source";
TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql);
// case1
TransformProcessor processor = new TransformProcessor(config);
byte[] srcBytes = this.getPbTestData();
List<String> output = processor.transform(srcBytes, new HashMap<>());
Assert.assertTrue(output.size() == 2);
Assert.assertEquals(output.get(0), "sid|1|1713243918000|sid11713243918000msgValue4");
Assert.assertEquals(output.get(1), "sid|1|1713243918002|sid11713243918002msgValue42");
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testPb2CsvForNow() {
try {
List<FieldInfo> fields = this.getTestFieldList();
String transformBase64 = this.getPbTestDescription();
SourceInfo pbSource = new PbSourceInfo("UTF-8", transformBase64, "SdkDataRequest", "msgs");
SinkInfo csvSink = new CsvSinkInfo("UTF-8", "|", "\\", fields);
String transformSql = "select now() from source";
TransformConfig config = new TransformConfig(pbSource, csvSink, transformSql);
// case1
TransformProcessor processor = new TransformProcessor(config);
byte[] srcBytes = this.getPbTestData();
List<String> output = processor.transform(srcBytes, new HashMap<>());
Assert.assertTrue(output.size() == 2);
} catch (Exception e) {
e.printStackTrace();
}
}
}

0 comments on commit cacbd4d

Please sign in to comment.