Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -510,4 +511,86 @@ public static Object coalesce(Object... objects) {
}
return null;
}

public static String castToString(Object object) {
if (object == null) {
return null;
}
return object.toString();
Copy link
Contributor

@lvyanquan lvyanquan May 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we pass a NULL Object to cast function? Do we allow this happen?

}

public static Byte castToByte(Object object) {
if (object == null) {
return null;
}
return Byte.valueOf(castObjectIntoString(object));
}

public static Boolean castToBoolean(Object object) {
if (object == null) {
return null;
}
if (object instanceof Byte
|| object instanceof Short
|| object instanceof Integer
|| object instanceof Long
|| object instanceof Float
|| object instanceof Double
|| object instanceof BigDecimal) {
return !object.equals(0);
}
return Boolean.valueOf(castToString(object));
}

public static Short castToShort(Object object) {
if (object == null) {
return null;
}
return Short.valueOf(castObjectIntoString(object));
}

public static Integer castToInteger(Object object) {
if (object == null) {
return null;
}
return Integer.valueOf(castObjectIntoString(object));
}

public static Long castToLong(Object object) {
if (object == null) {
return null;
}
return Long.valueOf(castObjectIntoString(object));
}

public static Float castToFloat(Object object) {
if (object == null) {
return null;
}
return Float.valueOf(castObjectIntoString(object));
}

public static Double castToDouble(Object object) {
if (object == null) {
return null;
}
return Double.valueOf(castObjectIntoString(object));
}

public static BigDecimal castToBigDecimal(Object object, int precision, int scale) {
if (object == null) {
return null;
}
BigDecimal bigDecimal =
new BigDecimal(castObjectIntoString(object), new MathContext(precision));
bigDecimal = bigDecimal.setScale(scale, BigDecimal.ROUND_HALF_UP);
return bigDecimal;
}

private static String castObjectIntoString(Object object) {
if (object instanceof Boolean) {
return Boolean.valueOf(castToString(object)) ? "1" : "0";
}
return String.valueOf(object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.cdc.common.utils.StringUtils;

import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -228,6 +230,8 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue(
case LESS_THAN_OR_EQUAL:
case GREATER_THAN_OR_EQUAL:
return generateBinaryOperation(sqlBasicCall, atoms, sqlBasicCall.getKind().sql);
case CAST:
return generateCastOperation(sqlBasicCall, atoms);
case OTHER:
return generateOtherOperation(sqlBasicCall, atoms);
default:
Expand Down Expand Up @@ -256,6 +260,16 @@ private static Java.Rvalue generateEqualsOperation(
Location.NOWHERE, null, StringUtils.convertToCamelCase("VALUE_EQUALS"), atoms);
}

private static Java.Rvalue generateCastOperation(
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 1) {
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
}
List<SqlNode> operandList = sqlBasicCall.getOperandList();
SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1);
return generateTypeConvertMethod(sqlDataTypeSpec, atoms);
}

private static Java.Rvalue generateOtherOperation(
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (sqlBasicCall.getOperator().getName().equals("||")) {
Expand Down Expand Up @@ -298,4 +312,56 @@ private static Java.Rvalue generateNoOperandTimestampFunctionOperation(String op
StringUtils.convertToCamelCase(operationName),
timestampFunctionParam.toArray(new Java.Rvalue[0]));
}

private static Java.Rvalue generateTypeConvertMethod(
SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
case "BOOLEAN":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToBoolean", atoms);
case "TINYINT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToByte", atoms);
case "SMALLINT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToShort", atoms);
case "INTEGER":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToInteger", atoms);
case "BIGINT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToLong", atoms);
case "FLOAT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToFloat", atoms);
case "DOUBLE":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToDouble", atoms);
case "DECIMAL":
int precision = 10;
int scale = 0;
if (sqlDataTypeSpec.getTypeNameSpec() instanceof SqlBasicTypeNameSpec) {
SqlBasicTypeNameSpec typeNameSpec =
(SqlBasicTypeNameSpec) sqlDataTypeSpec.getTypeNameSpec();
if (typeNameSpec.getPrecision() > -1) {
precision = typeNameSpec.getPrecision();
}
if (typeNameSpec.getScale() > -1) {
scale = typeNameSpec.getScale();
}
}
List<Java.Rvalue> newAtoms = new ArrayList<>(Arrays.asList(atoms));
newAtoms.add(
new Java.AmbiguousName(
Location.NOWHERE, new String[] {String.valueOf(precision)}));
newAtoms.add(
new Java.AmbiguousName(
Location.NOWHERE, new String[] {String.valueOf(scale)}));
return new Java.MethodInvocation(
Location.NOWHERE,
null,
"castToBigDecimal",
newAtoms.toArray(new Java.Rvalue[0]));
case "CHAR":
case "VARCHAR":
case "STRING":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToString", atoms);
default:
throw new ParseException(
"Unsupported data type cast: " + sqlDataTypeSpec.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void writeBinary(int pos, byte[] bytes) {

@Override
public void writeDecimal(int pos, DecimalData value, int precision) {
assert value == null || (value.precision() == precision);
assert value == null || (value.precision() <= precision);

if (DecimalData.isCompact(precision)) {
assert value != null;
Expand Down
Loading