Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the
* lenient column type changes.
*/
public class AlterColumnTypeEvent implements SchemaChangeEvent {
@PublicEvolving
public class AlterColumnTypeEvent implements SchemaChangeEventWithPreSchema, SchemaChangeEvent {

private static final long serialVersionUID = 1L;

Expand All @@ -35,9 +42,21 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent {
/** key => column name, value => column type after changing. */
private final Map<String, DataType> typeMapping;

private final Map<String, DataType> oldTypeMapping;

public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = new HashMap<>();
}

public AlterColumnTypeEvent(
TableId tableId,
Map<String, DataType> typeMapping,
Map<String, DataType> oldTypeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = oldTypeMapping;
}

/** Returns the type mapping. */
Expand All @@ -55,29 +74,74 @@ public boolean equals(Object o) {
}
AlterColumnTypeEvent that = (AlterColumnTypeEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(typeMapping, that.typeMapping);
&& Objects.equals(typeMapping, that.typeMapping)
&& Objects.equals(oldTypeMapping, that.oldTypeMapping);
}

@Override
public int hashCode() {
return Objects.hash(tableId, typeMapping);
return Objects.hash(tableId, typeMapping, oldTypeMapping);
}

@Override
public String toString() {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", nameMapping="
+ typeMapping
+ '}';
if (hasPreSchema()) {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ ", oldTypeMapping="
+ oldTypeMapping
+ '}';
} else {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ '}';
}
}

@Override
public TableId tableId() {
return tableId;
}

public Map<String, DataType> getOldTypeMapping() {
return oldTypeMapping;
}

@Override
public boolean hasPreSchema() {
return !oldTypeMapping.isEmpty();
}

@Override
public void fillPreSchema(Schema oldTypeSchema) {
oldTypeMapping.clear();
oldTypeMapping.putAll(
oldTypeSchema.getColumns().stream()
.filter(e -> typeMapping.containsKey(e.getName()) && e.getType() != null)
.collect(Collectors.toMap(Column::getName, Column::getType)));
}

@Override
public boolean trimRedundantChanges() {
if (hasPreSchema()) {
Set<String> redundantlyChangedColumns =
typeMapping.keySet().stream()
.filter(e -> Objects.equals(typeMapping.get(e), oldTypeMapping.get(e)))
.collect(Collectors.toSet());

// Remove redundant alter column type records that doesn't really change the type
typeMapping.keySet().removeAll(redundantlyChangedColumns);
oldTypeMapping.keySet().removeAll(redundantlyChangedColumns);
}
return !typeMapping.isEmpty();
}

@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_TYPE;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.source.DataSource;

import java.util.Objects;

/**
* A {@link SchemaChangeEvent} that represents an {@code DROP TABLE} DDL. this will be sent by
* {@link DataSource} before all {@link DataChangeEvent} with the same tableId.
*/
@PublicEvolving
public class DropTableEvent implements SchemaChangeEvent {

private static final long serialVersionUID = 1L;

private final TableId tableId;

public DropTableEvent(TableId tableId) {
this.tableId = tableId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DropTableEvent)) {
return false;
}
DropTableEvent that = (DropTableEvent) o;
return Objects.equals(tableId, that.tableId);
}

@Override
public int hashCode() {
return Objects.hash(tableId);
}

@Override
public String toString() {
return "DropTableEvent{" + "tableId=" + tableId + '}';
}

@Override
public TableId tableId() {
return tableId;
}

@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.DROP_TABLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.annotation.PublicEvolving;

import java.util.Map;
import java.util.Objects;

/**
* A {@link SchemaChangeEvent} that represents an {@code RENAME COLUMN} DDL, which may contain the
* lenient column type changes.
*/
@PublicEvolving
public class RenameColumnEvent implements SchemaChangeEvent {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ public enum SchemaChangeEventType {
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
RENAME_COLUMN;
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE;

public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
Expand All @@ -37,8 +39,12 @@ public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
return CREATE_TABLE;
} else if (event instanceof DropColumnEvent) {
return DROP_COLUMN;
} else if (event instanceof DropTableEvent) {
return DROP_TABLE;
} else if (event instanceof RenameColumnEvent) {
return RENAME_COLUMN;
} else if (event instanceof TruncateTableEvent) {
return TRUNCATE_TABLE;
} else {
throw new RuntimeException("Unknown schema change event type: " + event.getClass());
}
Expand All @@ -54,8 +60,12 @@ public static SchemaChangeEventType ofTag(String tag) {
return CREATE_TABLE;
case "drop.column":
return DROP_COLUMN;
case "drop.table":
return DROP_TABLE;
case "rename.column":
return RENAME_COLUMN;
case "truncate.table":
return TRUNCATE_TABLE;
default:
throw new RuntimeException("Unknown schema change event type: " + tag);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,45 @@

import org.apache.flink.cdc.common.annotation.PublicEvolving;

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;

/**
* An enumeration of schema change event families for clustering {@link SchemaChangeEvent}s into
* categories.
*/
@PublicEvolving
public class SchemaChangeEventTypeFamily {

public static final SchemaChangeEventType[] ADD = {SchemaChangeEventType.ADD_COLUMN};
public static final SchemaChangeEventType[] ADD = {ADD_COLUMN};

public static final SchemaChangeEventType[] ALTER = {SchemaChangeEventType.ALTER_COLUMN_TYPE};
public static final SchemaChangeEventType[] ALTER = {ALTER_COLUMN_TYPE};

public static final SchemaChangeEventType[] CREATE = {SchemaChangeEventType.CREATE_TABLE};
public static final SchemaChangeEventType[] CREATE = {CREATE_TABLE};

public static final SchemaChangeEventType[] DROP = {SchemaChangeEventType.DROP_COLUMN};
public static final SchemaChangeEventType[] DROP = {DROP_COLUMN, DROP_TABLE};

public static final SchemaChangeEventType[] RENAME = {SchemaChangeEventType.RENAME_COLUMN};
public static final SchemaChangeEventType[] RENAME = {RENAME_COLUMN};

public static final SchemaChangeEventType[] TABLE = {SchemaChangeEventType.CREATE_TABLE};
public static final SchemaChangeEventType[] TABLE = {CREATE_TABLE, DROP_TABLE, TRUNCATE_TABLE};

public static final SchemaChangeEventType[] COLUMN = {
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.ALTER_COLUMN_TYPE,
SchemaChangeEventType.DROP_COLUMN,
SchemaChangeEventType.RENAME_COLUMN
ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, RENAME_COLUMN
};

public static final SchemaChangeEventType[] ALL = {
SchemaChangeEventType.ADD_COLUMN,
SchemaChangeEventType.CREATE_TABLE,
SchemaChangeEventType.ALTER_COLUMN_TYPE,
SchemaChangeEventType.DROP_COLUMN,
SchemaChangeEventType.RENAME_COLUMN
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE
};

public static final SchemaChangeEventType[] NONE = {};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.schema.Schema;

/** A {@link SchemaChangeEvent} that supports appending schema before change event. */
@PublicEvolving
public interface SchemaChangeEventWithPreSchema extends SchemaChangeEvent {

/** Describes if this event already has schema before change info. */
boolean hasPreSchema();

/** Append schema before change info to this event. */
void fillPreSchema(Schema oldSchema);

/** Check if this event contains redundant schema change request only. */
default boolean trimRedundantChanges() {
return false;
}
}
Loading