diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 51fdb46e4df..966f0b6d5bb 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -37,6 +37,7 @@ import org.apache.flink.cdc.common.types.TimestampType; import org.apache.flink.cdc.common.types.ZonedTimestampType; +import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import java.util.ArrayList; @@ -56,6 +57,7 @@ public class SchemaUtils { * create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from * RecordData. */ + @CheckReturnValue public static List createFieldGetters(Schema schema) { return createFieldGetters(schema.getColumns()); } @@ -64,6 +66,7 @@ public static List createFieldGetters(Schema schema) { * create a list of {@link RecordData.FieldGetter} from given {@link Column} to get Object from * RecordData. */ + @CheckReturnValue public static List createFieldGetters(List columns) { List fieldGetters = new ArrayList<>(columns.size()); for (int i = 0; i < columns.size(); i++) { @@ -73,6 +76,7 @@ public static List createFieldGetters(List colum } /** Restore original data fields from RecordData structure. */ + @CheckReturnValue public static List restoreOriginalData( @Nullable RecordData recordData, List fieldGetters) { if (recordData == null) { @@ -86,6 +90,7 @@ public static List restoreOriginalData( } /** apply SchemaChangeEvent to the old schema and return the schema after changing. */ + @CheckReturnValue public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) { return SchemaChangeEventVisitor.visit( event, @@ -210,6 +215,7 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche * position indicators. This is necessary since extra calculated columns might be added, and * `FIRST` / `LAST` position might differ. */ + @CheckReturnValue public static Optional transformSchemaChangeEvent( boolean hasAsterisk, List referencedColumns, SchemaChangeEvent event) { Optional evolvedSchemaChangeEvent = Optional.empty(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java index 445387b0294..de0ef182dd8 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java @@ -311,7 +311,6 @@ public Optional coerceDataRecord( List upstreamSchemaReader = upstreamRecordGetterCache.getUnchecked(upstreamSchema); - SchemaUtils.createFieldGetters(upstreamSchema); BinaryRecordDataGenerator evolvedSchemaWriter = evolvedRecordWriterCache.getUnchecked(evolvedSchema);