From 3bf9370c86fd38eb4f80433da15fd5a534f8a5e3 Mon Sep 17 00:00:00 2001 From: joyCurry30 Date: Thu, 16 May 2024 11:11:23 +0800 Subject: [PATCH 1/2] [FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. --- .../cdc/common/event/AddColumnEvent.java | 18 +++ .../paimon/sink/PaimonMetadataApplier.java | 118 ++++++++++++++++-- .../sink/PaimonMetadataApplierTest.java | 83 ++++++++++++ 3 files changed, 206 insertions(+), 13 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java index 9a18023739e..d41dfcb3a84 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java @@ -44,6 +44,24 @@ public AddColumnEvent(TableId tableId, List addedColumns) { this.addedColumns = addedColumns; } + public static AddColumnEvent.ColumnWithPosition first(Column addColumn) { + return new ColumnWithPosition(addColumn, ColumnPosition.FIRST, null); + } + + public static AddColumnEvent.ColumnWithPosition last(Column addColumn) { + return new ColumnWithPosition(addColumn, ColumnPosition.LAST, null); + } + + public static AddColumnEvent.ColumnWithPosition before( + Column addColumn, String existedColumnName) { + return new ColumnWithPosition(addColumn, ColumnPosition.BEFORE, existedColumnName); + } + + public static AddColumnEvent.ColumnWithPosition after( + Column addColumn, String existedColumnName) { + return new ColumnWithPosition(addColumn, ColumnPosition.AFTER, existedColumnName); + } + /** Returns the added columns. */ public List getAddedColumns() { return addedColumns; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 9253108ca12..57645492490 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -34,12 +34,16 @@ import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument; +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + /** * A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table * only. @@ -129,25 +133,113 @@ private void applyCreateTable(CreateTableEvent event) private void applyAddColumn(AddColumnEvent event) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { - List tableChangeList = new ArrayList<>(); - event.getAddedColumns() - .forEach( - (column) -> { - SchemaChange tableChange = - SchemaChange.addColumn( - column.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType( - column.getAddColumn().getType()) - .getLogicalType())); - tableChangeList.add(tableChange); - }); + List tableChangeList = applyAddColumnEventWithPosition(event); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, true); } + private List applyAddColumnEventWithPosition(AddColumnEvent event) + throws Catalog.TableNotExistException { + List tableChangeList = new ArrayList<>(); + for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + SchemaChange tableChange; + switch (columnWithPosition.getPosition()) { + case FIRST: + tableChange = + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType( + columnWithPosition + .getAddColumn() + .getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment(), + SchemaChange.Move.first( + columnWithPosition.getAddColumn().getName())); + tableChangeList.add(tableChange); + break; + case LAST: + SchemaChange schemaChangeWithLastPosition = + applyAddColumnWithLastPosition(columnWithPosition); + tableChangeList.add(schemaChangeWithLastPosition); + break; + case BEFORE: + SchemaChange schemaChangeWithBeforePosition = + applyAddColumnWithBeforePosition( + event.tableId().getSchemaName(), + event.tableId().getTableName(), + columnWithPosition); + tableChangeList.add(schemaChangeWithBeforePosition); + break; + case AFTER: + checkNotNull( + columnWithPosition.getExistedColumnName(), + "Existing column name must be provided for AFTER position"); + tableChange = + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType( + columnWithPosition + .getAddColumn() + .getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment(), + SchemaChange.Move.after( + columnWithPosition.getAddColumn().getName(), + columnWithPosition.getExistedColumnName())); + tableChangeList.add(tableChange); + break; + default: + throw new IllegalArgumentException( + "Unknown column position: " + columnWithPosition.getPosition()); + } + } + return tableChangeList; + } + + private SchemaChange applyAddColumnWithLastPosition( + AddColumnEvent.ColumnWithPosition columnWithPosition) { + return SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment()); + } + + private SchemaChange applyAddColumnWithBeforePosition( + String schemaName, + String tableName, + AddColumnEvent.ColumnWithPosition columnWithPosition) + throws Catalog.TableNotExistException { + String existedColumnName = columnWithPosition.getExistedColumnName(); + Table table = catalog.getTable(new Identifier(schemaName, tableName)); + List columnNames = table.rowType().getFieldNames(); + int index = checkColumnPosition(existedColumnName, columnNames); + + return SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment(), + SchemaChange.Move.after( + columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1))); + } + + private int checkColumnPosition(String existedColumnName, List columnNames) { + if (existedColumnName == null) { + return 0; + } + int index = columnNames.indexOf(existedColumnName); + checkArgument(index == -1, "Column %s not found", existedColumnName); + return index; + } + private void applyDropColumn(DropColumnEvent event) throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, Catalog.ColumnNotExistException { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index dc24ff37ff6..fe29b467471 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -333,4 +333,87 @@ public void testCreateTableWithAllDataTypes(String metastore) Assertions.assertEquals( tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); } + + @ParameterizedTest + @ValueSource(strings = {"filesystem", "hive"}) + public void testAddColumnWithPosition(String metastore) + throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException, + Catalog.TableNotExistException { + initialize(metastore); + MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions); + + CreateTableEvent createTableEvent = + new CreateTableEvent( + TableId.parse("test.table1"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes + .STRING()))); // default last position. + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse("test.table1"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "col3", DataTypes.STRING()))); + + Assertions.assertEquals( + tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); + + addedColumns.clear(); + addedColumns.add( + AddColumnEvent.first( + Column.physicalColumn( + "col4_first", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "col5_last", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.before( + Column.physicalColumn( + "col6_before", + org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + addedColumns.add( + AddColumnEvent.after( + Column.physicalColumn( + "col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + + addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + tableSchema = + new RowType( + Arrays.asList( + new DataField(3, "col4_first", DataTypes.STRING()), + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(5, "col6_before", DataTypes.STRING()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(6, "col7_after", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING()), + new DataField(4, "col5_last", DataTypes.STRING()))); + + Assertions.assertEquals( + tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); + } } From a0d802e466a2ec888b7ee984da6c2439570c75ac Mon Sep 17 00:00:00 2001 From: joyCurry30 Date: Tue, 4 Jun 2024 10:54:32 +0800 Subject: [PATCH 2/2] [FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table. --- .../paimon/sink/PaimonMetadataApplier.java | 77 ++++------------- .../paimon/sink/SchemaChangeProvider.java | 86 +++++++++++++++++++ 2 files changed, 104 insertions(+), 59 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 57645492490..f71e9da1ab8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -148,22 +148,15 @@ private List applyAddColumnEventWithPosition(AddColumnEvent event) switch (columnWithPosition.getPosition()) { case FIRST: tableChange = - SchemaChange.addColumn( - columnWithPosition.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType( - columnWithPosition - .getAddColumn() - .getType()) - .getLogicalType()), - columnWithPosition.getAddColumn().getComment(), + SchemaChangeProvider.add( + columnWithPosition, SchemaChange.Move.first( columnWithPosition.getAddColumn().getName())); tableChangeList.add(tableChange); break; case LAST: SchemaChange schemaChangeWithLastPosition = - applyAddColumnWithLastPosition(columnWithPosition); + SchemaChangeProvider.add(columnWithPosition); tableChangeList.add(schemaChangeWithLastPosition); break; case BEFORE: @@ -178,19 +171,11 @@ private List applyAddColumnEventWithPosition(AddColumnEvent event) checkNotNull( columnWithPosition.getExistedColumnName(), "Existing column name must be provided for AFTER position"); - tableChange = - SchemaChange.addColumn( + SchemaChange.Move after = + SchemaChange.Move.after( columnWithPosition.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType( - columnWithPosition - .getAddColumn() - .getType()) - .getLogicalType()), - columnWithPosition.getAddColumn().getComment(), - SchemaChange.Move.after( - columnWithPosition.getAddColumn().getName(), - columnWithPosition.getExistedColumnName())); + columnWithPosition.getExistedColumnName()); + tableChange = SchemaChangeProvider.add(columnWithPosition, after); tableChangeList.add(tableChange); break; default: @@ -201,16 +186,6 @@ private List applyAddColumnEventWithPosition(AddColumnEvent event) return tableChangeList; } - private SchemaChange applyAddColumnWithLastPosition( - AddColumnEvent.ColumnWithPosition columnWithPosition) { - return SchemaChange.addColumn( - columnWithPosition.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) - .getLogicalType()), - columnWithPosition.getAddColumn().getComment()); - } - private SchemaChange applyAddColumnWithBeforePosition( String schemaName, String tableName, @@ -220,15 +195,11 @@ private SchemaChange applyAddColumnWithBeforePosition( Table table = catalog.getTable(new Identifier(schemaName, tableName)); List columnNames = table.rowType().getFieldNames(); int index = checkColumnPosition(existedColumnName, columnNames); - - return SchemaChange.addColumn( - columnWithPosition.getAddColumn().getName(), - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) - .getLogicalType()), - columnWithPosition.getAddColumn().getComment(), + SchemaChange.Move after = SchemaChange.Move.after( - columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1))); + columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1)); + + return SchemaChangeProvider.add(columnWithPosition, after); } private int checkColumnPosition(String existedColumnName, List columnNames) { @@ -236,7 +207,7 @@ private int checkColumnPosition(String existedColumnName, List columnNam return 0; } int index = columnNames.indexOf(existedColumnName); - checkArgument(index == -1, "Column %s not found", existedColumnName); + checkArgument(index != -1, "Column %s not found", existedColumnName); return index; } @@ -245,11 +216,7 @@ private void applyDropColumn(DropColumnEvent event) Catalog.ColumnNotExistException { List tableChangeList = new ArrayList<>(); event.getDroppedColumnNames() - .forEach( - (column) -> { - SchemaChange tableChange = SchemaChange.dropColumn(column); - tableChangeList.add(tableChange); - }); + .forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column))); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, @@ -262,10 +229,8 @@ private void applyRenameColumn(RenameColumnEvent event) List tableChangeList = new ArrayList<>(); event.getNameMapping() .forEach( - (oldName, newName) -> { - SchemaChange tableChange = SchemaChange.renameColumn(oldName, newName); - tableChangeList.add(tableChange); - }); + (oldName, newName) -> + tableChangeList.add(SchemaChangeProvider.rename(oldName, newName))); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, @@ -278,15 +243,9 @@ private void applyAlterColumn(AlterColumnTypeEvent event) List tableChangeList = new ArrayList<>(); event.getTypeMapping() .forEach( - (oldName, newType) -> { - SchemaChange tableChange = - SchemaChange.updateColumnType( - oldName, - LogicalTypeConversion.toDataType( - DataTypeUtils.toFlinkDataType(newType) - .getLogicalType())); - tableChangeList.add(tableChange); - }); + (oldName, newType) -> + tableChangeList.add( + SchemaChangeProvider.updateColumnType(oldName, newType))); catalog.alterTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), tableChangeList, diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java new file mode 100644 index 00000000000..d6972fbc643 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java @@ -0,0 +1,86 @@ +package org.apache.flink.cdc.connectors.paimon.sink; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.schema.SchemaChange; + +/** + * The SchemaChangeProvider class provides static methods to create SchemaChange objects that + * represent different types of schema modifications. + */ +public class SchemaChangeProvider { + + /** + * Creates a SchemaChange object for adding a column without specifying its position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @return A SchemaChange object representing the addition of a column. + */ + public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) { + return SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment()); + } + + /** + * Creates a SchemaChange object for adding a column with a specified position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @param move The move operation to indicate the column's new position. + * @return A SchemaChange object representing the addition of a column with position + * information. + */ + public static SchemaChange add( + AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) { + return SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType()) + .getLogicalType()), + columnWithPosition.getAddColumn().getComment(), + move); + } + + /** + * Creates a SchemaChange object to update the data type of a column. + * + * @param oldColumnName The name of the column whose data type is to be updated. + * @param newType The new DataType for the column. + * @return A SchemaChange object representing the update of the column's data type. + */ + public static SchemaChange updateColumnType(String oldColumnName, DataType newType) { + return SchemaChange.updateColumnType( + oldColumnName, + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(newType).getLogicalType())); + } + + /** + * Creates a SchemaChange object for renaming a column. + * + * @param oldColumnName The current name of the column to be renamed. + * @param newColumnName The new name for the column. + * @return A SchemaChange object representing the renaming of a column. + */ + public static SchemaChange rename(String oldColumnName, String newColumnName) { + return SchemaChange.renameColumn(oldColumnName, newColumnName); + } + + /** + * Creates a SchemaChange object for dropping a column. + * + * @param columnName The name of the column to be dropped. + * @return A SchemaChange object representing the deletion of a column. + */ + public static SchemaChange drop(String columnName) { + return SchemaChange.dropColumn(columnName); + } +}