diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java index da4d454ddee..0a3325f3774 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/DataChangeEvent.java @@ -40,11 +40,22 @@ private DataChangeEvent( RecordData after, OperationType op, Map meta) { + this(tableId, before, after, op, meta, null); + } + + private DataChangeEvent( + TableId tableId, + RecordData before, + RecordData after, + OperationType op, + Map meta, + String schema) { this.tableId = tableId; this.before = before; this.after = after; this.op = op; this.meta = meta; + this.schema = schema; } private final TableId tableId; @@ -61,6 +72,8 @@ private DataChangeEvent( /** Optional, describes the metadata of the change event. e.g. MySQL binlog file name, pos. */ private final Map meta; + private final String schema; + @Override public TableId tableId() { return tableId; @@ -82,6 +95,10 @@ public Map meta() { return meta; } + public String getSchema() { + return schema; + } + /** Creates a {@link DataChangeEvent} instance that describes the insert event. */ public static DataChangeEvent insertEvent(TableId tableId, RecordData after) { return new DataChangeEvent( @@ -96,6 +113,15 @@ public static DataChangeEvent insertEvent( return new DataChangeEvent(tableId, null, after, OperationType.INSERT, meta); } + /** + * Creates a {@link DataChangeEvent} instance that describes the insert event with meta info and + * schema info. + */ + public static DataChangeEvent insertEvent( + TableId tableId, RecordData after, Map meta, String schema) { + return new DataChangeEvent(tableId, null, after, OperationType.INSERT, meta, schema); + } + /** Creates a {@link DataChangeEvent} instance that describes the delete event. */ public static DataChangeEvent deleteEvent(TableId tableId, RecordData before) { return new DataChangeEvent( @@ -110,6 +136,15 @@ public static DataChangeEvent deleteEvent( return new DataChangeEvent(tableId, before, null, OperationType.DELETE, meta); } + /** + * Creates a {@link DataChangeEvent} instance that describes the delete event with meta info and + * schema info. + */ + public static DataChangeEvent deleteEvent( + TableId tableId, RecordData before, Map meta, String schema) { + return new DataChangeEvent(tableId, before, null, OperationType.DELETE, meta, schema); + } + /** Creates a {@link DataChangeEvent} instance that describes the update event. */ public static DataChangeEvent updateEvent( TableId tableId, RecordData before, RecordData after) { @@ -125,6 +160,19 @@ public static DataChangeEvent updateEvent( return new DataChangeEvent(tableId, before, after, OperationType.UPDATE, meta); } + /** + * Creates a {@link DataChangeEvent} instance that describes the update event with meta info and + * schema info. + */ + public static DataChangeEvent updateEvent( + TableId tableId, + RecordData before, + RecordData after, + Map meta, + String schema) { + return new DataChangeEvent(tableId, before, after, OperationType.UPDATE, meta, schema); + } + /** Creates a {@link DataChangeEvent} instance that describes the replace event. */ public static DataChangeEvent replaceEvent(TableId tableId, RecordData after) { return new DataChangeEvent( @@ -139,6 +187,14 @@ public static DataChangeEvent replaceEvent( return new DataChangeEvent(tableId, null, after, OperationType.REPLACE, meta); } + /** + * Creates a {@link DataChangeEvent} instance that describes the replace event with meta info. + */ + public static DataChangeEvent replaceEvent( + TableId tableId, RecordData after, Map meta, String columnType) { + return new DataChangeEvent(tableId, null, after, OperationType.REPLACE, meta, columnType); + } + /** * Updates the before of a {@link DataChangeEvent} instance that describes the event with meta * info. @@ -211,6 +267,8 @@ public String toString() { + op + ", meta=" + describeMeta() + + ", schema=" + + schema + '}'; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml index c8d41fe5eba..1ebaef2c45c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml @@ -115,6 +115,19 @@ limitations under the License. + + + org.apache.maven.plugins + maven-jar-plugin + + + test-jar + + test-jar + + + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java index cd2850316ee..0580c14c796 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java @@ -48,7 +48,10 @@ public class ChangeLogJsonFormatFactory { * @return The configured instance of {@link SerializationSchema}. */ public static SerializationSchema createSerializationSchema( - ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) { + ReadableConfig formatOptions, + JsonSerializationType type, + ZoneId zoneId, + boolean includeSchemaInfo) { TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions); JsonFormatOptions.MapNullKeyMode mapNullKeyMode = JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions); @@ -65,7 +68,8 @@ public static SerializationSchema createSerializationSchema( mapNullKeyMode, mapNullKeyLiteral, zoneId, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + includeSchemaInfo); } case CANAL_JSON: { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java new file mode 100644 index 00000000000..971be85b640 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.kafka.json.debezium; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonFormatOptions; +import org.apache.flink.formats.json.JsonParserRowDataDeserializationSchema; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.formats.json.RowDataToJsonConverters; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.util.Objects; + +/** + * Serialization schema that serializes an object of Flink internal data structure into a JSON + * bytes. + * + *

Serializes the input Flink object into a JSON string and converts it into byte[]. + * + *

Result byte[] messages can be deserialized using {@link + * JsonRowDataDeserializationSchema} or {@link JsonParserRowDataDeserializationSchema}. + */ +public class DebeziumJsonRowDataSerializationSchema implements SerializationSchema { + private static final long serialVersionUID = 1L; + + /** RowType to generate the runtime converter. */ + private final RowType rowType; + + /** The converter that converts internal data formats to JsonNode. */ + private final RowDataToJsonConverters.RowDataToJsonConverter runtimeConverter; + + /** Object mapper that is used to create output JSON objects. */ + private transient ObjectMapper mapper; + + /** Reusable object node. */ + private transient ObjectNode node; + + /** Timestamp format specification which is used to parse timestamp. */ + private final TimestampFormat timestampFormat; + + /** The handling mode when serializing null keys for map data. */ + private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode; + + /** The string literal when handling mode for map null key LITERAL. */ + private final String mapNullKeyLiteral; + + /** Flag indicating whether to serialize all decimals as plain numbers. */ + private final boolean encodeDecimalAsPlainNumber; + + private final boolean includeSchemaInfo; + + public DebeziumJsonRowDataSerializationSchema( + RowType rowType, + TimestampFormat timestampFormat, + JsonFormatOptions.MapNullKeyMode mapNullKeyMode, + String mapNullKeyLiteral, + boolean encodeDecimalAsPlainNumber, + boolean includeSchemaInfo) { + this.rowType = rowType; + this.timestampFormat = timestampFormat; + this.mapNullKeyMode = mapNullKeyMode; + this.mapNullKeyLiteral = mapNullKeyLiteral; + this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; + this.runtimeConverter = + new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral) + .createConverter(rowType); + this.includeSchemaInfo = includeSchemaInfo; + } + + @Override + public void open(InitializationContext context) throws Exception { + mapper = + JacksonMapperFactory.createObjectMapper() + .configure( + JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, + encodeDecimalAsPlainNumber); + } + + @Override + public byte[] serialize(RowData row) { + if (node == null) { + node = mapper.createObjectNode(); + } + + try { + runtimeConverter.convert(mapper, node, row); + if (includeSchemaInfo) { + // schema is a nested json string, asText() can return a pure string without other + // escape characters such as "\" + String schemaValue = node.get("schema").asText(); + JsonNode schemaNode = mapper.readTree(schemaValue); + node.set("schema", schemaNode); + } + return mapper.writeValueAsBytes(node); + } catch (Throwable t) { + throw new RuntimeException(String.format("Could not serialize row '%s'.", row), t); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DebeziumJsonRowDataSerializationSchema that = (DebeziumJsonRowDataSerializationSchema) o; + return rowType.equals(that.rowType) + && timestampFormat.equals(that.timestampFormat) + && mapNullKeyMode.equals(that.mapNullKeyMode) + && mapNullKeyLiteral.equals(that.mapNullKeyLiteral) + && encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber; + } + + @Override + public int hashCode() { + return Objects.hash( + rowType, + timestampFormat, + mapNullKeyMode, + mapNullKeyLiteral, + encodeDecimalAsPlainNumber); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java index 15cecbc4f8e..25a3298da28 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java @@ -29,7 +29,6 @@ import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonFormatOptions; -import org.apache.flink.formats.json.JsonRowDataSerializationSchema; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; @@ -40,8 +39,17 @@ import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import java.util.function.BiConsumer; import static java.lang.String.format; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.AFTER; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.BEFORE; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.OPERATION; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.SOURCE; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumSource.DATABASE; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumSource.TABLE; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumStruct.PAYLOAD; +import static org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumStruct.SCHEMA; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; /** @@ -64,6 +72,8 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema(); + this.includeSchemaInfo = includeSchemaInfo; } @Override public void open(InitializationContext context) { - reuseGenericRowData = new GenericRowData(4); + if (includeSchemaInfo) { + reuseGenericRowData = new GenericRowData(2); + payloadGenericRowData = new GenericRowData(4); + + reuseGenericRowData.setField(PAYLOAD.getPosition(), payloadGenericRowData); + } else { + reuseGenericRowData = new GenericRowData(4); + } this.context = context; } @@ -112,13 +133,14 @@ public byte[] serialize(Event event) { } LogicalType rowType = DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType(); - JsonRowDataSerializationSchema jsonSerializer = - new JsonRowDataSerializationSchema( - createJsonRowType(fromLogicalToDataType(rowType)), + DebeziumJsonRowDataSerializationSchema jsonSerializer = + new DebeziumJsonRowDataSerializationSchema( + createJsonRowType(fromLogicalToDataType(rowType), includeSchemaInfo), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + includeSchemaInfo); try { jsonSerializer.open(context); } catch (Exception e) { @@ -132,81 +154,124 @@ public byte[] serialize(Event event) { } DataChangeEvent dataChangeEvent = (DataChangeEvent) event; - reuseGenericRowData.setField( - 3, - GenericRowData.of( - StringData.fromString(dataChangeEvent.tableId().getSchemaName()), - StringData.fromString(dataChangeEvent.tableId().getTableName()))); + BiConsumer converter; try { switch (dataChangeEvent.op()) { case INSERT: - reuseGenericRowData.setField(0, null); - reuseGenericRowData.setField( - 1, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.after(), false)); - reuseGenericRowData.setField(2, OP_INSERT); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertInsertEventToRowData; + break; case DELETE: - reuseGenericRowData.setField( - 0, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.before(), false)); - reuseGenericRowData.setField(1, null); - reuseGenericRowData.setField(2, OP_DELETE); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertDeleteEventToRowData; + break; case UPDATE: case REPLACE: - reuseGenericRowData.setField( - 0, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.before(), false)); - reuseGenericRowData.setField( - 1, - jsonSerializers - .get(dataChangeEvent.tableId()) - .getRowDataFromRecordData(dataChangeEvent.after(), false)); - reuseGenericRowData.setField(2, OP_UPDATE); - return jsonSerializers - .get(dataChangeEvent.tableId()) - .getSerializationSchema() - .serialize(reuseGenericRowData); + converter = this::convertUpdateEventToRowData; + break; default: throw new UnsupportedOperationException( format( "Unsupported operation '%s' for OperationType.", dataChangeEvent.op())); } + + GenericRowData genericRowData = + includeSchemaInfo ? payloadGenericRowData : reuseGenericRowData; + converter.accept(dataChangeEvent, genericRowData); + + if (includeSchemaInfo) { + reuseGenericRowData.setField( + SCHEMA.getPosition(), StringData.fromString(dataChangeEvent.getSchema())); + } + + return jsonSerializers + .get(dataChangeEvent.tableId()) + .getSerializationSchema() + .serialize(reuseGenericRowData); + } catch (Throwable t) { throw new RuntimeException(format("Could not serialize event '%s'.", event), t); } } + private void convertInsertEventToRowData( + DataChangeEvent dataChangeEvent, GenericRowData genericRowData) { + genericRowData.setField(BEFORE.getPosition(), null); + genericRowData.setField( + AFTER.getPosition(), + jsonSerializers + .get(dataChangeEvent.tableId()) + .getRowDataFromRecordData(dataChangeEvent.after(), false)); + genericRowData.setField(OPERATION.getPosition(), OP_INSERT); + genericRowData.setField( + SOURCE.getPosition(), + GenericRowData.of( + StringData.fromString(dataChangeEvent.tableId().getSchemaName()), + StringData.fromString(dataChangeEvent.tableId().getTableName()))); + } + + private void convertDeleteEventToRowData( + DataChangeEvent dataChangeEvent, GenericRowData genericRowData) { + genericRowData.setField( + BEFORE.getPosition(), + jsonSerializers + .get(dataChangeEvent.tableId()) + .getRowDataFromRecordData(dataChangeEvent.before(), false)); + genericRowData.setField(AFTER.getPosition(), null); + genericRowData.setField(OPERATION.getPosition(), OP_DELETE); + genericRowData.setField( + SOURCE.getPosition(), + GenericRowData.of( + StringData.fromString(dataChangeEvent.tableId().getSchemaName()), + StringData.fromString(dataChangeEvent.tableId().getTableName()))); + } + + private void convertUpdateEventToRowData( + DataChangeEvent dataChangeEvent, GenericRowData genericRowData) { + genericRowData.setField( + BEFORE.getPosition(), + jsonSerializers + .get(dataChangeEvent.tableId()) + .getRowDataFromRecordData(dataChangeEvent.before(), false)); + genericRowData.setField( + AFTER.getPosition(), + jsonSerializers + .get(dataChangeEvent.tableId()) + .getRowDataFromRecordData(dataChangeEvent.after(), false)); + genericRowData.setField(OPERATION.getPosition(), OP_UPDATE); + genericRowData.setField( + SOURCE.getPosition(), + GenericRowData.of( + StringData.fromString(dataChangeEvent.tableId().getSchemaName()), + StringData.fromString(dataChangeEvent.tableId().getTableName()))); + } + /** * Refer to Debezium * docs for more details. */ - private static RowType createJsonRowType(DataType databaseSchema) { - return (RowType) + private static RowType createJsonRowType(DataType databaseSchema, boolean includeSchemaInfo) { + DataType payloadRowType = DataTypes.ROW( - DataTypes.FIELD("before", databaseSchema), - DataTypes.FIELD("after", databaseSchema), - DataTypes.FIELD("op", DataTypes.STRING()), - DataTypes.FIELD( - "source", - DataTypes.ROW( - DataTypes.FIELD("db", DataTypes.STRING()), - DataTypes.FIELD("table", DataTypes.STRING())))) - .getLogicalType(); + DataTypes.FIELD(BEFORE.getFieldName(), databaseSchema), + DataTypes.FIELD(AFTER.getFieldName(), databaseSchema), + DataTypes.FIELD(OPERATION.getFieldName(), DataTypes.STRING()), + DataTypes.FIELD( + SOURCE.getFieldName(), + DataTypes.ROW( + DataTypes.FIELD( + DATABASE.getFieldName(), DataTypes.STRING()), + DataTypes.FIELD( + TABLE.getFieldName(), DataTypes.STRING())))); + + if (includeSchemaInfo) { + return (RowType) + DataTypes.ROW( + DataTypes.FIELD(SCHEMA.getFieldName(), DataTypes.STRING()), + DataTypes.FIELD(PAYLOAD.getFieldName(), payloadRowType)) + .getLogicalType(); + } else { + return (RowType) payloadRowType.getLogicalType(); + } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java new file mode 100644 index 00000000000..e1c314b9b5a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.kafka.json.debezium; + +/** Debezium JSON struct. */ +public class DebeziumJsonStruct { + + enum DebeziumStruct { + SCHEMA(0, "schema"), + PAYLOAD(1, "payload"); + + private final int position; + private final String fieldName; + + DebeziumStruct(int position, String fieldName) { + this.position = position; + this.fieldName = fieldName; + } + + public int getPosition() { + return position; + } + + public String getFieldName() { + return fieldName; + } + } + + enum DebeziumPayload { + BEFORE(0, "before"), + AFTER(1, "after"), + OPERATION(2, "op"), + SOURCE(3, "source"); + + private final int position; + private final String fieldName; + + DebeziumPayload(int position, String fieldName) { + this.position = position; + this.fieldName = fieldName; + } + + public int getPosition() { + return position; + } + + public String getFieldName() { + return fieldName; + } + } + + enum DebeziumSource { + DATABASE(0, "db"), + TABLE(1, "table"); + + private final int position; + private final String fieldName; + + DebeziumSource(int position, String fieldName) { + this.position = position; + this.fieldName = fieldName; + } + + public int getPosition() { + return position; + } + + public String getFieldName() { + return fieldName; + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java index d8981291395..a262fc2b655 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java @@ -41,6 +41,7 @@ import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER; +import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_SCHEMA_INFO_ENABLED; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC; import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT; @@ -67,13 +68,14 @@ public DataSink createDataSink(Context context) { .get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); } KeyFormat keyFormat = context.getFactoryConfiguration().get(KEY_FORMAT); + Boolean includeSchemaInfo = context.getFactoryConfiguration().get(SINK_SCHEMA_INFO_ENABLED); SerializationSchema keySerialization = KeySerializationFactory.createSerializationSchema(configuration, keyFormat, zoneId); JsonSerializationType jsonSerializationType = context.getFactoryConfiguration().get(KafkaDataSinkOptions.VALUE_FORMAT); SerializationSchema valueSerialization = ChangeLogJsonFormatFactory.createSerializationSchema( - configuration, jsonSerializationType, zoneId); + configuration, jsonSerializationType, zoneId, includeSchemaInfo); final Properties kafkaProperties = new Properties(); Map allOptions = context.getFactoryConfiguration().toMap(); allOptions.keySet().stream() @@ -123,6 +125,7 @@ public Set> optionalOptions() { options.add(TOPIC); options.add(SINK_ADD_TABLEID_TO_HEADER_ENABLED); options.add(SINK_CUSTOM_HEADER); + options.add(SINK_SCHEMA_INFO_ENABLED); options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java index ca82f5c8071..ba20fdd7120 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java @@ -79,4 +79,11 @@ public class KafkaDataSinkOptions { .defaultValue("") .withDescription( "custom headers for each kafka record. Each header are separated by ',', separate key and value by ':'. For example, we can set headers like 'key1:value1,key2:value2'."); + + public static final ConfigOption SINK_SCHEMA_INFO_ENABLED = + key("sink.schema-info-enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether receive schema info, by default is false. If set to true and the 'schema-info.enabled' parameter in source is also true, the column type info will be sent and increase message size."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java index 362354c6ecc..edc8e04d8c4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java @@ -57,7 +57,8 @@ public void testSerialize() throws Exception { ChangeLogJsonFormatFactory.createSerializationSchema( new Configuration(), JsonSerializationType.CANAL_JSON, - ZoneId.systemDefault()); + ZoneId.systemDefault(), + false); serializationSchema.open(new MockInitializationContext()); // create table diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java index f2b36e4efe5..6a756c20f83 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java @@ -57,7 +57,8 @@ public void testSerialize() throws Exception { ChangeLogJsonFormatFactory.createSerializationSchema( new Configuration(), JsonSerializationType.DEBEZIUM_JSON, - ZoneId.systemDefault()); + ZoneId.systemDefault(), + false); serializationSchema.open(new MockInitializationContext()); // create table Schema schema = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 118e8cdb19f..03195241249 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -78,6 +78,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_INFO_ENABLE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; @@ -117,6 +118,7 @@ public DataSource createDataSource(Context context) { StartupOptions startupOptions = getStartupOptions(config); boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED); + boolean includeSchemaInfo = config.get(SCHEMA_INFO_ENABLE); int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); @@ -168,6 +170,7 @@ public DataSource createDataSource(Context context) { .connectionPoolSize(connectionPoolSize) .closeIdleReaders(closeIdleReaders) .includeSchemaChanges(includeSchemaChanges) + .includeSchemaInfo(includeSchemaInfo) .debeziumProperties(getDebeziumProperties(configMap)) .jdbcProperties(getJdbcProperties(configMap)) .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled); @@ -254,6 +257,7 @@ public Set> optionalOptions() { options.add(PORT); options.add(TABLES_EXCLUDE); options.add(SCHEMA_CHANGE_ENABLED); + options.add(SCHEMA_INFO_ENABLE); options.add(SERVER_ID); options.add(SERVER_TIME_ZONE); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java index d1dc487c04e..cb127ea53b1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java @@ -45,7 +45,9 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory) { public EventSourceProvider getEventSourceProvider() { MySqlEventDeserializer deserializer = new MySqlEventDeserializer( - DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges()); + DebeziumChangelogMode.ALL, + sourceConfig.isIncludeSchemaChanges(), + sourceConfig.isIncludeSchemaInfo()); MySqlSource source = new MySqlSource<>( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 580d370b5aa..cdfffd3a36c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -250,6 +250,14 @@ public class MySqlDataSourceOptions { .withDescription( "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); + @Experimental + public static final ConfigOption SCHEMA_INFO_ENABLE = + ConfigOptions.key("schema-info.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether send schema info(such as column type info), by default is false. If set to true, the column type info will be sent and increase network transmission overhead."); + @Experimental public static final ConfigOption TABLES_EXCLUDE = ConfigOptions.key("tables.exclude") diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java index 548e603fa5a..cb446dc5757 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java @@ -64,8 +64,10 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema { private transient CustomMySqlAntlrDdlParser customParser; public MySqlEventDeserializer( - DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { - super(new MySqlSchemaDataTypeInference(), changelogMode); + DebeziumChangelogMode changelogMode, + boolean includeSchemaChanges, + boolean includeSchemaInfo) { + super(new MySqlSchemaDataTypeInference(), changelogMode, includeSchemaInfo); this.includeSchemaChanges = includeSchemaChanges; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java index a1c0ae24b1f..da575d1a76b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/factory/ValuesDataFactory.java @@ -58,8 +58,9 @@ public DataSink createDataSink(Context context) { context.getFactoryConfiguration().get(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY), context.getFactoryConfiguration().get(ValuesDataSinkOptions.PRINT_ENABLED), context.getFactoryConfiguration().get(ValuesDataSinkOptions.SINK_API), + context.getFactoryConfiguration().get(ValuesDataSinkOptions.ERROR_ON_SCHEMA_CHANGE), context.getFactoryConfiguration() - .get(ValuesDataSinkOptions.ERROR_ON_SCHEMA_CHANGE)); + .get(ValuesDataSinkOptions.SINK_SCHEMA_INFO_ENABLED)); } @Override @@ -81,6 +82,7 @@ public Set> optionalOptions() { options.add(ValuesDataSinkOptions.PRINT_ENABLED); options.add(ValuesDataSinkOptions.SINK_API); options.add(ValuesDataSinkOptions.ERROR_ON_SCHEMA_CHANGE); + options.add(ValuesDataSinkOptions.SINK_SCHEMA_INFO_ENABLED); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java index 7d4ee3dad8e..74305b3c863 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java @@ -54,24 +54,29 @@ public class ValuesDataSink implements DataSink, Serializable { private final boolean errorOnSchemaChange; + private final boolean includeSchemaInfo; + public ValuesDataSink( boolean materializedInMemory, boolean print, SinkApi sinkApi, - boolean errorOnSchemaChange) { + boolean errorOnSchemaChange, + boolean includeSchemaInfo) { this.materializedInMemory = materializedInMemory; this.print = print; this.sinkApi = sinkApi; this.errorOnSchemaChange = errorOnSchemaChange; + this.includeSchemaInfo = includeSchemaInfo; } @Override public EventSinkProvider getEventSinkProvider() { if (SinkApi.SINK_V2.equals(sinkApi)) { - return FlinkSinkProvider.of(new ValuesSink(materializedInMemory, print)); + return FlinkSinkProvider.of( + new ValuesSink(materializedInMemory, print, includeSchemaInfo)); } else { return FlinkSinkFunctionProvider.of( - new ValuesDataSinkFunction(materializedInMemory, print)); + new ValuesDataSinkFunction(materializedInMemory, print, includeSchemaInfo)); } } @@ -91,9 +96,12 @@ private static class ValuesSink implements Sink { private final boolean print; - public ValuesSink(boolean materializedInMemory, boolean print) { + private final boolean includeSchemaInfo; + + public ValuesSink(boolean materializedInMemory, boolean print, boolean includeSchemaInfo) { this.materializedInMemory = materializedInMemory; this.print = print; + this.includeSchemaInfo = includeSchemaInfo; } @Override @@ -102,7 +110,8 @@ public SinkWriter createWriter(InitContext context) { materializedInMemory, print, context.getSubtaskId(), - context.getNumberOfParallelSubtasks()); + context.getNumberOfParallelSubtasks(), + includeSchemaInfo); } } @@ -119,6 +128,8 @@ private static class ValuesSinkWriter implements SinkWriter { private final int numSubtasks; + private final boolean includeSchemaInfo; + /** * keep the relationship of TableId and Schema as write method may rely on the schema * information of DataChangeEvent. @@ -128,7 +139,11 @@ private static class ValuesSinkWriter implements SinkWriter { private final Map> fieldGetterMaps; public ValuesSinkWriter( - boolean materializedInMemory, boolean print, int subtaskIndex, int numSubtasks) { + boolean materializedInMemory, + boolean print, + int subtaskIndex, + int numSubtasks, + boolean includeSchemaInfo) { super(); this.materializedInMemory = materializedInMemory; this.print = print; @@ -136,6 +151,7 @@ public ValuesSinkWriter( this.numSubtasks = numSubtasks; schemaMaps = new HashMap<>(); fieldGetterMaps = new HashMap<>(); + this.includeSchemaInfo = includeSchemaInfo; } @Override @@ -167,7 +183,8 @@ public void write(Event event, Context context) { prefix + ValuesDataSinkHelper.convertEventToStr( event, - fieldGetterMaps.get(((ChangeEvent) event).tableId()))); + fieldGetterMaps.get(((ChangeEvent) event).tableId()), + includeSchemaInfo)); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkFunction.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkFunction.java index f4876d5e04a..9ba3e22f5c5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkFunction.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkFunction.java @@ -40,6 +40,8 @@ public class ValuesDataSinkFunction implements SinkFunction { private final boolean print; + private final boolean includeSchemaInfo; + /** * keep the relationship of TableId and Schema as write method may rely on the schema * information of DataChangeEvent. @@ -48,11 +50,13 @@ public class ValuesDataSinkFunction implements SinkFunction { private final Map> fieldGetterMaps; - public ValuesDataSinkFunction(boolean materializedInMemory, boolean print) { + public ValuesDataSinkFunction( + boolean materializedInMemory, boolean print, boolean includeSchemaInfo) { this.materializedInMemory = materializedInMemory; this.print = print; schemaMaps = new HashMap<>(); fieldGetterMaps = new HashMap<>(); + this.includeSchemaInfo = includeSchemaInfo; } @Override @@ -82,7 +86,9 @@ public void invoke(Event event, Context context) throws Exception { // print the detail message to console for verification. System.out.println( ValuesDataSinkHelper.convertEventToStr( - event, fieldGetterMaps.get(((ChangeEvent) event).tableId()))); + event, + fieldGetterMaps.get(((ChangeEvent) event).tableId()), + includeSchemaInfo)); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java index ba52c1fd87e..086510583a3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelper.java @@ -29,25 +29,41 @@ public class ValuesDataSinkHelper { /** convert Event to String for {@link ValuesDataSink} to display detail message. */ - public static String convertEventToStr(Event event, List fieldGetters) { + public static String convertEventToStr( + Event event, List fieldGetters, boolean includeSchemaInfo) { if (event instanceof SchemaChangeEvent) { return event.toString(); } else if (event instanceof DataChangeEvent) { DataChangeEvent dataChangeEvent = (DataChangeEvent) event; - String eventStr = - "DataChangeEvent{" - + "tableId=" - + dataChangeEvent.tableId() - + ", before=" - + getFields(fieldGetters, dataChangeEvent.before()) - + ", after=" - + getFields(fieldGetters, dataChangeEvent.after()) - + ", op=" - + dataChangeEvent.op() - + ", meta=" - + dataChangeEvent.describeMeta() - + '}'; - return eventStr; + if (includeSchemaInfo) { + return "DataChangeEvent{" + + "tableId=" + + dataChangeEvent.tableId() + + ", before=" + + getFields(fieldGetters, dataChangeEvent.before()) + + ", after=" + + getFields(fieldGetters, dataChangeEvent.after()) + + ", op=" + + dataChangeEvent.op() + + ", meta=" + + dataChangeEvent.describeMeta() + + ", schema=" + + dataChangeEvent.getSchema() + + '}'; + } else { + return "DataChangeEvent{" + + "tableId=" + + dataChangeEvent.tableId() + + ", before=" + + getFields(fieldGetters, dataChangeEvent.before()) + + ", after=" + + getFields(fieldGetters, dataChangeEvent.after()) + + ", op=" + + dataChangeEvent.op() + + ", meta=" + + dataChangeEvent.describeMeta() + + '}'; + } } return "Event{}"; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java index 4830d102be0..af10ebbc2f0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java @@ -20,6 +20,8 @@ import org.apache.flink.cdc.common.configuration.ConfigOption; import org.apache.flink.cdc.common.configuration.ConfigOptions; +import static org.apache.flink.cdc.common.configuration.ConfigOptions.key; + /** Configurations for {@link ValuesDataSink}. */ public class ValuesDataSinkOptions { @@ -49,4 +51,11 @@ public class ValuesDataSinkOptions { .defaultValue(false) .withDescription( "True if a runtime error should be thrown when handling schema change events."); + + public static final ConfigOption SINK_SCHEMA_INFO_ENABLED = + key("sink.schema-info-enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether receive schema info, by default is false. If set to true and the 'schema-info.enabled' parameter in source is also true, the column type info will be sent and increase message size."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java index 238f0756456..f6d362d0714 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkHelperTest.java @@ -33,6 +33,8 @@ import java.util.List; +import static org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions.SINK_SCHEMA_INFO_ENABLED; + /** A test for the {@link org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkHelper}. */ public class ValuesDataSinkHelperTest { @@ -52,7 +54,9 @@ public void testConvertEventToStr() { Assert.assertEquals( "CreateTableEvent{tableId=default.default.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}", ValuesDataSinkHelper.convertEventToStr( - new CreateTableEvent(tableId, schema), fieldGetters)); + new CreateTableEvent(tableId, schema), + fieldGetters, + SINK_SCHEMA_INFO_ENABLED.defaultValue())); DataChangeEvent insertEvent = DataChangeEvent.insertEvent( @@ -64,7 +68,8 @@ public void testConvertEventToStr() { })); Assert.assertEquals( "DataChangeEvent{tableId=default.default.table1, before=[], after=[1, 1], op=INSERT, meta=()}", - ValuesDataSinkHelper.convertEventToStr(insertEvent, fieldGetters)); + ValuesDataSinkHelper.convertEventToStr( + insertEvent, fieldGetters, SINK_SCHEMA_INFO_ENABLED.defaultValue())); DataChangeEvent deleteEvent = DataChangeEvent.deleteEvent( tableId, @@ -75,7 +80,8 @@ public void testConvertEventToStr() { })); Assert.assertEquals( "DataChangeEvent{tableId=default.default.table1, before=[1, 1], after=[], op=DELETE, meta=()}", - ValuesDataSinkHelper.convertEventToStr(deleteEvent, fieldGetters)); + ValuesDataSinkHelper.convertEventToStr( + deleteEvent, fieldGetters, SINK_SCHEMA_INFO_ENABLED.defaultValue())); DataChangeEvent updateEvent = DataChangeEvent.updateEvent( tableId, @@ -91,6 +97,7 @@ public void testConvertEventToStr() { })); Assert.assertEquals( "DataChangeEvent{tableId=default.default.table1, before=[1, 1], after=[1, x], op=UPDATE, meta=()}", - ValuesDataSinkHelper.convertEventToStr(updateEvent, fieldGetters)); + ValuesDataSinkHelper.convertEventToStr( + updateEvent, fieldGetters, SINK_SCHEMA_INFO_ENABLED.defaultValue())); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java index f77d1aa184c..fac3a64f27e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java @@ -39,6 +39,10 @@ import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo; import org.apache.flink.util.Collector; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.debezium.data.Envelope; import io.debezium.data.SpecialValueDecimal; import io.debezium.data.VariableScaleDecimal; @@ -51,7 +55,10 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.ConverterType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +66,7 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -82,10 +90,17 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve /** Changelog Mode to use for encoding changes in Flink internal data structure. */ protected final DebeziumChangelogMode changelogMode; + private transient JsonConverter jsonConverter; + + private final boolean includeSchemaInfo; + public DebeziumEventDeserializationSchema( - SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) { + SchemaDataTypeInference schemaDataTypeInference, + DebeziumChangelogMode changelogMode, + boolean includeSchemaInfo) { this.schemaDataTypeInference = schemaDataTypeInference; this.changelogMode = changelogMode; + this.includeSchemaInfo = includeSchemaInfo; } @Override @@ -102,21 +117,70 @@ public List deserializeDataChangeRecord(SourceRecord record) th Schema valueSchema = record.valueSchema(); Map meta = getMetadata(record); + if (includeSchemaInfo) { + if (jsonConverter == null) { + initializeJsonConverter(); + } + } if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { RecordData after = extractAfterDataRecord(value, valueSchema); - return Collections.singletonList(DataChangeEvent.insertEvent(tableId, after, meta)); + List dataChangeEvent = + includeSchemaInfo + ? Collections.singletonList( + DataChangeEvent.insertEvent( + tableId, + after, + meta, + extractBeforeAndAfterSchema( + jsonConverter.asJsonSchema(valueSchema)))) + : Collections.singletonList( + DataChangeEvent.insertEvent(tableId, after, meta)); + return dataChangeEvent; } else if (op == Envelope.Operation.DELETE) { RecordData before = extractBeforeDataRecord(value, valueSchema); - return Collections.singletonList(DataChangeEvent.deleteEvent(tableId, before, meta)); + List dataChangeEvent = + includeSchemaInfo + ? Collections.singletonList( + DataChangeEvent.deleteEvent( + tableId, + before, + meta, + extractBeforeAndAfterSchema( + jsonConverter.asJsonSchema(valueSchema)))) + : Collections.singletonList( + DataChangeEvent.deleteEvent(tableId, before, meta)); + return dataChangeEvent; } else if (op == Envelope.Operation.UPDATE) { RecordData after = extractAfterDataRecord(value, valueSchema); if (changelogMode == DebeziumChangelogMode.ALL) { RecordData before = extractBeforeDataRecord(value, valueSchema); - return Collections.singletonList( - DataChangeEvent.updateEvent(tableId, before, after, meta)); + List dataChangeEvent = + includeSchemaInfo + ? Collections.singletonList( + DataChangeEvent.updateEvent( + tableId, + before, + after, + meta, + extractBeforeAndAfterSchema( + jsonConverter.asJsonSchema(valueSchema)))) + : Collections.singletonList( + DataChangeEvent.updateEvent(tableId, before, after, meta)); + return dataChangeEvent; } - return Collections.singletonList( - DataChangeEvent.updateEvent(tableId, null, after, meta)); + List dataChangeEvent = + includeSchemaInfo + ? Collections.singletonList( + DataChangeEvent.updateEvent( + tableId, + null, + after, + meta, + extractBeforeAndAfterSchema( + jsonConverter.asJsonSchema(valueSchema)))) + : Collections.singletonList( + DataChangeEvent.updateEvent(tableId, null, after, meta)); + return dataChangeEvent; } else { LOG.trace("Received {} operation, skip", op); return Collections.emptyList(); @@ -140,6 +204,24 @@ private RecordData extractAfterDataRecord(Struct value, Schema valueSchema) thro return extractDataRecord(afterValue, afterSchema); } + /** extract schema of before or after fields. */ + private String extractBeforeAndAfterSchema(ObjectNode valueSchema) { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode copyNode = valueSchema.deepCopy(); + + ArrayNode fieldsArray = (ArrayNode) copyNode.get("fields"); + ArrayNode newFields = mapper.createArrayNode(); + for (JsonNode field : fieldsArray) { + String fieldName = field.get("field").asText(); + if (fieldName.equals(Envelope.FieldName.BEFORE) + || fieldName.equals(Envelope.FieldName.AFTER)) { + newFields.add(field); + } + } + copyNode.set("fields", newFields); + return copyNode.toString(); + } + private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception { DataType dataType = schemaDataTypeInference.infer(value, valueSchema); return (RecordData) getOrCreateConverter(dataType).convert(value, valueSchema); @@ -149,6 +231,13 @@ private DeserializationRuntimeConverter getOrCreateConverter(DataType type) { return CONVERTERS.computeIfAbsent(type, this::createConverter); } + private void initializeJsonConverter() { + jsonConverter = new JsonConverter(); + final HashMap configs = new HashMap<>(2); + configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); + jsonConverter.configure(configs); + } + // ------------------------------------------------------------------------------------- // Runtime Converters // ------------------------------------------------------------------------------------- diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index dd0ac789666..a9126d4abe9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -61,6 +61,7 @@ public class MySqlSourceConfig implements Serializable { private final double distributionFactorUpper; private final double distributionFactorLower; private final boolean includeSchemaChanges; + private final boolean includeSchemaInfo; private final boolean scanNewlyAddedTableEnabled; private final boolean closeIdleReaders; private final Properties jdbcProperties; @@ -94,6 +95,7 @@ public class MySqlSourceConfig implements Serializable { double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean includeSchemaInfo, boolean scanNewlyAddedTableEnabled, boolean closeIdleReaders, Properties dbzProperties, @@ -119,6 +121,7 @@ public class MySqlSourceConfig implements Serializable { this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; + this.includeSchemaInfo = includeSchemaInfo; this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; this.closeIdleReaders = closeIdleReaders; this.dbzProperties = checkNotNull(dbzProperties); @@ -202,6 +205,10 @@ public boolean isIncludeSchemaChanges() { return includeSchemaChanges; } + public boolean isIncludeSchemaInfo() { + return includeSchemaInfo; + } + public boolean isScanNewlyAddedTableEnabled() { return scanNewlyAddedTableEnabled; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 8b65055ca13..e87a93b90fb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -63,6 +63,7 @@ public class MySqlSourceConfigFactory implements Serializable { private double distributionFactorLower = MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; + private boolean includeSchemaInfo = false; private boolean scanNewlyAddedTableEnabled = false; private boolean closeIdleReaders = false; private Properties jdbcProperties; @@ -231,6 +232,12 @@ public MySqlSourceConfigFactory includeSchemaChanges(boolean includeSchemaChange return this; } + /** Whether the {@link MySqlSource} should output the schema info or not. */ + public MySqlSourceConfigFactory includeSchemaInfo(boolean includeSchemaInfo) { + this.includeSchemaInfo = includeSchemaInfo; + return this; + } + /** Whether the {@link MySqlSource} should scan the newly added tables or not. */ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAddedTableEnabled) { this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; @@ -379,6 +386,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + includeSchemaInfo, scanNewlyAddedTableEnabled, closeIdleReaders, props, diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 31c4d2ec815..2ce98931741 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -98,6 +98,14 @@ limitations under the License. test-jar test + + org.apache.flink + flink-cdc-pipeline-connector-kafka + ${project.version} + test-jar + test + + org.apache.flink flink-connector-test-util @@ -126,6 +134,13 @@ limitations under the License. ${scala.version} test + + + org.testcontainers + kafka + ${testcontainers.version} + test + @@ -245,6 +260,16 @@ limitations under the License. + + org.apache.flink + flink-cdc-pipeline-connector-kafka + ${project.version} + kafka-cdc-pipeline-connector.jar + jar + ${project.build.directory}/dependencies + + + org.apache.flink flink-cdc-pipeline-udf-examples diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index a8c7a8c5f67..8290a27009c 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -334,6 +334,256 @@ public void testSchemaChangeEvents() throws Exception { "DropTableEvent{tableId=%s.products}"); } + @Test + public void testSchemaChangeEventsWithSchemaInfo() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + " schema-info.enabled: true\n" + + "\n" + + "sink:\n" + + " type: values\n" + + " sink.schema-info-enabled: true\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d\n" + + " schema.change.behavior: evolve", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + String customersSchemaInfo = + String.format( + "schema=" + + "{\"type\":\"struct\",\"fields\":[" + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"address\"},{\"type\":\"string\",\"optional\":true,\"field\":\"phone_number\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.customers.Value\",\"field\":\"before\"}," + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"address\"},{\"type\":\"string\",\"optional\":true,\"field\":\"phone_number\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.customers.Value\",\"field\":\"after\"}" + + "],\"optional\":false,\"name\":\"mysql_binlog_source.%s.customers.Envelope\"}", + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName()); + + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=(), " + + customersSchemaInfo + + "}", + mysqlInventoryDatabase.getDatabaseName())); + + String productsSchemaInfo = + String.format( + "schema={\"type\":\"struct\",\"fields\":[" + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"before\"}," + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"after\"}" + + "],\"optional\":false,\"name\":\"mysql_binlog_source.%s.products.Envelope\"}", + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName()); + + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=(), " + + customersSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=(), " + + customersSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=(), " + + customersSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.customers, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=(), " + + customersSchemaInfo + + "}", + "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=%s.products, before=[], after=[109, spare tire, 24 inch spare tire, 22.2, null, null, null], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[107, rocks, box of assorted rocks, 5.3, null, null, null], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[108, jacket, water resistent black wind breaker, 0.1, null, null, null], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=(), " + + productsSchemaInfo + + "}"); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // Perform DDL changes after the binlog is generated + waitUntilSpecificEvent( + String.format( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=(), " + + productsSchemaInfo + + "}", + mysqlInventoryDatabase.getDatabaseName())); + + LOG.info("Begin schema evolution stage."); + + // Test AddColumnEvent + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + + String productsSchemaInfoAddNewCol = + String.format( + "schema={\"type\":\"struct\",\"fields\":[" + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"new_col\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"before\"}," + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"new_col\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"after\"}" + + "],\"optional\":false,\"name\":\"mysql_binlog_source.%s.products.Envelope\"}", + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName()); + + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + + // Test AlterColumnTypeEvent + stat.execute("ALTER TABLE products MODIFY COLUMN new_col BIGINT;"); + + String productsSchemaInfoModifyNewCol = + String.format( + "schema={\"type\":\"struct\",\"fields\":[" + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"new_col\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"before\"}," + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"new_col\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"after\"}" + + "],\"optional\":false,\"name\":\"mysql_binlog_source.%s.products.Envelope\"}", + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName()); + + stat.execute( + "INSERT INTO products VALUES (default,'derrida','forever 21',2.1728, null, null, null, 2147483649);"); // 112 + + // Test RenameColumnEvent + stat.execute("ALTER TABLE products RENAME COLUMN new_col TO new_column;"); + + String productsSchemaInfoRenameNewColumn = + String.format( + "schema={\"type\":\"struct\",\"fields\":[" + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"new_column\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"before\"}," + + "{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"default\":\"flink\",\"field\":\"name\"},{\"type\":\"string\",\"optional\":true,\"field\":\"description\"},{\"type\":\"float\",\"optional\":true,\"field\":\"weight\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"red,white\"},\"default\":\"red\",\"field\":\"enum_c\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Json\",\"version\":1,\"field\":\"json_c\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"double\",\"optional\":false,\"field\":\"x\"},{\"type\":\"double\",\"optional\":false,\"field\":\"y\"},{\"type\":\"bytes\",\"optional\":true,\"field\":\"wkb\"},{\"type\":\"int32\",\"optional\":true,\"field\":\"srid\"}],\"optional\":true,\"name\":\"io.debezium.data.geometry.Point\",\"version\":1,\"doc\":\"Geometry (POINT)\",\"field\":\"point_c\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"new_column\"}],\"optional\":true,\"name\":\"mysql_binlog_source.%s.products.Value\",\"field\":\"after\"}" + + "],\"optional\":false,\"name\":\"mysql_binlog_source.%s.products.Envelope\"}", + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName(), + mysqlInventoryDatabase.getDatabaseName()); + + stat.execute( + "INSERT INTO products VALUES (default,'dynazenon','SSSS',2.1728, null, null, null, 2147483649);"); // 113 + + // Test DropColumnEvent + stat.execute("ALTER TABLE products DROP COLUMN new_column;"); + + stat.execute( + "INSERT INTO products VALUES (default,'evangelion','Eva',2.1728, null, null, null);"); // 114 + + // Test TruncateTableEvent + stat.execute("TRUNCATE TABLE products;"); + + // Test DropTableEvent. It's all over. + stat.execute("DROP TABLE products;"); + + waitUntilSpecificEvent( + String.format( + "DropTableEvent{tableId=%s.products}", + mysqlInventoryDatabase.getDatabaseName())); + + validateResult( + "DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=(), " + + productsSchemaInfo + + "}", + "DataChangeEvent{tableId=%s.products, before=[107, rocks, box of assorted rocks, 5.3, null, null, null], after=[107, rocks, box of assorted rocks, 5.1, null, null, null], op=UPDATE, meta=(), " + + productsSchemaInfo + + "}", + "AddColumnEvent{tableId=%s.products, addedColumns=[ColumnWithPosition{column=`new_col` INT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], op=INSERT, meta=(), " + + productsSchemaInfoAddNewCol + + "}", + "DataChangeEvent{tableId=%s.products, before=[], after=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], op=INSERT, meta=(), " + + productsSchemaInfoAddNewCol + + "}", + "DataChangeEvent{tableId=%s.products, before=[110, jacket, water resistent white wind breaker, 0.2, null, null, null, 1], after=[110, jacket, new water resistent white wind breaker, 0.5, null, null, null, 1], op=UPDATE, meta=(), " + + productsSchemaInfoAddNewCol + + "}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.18, null, null, null, 1], after=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], op=UPDATE, meta=(), " + + productsSchemaInfoAddNewCol + + "}", + "DataChangeEvent{tableId=%s.products, before=[111, scooter, Big 2-wheel scooter , 5.17, null, null, null, 1], after=[], op=DELETE, meta=(), " + + productsSchemaInfoAddNewCol + + "}", + "AlterColumnTypeEvent{tableId=%s.products, typeMapping={new_col=BIGINT}, oldTypeMapping={new_col=INT}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[112, derrida, forever 21, 2.1728, null, null, null, 2147483649], op=INSERT, meta=(), " + + productsSchemaInfoModifyNewCol + + "}", + "RenameColumnEvent{tableId=%s.products, nameMapping={new_col=new_column}}", + "DataChangeEvent{tableId=%s.products, before=[], after=[113, dynazenon, SSSS, 2.1728, null, null, null, 2147483649], op=INSERT, meta=(), " + + productsSchemaInfoRenameNewColumn + + "}", + "DropColumnEvent{tableId=%s.products, droppedColumnNames=[new_column]}", + "DataChangeEvent{tableId=%s.products, before=[], after=[114, evangelion, Eva, 2.1728, null, null, null], op=INSERT, meta=(), " + + productsSchemaInfo + + "}", + "TruncateTableEvent{tableId=%s.products}", + "DropTableEvent{tableId=%s.products}"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + } + @Test public void testDanglingDropTableEventInBinlog() throws Exception { // Create a new table for later deletion diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java new file mode 100644 index 00000000000..29f0ea7551d --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.kafka.sink.KafkaUtil; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.DockerImageVersions.KAFKA; +import static org.assertj.core.api.Assertions.assertThat; + +/** End-to-end tests for mysql cdc to Kafka pipeline job. */ +@RunWith(Parameterized.class) +public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MysqlToKafkaE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + protected static final long EVENT_WAITING_TIMEOUT = 60000L; + + private static AdminClient admin; + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final int ZK_TIMEOUT_MILLIS = 30000; + private static final short TOPIC_REPLICATION_FACTOR = 1; + private TableId table; + private String topic; + private KafkaConsumer consumer; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final KafkaContainer KAFKA_CONTAINER = + KafkaUtil.createKafkaContainer(KAFKA, LOG) + .withNetworkAliases("kafka") + .withEmbeddedZookeeper() + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @BeforeClass + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); + Map properties = new HashMap<>(); + properties.put( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + KAFKA_CONTAINER.getBootstrapServers()); + admin = AdminClient.create(properties); + LOG.info("Containers are started."); + } + + @Before + public void before() throws Exception { + super.before(); + createTestTopic(1, TOPIC_REPLICATION_FACTOR); + Properties properties = getKafkaClientConfiguration(); + consumer = new KafkaConsumer<>(properties); + consumer.subscribe(Collections.singletonList(topic)); + mysqlInventoryDatabase.createAndInitialize(); + } + + @After + public void after() { + super.after(); + admin.deleteTopics(Collections.singletonList(topic)); + consumer.close(); + mysqlInventoryDatabase.dropDatabase(); + } + + @Test + public void testSyncWholeDatabaseWithDebeziumJsonHasSchema() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + " schema-info.enabled: true\n" + + "\n" + + "sink:\n" + + " type: kafka\n" + + " properties.bootstrap.servers: kafka:9092\n" + + " topic: %s\n" + + " sink.schema-info-enabled: true\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + topic, + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + List> collectedRecords = new ArrayList<>(); + int expectedEventCount = 13; + waitUntilSpecificEventCount(collectedRecords, expectedEventCount); + List expectedRecords = + getExpectedRecords("expectedEvents/mysqlToKafka/debezium-json-with-schema.txt"); + assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords)); + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // modify table schema + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + expectedEventCount = 20; + waitUntilSpecificEventCount(collectedRecords, expectedEventCount); + assertThat(expectedRecords) + .containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords)); + } + + private void waitUntilSpecificEventCount( + List> actualEvent, int expectedCount) throws Exception { + boolean result = false; + long endTimeout = System.currentTimeMillis() + MysqlToKafkaE2eITCase.EVENT_WAITING_TIMEOUT; + while (System.currentTimeMillis() < endTimeout) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + records.forEach(actualEvent::add); + if (actualEvent.size() == expectedCount) { + result = true; + break; + } + Thread.sleep(1000); + } + if (!result) { + throw new TimeoutException( + "failed to get specific event count: " + + expectedCount + + " from stdout: " + + actualEvent.size()); + } + } + + private static Properties getKafkaClientConfiguration() { + final Properties standardProps = new Properties(); + standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); + standardProps.put("group.id", UUID.randomUUID().toString()); + standardProps.put("enable.auto.commit", false); + standardProps.put("auto.offset.reset", "earliest"); + standardProps.put("max.partition.fetch.bytes", 256); + standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS); + standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS); + standardProps.put("key.deserializer", ByteArrayDeserializer.class.getName()); + standardProps.put("value.deserializer", ByteArrayDeserializer.class.getName()); + return standardProps; + } + + private void createTestTopic(int numPartitions, short replicationFactor) + throws ExecutionException, InterruptedException { + table = + TableId.tableId( + "default_namespace", "default_schema", UUID.randomUUID().toString()); + topic = table.toString(); + final CreateTopicsResult result = + admin.createTopics( + Collections.singletonList( + new NewTopic(topic, numPartitions, replicationFactor))); + result.all().get(); + } + + private List deserializeValues(List> records) + throws IOException { + List result = new ArrayList<>(); + for (ConsumerRecord record : records) { + result.add(new String(record.value(), "UTF-8")); + } + return result; + } + + protected List getExpectedRecords(String resourceDirFormat) throws Exception { + URL url = + MysqlToKafkaE2eITCase.class + .getClassLoader() + .getResource(String.format(resourceDirFormat)); + return Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isValidJsonRecord) + .map( + line -> + line.replace( + "$databaseName", mysqlInventoryDatabase.getDatabaseName())) + .collect(Collectors.toList()); + } + + protected boolean isValidJsonRecord(String line) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.readTree(line); + return !StringUtils.isEmpty(line); + } catch (JsonProcessingException e) { + return false; + } + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt new file mode 100644 index 00000000000..fbb1de27d04 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.customers.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"user_2","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.customers.Envelope"},"payload":{"before":null,"after":{"id":103,"name":"user_3","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.customers.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"user_1","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"name":"mysql_binlog_source.$databaseName.customers.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.customers.Envelope"},"payload":{"before":null,"after":{"id":104,"name":"user_4","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"enum_c":"white","json_c":"{\"key4\": \"value4\"}","point_c":"{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"enum_c":"red","json_c":"{\"key3\": \"value3\"}","point_c":"{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"enum_c":"red","json_c":"{\"k1\": \"v1\", \"k2\": \"v2\"}","point_c":"{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"enum_c":"white","json_c":"{\"key2\": \"value2\"}","point_c":"{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"enum_c":"red","json_c":"{\"key1\": \"value1\"}","point_c":"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}}} +{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"red,white"},"default":"red","field":"enum_c"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"json_c"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"point_c"},{"type":"int32","optional":true,"field":"new_col"}],"optional":true,"name":"mysql_binlog_source.$databaseName.products.Value","field":"after"}],"optional":false,"name":"mysql_binlog_source.$databaseName.products.Envelope"},"payload":{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":null,"op":"d","source":{"db":"$databaseName","table":"products"}}} \ No newline at end of file diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DataChangeEventSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DataChangeEventSerializer.java index 182650c3ad5..13f59ec3770 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DataChangeEventSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DataChangeEventSerializer.java @@ -51,6 +51,7 @@ public class DataChangeEventSerializer extends TypeSerializerSingleton opSerializer = new EnumSerializer<>(OperationType.class); private final RecordDataSerializer recordDataSerializer = RecordDataSerializer.INSTANCE; + private final StringSerializer stringSerializer = StringSerializer.INSTANCE; @Override public DataChangeEvent createInstance() { @@ -69,6 +70,8 @@ public void serialize(DataChangeEvent event, DataOutputView target) throws IOExc recordDataSerializer.serialize(event.after(), target); } metaSerializer.serialize(event.meta(), target); + + stringSerializer.serialize(event.getSchema(), target); } @Override @@ -81,23 +84,27 @@ public DataChangeEvent deserialize(DataInputView source) throws IOException { return DataChangeEvent.deleteEvent( tableId, recordDataSerializer.deserialize(source), - metaSerializer.deserialize(source)); + metaSerializer.deserialize(source), + stringSerializer.deserialize(source)); case INSERT: return DataChangeEvent.insertEvent( tableId, recordDataSerializer.deserialize(source), - metaSerializer.deserialize(source)); + metaSerializer.deserialize(source), + stringSerializer.deserialize(source)); case UPDATE: return DataChangeEvent.updateEvent( tableId, recordDataSerializer.deserialize(source), recordDataSerializer.deserialize(source), - metaSerializer.deserialize(source)); + metaSerializer.deserialize(source), + stringSerializer.deserialize(source)); case REPLACE: return DataChangeEvent.replaceEvent( tableId, recordDataSerializer.deserialize(source), - metaSerializer.deserialize(source)); + metaSerializer.deserialize(source), + stringSerializer.deserialize(source)); default: throw new IllegalArgumentException("Unsupported data change event: " + op); } @@ -117,23 +124,27 @@ public DataChangeEvent copy(DataChangeEvent from) { return DataChangeEvent.deleteEvent( tableIdSerializer.copy(from.tableId()), recordDataSerializer.copy(from.before()), - metaSerializer.copy(from.meta())); + metaSerializer.copy(from.meta()), + stringSerializer.copy(from.getSchema())); case INSERT: return DataChangeEvent.insertEvent( tableIdSerializer.copy(from.tableId()), recordDataSerializer.copy(from.after()), - metaSerializer.copy(from.meta())); + metaSerializer.copy(from.meta()), + stringSerializer.copy(from.getSchema())); case UPDATE: return DataChangeEvent.updateEvent( tableIdSerializer.copy(from.tableId()), recordDataSerializer.copy(from.before()), recordDataSerializer.copy(from.after()), - metaSerializer.copy(from.meta())); + metaSerializer.copy(from.meta()), + stringSerializer.copy(from.getSchema())); case REPLACE: return DataChangeEvent.replaceEvent( tableIdSerializer.copy(from.tableId()), recordDataSerializer.copy(from.after()), - metaSerializer.copy(from.meta())); + metaSerializer.copy(from.meta()), + stringSerializer.copy(from.getSchema())); default: throw new IllegalArgumentException("Unsupported data change event: " + op); }