Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ public AddColumnEvent(TableId tableId, List<ColumnWithPosition> addedColumns) {
this.addedColumns = addedColumns;
}

public static AddColumnEvent.ColumnWithPosition first(Column addColumn) {
return new ColumnWithPosition(addColumn, ColumnPosition.FIRST, null);
}

public static AddColumnEvent.ColumnWithPosition last(Column addColumn) {
return new ColumnWithPosition(addColumn, ColumnPosition.LAST, null);
}

public static AddColumnEvent.ColumnWithPosition before(
Column addColumn, String existedColumnName) {
return new ColumnWithPosition(addColumn, ColumnPosition.BEFORE, existedColumnName);
}

public static AddColumnEvent.ColumnWithPosition after(
Column addColumn, String existedColumnName) {
return new ColumnWithPosition(addColumn, ColumnPosition.AFTER, existedColumnName);
}

/** Returns the added columns. */
public List<ColumnWithPosition> getAddedColumns() {
return addedColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

/**
* A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table
* only.
Expand Down Expand Up @@ -129,35 +133,90 @@ private void applyCreateTable(CreateTableEvent event)
private void applyAddColumn(AddColumnEvent event)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getAddedColumns()
.forEach(
(column) -> {
SchemaChange tableChange =
SchemaChange.addColumn(
column.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(
column.getAddColumn().getType())
.getLogicalType()));
tableChangeList.add(tableChange);
});
List<SchemaChange> tableChangeList = applyAddColumnEventWithPosition(event);
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
true);
}

private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
throws Catalog.TableNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
SchemaChange tableChange;
switch (columnWithPosition.getPosition()) {
case FIRST:
tableChange =
SchemaChangeProvider.add(
columnWithPosition,
SchemaChange.Move.first(
columnWithPosition.getAddColumn().getName()));
tableChangeList.add(tableChange);
break;
case LAST:
SchemaChange schemaChangeWithLastPosition =
SchemaChangeProvider.add(columnWithPosition);
tableChangeList.add(schemaChangeWithLastPosition);
break;
case BEFORE:
SchemaChange schemaChangeWithBeforePosition =
applyAddColumnWithBeforePosition(
event.tableId().getSchemaName(),
event.tableId().getTableName(),
columnWithPosition);
tableChangeList.add(schemaChangeWithBeforePosition);
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
SchemaChange.Move after =
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
tableChangeList.add(tableChange);
break;
default:
throw new IllegalArgumentException(
"Unknown column position: " + columnWithPosition.getPosition());
}
}
return tableChangeList;
}

private SchemaChange applyAddColumnWithBeforePosition(
String schemaName,
String tableName,
AddColumnEvent.ColumnWithPosition columnWithPosition)
throws Catalog.TableNotExistException {
String existedColumnName = columnWithPosition.getExistedColumnName();
Table table = catalog.getTable(new Identifier(schemaName, tableName));
List<String> columnNames = table.rowType().getFieldNames();
int index = checkColumnPosition(existedColumnName, columnNames);
SchemaChange.Move after =
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1));

return SchemaChangeProvider.add(columnWithPosition, after);
}

private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
if (existedColumnName == null) {
return 0;
}
int index = columnNames.indexOf(existedColumnName);
checkArgument(index != -1, "Column %s not found", existedColumnName);
return index;
}

private void applyDropColumn(DropColumnEvent event)
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
Catalog.ColumnNotExistException {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getDroppedColumnNames()
.forEach(
(column) -> {
SchemaChange tableChange = SchemaChange.dropColumn(column);
tableChangeList.add(tableChange);
});
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
Expand All @@ -170,10 +229,8 @@ private void applyRenameColumn(RenameColumnEvent event)
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getNameMapping()
.forEach(
(oldName, newName) -> {
SchemaChange tableChange = SchemaChange.renameColumn(oldName, newName);
tableChangeList.add(tableChange);
});
(oldName, newName) ->
tableChangeList.add(SchemaChangeProvider.rename(oldName, newName)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
Expand All @@ -186,15 +243,9 @@ private void applyAlterColumn(AlterColumnTypeEvent event)
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getTypeMapping()
.forEach(
(oldName, newType) -> {
SchemaChange tableChange =
SchemaChange.updateColumnType(
oldName,
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(newType)
.getLogicalType()));
tableChangeList.add(tableChange);
});
(oldName, newType) ->
tableChangeList.add(
SchemaChangeProvider.updateColumnType(oldName, newType)));
catalog.alterTable(
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
tableChangeList,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.paimon.sink;

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;

import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.schema.SchemaChange;

/**
* The SchemaChangeProvider class provides static methods to create SchemaChange objects that
* represent different types of schema modifications.
*/
public class SchemaChangeProvider {

/**
* Creates a SchemaChange object for adding a column without specifying its position.
*
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
* intended position within the schema.
* @return A SchemaChange object representing the addition of a column.
*/
public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment());
}

