Skip to content

Commit

Permalink
[INLONG-10764][SDK] Transform SQL support temporal functions(Includin…
Browse files Browse the repository at this point in the history
…g year, quarter, month, week, dayofyear and dayofmonth) (apache#10766)
  • Loading branch information
yfsn666 authored and emptyOVO committed Aug 15, 2024
1 parent ecee16c commit 3d7f08a
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.function;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;

import java.sql.Date;
import java.time.LocalDate;
import java.time.temporal.TemporalField;
import java.time.temporal.WeekFields;
import java.util.List;
import java.util.Locale;

/**
* DateExtractFunction
* description:
* - year(date)--returns the year from SQL date date
* - quarter(date)--returns the quarter of a year (an integer between 1 and 4) from SQL date date
* - month(date)--returns the month of a year (an integer between 1 and 12) from SQL date date
* - week(date)--returns the week of a year (an integer between 1 and 53) from SQL date date
* - dayofyear(date)--returns the day of a year (an integer between 1 and 366) from SQL date date
* - dayofmonth(date)--returns the day of a month (an integer between 1 and 31) from SQL date date
*/
public class DateExtractFunction implements ValueParser {

private DateExtractFunctionType type;
private ValueParser dateParser;
private static final TemporalField weekOfYearField = WeekFields.of(Locale.getDefault()).weekOfWeekBasedYear();

public enum DateExtractFunctionType {
YEAR, QUARTER, MONTH, WEEK, DAY_OF_YEAR, DAY_OF_MONTH
}

public DateExtractFunction(DateExtractFunctionType type, Function expr) {
this.type = type;
List<Expression> expressions = expr.getParameters().getExpressions();
dateParser = OperatorTools.buildParser(expressions.get(0));
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
Object dateObj = dateParser.parse(sourceData, rowIndex, context);
Date date = OperatorTools.parseDate(dateObj);
LocalDate localDate = date.toLocalDate();
switch (type) {
// year
case YEAR:
return localDate.getYear();
// quarter(between 1 and 4)
case QUARTER:
return (localDate.getMonthValue() - 1) / 3 + 1;
// month(between 1 and 12)
case MONTH:
return localDate.getMonthValue();
// week(between 1 and 53)
case WEEK:
return localDate.get(weekOfYearField);
// dayofyear(between 1 and 366)
case DAY_OF_YEAR:
return localDate.getDayOfYear();
// dayofmonth(between 1 and 31)
case DAY_OF_MONTH:
return localDate.getDayOfMonth();
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.inlong.sdk.transform.process.function.AbsFunction;
import org.apache.inlong.sdk.transform.process.function.CeilFunction;
import org.apache.inlong.sdk.transform.process.function.ConcatFunction;
import org.apache.inlong.sdk.transform.process.function.DateExtractFunction;
import org.apache.inlong.sdk.transform.process.function.DateExtractFunction.DateExtractFunctionType;
import org.apache.inlong.sdk.transform.process.function.DateFormatFunction;
import org.apache.inlong.sdk.transform.process.function.ExpFunction;
import org.apache.inlong.sdk.transform.process.function.FloorFunction;
Expand All @@ -37,6 +39,7 @@
import org.apache.inlong.sdk.transform.process.function.ToDateFunction;
import org.apache.inlong.sdk.transform.process.parser.AdditionParser;
import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
import org.apache.inlong.sdk.transform.process.parser.DateParser;
import org.apache.inlong.sdk.transform.process.parser.DivisionParser;
import org.apache.inlong.sdk.transform.process.parser.LongParser;
import org.apache.inlong.sdk.transform.process.parser.MultiplicationParser;
Expand All @@ -45,6 +48,7 @@
import org.apache.inlong.sdk.transform.process.parser.SubtractionParser;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;

import net.sf.jsqlparser.expression.DateValue;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
import net.sf.jsqlparser.expression.LongValue;
Expand All @@ -67,6 +71,7 @@
import org.apache.commons.lang.ObjectUtils;

import java.math.BigDecimal;
import java.sql.Date;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -101,6 +106,12 @@ public class OperatorTools {
functionMap.put("floor", FloorFunction::new);
functionMap.put("sin", SinFunction::new);
functionMap.put("sinh", SinhFunction::new);
functionMap.put("year", func -> new DateExtractFunction(DateExtractFunctionType.YEAR, func));
functionMap.put("quarter", func -> new DateExtractFunction(DateExtractFunctionType.QUARTER, func));
functionMap.put("month", func -> new DateExtractFunction(DateExtractFunctionType.MONTH, func));
functionMap.put("week", func -> new DateExtractFunction(DateExtractFunctionType.WEEK, func));
functionMap.put("dayofyear", func -> new DateExtractFunction(DateExtractFunctionType.DAY_OF_YEAR, func));
functionMap.put("dayofmonth", func -> new DateExtractFunction(DateExtractFunctionType.DAY_OF_MONTH, func));
}

public static ExpressionOperator buildOperator(Expression expr) {
Expand Down Expand Up @@ -145,6 +156,8 @@ public static ValueParser buildParser(Expression expr) {
return new MultiplicationParser((Multiplication) expr);
} else if (expr instanceof Division) {
return new DivisionParser((Division) expr);
} else if (expr instanceof DateValue) {
return new DateParser((DateValue) expr);
} else if (expr instanceof Function) {
String exprString = expr.toString();
if (exprString.startsWith(ROOT_KEY) || exprString.startsWith(CHILD_KEY)) {
Expand Down Expand Up @@ -181,6 +194,14 @@ public static String parseString(Object value) {
return value.toString();
}

public static Date parseDate(Object value) {
if (value instanceof Date) {
return (Date) value;
} else {
return Date.valueOf(String.valueOf(value));
}
}

/**
* compareValue
* @param left
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.transform.process.parser;

import org.apache.inlong.sdk.transform.decode.SourceData;
import org.apache.inlong.sdk.transform.process.Context;

import net.sf.jsqlparser.expression.DateValue;

import java.sql.Date;

/**
* DateParser
* description: parse the sql expression to a java.sql.Date object
*/
public class DateParser implements ValueParser {

private final Date dateValue;

public DateParser(DateValue expr) {
this.dateValue = Date.valueOf(expr.getValue().toLocalDate());
}

@Override
public Object parse(SourceData sourceData, int rowIndex, Context context) {
return dateValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,72 @@ public void testDateFormatFunction() throws Exception {
List<String> output3 = processor1.transform("yyyyMMddHHmmss|apple|cloud|1722524216|1|3", new HashMap<>());
Assert.assertEquals(1, output3.size());
Assert.assertEquals(output3.get(0), "result=20240801225656");
// case1: date_format(1722524216, 'yyyy/MM/dd HH:mm:ss')
// case4: date_format(1722524216, 'yyyy/MM/dd HH:mm:ss')
List<String> output4 = processor1.transform("yyyy/MM/dd HH:mm:ss|apple|cloud|1722524216|1|3", new HashMap<>());
Assert.assertEquals(1, output4.size());
Assert.assertEquals(output4.get(0), "result=2024/08/01 22:56:56");
}

@Test
public void testDateExtractFunction() throws Exception {
String transformSql1 = "select year(string1) from source";
TransformConfig config1 = new TransformConfig(transformSql1);
TransformProcessor<String, String> processor1 = TransformProcessor
.create(config1, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case1: year(2024-08-08)
List<String> output1 = processor1.transform("2024-08-08", new HashMap<>());
Assert.assertEquals(1, output1.size());
Assert.assertEquals(output1.get(0), "result=2024");

String transformSql2 = "select quarter(string1) from source";
TransformConfig config2 = new TransformConfig(transformSql2);
TransformProcessor<String, String> processor2 = TransformProcessor
.create(config2, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case2: quarter(2024-08-08)
List<String> output2 = processor2.transform("2024-08-08", new HashMap<>());
Assert.assertEquals(1, output2.size());
Assert.assertEquals(output2.get(0), "result=3");

String transformSql3 = "select month(string1) from source";
TransformConfig config3 = new TransformConfig(transformSql3);
TransformProcessor<String, String> processor3 = TransformProcessor
.create(config3, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case3: month(2024-08-08)
List<String> output3 = processor3.transform("2024-08-08", new HashMap<>());
Assert.assertEquals(1, output3.size());
Assert.assertEquals(output3.get(0), "result=8");

String transformSql4 = "select week(string1) from source";
TransformConfig config4 = new TransformConfig(transformSql4);
TransformProcessor<String, String> processor4 = TransformProcessor
.create(config4, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case4: week(2024-02-29)
List<String> output4 = processor4.transform("2024-02-29", new HashMap<>());
Assert.assertEquals(1, output4.size());
Assert.assertEquals(output4.get(0), "result=9");

String transformSql5 = "select dayofyear(string1) from source";
TransformConfig config5 = new TransformConfig(transformSql5);
TransformProcessor<String, String> processor5 = TransformProcessor
.create(config5, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case5: dayofyear(2024-02-29)
List<String> output5 = processor5.transform("2024-02-29", new HashMap<>());
Assert.assertEquals(1, output5.size());
Assert.assertEquals(output5.get(0), "result=60");

String transformSql6 = "select dayofmonth(string1) from source";
TransformConfig config6 = new TransformConfig(transformSql6);
TransformProcessor<String, String> processor6 = TransformProcessor
.create(config6, SourceDecoderFactory.createCsvDecoder(csvSource),
SinkEncoderFactory.createKvEncoder(kvSink));
// case6: dayofmonth(2024-02-29)
List<String> output6 = processor6.transform("2024-02-29", new HashMap<>());
Assert.assertEquals(1, output6.size());
Assert.assertEquals(output6.get(0), "result=29");
}
}

0 comments on commit 3d7f08a

Please sign in to comment.