Skip to content

Commit

Permalink
[cdc-common] add PublicEvolving annotation to SchemaUtils.
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan committed Nov 27, 2023
1 parent 6a92546 commit b9b2deb
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ public boolean equals(Object o) {
return Objects.equals(columns, schema.columns)
&& Objects.equals(primaryKeys, schema.primaryKeys)
&& Objects.equals(options, schema.options)
&& Objects.equals(comment, schema.comment)
&& Objects.equals(nameToColumns, schema.nameToColumns);
&& Objects.equals(comment, schema.comment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.common.utils;

import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.event.AddColumnEvent;
import com.ververica.cdc.common.event.AlterColumnTypeEvent;
import com.ververica.cdc.common.event.DropColumnEvent;
Expand All @@ -32,6 +33,7 @@
import java.util.stream.Collectors;

/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
public class SchemaUtils {

private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import com.ververica.cdc.common.event.RenameColumnEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.common.schema.PhysicalColumn;
import com.ververica.cdc.common.schema.Schema;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.utils.SchemaUtils;
import com.ververica.cdc.runtime.serializer.TableIdSerializer;
import com.ververica.cdc.runtime.serializer.schema.SchemaSerializer;
import org.slf4j.Logger;
Expand All @@ -43,18 +41,14 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static com.ververica.cdc.common.utils.Preconditions.checkArgument;
import static com.ververica.cdc.common.utils.Preconditions.checkNotNull;

/**
* Schema manager handles handles schema changes for tables, and manages historical schema versions
Expand Down Expand Up @@ -173,27 +167,8 @@ private void handleAlterColumnTypeEvent(AlterColumnTypeEvent event) {
event.tableId());

LOG.info("Handling schema change event: {}", event);
Map<String, DataType> typeMapping = event.getTypeMapping();
Schema oldSchema = optionalSchema.get();
Schema.Builder schemaBuilder = Schema.newBuilder();

// Rebuild physical columns
for (Column column : oldSchema.getColumns()) {
if (typeMapping.containsKey(column.getName())) {
// The column type is being changed
Column newColumn = column.copy(typeMapping.get(column.getName()));
schemaBuilder.column(newColumn);
} else {
schemaBuilder.column(column);
}
}

// Dump the rest of information
schemaBuilder.primaryKey(oldSchema.primaryKeys());
schemaBuilder.comment(oldSchema.comment());
schemaBuilder.options(oldSchema.options());
Schema newSchema = schemaBuilder.build();
registerNewSchema(event.tableId(), newSchema);
registerNewSchema(
event.tableId(), SchemaUtils.applySchemaChangeEvent(optionalSchema.get(), event));
}

private void handleRenameColumnEvent(RenameColumnEvent event) {
Expand All @@ -204,30 +179,8 @@ private void handleRenameColumnEvent(RenameColumnEvent event) {
event.tableId());

LOG.info("Handling schema change event: {}", event);
Map<String, String> nameMapping = event.getNameMapping();
Schema oldSchema = optionalSchema.get();
Schema.Builder schemaBuilder = Schema.newBuilder();
for (Column column : oldSchema.getColumns()) {
if (nameMapping.containsKey(column.getName())) {
// The column is being renamed
if (column instanceof PhysicalColumn) {
schemaBuilder.physicalColumn(
nameMapping.get(column.getName()), column.getType());
} else {
schemaBuilder.metadataColumn(
nameMapping.get(column.getName()), column.getType());
}
} else {
schemaBuilder.column(column);
}
}

// Dump the rest of information
schemaBuilder.primaryKey(oldSchema.primaryKeys());
schemaBuilder.comment(oldSchema.comment());
schemaBuilder.options(oldSchema.options());
Schema newSchema = schemaBuilder.build();
registerNewSchema(event.tableId(), newSchema);
registerNewSchema(
event.tableId(), SchemaUtils.applySchemaChangeEvent(optionalSchema.get(), event));
}

private void handleDropColumnEvent(DropColumnEvent event) {
Expand All @@ -237,24 +190,8 @@ private void handleDropColumnEvent(DropColumnEvent event) {
"Unable to apply DropColumnEvent for table \"%s\" without existing schema",
event.tableId());
LOG.info("Handling schema change event: {}", event);
List<String> droppedColumns =
event.getDroppedColumns().stream()
.map(Column::getName)
.collect(Collectors.toList());
Schema oldSchema = optionalSchema.get();
Schema.Builder schemaBuilder = Schema.newBuilder();
for (Column column : oldSchema.getColumns()) {
if (!droppedColumns.contains(column.getName())) {
schemaBuilder.column(column);
}
}

// Dump the rest of information
schemaBuilder.primaryKey(oldSchema.primaryKeys());
schemaBuilder.comment(oldSchema.comment());
schemaBuilder.options(oldSchema.options());
Schema newSchema = schemaBuilder.build();
registerNewSchema(event.tableId(), newSchema);
registerNewSchema(
event.tableId(), SchemaUtils.applySchemaChangeEvent(optionalSchema.get(), event));
}

private void handleAddColumnEvent(AddColumnEvent event) {
Expand All @@ -264,64 +201,8 @@ private void handleAddColumnEvent(AddColumnEvent event) {
"Unable to apply AddColumnEvent for table \"%s\" without existing schema",
event.tableId());
LOG.info("Handling schema change event: {}", event);
List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();
Schema oldSchema = optionalSchema.get();
LinkedList<Column> columns = new LinkedList<>(oldSchema.getColumns());
for (AddColumnEvent.ColumnWithPosition addedColumn : addedColumns) {
Column existingColumn = addedColumn.getExistingColumn();
switch (addedColumn.getPosition()) {
case BEFORE:
columns.add(
searchColumnIndex(
columns,
checkNotNull(
existingColumn,
"Existing column should not be null for position BEFORE")
.getName()),
addedColumn.getAddColumn());
break;
case AFTER:
columns.add(
searchColumnIndex(
columns,
checkNotNull(
existingColumn,
"Existing column should not be null for position AFTER")
.getName())
+ 1,
addedColumn.getAddColumn());
break;
case FIRST:
columns.add(0, addedColumn.getAddColumn());
break;
case LAST:
columns.add(columns.size(), addedColumn.getAddColumn());
break;
}
}
Schema.Builder schemaBuilder = Schema.newBuilder();
for (Column column : columns) {
schemaBuilder.column(column);
}

// Dump the rest of information
schemaBuilder.primaryKey(oldSchema.primaryKeys());
schemaBuilder.comment(oldSchema.comment());
schemaBuilder.options(oldSchema.options());
Schema newSchema = schemaBuilder.build();
registerNewSchema(event.tableId(), newSchema);
}

private int searchColumnIndex(List<Column> columns, String columnNameToSearch) {
int i = 0;
while (i < columns.size()) {
if (columns.get(i).getName().equals(columnNameToSearch)) {
return i;
}
++i;
}
throw new IllegalStateException(
String.format("Unable to find column with name \"%s\"", columnNameToSearch));
registerNewSchema(
event.tableId(), SchemaUtils.applySchemaChangeEvent(optionalSchema.get(), event));
}

private void registerNewSchema(TableId tableId, Schema newSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void testHandlingAddColumnEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("append_before_phone", DataTypes.BIGINT()),
AddColumnEvent.ColumnPosition.BEFORE,
Column.physicalColumn("phone", DataTypes.STRING())));
Column.physicalColumn("phone", DataTypes.BIGINT())));

schemaManager.applySchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA));
schemaManager.applySchemaChange(new AddColumnEvent(CUSTOMERS, newColumns));
Expand Down

0 comments on commit b9b2deb

Please sign in to comment.