/**
* Creates a SchemaChange object for adding a column with a specified position.
*
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
* intended position within the schema.
* @param move The move operation to indicate the column's new position.
* @return A SchemaChange object representing the addition of a column with position
* information.
*/
public static SchemaChange add(
AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment(),
move);
}

/**
* Creates a SchemaChange object to update the data type of a column.
*
* @param oldColumnName The name of the column whose data type is to be updated.
* @param newType The new DataType for the column.
* @return A SchemaChange object representing the update of the column's data type.
*/
public static SchemaChange updateColumnType(String oldColumnName, DataType newType) {
return SchemaChange.updateColumnType(
oldColumnName,
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(newType).getLogicalType()));
}

/**
* Creates a SchemaChange object for renaming a column.
*
* @param oldColumnName The current name of the column to be renamed.
* @param newColumnName The new name for the column.
* @return A SchemaChange object representing the renaming of a column.
*/
public static SchemaChange rename(String oldColumnName, String newColumnName) {
return SchemaChange.renameColumn(oldColumnName, newColumnName);
}

/**
* Creates a SchemaChange object for dropping a column.
*
* @param columnName The name of the column to be dropped.
* @return A SchemaChange object representing the deletion of a column.
*/
public static SchemaChange drop(String columnName) {
return SchemaChange.dropColumn(columnName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,87 @@ public void testCreateTableWithAllDataTypes(String metastore)
Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testAddColumnWithPosition(String metastore)
throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
initialize(metastore);
MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);

CreateTableEvent createTableEvent =
new CreateTableEvent(
TableId.parse("test.table1"),
org.apache.flink.cdc.common.schema.Schema.newBuilder()
.physicalColumn(
"col1",
org.apache.flink.cdc.common.types.DataTypes.STRING()
.notNull())
.physicalColumn(
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
.primaryKey("col1")
.build());
metadataApplier.applySchemaChange(createTableEvent);

List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"col3",
org.apache.flink.cdc.common.types.DataTypes
.STRING()))); // default last position.
AddColumnEvent addColumnEvent =
new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
metadataApplier.applySchemaChange(addColumnEvent);
RowType tableSchema =
new RowType(
Arrays.asList(
new DataField(0, "col1", DataTypes.STRING().notNull()),
new DataField(1, "col2", DataTypes.INT()),
new DataField(2, "col3", DataTypes.STRING())));

Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());

addedColumns.clear();
addedColumns.add(
AddColumnEvent.first(
Column.physicalColumn(
"col4_first",
org.apache.flink.cdc.common.types.DataTypes.STRING())));
addedColumns.add(
AddColumnEvent.last(
Column.physicalColumn(
"col5_last",
org.apache.flink.cdc.common.types.DataTypes.STRING())));
addedColumns.add(
AddColumnEvent.before(
Column.physicalColumn(
"col6_before",
org.apache.flink.cdc.common.types.DataTypes.STRING()),
"col2"));
addedColumns.add(
AddColumnEvent.after(
Column.physicalColumn(
"col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()),
"col2"));

addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
metadataApplier.applySchemaChange(addColumnEvent);

tableSchema =
new RowType(
Arrays.asList(
new DataField(3, "col4_first", DataTypes.STRING()),
new DataField(0, "col1", DataTypes.STRING().notNull()),
new DataField(5, "col6_before", DataTypes.STRING()),
new DataField(1, "col2", DataTypes.INT()),
new DataField(6, "col7_after", DataTypes.STRING()),
new DataField(2, "col3", DataTypes.STRING()),
new DataField(4, "col5_last", DataTypes.STRING())));

Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
}
}