Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.cli.parser;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
Expand All @@ -28,14 +29,17 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

/** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */
Expand All @@ -51,6 +55,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Source / sink keys
private static final String TYPE_KEY = "type";
private static final String NAME_KEY = "name";
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";

// Route keys
private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
Expand Down Expand Up @@ -135,6 +141,23 @@ private SourceDef toSourceDef(JsonNode sourceNode) {
}

private SinkDef toSinkDef(JsonNode sinkNode) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();

Optional.ofNullable(sinkNode.get(INCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> includedSETypes.add(tag.asText())));

Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText())));

Set<SchemaChangeEventType> declaredSETypes =
resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes);

if (sinkNode instanceof ObjectNode) {
((ObjectNode) sinkNode).remove(INCLUDE_SCHEMA_EVOLUTION_TYPES);
((ObjectNode) sinkNode).remove(EXCLUDE_SCHEMA_EVOLUTION_TYPES);
}

Map<String, String> sinkMap =
mapper.convertValue(sinkNode, new TypeReference<Map<String, String>>() {});

Expand All @@ -148,7 +171,7 @@ private SinkDef toSinkDef(JsonNode sinkNode) {
// "name" field is optional
String name = sinkMap.remove(NAME_KEY);

return new SinkDef(type, name, Configuration.fromMap(sinkMap));
return new SinkDef(type, name, Configuration.fromMap(sinkMap), declaredSETypes);
}

