From 88d5546be277afb41c9a96a8e45cb3ef589222ca Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 18 Oct 2024 11:48:37 +0800 Subject: [PATCH 1/2] [FLINK-36565] Route module allows merging Decimals with various precisions Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../flink/cdc/common/utils/SchemaUtils.java | 40 +++-- .../cdc/common/utils/SchemaUtilsTest.java | 17 ++ .../flink/FlinkPipelineComposerITCase.java | 161 ++++++++++++++++++ .../operators/schema/SchemaOperator.java | 33 +++- 4 files changed, 238 insertions(+), 13 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index c90329a8dc0..5b023222237 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -215,25 +215,41 @@ public static DataType inferWiderType(DataType lType, DataType rType) { lhsDecimal.getPrecision() - lhsDecimal.getScale(), rhsDecimal.getPrecision() - rhsDecimal.getScale()); int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale()); + Preconditions.checkArgument( + resultIntDigits + resultScale <= DecimalType.MAX_PRECISION, + String.format( + "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", + lType, + rType, + resultIntDigits + resultScale, + DecimalType.MAX_PRECISION)); mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale); } else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int DecimalType lhsDecimal = (DecimalType) lType; - mergedType = - DataTypes.DECIMAL( - Math.max( - lhsDecimal.getPrecision(), - lhsDecimal.getScale() + getNumericPrecision(rType)), - lhsDecimal.getScale()); + int resultPrecision = + Math.max( + lhsDecimal.getPrecision(), + lhsDecimal.getScale() + getNumericPrecision(rType)); + Preconditions.checkArgument( + resultPrecision <= DecimalType.MAX_PRECISION, + String.format( + "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", + lType, rType, resultPrecision, DecimalType.MAX_PRECISION)); + mergedType = DataTypes.DECIMAL(resultPrecision, lhsDecimal.getScale()); } else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int DecimalType rhsDecimal = (DecimalType) rType; - mergedType = - DataTypes.DECIMAL( - Math.max( - rhsDecimal.getPrecision(), - rhsDecimal.getScale() + getNumericPrecision(lType)), - rhsDecimal.getScale()); + int resultPrecision = + Math.max( + rhsDecimal.getPrecision(), + rhsDecimal.getScale() + getNumericPrecision(lType)); + Preconditions.checkArgument( + resultPrecision <= DecimalType.MAX_PRECISION, + String.format( + "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", + lType, rType, resultPrecision, DecimalType.MAX_PRECISION)); + mergedType = DataTypes.DECIMAL(resultPrecision, rhsDecimal.getScale()); } else { throw new IllegalStateException( String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType)); diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index b33815017bf..6ff686aecb7 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -273,6 +273,23 @@ public void testInferWiderType() { DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2))) .isEqualTo(DataTypes.DECIMAL(12, 4)); + // Test overflow decimal conversions + Assertions.assertThatThrownBy( + () -> + SchemaUtils.inferWiderType( + DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0))) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Failed to merge DECIMAL(5, 5) NOT NULL and DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 available"); + + Assertions.assertThatThrownBy( + () -> + SchemaUtils.inferWiderType( + DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5))) + .isExactlyInstanceOf(IllegalArgumentException.class) + .hasMessage( + "Failed to merge DECIMAL(38, 0) NOT NULL and DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 available"); + // Test merging with nullability Assertions.assertThat( SchemaUtils.inferWiderType( diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 74afa8c4db9..679ff9cc62b 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.ZonedTimestampData; @@ -61,6 +62,7 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -1216,6 +1218,88 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA assertThat(outputEvents).containsExactlyInAnyOrder(expected); } + @ParameterizedTest + @EnumSource + void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + List events = generateDecimalColumnEvents("default_table_"); + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.singletonList( + new RouteDef( + "default_namespace.default_schema.default_table_\\.*", + "default_namespace.default_schema.default_everything_merged", + null, + "Merge all decimal columns with different precision")), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + String[] expected = + Stream.of( + "CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=TINYINT}}", + "DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[3, Alice, 17, 3333], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[4, Alice, 17, 44444444], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}}", + "DataChangeEvent{tableId={}, before=[], after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}}", + "DataChangeEvent{tableId={}, before=[], after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}}", + "DataChangeEvent{tableId={}, before=[], after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}, before=[], after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}") + .map( + s -> + s.replace( + "{}", + "default_namespace.default_schema.default_everything_merged")) + .toArray(String[]::new); + + assertThat(outputEvents).containsExactlyInAnyOrder(expected); + } + private List generateTemporalColumnEvents(String tableNamePrefix) { List events = new ArrayList<>(); @@ -1286,6 +1370,83 @@ private List generateTemporalColumnEvents(String tableNamePrefix) { return events; } + private List generateDecimalColumnEvents(String tableNamePrefix) { + List events = new ArrayList<>(); + + // Initialize schemas + List names = + Arrays.asList( + "tiny", + "small", + "vanilla", + "big", + "dec_15_0", + "decimal_10_10", + "decimal_16_2", + "decimal_29_19"); + + List types = + Arrays.asList( + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.DECIMAL(15, 0), + DataTypes.DECIMAL(10, 5), + DataTypes.DECIMAL(16, 2), + DataTypes.DECIMAL(29, 19)); + + List values = + Arrays.asList( + (byte) 1, + (short) 22, + 3333, + (long) 44444444, + DecimalData.fromBigDecimal(new BigDecimal("555555555555555"), 15, 0), + DecimalData.fromBigDecimal(new BigDecimal("66666.66666"), 10, 5), + DecimalData.fromBigDecimal(new BigDecimal("77777777.17"), 16, 2), + DecimalData.fromBigDecimal( + new BigDecimal("888888888.8888888888888888888"), 29, 19)); + + List schemas = + types.stream() + .map( + temporalColumnType -> + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("fav_num", temporalColumnType) + .primaryKey("id") + .build()) + .collect(Collectors.toList()); + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add(new CreateTableEvent(generatedTableId, generatedSchema)); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 1 + i, "Alice", 17, values.get(i)))); + } + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 101 + i, "Zen", 19, values.get(i)))); + } + + return events; + } + BinaryRecordData generate(Schema schema, Object... fields) { return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) .generate( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index e04416db74b..6ada032d56f 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; @@ -40,6 +41,7 @@ import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypeFamily; import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; @@ -74,6 +76,7 @@ import javax.annotation.Nullable; import java.io.Serializable; +import java.math.BigDecimal; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; @@ -611,14 +614,42 @@ public Object getFieldOrNull(RecordData recordData) { } else if (originalField instanceof Integer) { // INT return ((Integer) originalField).longValue(); + } else if (originalField instanceof Long) { + // BIGINT + return originalField; } else { return fail( new IllegalArgumentException( String.format( "Cannot fit type \"%s\" into a BIGINT column. " - + "Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column", + + "Currently only TINYINT / SMALLINT / INT / LONG can be accepted by a BIGINT column", + originalField.getClass()))); + } + } else if (destinationType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) destinationType; + BigDecimal decimalValue; + if (originalField instanceof Byte) { + decimalValue = BigDecimal.valueOf(((Byte) originalField).longValue(), 0); + } else if (originalField instanceof Short) { + decimalValue = BigDecimal.valueOf(((Short) originalField).longValue(), 0); + } else if (originalField instanceof Integer) { + decimalValue = BigDecimal.valueOf(((Integer) originalField).longValue(), 0); + } else if (originalField instanceof Long) { + decimalValue = BigDecimal.valueOf((Long) originalField, 0); + } else if (originalField instanceof DecimalData) { + decimalValue = ((DecimalData) originalField).toBigDecimal(); + } else { + return fail( + new IllegalArgumentException( + String.format( + "Cannot fit type \"%s\" into a DECIMAL column. " + + "Currently only BYTE / SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column", originalField.getClass()))); } + return decimalValue != null + ? DecimalData.fromBigDecimal( + decimalValue, decimalType.getPrecision(), decimalType.getScale()) + : null; } else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { if (originalField instanceof Float) { // FLOAT From e410f1f78729b34a9d73e90291ddb41c46e3f018 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:24:06 +0800 Subject: [PATCH 2/2] minor: merge duplicated code fragments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../flink/cdc/common/utils/SchemaUtils.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 5b023222237..99cd9735698 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -226,30 +226,10 @@ public static DataType inferWiderType(DataType lType, DataType rType) { mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale); } else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int - DecimalType lhsDecimal = (DecimalType) lType; - int resultPrecision = - Math.max( - lhsDecimal.getPrecision(), - lhsDecimal.getScale() + getNumericPrecision(rType)); - Preconditions.checkArgument( - resultPrecision <= DecimalType.MAX_PRECISION, - String.format( - "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", - lType, rType, resultPrecision, DecimalType.MAX_PRECISION)); - mergedType = DataTypes.DECIMAL(resultPrecision, lhsDecimal.getScale()); + mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType); } else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int - DecimalType rhsDecimal = (DecimalType) rType; - int resultPrecision = - Math.max( - rhsDecimal.getPrecision(), - rhsDecimal.getScale() + getNumericPrecision(lType)); - Preconditions.checkArgument( - resultPrecision <= DecimalType.MAX_PRECISION, - String.format( - "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", - lType, rType, resultPrecision, DecimalType.MAX_PRECISION)); - mergedType = DataTypes.DECIMAL(resultPrecision, rhsDecimal.getScale()); + mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, lType); } else { throw new IllegalStateException( String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType)); @@ -262,6 +242,20 @@ public static DataType inferWiderType(DataType lType, DataType rType) { } } + private static DataType mergeExactNumericsIntoDecimal( + DecimalType decimalType, DataType otherType) { + int resultPrecision = + Math.max( + decimalType.getPrecision(), + decimalType.getScale() + getNumericPrecision(otherType)); + Preconditions.checkArgument( + resultPrecision <= DecimalType.MAX_PRECISION, + String.format( + "Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available", + decimalType, otherType, resultPrecision, DecimalType.MAX_PRECISION)); + return DataTypes.DECIMAL(resultPrecision, decimalType.getScale()); + } + @VisibleForTesting public static int getNumericPrecision(DataType dataType) { if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {