Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1779,6 +1779,104 @@ void testNumericCastingsWithTruncation() throws Exception {
"DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
}

@Test
void testTransformWithLargeLiterals() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();

sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
List<Event> events = generateSchemaEvolutionEvents(tableId);

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"\\.*.\\.*.\\.*",
"*, 2147483647 AS int_max, "
+ "2147483648 AS greater_than_int_max, "
+ "-2147483648 AS int_min, "
+ "-2147483649 AS less_than_int_min, "
+ "CAST(1234567890123456789 AS DECIMAL(20, 0)) AS really_big_decimal",
"CAST(id AS BIGINT) + 2147483648 > 2147483649", // equivalent to id > 1
null,
null,
null,
null)),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");

assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT,`int_max` INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[3, Colin, 24, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",

// Add Column
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[2nd, 5, Eva, 20, 2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",

// Alter column type
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[5th, 8, Harry, 18.0, -3, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",

// Rename column
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[8th, 11, Kella, 18.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}",

// Drop column
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[11th, 14, Nein, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops, 2147483647, 2147483648, -2147483648, -2147483649, 1234567890123456789], after=[], op=DELETE, meta=()}");
}

private List<Event> generateSchemaEvolutionEvents(TableId tableId) {
List<Event> events = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlNumericLiteral;
import org.apache.calcite.sql.fun.SqlCase;
import org.apache.calcite.sql.type.SqlTypeName;
import org.codehaus.commons.compiler.CompileException;
Expand Down Expand Up @@ -138,6 +139,13 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
if (sqlLiteral instanceof SqlCharStringLiteral) {
// Double quotation marks represent strings in Janino.
value = "\"" + value.substring(1, value.length() - 1) + "\"";
} else if (sqlLiteral instanceof SqlNumericLiteral) {
if (((SqlNumericLiteral) sqlLiteral).isInteger()) {
long longValue = sqlLiteral.longValue(true);
if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
value += "L";
}
}
}
if (SQL_TYPE_NAME_IGNORE.contains(sqlLiteral.getTypeName())) {
value = "\"" + value + "\"";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.cdc.runtime.parser;

import org.apache.flink.api.java.tuple.Tuple2;

import org.codehaus.commons.compiler.CompileException;
import org.codehaus.commons.compiler.Location;
import org.codehaus.janino.ExpressionEvaluator;
Expand All @@ -34,6 +36,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

/** Unit tests for the {@link JaninoCompiler}. */
public class JaninoCompilerTest {
Expand Down Expand Up @@ -120,4 +125,81 @@ public void testBuildInFunction() throws InvocationTargetException {
Object evaluate = expressionEvaluator.evaluate(params.toArray());
Assert.assertEquals(3.0, evaluate);
}

@Test
public void testLargeNumericLiterals() {
// Test parsing integer literals
Stream.of(
Tuple2.of("0", 0),
Tuple2.of("1", 1),
Tuple2.of("1", 1),
Tuple2.of("2147483647", 2147483647),
Tuple2.of("-2147483648", -2147483648))
.forEach(
t -> {
String expression = t.f0;
List<String> columnNames = new ArrayList<>();
List<Class<?>> paramTypes = new ArrayList<>();
ExpressionEvaluator expressionEvaluator =
JaninoCompiler.compileExpression(
JaninoCompiler.loadSystemFunction(expression),
columnNames,
paramTypes,
Integer.class);
try {
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
});

// Test parsing double literals
Stream.of(
Tuple2.of("3.1415926", 3.1415926),
Tuple2.of("0.0", 0.0),
Tuple2.of("17.0", 17.0),
Tuple2.of("123456789.123456789", 123456789.123456789),
Tuple2.of("-987654321.987654321", -987654321.987654321))
.forEach(
t -> {
String expression = t.f0;
List<String> columnNames = new ArrayList<>();
List<Class<?>> paramTypes = new ArrayList<>();
ExpressionEvaluator expressionEvaluator =
JaninoCompiler.compileExpression(
JaninoCompiler.loadSystemFunction(expression),
columnNames,
paramTypes,
Double.class);
try {
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
});

// Test parsing long literals
Stream.of(
Tuple2.of("2147483648L", 2147483648L),
Tuple2.of("-2147483649L", -2147483649L),
Tuple2.of("9223372036854775807L", 9223372036854775807L),
Tuple2.of("-9223372036854775808L", -9223372036854775808L))
.forEach(
t -> {
String expression = t.f0;
List<String> columnNames = new ArrayList<>();
List<Class<?>> paramTypes = new ArrayList<>();
ExpressionEvaluator expressionEvaluator =
JaninoCompiler.compileExpression(
JaninoCompiler.loadSystemFunction(expression),
columnNames,
paramTypes,
Long.class);
try {
assertThat(expressionEvaluator.evaluate()).isEqualTo(t.f1);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
Expand Down Expand Up @@ -461,6 +462,34 @@ public void testTranslateUdfFilterToJaninoExpression() {
"__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)) > 4 || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && !valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", \"a\", \"z\", \"lie\"), \"\")");
}

@Test
void testLargeNumericalLiterals() {
// For literals within [-2147483648, 2147483647] range, plain Integers are OK
testFilterExpression("id > 2147483647", "id > 2147483647");
testFilterExpression("id < -2147483648", "id < -2147483648");

// For out-of-range literals, an extra `L` suffix is required
testFilterExpression("id > 2147483648", "id > 2147483648L");
testFilterExpression("id > -2147483649", "id > -2147483649L");
testFilterExpression("id < 9223372036854775807", "id < 9223372036854775807L");
testFilterExpression("id > -9223372036854775808", "id > -9223372036854775808L");

// But there's still a limit
Assertions.assertThatThrownBy(
() ->
TransformParser.translateFilterExpressionToJaninoExpression(
"id > 9223372036854775808", Collections.emptyList()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '9223372036854775808' out of range");

Assertions.assertThatThrownBy(
() ->
TransformParser.translateFilterExpressionToJaninoExpression(
"id < -9223372036854775809", Collections.emptyList()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '-9223372036854775809' out of range");
}

private void testFilterExpression(String expression, String expressionExpect) {
String janinoExpression =
TransformParser.translateFilterExpressionToJaninoExpression(
Expand Down