private RouteDef toRouteDef(JsonNode routeNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* lenient column type changes.
*/
@PublicEvolving
public final class AddColumnEvent implements SchemaChangeEvent {
public final class AddColumnEvent implements ColumnSchemaChangeEvent {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -173,4 +173,9 @@ public String toString() {
public TableId tableId() {
return tableId;
}

@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ADD_COLUMN;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.schema.Schema;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the
* comment changes.
*/
public class AlterColumnCommentEvent
implements ColumnSchemaChangeEvent, SchemaChangeEventWithPreSchema {

private static final long serialVersionUID = 1L;

private final TableId tableId;

/** key => column name, value => column type after changing. */
private final Map<String, String> commentMapping;

private final Map<String, String> oldCommentMapping;

public AlterColumnCommentEvent(TableId tableId, Map<String, String> commentMapping) {
this.tableId = tableId;
this.commentMapping = commentMapping;
this.oldCommentMapping = new HashMap<>();
}

public AlterColumnCommentEvent(
TableId tableId,
Map<String, String> commentMapping,
Map<String, String> oldCommentMapping) {
this.tableId = tableId;
this.commentMapping = commentMapping;
this.oldCommentMapping = oldCommentMapping;
}

/** Returns the type mapping. */
public Map<String, String> getCommentMapping() {
return commentMapping;
}

public Map<String, String> getOldCommentMapping() {
return oldCommentMapping;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof AlterColumnCommentEvent)) {
return false;
}
AlterColumnCommentEvent that = (AlterColumnCommentEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(commentMapping, that.commentMapping)
&& Objects.equals(oldCommentMapping, that.oldCommentMapping);
}

@Override
public int hashCode() {
return Objects.hash(tableId, commentMapping, oldCommentMapping);
}

@Override
public String toString() {
if (hasPreSchema()) {
return "AlterColumnCommentEvent{"
+ "tableId="
+ tableId
+ ", commentMapping="
+ commentMapping
+ ", oldCommentMapping="
+ oldCommentMapping
+ '}';
} else {
return "AlterColumnCommentEvent{"
+ "tableId="
+ tableId
+ ", commentMapping="
+ commentMapping
+ '}';
}
}

@Override
public TableId tableId() {
return tableId;
}

@Override
public boolean hasPreSchema() {
return !oldCommentMapping.isEmpty();
}

@Override
public void fillPreSchema(Schema oldTypeSchema) {
oldCommentMapping.clear();
oldTypeSchema.getColumns().stream()
.filter(e -> commentMapping.containsKey(e.getName()))
.forEach(e -> oldCommentMapping.put(e.getName(), e.getComment()));
}

@Override
public boolean trimRedundantChanges() {
if (hasPreSchema()) {
Set<String> redundantlyChangedColumns =
commentMapping.keySet().stream()
.filter(
e ->
Objects.equals(
commentMapping.get(e),
oldCommentMapping.get(e)))
.collect(Collectors.toSet());

// Remove redundant alter column type records that doesn't really change the type
commentMapping.keySet().removeAll(redundantlyChangedColumns);
oldCommentMapping.keySet().removeAll(redundantlyChangedColumns);
}
return !commentMapping.isEmpty();
}

@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_COMMENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@

package org.apache.flink.cdc.common.event;

import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A {@link SchemaChangeEvent} that represents an {@code ALTER COLUMN} DDL, which may contain the
* lenient column type changes.
*/
public class AlterColumnTypeEvent implements SchemaChangeEvent {
public class AlterColumnTypeEvent
implements ColumnSchemaChangeEvent, SchemaChangeEventWithPreSchema {

private static final long serialVersionUID = 1L;

Expand All @@ -35,9 +41,21 @@ public class AlterColumnTypeEvent implements SchemaChangeEvent {
/** key => column name, value => column type after changing. */
private final Map<String, DataType> typeMapping;

private final Map<String, DataType> oldTypeMapping;

public AlterColumnTypeEvent(TableId tableId, Map<String, DataType> typeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = new HashMap<>();
}

public AlterColumnTypeEvent(
TableId tableId,
Map<String, DataType> typeMapping,
Map<String, DataType> oldTypeMapping) {
this.tableId = tableId;
this.typeMapping = typeMapping;
this.oldTypeMapping = oldTypeMapping;
}

/** Returns the type mapping. */
Expand All @@ -55,26 +73,76 @@ public boolean equals(Object o) {
}
AlterColumnTypeEvent that = (AlterColumnTypeEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(typeMapping, that.typeMapping);
&& Objects.equals(typeMapping, that.typeMapping)
&& Objects.equals(oldTypeMapping, that.oldTypeMapping);
}

@Override
public int hashCode() {
return Objects.hash(tableId, typeMapping);
return Objects.hash(tableId, typeMapping, oldTypeMapping);
}

@Override
public String toString() {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", nameMapping="
+ typeMapping
+ '}';
if (hasPreSchema()) {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ ", oldTypeMapping="
+ oldTypeMapping
+ '}';
} else {
return "AlterColumnTypeEvent{"
+ "tableId="
+ tableId
+ ", typeMapping="
+ typeMapping
+ '}';
}
}

@Override
public TableId tableId() {
return tableId;
}

public Map<String, DataType> getOldTypeMapping() {
return oldTypeMapping;
}

@Override
public boolean hasPreSchema() {
return !oldTypeMapping.isEmpty();
}

@Override
public void fillPreSchema(Schema oldTypeSchema) {
oldTypeMapping.clear();
oldTypeMapping.putAll(
oldTypeSchema.getColumns().stream()
.filter(e -> typeMapping.containsKey(e.getName()) && e.getType() != null)
.collect(Collectors.toMap(Column::getName, Column::getType)));
}

@Override
public boolean trimRedundantChanges() {
if (hasPreSchema()) {
Set<String> redundantlyChangedColumns =
typeMapping.keySet().stream()
.filter(e -> Objects.equals(typeMapping.get(e), oldTypeMapping.get(e)))
.collect(Collectors.toSet());

// Remove redundant alter column type records that doesn't really change the type
typeMapping.keySet().removeAll(redundantlyChangedColumns);
oldTypeMapping.keySet().removeAll(redundantlyChangedColumns);
}
return !typeMapping.isEmpty();
}

@Override
public SchemaChangeEventType getType() {
return SchemaChangeEventType.ALTER_COLUMN_TYPE;
}
}
Loading