diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java index 9a18023739e..d41dfcb3a84 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java @@ -44,6 +44,24 @@ public AddColumnEvent(TableId tableId, List addedColumns) { this.addedColumns = addedColumns; } + public static AddColumnEvent.ColumnWithPosition first(Column addColumn) { + return new ColumnWithPosition(addColumn, ColumnPosition.FIRST, null); + } + + public static AddColumnEvent.ColumnWithPosition last(Column addColumn) { + return new ColumnWithPosition(addColumn, ColumnPosition.LAST, null); + } + + public static AddColumnEvent.ColumnWithPosition before( + Column addColumn, String existedColumnName) { + return new ColumnWithPosition(addColumn, ColumnPosition.BEFORE, existedColumnName); + } + + public static AddColumnEvent.ColumnWithPosition after( + Column addColumn, String existedColumnName) { + return new ColumnWithPosition(addColumn, ColumnPosition.AFTER, existedColumnName); + } + /** Returns the added columns. */ public List getAddedColumns() { return addedColumns; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 9253108ca12..f71e9da1ab8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -34,12 +34,16 @@ import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument; +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + /** * A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table * only. @@ -129,35 +133,90 @@ private void applyCreateTable(CreateTableEvent event) private void applyAddColumn(AddColumnEvent event) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { - List tableChangeList = new ArrayList<>(); - event.getAddedColumns() - .forEach( - (column) -> { - SchemaChange tableChange = - SchemaChange.addColumn( - column.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType( - column.getAddColumn().getType()) - .getLogicalType())); - tableChangeList.add(tableChange); - }); + List tableChangeList = applyAddColumnEventWithPosition(event); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, true); } + private List applyAddColumnEventWithPosition(AddColumnEvent event) + throws Catalog.TableNotExistException { + List tableChangeList = new ArrayList<>(); + for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + SchemaChange tableChange; + switch (columnWithPosition.getPosition()) { + case FIRST: + tableChange = + SchemaChangeProvider.add( + columnWithPosition, + SchemaChange.Move.first( + columnWithPosition.getAddColumn().getName())); + tableChangeList.add(tableChange); + break; + case LAST: + SchemaChange schemaChangeWithLastPosition = + SchemaChangeProvider.add(columnWithPosition); + tableChangeList.add(schemaChangeWithLastPosition); + break; + case BEFORE: + SchemaChange schemaChangeWithBeforePosition = + applyAddColumnWithBeforePosition( + event.tableId().getSchemaName(), + event.tableId().getTableName(), + columnWithPosition); + tableChangeList.add(schemaChangeWithBeforePosition); + break; + case AFTER: + checkNotNull( + columnWithPosition.getExistedColumnName(), + "Existing column name must be provided for AFTER position"); + SchemaChange.Move after = + SchemaChange.Move.after( + columnWithPosition.getAddColumn().getName(), + columnWithPosition.getExistedColumnName()); + tableChange = SchemaChangeProvider.add(columnWithPosition, after); + tableChangeList.add(tableChange); + break; + default: + throw new IllegalArgumentException( + "Unknown column position: " + columnWithPosition.getPosition()); + } + } + return tableChangeList; + } + + private SchemaChange applyAddColumnWithBeforePosition( + String schemaName, + String tableName, + AddColumnEvent.ColumnWithPosition columnWithPosition) + throws Catalog.TableNotExistException { + String existedColumnName = columnWithPosition.getExistedColumnName(); + Table table = catalog.getTable(new Identifier(schemaName, tableName)); + List columnNames = table.rowType().getFieldNames(); + int index = checkColumnPosition(existedColumnName, columnNames); + SchemaChange.Move after = + SchemaChange.Move.after( + columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1)); + + return SchemaChangeProvider.add(columnWithPosition, after); + } + + private int checkColumnPosition(String existedColumnName, List columnNames) { + if (existedColumnName == null) { + return 0; + } + int index = columnNames.indexOf(existedColumnName); + checkArgument(index != -1, "Column %s not found", existedColumnName); + return index; + } + private void applyDropColumn(DropColumnEvent event) throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, Catalog.ColumnNotExistException { List tableChangeList = new ArrayList<>(); event.getDroppedColumnNames() - .forEach( - (column) -> { - SchemaChange tableChange = SchemaChange.dropColumn(column); - tableChangeList.add(tableChange); - }); + .forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column))); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, @@ -170,10 +229,8 @@ private void applyRenameColumn(RenameColumnEvent event) List tableChangeList = new ArrayList<>(); event.getNameMapping() .forEach( - (oldName, newName) -> { - SchemaChange tableChange = SchemaChange.renameColumn(oldName, newName); - tableChangeList.add(tableChange); - }); + (oldName, newName) -> + tableChangeList.add(SchemaChangeProvider.rename(oldName, newName))); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, @@ -186,15 +243,9 @@ private void applyAlterColumn(AlterColumnTypeEvent event) List tableChangeList = new ArrayList<>(); event.getTypeMapping() .forEach( - (oldName, newType) -> { - SchemaChange tableChange = - SchemaChange.updateColumnType( - oldName, - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType(newType) - .getLogicalType())); - tableChangeList.add(tableChange); - }); + (oldName, newType) -> + tableChangeList.add( + SchemaChangeProvider.updateColumnType(oldName, newType))); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java new file mode 100644 index 00000000000..0f966c0a994 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.schema.SchemaChange; + +/** + * The SchemaChangeProvider class provides static methods to create SchemaChange objects that + * represent different types of schema modifications. + */ +public class SchemaChangeProvider { + + /** + * Creates a SchemaChange object for adding a column without specifying its position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @return A SchemaChange object representing the addition of a column. + */ + public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) { + return SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment()); + } + + /** + * Creates a SchemaChange object for adding a column with a specified position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @param move The move operation to indicate the column's new position. + * @return A SchemaChange object representing the addition of a column with position + * information. + */ + public static SchemaChange add( + AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) { + return SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment(), + move); + } + + /** + * Creates a SchemaChange object to update the data type of a column. + * + * @param oldColumnName The name of the column whose data type is to be updated. + * @param newType The new DataType for the column. + * @return A SchemaChange object representing the update of the column's data type. + */ + public static SchemaChange updateColumnType(String oldColumnName, DataType newType) { + return SchemaChange.updateColumnType( + oldColumnName, + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(newType).getLogicalType())); + } + + /** + * Creates a SchemaChange object for renaming a column. + * + * @param oldColumnName The current name of the column to be renamed. + * @param newColumnName The new name for the column. + * @return A SchemaChange object representing the renaming of a column. + */ + public static SchemaChange rename(String oldColumnName, String newColumnName) { + return SchemaChange.renameColumn(oldColumnName, newColumnName); + } + + /** + * Creates a SchemaChange object for dropping a column. + * + * @param columnName The name of the column to be dropped. + * @return A SchemaChange object representing the deletion of a column. + */ + public static SchemaChange drop(String columnName) { + return SchemaChange.dropColumn(columnName); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index dc24ff37ff6..fe29b467471 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -333,4 +333,87 @@ public void testCreateTableWithAllDataTypes(String metastore) Assertions.assertEquals( tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); } + + @ParameterizedTest + @ValueSource(strings = {"filesystem", "hive"}) + public void testAddColumnWithPosition(String metastore) + throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException, + Catalog.TableNotExistException { + initialize(metastore); + MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.table1"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes + .STRING()))); // default last position. + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse("test.table1"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "col3", DataTypes.STRING()))); + + Assertions.assertEquals( + tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); + + addedColumns.clear(); + addedColumns.add( + AddColumnEvent.first( + Column.physicalColumn( + "col4_first", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "col5_last", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.before( + Column.physicalColumn( + "col6_before", + org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + addedColumns.add( + AddColumnEvent.after( + Column.physicalColumn( + "col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + + addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + tableSchema = + new RowType( + Arrays.asList( + new DataField(3, "col4_first", DataTypes.STRING()), + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(5, "col6_before", DataTypes.STRING()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(6, "col7_after", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING()), + new DataField(4, "col5_last", DataTypes.STRING()))); + + Assertions.assertEquals( + tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); + } }