diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml index 6c73b8ebbf70..9499b39740dd 100644 --- a/paimon-flink/paimon-flink-cdc/pom.xml +++ b/paimon-flink/paimon-flink-cdc/pom.xml @@ -35,8 +35,8 @@ under the License. 1.20.1 - 3.1.1 - 3.1.1 + 3.5.0 + 3.5.0 1.11.4 2.2.0 2.9.0 diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java index 5107ba146bfb..9b6c0188367e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java @@ -186,7 +186,7 @@ public static JdbcIncrementalSource buildPostgresSource( customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); CdcDebeziumDeserializationSchema schema = new CdcDebeziumDeserializationSchema(true, customConverterConfigs); - return sourceBuilder.deserializer(schema).includeSchemaChanges(true).build(); + return sourceBuilder.deserializer(schema).build(); } public static void registerJdbcDriver() { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java new file mode 100644 index 000000000000..fa187b84f8e9 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.schema; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument; +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +/** A {@code MetadataApplier} that applies schema changes to Paimon table. */ +public class PaimonMetadataApplier implements MetadataApplier { + + private static final Logger LOG = LoggerFactory.getLogger(PaimonMetadataApplier.class); + + // Catalog is unSerializable. + private transient Catalog catalog; + + // currently, we set table options for all tables using the same options. + private final Map tableOptions; + + private final Options catalogOptions; + + private final Map> partitionMaps; + + private Set enabledSchemaEvolutionTypes; + + public PaimonMetadataApplier(Options catalogOptions) { + this.catalogOptions = catalogOptions; + this.tableOptions = new HashMap<>(); + this.partitionMaps = new HashMap<>(); + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + } + + public PaimonMetadataApplier( + Options catalogOptions, + Map tableOptions, + Map> partitionMaps) { + this.catalogOptions = catalogOptions; + this.tableOptions = tableOptions; + this.partitionMaps = partitionMaps; + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + } + + @Override + public MetadataApplier setAcceptedSchemaEvolutionTypes( + Set schemaEvolutionTypes) { + this.enabledSchemaEvolutionTypes = schemaEvolutionTypes; + return this; + } + + @Override + public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { + return enabledSchemaEvolutionTypes.contains(schemaChangeEventType); + } + + @Override + public Set getSupportedSchemaEvolutionTypes() { + return Sets.newHashSet( + SchemaChangeEventType.CREATE_TABLE, + SchemaChangeEventType.ADD_COLUMN, + SchemaChangeEventType.DROP_COLUMN, + SchemaChangeEventType.RENAME_COLUMN, + SchemaChangeEventType.ALTER_COLUMN_TYPE); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) + throws SchemaEvolveException { + if (catalog == null) { + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + } + SchemaChangeEventVisitor.visit( + schemaChangeEvent, + addColumnEvent -> { + applyAddColumn(addColumnEvent); + return null; + }, + alterColumnTypeEvent -> { + applyAlterColumnType(alterColumnTypeEvent); + return null; + }, + createTableEvent -> { + applyCreateTable(createTableEvent); + return null; + }, + dropColumnEvent -> { + applyDropColumn(dropColumnEvent); + return null; + }, + dropTableEvent -> { + applyDropTable(dropTableEvent); + return null; + }, + renameColumnEvent -> { + applyRenameColumn(renameColumnEvent); + return null; + }, + truncateTableEvent -> { + applyTruncateTable(truncateTableEvent); + return null; + }); + } + + @Override + public void close() throws Exception { + if (catalog != null) { + catalog.close(); + } + } + + private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException { + try { + if (!catalog.listDatabases().contains(event.tableId().getSchemaName())) { + catalog.createDatabase(event.tableId().getSchemaName(), true); + } + Schema schema = event.getSchema(); + org.apache.paimon.schema.Schema.Builder builder = + new org.apache.paimon.schema.Schema.Builder(); + schema.getColumns() + .forEach( + (column) -> + builder.column( + column.getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(column.getType()) + .getLogicalType()), + column.getComment())); + List partitionKeys = new ArrayList<>(); + List primaryKeys = schema.primaryKeys(); + if (partitionMaps.containsKey(event.tableId())) { + partitionKeys.addAll(partitionMaps.get(event.tableId())); + } else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) { + partitionKeys.addAll(schema.partitionKeys()); + } + builder.primaryKey(primaryKeys) + .partitionKeys(partitionKeys) + .comment(schema.comment()) + .options(tableOptions) + .options(schema.options()); + catalog.createTable(tableIdToIdentifier(event), builder.build(), true); + } catch (Catalog.TableAlreadyExistException + | Catalog.DatabaseNotExistException + | Catalog.DatabaseAlreadyExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyAddColumn(AddColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = applyAddColumnEventWithPosition(event); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + if (e instanceof Catalog.ColumnAlreadyExistException) { + LOG.warn("{}, skip it.", e.getMessage()); + } else { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + } + + private List applyAddColumnEventWithPosition(AddColumnEvent event) + throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + switch (columnWithPosition.getPosition()) { + case FIRST: + tableChangeList.addAll( + SchemaChangeProvider.add( + columnWithPosition, + SchemaChange.Move.first( + columnWithPosition.getAddColumn().getName()))); + break; + case LAST: + tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition)); + break; + case BEFORE: + tableChangeList.addAll( + applyAddColumnWithBeforePosition( + event.tableId().getSchemaName(), + event.tableId().getTableName(), + columnWithPosition)); + break; + case AFTER: + checkNotNull( + columnWithPosition.getExistedColumnName(), + "Existing column name must be provided for AFTER position"); + SchemaChange.Move after = + SchemaChange.Move.after( + columnWithPosition.getAddColumn().getName(), + columnWithPosition.getExistedColumnName()); + tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after)); + break; + default: + throw new SchemaEvolveException( + event, + "Unknown column position: " + columnWithPosition.getPosition()); + } + } + return tableChangeList; + } catch (Catalog.TableNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private List applyAddColumnWithBeforePosition( + String schemaName, + String tableName, + AddColumnEvent.ColumnWithPosition columnWithPosition) + throws Catalog.TableNotExistException { + String existedColumnName = columnWithPosition.getExistedColumnName(); + Table table = catalog.getTable(new Identifier(schemaName, tableName)); + List columnNames = table.rowType().getFieldNames(); + int index = checkColumnPosition(existedColumnName, columnNames); + String columnName = columnWithPosition.getAddColumn().getName(); + return SchemaChangeProvider.add( + columnWithPosition, + (index == 0) + ? SchemaChange.Move.first(columnName) + : SchemaChange.Move.after(columnName, columnNames.get(index - 1))); + } + + private int checkColumnPosition(String existedColumnName, List columnNames) { + if (existedColumnName == null) { + return 0; + } + int index = columnNames.indexOf(existedColumnName); + checkArgument(index != -1, "Column %s not found", existedColumnName); + return index; + } + + private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getDroppedColumnNames() + .forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column))); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException | Catalog.ColumnNotExistException e) { + LOG.warn("Failed to apply DropColumnEvent, skip it.", e); + } catch (Catalog.ColumnAlreadyExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException { + try { + Map options = + catalog.getTable( + new Identifier( + event.tableId().getSchemaName(), + event.tableId().getTableName())) + .options(); + List tableChangeList = new ArrayList<>(); + event.getNameMapping() + .forEach( + (oldName, newName) -> + tableChangeList.addAll( + SchemaChangeProvider.rename( + oldName, newName, options))); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getTypeMapping() + .forEach( + (oldName, newType) -> + tableChangeList.add( + SchemaChangeProvider.updateColumnType( + oldName, newType))); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException { + try { + Table table = catalog.getTable(tableIdToIdentifier(event)); + if (table.options().get("deletion-vectors.enabled").equals("true")) { + throw new UnsupportedSchemaChangeEventException( + event, "Unable to truncate a table with deletion vectors enabled.", null); + } + try (BatchTableCommit batchTableCommit = table.newBatchWriteBuilder().newCommit()) { + batchTableCommit.truncateTable(); + } + } catch (Exception e) { + throw new SchemaEvolveException(event, "Failed to apply truncate table event", e); + } + } + + private void applyDropTable(DropTableEvent event) throws SchemaEvolveException { + try { + catalog.dropTable(tableIdToIdentifier(event), true); + } catch (Catalog.TableNotExistException e) { + throw new SchemaEvolveException(event, "Failed to apply drop table event", e); + } + } + + private static Identifier tableIdToIdentifier(SchemaChangeEvent event) { + return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/SchemaChangeProvider.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/SchemaChangeProvider.java new file mode 100644 index 000000000000..8c2a93e61727 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/SchemaChangeProvider.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.schema; + +import org.apache.paimon.flink.pipeline.cdc.util.FlinkCDCToPaimonTypeConverter; +import org.apache.paimon.schema.SchemaChange; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.types.DataType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * The SchemaChangeProvider class provides static methods to create SchemaChange objects that + * represent different types of schema modifications. + */ +public class SchemaChangeProvider { + + /** + * Creates a SchemaChange object for adding a column without specifying its position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @return A SchemaChange object representing the addition of a column. + */ + public static List add(AddColumnEvent.ColumnWithPosition columnWithPosition) { + List result = new ArrayList<>(); + result.add( + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDataTypeToPaimonDataType( + columnWithPosition.getAddColumn().getType()), + columnWithPosition.getAddColumn().getComment())); + // if default value express exists, we need to set the default value to the table + // option + Column column = columnWithPosition.getAddColumn(); + Optional.ofNullable( + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDefaultValueToValidValue( + column.getDefaultValueExpression(), column.getType())) + .ifPresent( + value -> { + result.add( + SchemaChange.updateColumnDefaultValue( + new String[] {column.getName()}, value)); + }); + return result; + } + + /** + * Creates a SchemaChange object for adding a column with a specified position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @param move The move operation to indicate the column's new position. + * @return A SchemaChange object representing the addition of a column with position + * information. + */ + public static List add( + AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) { + List result = new ArrayList<>(); + result.add( + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDataTypeToPaimonDataType( + columnWithPosition.getAddColumn().getType()), + columnWithPosition.getAddColumn().getComment(), + move)); + // if default value express exists, we need to set the default value to the table + // option + Column column = columnWithPosition.getAddColumn(); + Optional.ofNullable( + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDefaultValueToValidValue( + column.getDefaultValueExpression(), column.getType())) + .ifPresent( + value -> { + result.add( + SchemaChange.updateColumnDefaultValue( + new String[] {column.getName()}, value)); + }); + return result; + } + + /** + * Creates a SchemaChange object to update the data type of a column. + * + * @param oldColumnName The name of the column whose data type is to be updated. + * @param newType The new DataType for the column. + * @return A SchemaChange object representing the update of the column's data type. + */ + public static SchemaChange updateColumnType(String oldColumnName, DataType newType) { + return SchemaChange.updateColumnType( + oldColumnName, + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDataTypeToPaimonDataType(newType)); + } + + /** + * Creates a SchemaChange object for renaming a column. + * + * @param oldColumnName The current name of the column to be renamed. + * @param newColumnName The new name for the column. + * @return A SchemaChange object representing the renaming of a column. + */ + public static List rename( + String oldColumnName, String newColumnName, Map options) { + List result = new ArrayList<>(); + result.add(SchemaChange.renameColumn(oldColumnName, newColumnName)); + return result; + } + + /** + * Creates a SchemaChange object for dropping a column. + * + * @param columnName The name of the column to be dropped. + * @return A SchemaChange object representing the deletion of a column. + */ + public static List drop(String columnName) { + List result = new ArrayList<>(); + result.add(SchemaChange.dropColumn(columnName)); + return result; + } + + /** + * Creates a SchemaChange object for setting an option. + * + * @param key The key of the option to be set. + * @param value The value of the option to be set. + * @return A SchemaChange object representing the setting of an option. + */ + public static SchemaChange setOption(String key, String value) { + return SchemaChange.setOption(key, value); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonDataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonDataConverter.java new file mode 100644 index 000000000000..8148541fd9ff --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonDataConverter.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.memory.MemorySegmentUtils; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.MapData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryArrayData; +import org.apache.flink.cdc.common.data.binary.BinaryMapData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.OperationType; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.core.memory.MemorySegment; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** Utils for converting Flink CDC data change event to Paimon internal row. */ +public class FlinkCDCToPaimonDataConverter { + + /** Convert Flink CDC data change event to Paimon internal row. */ + public static InternalRow convertDataChangeEventToInternalRow( + DataChangeEvent event, List fieldGetters) { + RecordData recordData = + OperationType.DELETE.equals(event.op()) ? event.before() : event.after(); + RowKind rowKind = OperationType.DELETE.equals(event.op()) ? RowKind.DELETE : RowKind.INSERT; + Preconditions.checkArgument( + recordData.getArity() == fieldGetters.size(), + "Field arity not equal to field getters size of %s", + event.tableId()); + + GenericRow genericRow = new GenericRow(rowKind, recordData.getArity()); + for (int i = 0; i < recordData.getArity(); i++) { + genericRow.setField(i, fieldGetters.get(i).getFieldOrNull(recordData)); + } + return genericRow; + } + + public static List createFieldGetters(List fieldTypes) { + List fieldGetters = new ArrayList<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters.add(createFieldGetter(fieldTypes.get(i), i)); + } + return fieldGetters; + } + + public static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { + final RecordData.FieldGetter fieldGetter; + // Ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> { + DecimalData decimalData = + row.getDecimal(fieldPos, decimalPrecision, decimalScale); + return Decimal.fromBigDecimal( + decimalData.toBigDecimal(), decimalPrecision, decimalScale); + }; + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case BIGINT: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case INTEGER: + fieldGetter = row -> row.getInt(fieldPos); + break; + case DATE: + fieldGetter = row -> (int) row.getDate(fieldPos).toEpochDay(); + break; + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = row -> (int) row.getTime(fieldPos).toMillisOfDay(); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromSQLTimestamp( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toTimestamp()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromInstant( + row.getLocalZonedTimestampData( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toInstant()); + break; + case ROW: + final int rowFieldCount = DataTypeChecks.getFieldCount(fieldType); + fieldGetter = new BinaryFieldDataGetter(fieldPos, DataTypeRoot.ROW, rowFieldCount); + break; + case ARRAY: + case MAP: + fieldGetter = new BinaryFieldDataGetter(fieldPos, fieldType.getTypeRoot()); + break; + default: + throw new IllegalArgumentException( + "Don't support type of " + fieldType.getTypeRoot()); + } + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); + }; + } + + /** A helper class to create FieldGetter and GenericRow. */ + public static class BinaryFieldDataGetter implements RecordData.FieldGetter { + private final int fieldPos; + private final DataTypeRoot dataTypeRoot; + private final int rowFieldCount; + + BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot) { + this(fieldPos, dataTypeRoot, -1); + } + + BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot, int rowFieldCount) { + this.fieldPos = fieldPos; + this.dataTypeRoot = dataTypeRoot; + this.rowFieldCount = rowFieldCount; + } + + @Override + public Object getFieldOrNull(RecordData row) { + switch (dataTypeRoot) { + case ARRAY: + return getArrayField(row); + case MAP: + return getMapField(row); + case ROW: + return getRecordField(row); + default: + throw new IllegalArgumentException("Unsupported field type: " + dataTypeRoot); + } + } + + private Object getArrayField(RecordData row) { + ArrayData arrayData = row.getArray(fieldPos); + if (!(arrayData instanceof BinaryArrayData)) { + throw new IllegalArgumentException( + "Expected BinaryArrayData but was " + arrayData.getClass().getSimpleName()); + } + BinaryArrayData binaryArrayData = (BinaryArrayData) arrayData; + return convertSegments( + binaryArrayData.getSegments(), + binaryArrayData.getOffset(), + binaryArrayData.getSizeInBytes(), + MemorySegmentUtils::readArrayData); + } + + private Object getMapField(RecordData row) { + MapData mapData = row.getMap(fieldPos); + if (!(mapData instanceof BinaryMapData)) { + throw new IllegalArgumentException( + "Expected BinaryMapData but was " + mapData.getClass().getSimpleName()); + } + BinaryMapData binaryMapData = (BinaryMapData) mapData; + return convertSegments( + binaryMapData.getSegments(), + binaryMapData.getOffset(), + binaryMapData.getSizeInBytes(), + MemorySegmentUtils::readMapData); + } + + private Object getRecordField(RecordData row) { + RecordData recordData = row.getRow(fieldPos, rowFieldCount); + if (!(recordData instanceof BinaryRecordData)) { + throw new IllegalArgumentException( + "Expected BinaryRecordData but was " + + recordData.getClass().getSimpleName()); + } + BinaryRecordData binaryRecordData = (BinaryRecordData) recordData; + return convertSegments( + binaryRecordData.getSegments(), + binaryRecordData.getOffset(), + binaryRecordData.getSizeInBytes(), + (segments, offset, sizeInBytes) -> + MemorySegmentUtils.readRowData( + segments, rowFieldCount, offset, sizeInBytes)); + } + + private T convertSegments( + MemorySegment[] segments, + int offset, + int sizeInBytes, + SegmentConverter converter) { + org.apache.paimon.memory.MemorySegment[] paimonMemorySegments = + new org.apache.paimon.memory.MemorySegment[segments.length]; + for (int i = 0; i < segments.length; i++) { + MemorySegment currMemorySegment = segments[i]; + ByteBuffer byteBuffer = currMemorySegment.wrap(0, currMemorySegment.size()); + + // Allocate a new byte array and copy the data from the ByteBuffer + byte[] bytes = new byte[currMemorySegment.size()]; + byteBuffer.get(bytes); + + paimonMemorySegments[i] = org.apache.paimon.memory.MemorySegment.wrap(bytes); + } + return converter.convert(paimonMemorySegments, offset, sizeInBytes); + } + + private interface SegmentConverter { + T convert( + org.apache.paimon.memory.MemorySegment[] segments, int offset, int sizeInBytes); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonTypeConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonTypeConverter.java new file mode 100644 index 000000000000..7465a7420c8a --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonTypeConverter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataType; + +import org.apache.flink.cdc.common.types.LocalZonedTimestampType; +import org.apache.flink.cdc.common.types.TimestampType; +import org.apache.flink.cdc.common.types.ZonedTimestampType; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; + +/** Type converter from Flink CDC to Paimon. */ +public class FlinkCDCToPaimonTypeConverter { + + public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00"; + + public static final String INVALID_OR_MISSING_DATATIME = "0000-00-00 00:00:00"; + + /** Convert Flink CDC schema to Paimon schema. */ + public static Schema convertFlinkCDCSchemaToPaimonSchema( + org.apache.flink.cdc.common.schema.Schema schema) { + Schema.Builder builder = new Schema.Builder(); + schema.getColumns() + .forEach( + (column) -> + builder.column( + column.getName(), + convertFlinkCDCDataTypeToPaimonDataType(column.getType()), + column.getComment(), + convertFlinkCDCDefaultValueToValidValue( + column.getDefaultValueExpression(), + column.getType()))); + builder.primaryKey(schema.primaryKeys()) + .partitionKeys(schema.partitionKeys()) + .comment(schema.comment()) + .options(schema.options()); + return builder.build(); + } + + /** Convert Flink CDC data type to Paimon data type. */ + public static DataType convertFlinkCDCDataTypeToPaimonDataType( + org.apache.flink.cdc.common.types.DataType dataType) { + return LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(dataType).getLogicalType()); + } + + /** Convert Flink CDC default value to a valid value of Paimon. */ + public static String convertFlinkCDCDefaultValueToValidValue( + String defaultValue, org.apache.flink.cdc.common.types.DataType dataType) { + if (defaultValue == null) { + return null; + } + + if (dataType instanceof LocalZonedTimestampType + || dataType instanceof TimestampType + || dataType instanceof ZonedTimestampType) { + + if (INVALID_OR_MISSING_DATATIME.equals(defaultValue)) { + return DEFAULT_DATETIME; + } + } + + return defaultValue; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCDataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCDataConverter.java new file mode 100644 index 000000000000..a1f7b85b13ef --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCDataConverter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.data.InternalRow; + +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** Converter from Paimon to Flink CDC data. */ +public class PaimonToFlinkCDCDataConverter { + + /** Convert Paimon row to Flink CDC data. */ + public static DataChangeEvent convertRowToDataChangeEvent( + TableId tableId, + InternalRow row, + List fieldGetters, + BinaryRecordDataGenerator recordDataGenerator) { + Object[] objects = new Object[row.getFieldCount()]; + for (int i = 0; i < row.getFieldCount(); i++) { + objects[i] = fieldGetters.get(i).getFieldOrNull(row); + } + BinaryRecordData binaryRecordData = recordDataGenerator.generate(objects); + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + { + return DataChangeEvent.insertEvent(tableId, binaryRecordData, new HashMap<>()); + } + case DELETE: + case UPDATE_BEFORE: + { + return DataChangeEvent.deleteEvent(tableId, binaryRecordData, new HashMap<>()); + } + default: + throw new IllegalArgumentException("don't support type of " + row.getRowKind()); + } + } + + public static List createFieldGetters(List fieldTypes) { + List fieldGetters = new ArrayList<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters.add(createFieldGetter(fieldTypes.get(i), i)); + } + return fieldGetters; + } + + public static InternalRow.FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { + final InternalRow.FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = + row -> BinaryStringData.fromString(row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> + DecimalData.fromBigDecimal( + row.getDecimal(fieldPos, decimalPrecision, decimalScale) + .toBigDecimal(), + decimalPrecision, + decimalScale); + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case BIGINT: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case INTEGER: + fieldGetter = row -> row.getInt(fieldPos); + break; + case DATE: + fieldGetter = row -> row.getInt(fieldPos); + break; + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = row -> row.getInt(fieldPos); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + fieldGetter = + row -> + TimestampData.fromTimestamp( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toSQLTimestamp()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + fieldGetter = + row -> + LocalZonedTimestampData.fromInstant( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toInstant()); + break; + default: + throw new IllegalArgumentException( + "don't support type of " + fieldType.getTypeRoot()); + } + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); + }; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCTypeConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCTypeConverter.java new file mode 100644 index 000000000000..2c369ff9a059 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCTypeConverter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataType; + +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.table.types.utils.TypeConversions; + +/** Type converter from Paimon to Flink CDC. */ +public class PaimonToFlinkCDCTypeConverter { + + /** Convert Paimon schema to Flink CDC schema. */ + public static org.apache.flink.cdc.common.schema.Schema convertPaimonSchemaToFlinkCDCSchema( + Schema schema) { + org.apache.flink.cdc.common.schema.Schema.Builder builder = + new org.apache.flink.cdc.common.schema.Schema.Builder(); + schema.fields() + .forEach( + (column) -> + builder.physicalColumn( + column.name(), + convertFlinkCDCDataTypeToPaimonDataType(column.type()), + column.description(), + column.defaultValue())); + builder.primaryKey(schema.primaryKeys()) + .partitionKey(schema.partitionKeys()) + .comment(schema.comment()) + .options(schema.options()); + return builder.build(); + } + + /** Convert Paimon data type to Flink CDC data type. */ + public static org.apache.flink.cdc.common.types.DataType + convertFlinkCDCDataTypeToPaimonDataType(DataType dataType) { + return DataTypeUtils.fromFlinkDataType( + TypeConversions.fromLogicalToDataType( + LogicalTypeConversion.toLogicalType(dataType))); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java index 92c2a7243a7c..812df31e61cc 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java @@ -298,7 +298,7 @@ public void testNewlyAddedTablesOptionsChange() throws Exception { JobClient jobClient = runActionWithDefaultEnv(action1); waitingTables("t3"); - jobClient.cancel(); + jobClient.cancel().get(); tableConfig.put("sink.savepoint.auto-tag", "true"); tableConfig.put("tag.num-retained-max", "5"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java index 2d8489dc23df..d2834582a52d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java @@ -186,7 +186,7 @@ public void testOptionsChange() throws Exception { syncTableActionBuilder(mongodbConfig).withTableConfig(tableConfig).build(); JobClient jobClient = runActionWithDefaultEnv(action1); waitingTables(tableName); - jobClient.cancel(); + jobClient.cancel().get(); tableConfig.put("sink.savepoint.auto-tag", "true"); tableConfig.put("tag.num-retained-max", "5"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java index 010041268c8a..3912169817a1 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.java @@ -20,7 +20,7 @@ import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -40,14 +40,14 @@ public class MySqlActionITCaseBase extends CdcActionITCaseBase { private static final Logger LOG = LoggerFactory.getLogger(MySqlActionITCaseBase.class); - protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7); + protected final MySqlContainer mysqlContainer = createMySqlContainer(MySqlVersion.V5_7); private static final String USER = "paimonuser"; private static final String PASSWORD = "paimonpw"; - @AfterAll - public static void stopContainers() { + @AfterEach + public void stopContainers() { LOG.info("Stopping containers..."); - MYSQL_CONTAINER.stop(); + mysqlContainer.stop(); LOG.info("Containers are stopped."); } @@ -61,25 +61,25 @@ private static MySqlContainer createMySqlContainer(MySqlVersion version) { .withLogConsumer(new Slf4jLogConsumer(LOG)); } - protected static void start() { + protected void start() { LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); + Startables.deepStart(Stream.of(mysqlContainer)).join(); LOG.info("Containers are started."); } protected Statement getStatement() throws SQLException { Connection conn = DriverManager.getConnection( - MYSQL_CONTAINER.getJdbcUrl(), - MYSQL_CONTAINER.getUsername(), - MYSQL_CONTAINER.getPassword()); + mysqlContainer.getJdbcUrl(), + mysqlContainer.getUsername(), + mysqlContainer.getPassword()); return conn.createStatement(); } protected Map getBasicMySqlConfig() { Map config = new HashMap<>(); - config.put("hostname", MYSQL_CONTAINER.getHost()); - config.put("port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + config.put("hostname", mysqlContainer.getHost()); + config.put("port", String.valueOf(mysqlContainer.getDatabasePort())); config.put("username", USER); config.put("password", PASSWORD); // see mysql/my.cnf in test resources diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java index de48d7046861..2417ec105a81 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlCdcTypeMappingITCase.java @@ -25,7 +25,7 @@ import org.apache.paimon.types.RowType; import org.apache.flink.cdc.debezium.utils.JdbcUrlUtils; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -50,9 +50,9 @@ /** IT test for {@link TypeMapping} in MySQL CDC. */ public class MySqlCdcTypeMappingITCase extends MySqlActionITCaseBase { - @BeforeAll - public static void startContainers() { - MYSQL_CONTAINER.withSetupSQL("mysql/type_mapping_test_setup.sql"); + @BeforeEach + public void startContainers() { + mysqlContainer.withSetupSQL("mysql/type_mapping_test_setup.sql"); start(); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java index 10ee548125bc..1f42344910ff 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; @@ -63,9 +63,9 @@ public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase { @TempDir java.nio.file.Path tempDir; - @BeforeAll - public static void startContainers() { - MYSQL_CONTAINER.withSetupSQL("mysql/sync_database_setup.sql"); + @BeforeEach + public void startContainers() { + mysqlContainer.withSetupSQL("mysql/sync_database_setup.sql"); start(); } @@ -1238,7 +1238,7 @@ public void testNewlyAddedTablesOptionsChange() throws Exception { JobClient jobClient = runActionWithDefaultEnv(action1); waitingTables("t1"); - jobClient.cancel(); + jobClient.cancel().get(); tableConfig.put("sink.savepoint.auto-tag", "true"); tableConfig.put("tag.num-retained-max", "5"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java index 21866a0a2fbc..318ce3c6dee0 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java @@ -24,7 +24,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -41,9 +41,9 @@ /** Test if the table list in {@link MySqlSyncDatabaseAction} is correct. */ public class MySqlSyncDatabaseTableListITCase extends MySqlActionITCaseBase { - @BeforeAll - public static void startContainers() { - MYSQL_CONTAINER.withSetupSQL("mysql/tablelist_test_setup.sql"); + @BeforeEach + public void startContainers() { + mysqlContainer.withSetupSQL("mysql/tablelist_test_setup.sql"); start(); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 065b9262049b..9be323703689 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -36,7 +36,7 @@ import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -68,9 +68,9 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { private static final String DATABASE_NAME = "paimon_sync_table"; - @BeforeAll - public static void startContainers() { - MYSQL_CONTAINER.withSetupSQL("mysql/sync_table_setup.sql"); + @BeforeEach + public void startContainers() { + mysqlContainer.withSetupSQL("mysql/sync_table_setup.sql"); start(); } @@ -1271,7 +1271,7 @@ public void testOptionsChange() throws Exception { "INSERT INTO test_options_change VALUES (2, '2023-03-23', null, null)"); } waitingTables(tableName); - jobClient.cancel(); + jobClient.cancel().get(); tableConfig.put("sink.savepoint.auto-tag", "true"); tableConfig.put("tag.num-retained-max", "5"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java index 52e2fee0ab47..77dd7e3b5eed 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java @@ -22,7 +22,7 @@ import org.apache.flink.cdc.connectors.postgres.source.PostgresConnectionPoolFactory; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -53,7 +53,7 @@ public class PostgresActionITCaseBase extends CdcActionITCaseBase { protected static final DockerImageName PG_IMAGE = DockerImageName.parse("postgres:13").asCompatibleSubstituteFor("postgres"); - protected static final PostgresContainer POSTGRES_CONTAINER = + protected final PostgresContainer postgresContainer = new PostgresContainer(PG_IMAGE) .withDatabaseName(DEFAULT_DB) .withUsername(USER) @@ -72,16 +72,16 @@ public class PostgresActionITCaseBase extends CdcActionITCaseBase { "-c", "max_wal_senders=20"); - protected static void start() { + protected void start() { LOG.info("Starting containers..."); - Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join(); + Startables.deepStart(Stream.of(postgresContainer)).join(); LOG.info("Containers are started."); } - @AfterAll - public static void stopContainers() { + @AfterEach + public void stopContainers() { LOG.info("Stopping containers..."); - POSTGRES_CONTAINER.stop(); + postgresContainer.stop(); LOG.info("Containers are stopped."); } @@ -89,23 +89,21 @@ protected Statement getStatement(String databaseName) throws SQLException { String jdbcUrl = String.format( PostgresConnectionPoolFactory.JDBC_URL_PATTERN, - POSTGRES_CONTAINER.getHost(), - POSTGRES_CONTAINER.getDatabasePort(), + postgresContainer.getHost(), + postgresContainer.getDatabasePort(), databaseName); Connection conn = DriverManager.getConnection( - jdbcUrl, - POSTGRES_CONTAINER.getUsername(), - POSTGRES_CONTAINER.getPassword()); + jdbcUrl, postgresContainer.getUsername(), postgresContainer.getPassword()); return conn.createStatement(); } protected Map getBasicPostgresConfig() { Map config = new HashMap<>(); - config.put(PostgresSourceOptions.HOSTNAME.key(), POSTGRES_CONTAINER.getHost()); + config.put(PostgresSourceOptions.HOSTNAME.key(), postgresContainer.getHost()); config.put( PostgresSourceOptions.PG_PORT.key(), - String.valueOf(POSTGRES_CONTAINER.getDatabasePort())); + String.valueOf(postgresContainer.getDatabasePort())); config.put(PostgresSourceOptions.USERNAME.key(), USER); config.put(PostgresSourceOptions.PASSWORD.key(), PASSWORD); config.put(PostgresSourceOptions.SLOT_NAME.key(), getSlotName()); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java index 8e0efd110be9..fa8fa27be408 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java @@ -27,7 +27,7 @@ import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions; import org.apache.flink.core.execution.JobClient; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -49,9 +49,9 @@ public class PostgresSyncTableActionITCase extends PostgresActionITCaseBase { private static final String DATABASE_NAME = "paimon_sync_table"; private static final String SCHEMA_NAME = "public"; - @BeforeAll - public static void startContainers() { - POSTGRES_CONTAINER.withSetupSQL("postgres/sync_table_setup.sql"); + @BeforeEach + public void startContainers() { + postgresContainer.withSetupSQL("postgres/sync_table_setup.sql"); start(); } @@ -703,7 +703,7 @@ public void testOptionsChange() throws Exception { "INSERT INTO test_options_change VALUES (2, '2023-03-23', null)"); } waitingTables(tableName); - jobClient.cancel(); + jobClient.cancel().get(); tableConfig.put("sink.savepoint.auto-tag", "true"); tableConfig.put("tag.num-retained-max", "5"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplierTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplierTest.java new file mode 100644 index 000000000000..387c2055d130 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplierTest.java @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.schema; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.FileSystemCatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.pipeline.cdc.util.FlinkCDCToPaimonTypeConverter; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.DataType; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** Tests for {@link PaimonMetadataApplier}. */ +class PaimonMetadataApplierTest { + + @TempDir public static java.nio.file.Path temporaryFolder; + + private static Catalog catalog; + + private static Options catalogOptions; + + private static MetadataApplier metadataApplier; + + @BeforeAll + public static void initialize() { + catalogOptions = new Options(); + catalogOptions.setString( + CatalogOptions.METASTORE.key(), FileSystemCatalogFactory.IDENTIFIER); + catalogOptions.setString( + CatalogOptions.WAREHOUSE.key(), + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString()); + catalogOptions.set(CatalogOptions.CACHE_ENABLED.key(), "false"); + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + metadataApplier = new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + } + + @AfterAll + public static void close() throws Exception { + if (catalog != null) { + catalog.close(); + } + metadataApplier.close(); + } + + @Test + void testApplySchemaChange() throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + null, + "col3DefValue"))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField( + 2, "col3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + + Map nameMapping = new HashMap<>(); + nameMapping.put("col2", "newcol2"); + nameMapping.put("col3", "newcol3"); + RenameColumnEvent renameColumnEvent = new RenameColumnEvent(tableId, nameMapping); + metadataApplier.applySchemaChange(renameColumnEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "newcol2", DataTypes.INT()), + new DataField( + 2, "newcol3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + + Map typeMapping = new HashMap<>(); + typeMapping.put("newcol2", org.apache.flink.cdc.common.types.DataTypes.STRING()); + AlterColumnTypeEvent alterColumnTypeEvent = + new AlterColumnTypeEvent(TableId.parse(tableId.identifier()), typeMapping); + metadataApplier.applySchemaChange(alterColumnTypeEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "newcol2", DataTypes.STRING()), + new DataField( + 2, "newcol3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + + DropColumnEvent dropColumnEvent = + new DropColumnEvent(tableId, Collections.singletonList("newcol2")); + metadataApplier.applySchemaChange(dropColumnEvent); + // id of DataField should keep the same as before dropping column + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField( + 2, "newcol3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + + // Create table with partition column. + tableId = TableId.tableId(databaseName, "table_with_partition"); + createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "dt", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .primaryKey("col1", "dt") + .partitionKey("dt") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "dt", DataTypes.INT().notNull()))); + Table tableWithPartition = catalog.getTable(Identifier.fromString(tableId.identifier())); + Assertions.assertThat(tableWithPartition.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(tableWithPartition.primaryKeys()) + .isEqualTo(Arrays.asList("col1", "dt")); + // Create table with upper case. + tableId = TableId.tableId(databaseName, "table_with_upper_case"); + catalogOptions.setString(CatalogOptions.CASE_SENSITIVE.key(), "true"); + PaimonMetadataApplier anotherMetadataApplier = new PaimonMetadataApplier(catalogOptions); + createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "COL1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("COL1") + .build()); + anotherMetadataApplier.applySchemaChange(createTableEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "COL1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + } + + @Test + public void testCreateTableWithoutPrimaryKey() + throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col4", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .build()); + metadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING()), + new DataField(3, "col4", DataTypes.STRING()))); + Assertions.assertThat(table.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(table.primaryKeys()).isEmpty(); + Assertions.assertThat(table.partitionKeys()).isEmpty(); + Assertions.assertThat(table.options()).containsEntry(CoreOptions.BUCKET.key(), "-1"); + } + + @Test + void testCreateTableWithOptions() throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + Map> partitionMaps = new HashMap<>(); + partitionMaps.put(tableId, Arrays.asList("col3", "col4")); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + MetadataApplier anotherMetadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, partitionMaps); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col4", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("col1", "col3", "col4") + .build()); + anotherMetadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING().notNull()), + new DataField(3, "col4", DataTypes.STRING().notNull()))); + Assertions.assertThat(table.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(table.primaryKeys()).isEqualTo(Arrays.asList("col1", "col3", "col4")); + Assertions.assertThat(table.partitionKeys()).isEqualTo(Arrays.asList("col3", "col4")); + Assertions.assertThat(table.options()).containsEntry(CoreOptions.BUCKET.key(), "-1"); + } + + @Test + void testCreateTableWithAllDataTypes() + throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "boolean", + org.apache.flink.cdc.common.types.DataTypes.BOOLEAN()) + .physicalColumn( + "binary", + org.apache.flink.cdc.common.types.DataTypes.BINARY(3)) + .physicalColumn( + "varbinary", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(10)) + .physicalColumn( + "bytes", + org.apache.flink.cdc.common.types.DataTypes.BYTES()) + .physicalColumn( + "tinyint", + org.apache.flink.cdc.common.types.DataTypes.TINYINT()) + .physicalColumn( + "smallint", + org.apache.flink.cdc.common.types.DataTypes.SMALLINT()) + .physicalColumn( + "int", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "float", + org.apache.flink.cdc.common.types.DataTypes.FLOAT()) + .physicalColumn( + "double", + org.apache.flink.cdc.common.types.DataTypes.DOUBLE()) + .physicalColumn( + "decimal", + org.apache.flink.cdc.common.types.DataTypes.DECIMAL(6, 3)) + .physicalColumn( + "char", org.apache.flink.cdc.common.types.DataTypes.CHAR(5)) + .physicalColumn( + "varchar", + org.apache.flink.cdc.common.types.DataTypes.VARCHAR(10)) + .physicalColumn( + "string", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "date", org.apache.flink.cdc.common.types.DataTypes.DATE()) + .physicalColumn( + "time", org.apache.flink.cdc.common.types.DataTypes.TIME()) + .physicalColumn( + "time_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIME(6)) + .physicalColumn( + "timestamp", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP()) + .physicalColumn( + "timestamp_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(3)) + .physicalColumn( + "timestamp_ltz", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ()) + .physicalColumn( + "timestamp_ltz_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ( + 3)) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "boolean", DataTypes.BOOLEAN()), + new DataField(2, "binary", DataTypes.BINARY(3)), + new DataField(3, "varbinary", DataTypes.VARBINARY(10)), + new DataField(4, "bytes", DataTypes.BYTES()), + new DataField(5, "tinyint", DataTypes.TINYINT()), + new DataField(6, "smallint", DataTypes.SMALLINT()), + new DataField(7, "int", DataTypes.INT()), + new DataField(8, "float", DataTypes.FLOAT()), + new DataField(9, "double", DataTypes.DOUBLE()), + new DataField(10, "decimal", DataTypes.DECIMAL(6, 3)), + new DataField(11, "char", DataTypes.CHAR(5)), + new DataField(12, "varchar", DataTypes.VARCHAR(10)), + new DataField(13, "string", DataTypes.STRING()), + new DataField(14, "date", DataTypes.DATE()), + new DataField(15, "time", DataTypes.TIME(0)), + new DataField(16, "time_with_precision", DataTypes.TIME(6)), + new DataField(17, "timestamp", DataTypes.TIMESTAMP(6)), + new DataField( + 18, "timestamp_with_precision", DataTypes.TIMESTAMP(3)), + new DataField( + 19, + "timestamp_ltz", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)), + new DataField( + 20, + "timestamp_ltz_with_precision", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + } + + @Test + void testAddColumnWithPosition() throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes + .STRING()))); // default last position. + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "col3", DataTypes.STRING()))); + + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + + addedColumns.clear(); + + addedColumns.add( + AddColumnEvent.before( + Column.physicalColumn( + "col4_first_before", + org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col1")); + addedColumns.add( + AddColumnEvent.first( + Column.physicalColumn( + "col4_first", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "col5_last", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.before( + Column.physicalColumn( + "col6_before", + org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + addedColumns.add( + AddColumnEvent.after( + Column.physicalColumn( + "col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + + addColumnEvent = new AddColumnEvent(tableId, addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + tableSchema = + new RowType( + Arrays.asList( + new DataField(4, "col4_first", DataTypes.STRING()), + new DataField(3, "col4_first_before", DataTypes.STRING()), + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(6, "col6_before", DataTypes.STRING()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(7, "col7_after", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING()), + new DataField(5, "col5_last", DataTypes.STRING()))); + + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + } + + @Test + public void testCreateTableWithComment() + throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "test.table_with_comment"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull(), + "comment of col1") + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col2") + .physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col3") + .physicalColumn( + "col4", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col4") + .comment("comment of table_with_comment") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField( + 0, "col1", DataTypes.STRING().notNull(), "comment of col1"), + new DataField(1, "col2", DataTypes.STRING(), "comment of col2"), + new DataField(2, "col3", DataTypes.STRING(), "comment of col3"), + new DataField(3, "col4", DataTypes.STRING(), "comment of col4"))); + Assertions.assertThat(table.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(table.primaryKeys()).isEmpty(); + Assertions.assertThat(table.partitionKeys()).isEmpty(); + Assertions.assertThat(table.options()).containsEntry("bucket", "-1"); + Assertions.assertThat(table.comment()).contains("comment of table_with_comment"); + } + + @Test + public void testMysqlDefaultTimestampValueConversionInAddColumn() + throws SchemaEvolveException, Catalog.TableNotExistException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "timestamp_test"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "name", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "created_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(), + null, + FlinkCDCToPaimonTypeConverter.INVALID_OR_MISSING_DATATIME))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "updated_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(), + null, + FlinkCDCToPaimonTypeConverter.INVALID_OR_MISSING_DATATIME))); + + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse(tableId.identifier()), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + + Assertions.assertThat(table).isNotNull(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/DataConvertTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/DataConvertTest.java new file mode 100644 index 000000000000..8a5e4b7999c2 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/DataConvertTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.flink.cdc.common.data.DateData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimeData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +/** + * Data convert test for {@link PaimonToFlinkCDCDataConverter} and {@link + * FlinkCDCToPaimonDataConverter}. + */ +public class DataConvertTest { + + @Test + public void testFullTypesConverter() { + org.apache.flink.cdc.common.schema.Schema fullTypesSchema = + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "pk_string", + org.apache.flink.cdc.common.types.DataTypes.STRING().notNull()) + .physicalColumn( + "boolean", org.apache.flink.cdc.common.types.DataTypes.BOOLEAN()) + .physicalColumn( + "binary", org.apache.flink.cdc.common.types.DataTypes.BINARY(3)) + .physicalColumn( + "varbinary", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(10)) + .physicalColumn( + "bytes", org.apache.flink.cdc.common.types.DataTypes.BYTES()) + .physicalColumn( + "tinyint", org.apache.flink.cdc.common.types.DataTypes.TINYINT()) + .physicalColumn( + "smallint", org.apache.flink.cdc.common.types.DataTypes.SMALLINT()) + .physicalColumn("int", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "bigint", org.apache.flink.cdc.common.types.DataTypes.BIGINT()) + .physicalColumn( + "float", org.apache.flink.cdc.common.types.DataTypes.FLOAT()) + .physicalColumn( + "double", org.apache.flink.cdc.common.types.DataTypes.DOUBLE()) + .physicalColumn( + "decimal", + org.apache.flink.cdc.common.types.DataTypes.DECIMAL(6, 3)) + .physicalColumn("char", org.apache.flink.cdc.common.types.DataTypes.CHAR(5)) + .physicalColumn( + "varchar", org.apache.flink.cdc.common.types.DataTypes.VARCHAR(10)) + .physicalColumn( + "string", org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn("date", org.apache.flink.cdc.common.types.DataTypes.DATE()) + .physicalColumn("time", org.apache.flink.cdc.common.types.DataTypes.TIME()) + .physicalColumn( + "time_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIME(6)) + .physicalColumn( + "timestamp", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP()) + .physicalColumn( + "timestamp_with_precision_3", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(3)) + .physicalColumn( + "timestamp_with_precision_6", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(6)) + .physicalColumn( + "timestamp_with_precision_9", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(9)) + .physicalColumn( + "timestamp_ltz", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ()) + .physicalColumn( + "timestamp_ltz_with_precision_3", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(3)) + .physicalColumn( + "timestamp_ltz_with_precision_6", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(6)) + .physicalColumn( + "timestamp_ltz_with_precision_9", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(9)) + .primaryKey("pk_string") + .partitionKey("boolean") + .build(); + TableId tableId = TableId.tableId("testDatabase", "testTable"); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator( + fullTypesSchema.getColumnDataTypes().toArray(new DataType[0])); + Object[] testData = + new Object[] { + BinaryStringData.fromString("pk_string"), + true, + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6}, + new byte[] {7, 8, 9}, + (byte) 1, + (short) 2, + 3, + 4L, + 5.1f, + 6.2, + DecimalData.fromBigDecimal(new BigDecimal("7.123"), 6, 3), + BinaryStringData.fromString("test1"), + BinaryStringData.fromString("test2"), + BinaryStringData.fromString("test3"), + DateData.fromEpochDay(1000), + TimeData.fromMillisOfDay(200), + TimeData.fromMillisOfDay(300).toMillisOfDay(), + TimestampData.fromMillis(100, 1), + TimestampData.fromMillis(200, 2), + TimestampData.fromMillis(300, 3), + TimestampData.fromMillis(400, 4), + LocalZonedTimestampData.fromEpochMillis(300, 3), + LocalZonedTimestampData.fromEpochMillis(400, 4), + LocalZonedTimestampData.fromEpochMillis(500, 5), + LocalZonedTimestampData.fromEpochMillis(600, 6), + }; + org.apache.flink.cdc.common.event.DataChangeEvent dataChangeEvent = + DataChangeEvent.insertEvent(tableId, recordDataGenerator.generate(testData)); + + Assertions.assertEquals( + dataChangeEvent, + PaimonToFlinkCDCDataConverter.convertRowToDataChangeEvent( + tableId, + FlinkCDCToPaimonDataConverter.convertDataChangeEventToInternalRow( + dataChangeEvent, + FlinkCDCToPaimonDataConverter.createFieldGetters( + fullTypesSchema.getColumnDataTypes())), + PaimonToFlinkCDCDataConverter.createFieldGetters( + fullTypesSchema.getColumnDataTypes()), + recordDataGenerator)); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/TypeConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/TypeConverterTest.java new file mode 100644 index 000000000000..2115c1899147 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/TypeConverterTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Type converter test for {@link FlinkCDCToPaimonTypeConverter} and {@link + * PaimonToFlinkCDCTypeConverter}. + */ +public class TypeConverterTest { + + @Test + public void testFullTypesConverter() { + org.apache.flink.cdc.common.schema.Schema fullTypesSchema = + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "pk_string", + org.apache.flink.cdc.common.types.DataTypes.STRING().notNull()) + .physicalColumn( + "boolean", org.apache.flink.cdc.common.types.DataTypes.BOOLEAN()) + .physicalColumn( + "binary", org.apache.flink.cdc.common.types.DataTypes.BINARY(3)) + .physicalColumn( + "varbinary", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(10)) + .physicalColumn( + "bytes", org.apache.flink.cdc.common.types.DataTypes.BYTES()) + .physicalColumn( + "tinyint", org.apache.flink.cdc.common.types.DataTypes.TINYINT()) + .physicalColumn( + "smallint", org.apache.flink.cdc.common.types.DataTypes.SMALLINT()) + .physicalColumn("int", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "bigint", org.apache.flink.cdc.common.types.DataTypes.BIGINT()) + .physicalColumn( + "float", org.apache.flink.cdc.common.types.DataTypes.FLOAT()) + .physicalColumn( + "double", org.apache.flink.cdc.common.types.DataTypes.DOUBLE()) + .physicalColumn( + "decimal", + org.apache.flink.cdc.common.types.DataTypes.DECIMAL(6, 3)) + .physicalColumn("char", org.apache.flink.cdc.common.types.DataTypes.CHAR(5)) + .physicalColumn( + "varchar", org.apache.flink.cdc.common.types.DataTypes.VARCHAR(10)) + .physicalColumn( + "string", org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn("date", org.apache.flink.cdc.common.types.DataTypes.DATE()) + .physicalColumn("time", org.apache.flink.cdc.common.types.DataTypes.TIME()) + .physicalColumn( + "time_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIME(6)) + .physicalColumn( + "timestamp", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP()) + .physicalColumn( + "timestamp_with_precision_3", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(3)) + .physicalColumn( + "timestamp_with_precision_6", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(6)) + .physicalColumn( + "timestamp_with_precision_9", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(9)) + .physicalColumn( + "timestamp_ltz", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ()) + .physicalColumn( + "timestamp_ltz_with_precision_3", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(3)) + .physicalColumn( + "timestamp_ltz_with_precision_6", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(6)) + .physicalColumn( + "timestamp_ltz_with_precision_9", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(9)) + .primaryKey("pk_string") + .partitionKey("boolean") + .build(); + + Assertions.assertEquals( + fullTypesSchema, + PaimonToFlinkCDCTypeConverter.convertPaimonSchemaToFlinkCDCSchema( + FlinkCDCToPaimonTypeConverter.convertFlinkCDCSchemaToPaimonSchema( + fullTypesSchema))); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/log4j2-test.properties b/paimon-flink/paimon-flink-cdc/src/test/resources/log4j2-test.properties index 1b3980d15104..e27922dad60b 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/log4j2-test.properties +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/log4j2-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = INFO rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger