Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36611][pipeline-connector][kafka] Add schema info to output of Kafka sink #3791

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,22 @@ private DataChangeEvent(
RecordData after,
OperationType op,
Map<String, String> meta) {
this(tableId, before, after, op, meta, null);
}

private DataChangeEvent(
TableId tableId,
RecordData before,
RecordData after,
OperationType op,
Map<String, String> meta,
String schema) {
this.tableId = tableId;
this.before = before;
this.after = after;
this.op = op;
this.meta = meta;
this.schema = schema;
}

private final TableId tableId;
Expand All @@ -61,6 +72,8 @@ private DataChangeEvent(
/** Optional, describes the metadata of the change event. e.g. MySQL binlog file name, pos. */
private final Map<String, String> meta;

private final String schema;

@Override
public TableId tableId() {
return tableId;
Expand All @@ -82,6 +95,10 @@ public Map<String, String> meta() {
return meta;
}

public String getSchema() {
return schema;
}

/** Creates a {@link DataChangeEvent} instance that describes the insert event. */
public static DataChangeEvent insertEvent(TableId tableId, RecordData after) {
return new DataChangeEvent(
Expand All @@ -96,6 +113,15 @@ public static DataChangeEvent insertEvent(
return new DataChangeEvent(tableId, null, after, OperationType.INSERT, meta);
}

/**
* Creates a {@link DataChangeEvent} instance that describes the insert event with meta info and
* schema info.
*/
public static DataChangeEvent insertEvent(
TableId tableId, RecordData after, Map<String, String> meta, String schema) {
return new DataChangeEvent(tableId, null, after, OperationType.INSERT, meta, schema);
}

/** Creates a {@link DataChangeEvent} instance that describes the delete event. */
public static DataChangeEvent deleteEvent(TableId tableId, RecordData before) {
return new DataChangeEvent(
Expand All @@ -110,6 +136,15 @@ public static DataChangeEvent deleteEvent(
return new DataChangeEvent(tableId, before, null, OperationType.DELETE, meta);
}

/**
* Creates a {@link DataChangeEvent} instance that describes the delete event with meta info and
* schema info.
*/
public static DataChangeEvent deleteEvent(
TableId tableId, RecordData before, Map<String, String> meta, String schema) {
return new DataChangeEvent(tableId, before, null, OperationType.DELETE, meta, schema);
}

/** Creates a {@link DataChangeEvent} instance that describes the update event. */
public static DataChangeEvent updateEvent(
TableId tableId, RecordData before, RecordData after) {
Expand All @@ -125,6 +160,19 @@ public static DataChangeEvent updateEvent(
return new DataChangeEvent(tableId, before, after, OperationType.UPDATE, meta);
}

/**
* Creates a {@link DataChangeEvent} instance that describes the update event with meta info and
* schema info.
*/
public static DataChangeEvent updateEvent(
TableId tableId,
RecordData before,
RecordData after,
Map<String, String> meta,
String schema) {
return new DataChangeEvent(tableId, before, after, OperationType.UPDATE, meta, schema);
}

/** Creates a {@link DataChangeEvent} instance that describes the replace event. */
public static DataChangeEvent replaceEvent(TableId tableId, RecordData after) {
return new DataChangeEvent(
Expand All @@ -139,6 +187,14 @@ public static DataChangeEvent replaceEvent(
return new DataChangeEvent(tableId, null, after, OperationType.REPLACE, meta);
}

/**
* Creates a {@link DataChangeEvent} instance that describes the replace event with meta info.
*/
public static DataChangeEvent replaceEvent(
TableId tableId, RecordData after, Map<String, String> meta, String columnType) {
return new DataChangeEvent(tableId, null, after, OperationType.REPLACE, meta, columnType);
}

/**
* Updates the before of a {@link DataChangeEvent} instance that describes the event with meta
* info.
Expand Down Expand Up @@ -211,6 +267,8 @@ public String toString() {
+ op
+ ", meta="
+ describeMeta()
+ ", schema="
+ schema
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public class ChangeLogJsonFormatFactory {
* @return The configured instance of {@link SerializationSchema}.
*/
public static SerializationSchema<Event> createSerializationSchema(
ReadableConfig formatOptions, JsonSerializationType type, ZoneId zoneId) {
ReadableConfig formatOptions,
JsonSerializationType type,
ZoneId zoneId,
boolean includeSchemaInfo) {
TimestampFormat timestampFormat = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
Expand All @@ -65,7 +68,8 @@ public static SerializationSchema<Event> createSerializationSchema(
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
includeSchemaInfo);
}
case CANAL_JSON:
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.kafka.json.debezium;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonParserRowDataDeserializationSchema;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.formats.json.RowDataToJsonConverters;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.Objects;

/**
* Serialization schema that serializes an object of Flink internal data structure into a JSON
* bytes.
*
* <p>Serializes the input Flink object into a JSON string and converts it into <code>byte[]</code>.
*
* <p>Result <code>byte[]</code> messages can be deserialized using {@link
* JsonRowDataDeserializationSchema} or {@link JsonParserRowDataDeserializationSchema}.
*/
public class DebeziumJsonRowDataSerializationSchema implements SerializationSchema<RowData> {
private static final long serialVersionUID = 1L;

/** RowType to generate the runtime converter. */
private final RowType rowType;

/** The converter that converts internal data formats to JsonNode. */
private final RowDataToJsonConverters.RowDataToJsonConverter runtimeConverter;

/** Object mapper that is used to create output JSON objects. */
private transient ObjectMapper mapper;

/** Reusable object node. */
private transient ObjectNode node;

/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;

/** The handling mode when serializing null keys for map data. */
private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;

/** The string literal when handling mode for map null key LITERAL. */
private final String mapNullKeyLiteral;

/** Flag indicating whether to serialize all decimals as plain numbers. */
private final boolean encodeDecimalAsPlainNumber;

private final boolean includeSchemaInfo;

public DebeziumJsonRowDataSerializationSchema(
RowType rowType,
TimestampFormat timestampFormat,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
boolean encodeDecimalAsPlainNumber,
boolean includeSchemaInfo) {
this.rowType = rowType;
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.runtimeConverter =
new RowDataToJsonConverters(timestampFormat, mapNullKeyMode, mapNullKeyLiteral)
.createConverter(rowType);
this.includeSchemaInfo = includeSchemaInfo;
}

@Override
public void open(InitializationContext context) throws Exception {
mapper =
JacksonMapperFactory.createObjectMapper()
.configure(
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
encodeDecimalAsPlainNumber);
}

@Override
public byte[] serialize(RowData row) {
if (node == null) {
node = mapper.createObjectNode();
}

try {
runtimeConverter.convert(mapper, node, row);
if (includeSchemaInfo) {
// schema is a nested json string, asText() can return a pure string without other
// escape characters such as "\"
String schemaValue = node.get("schema").asText();
JsonNode schemaNode = mapper.readTree(schemaValue);
node.set("schema", schemaNode);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the schema is passed to the downstream as a string, and there is a nested json in the schema, if the json string is put into jsonNode, there will be ["]. The JsonNode.asText() method can solve this problem well.

}
return mapper.writeValueAsBytes(node);
} catch (Throwable t) {
throw new RuntimeException(String.format("Could not serialize row '%s'.", row), t);
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DebeziumJsonRowDataSerializationSchema that = (DebeziumJsonRowDataSerializationSchema) o;
return rowType.equals(that.rowType)
&& timestampFormat.equals(that.timestampFormat)
&& mapNullKeyMode.equals(that.mapNullKeyMode)
&& mapNullKeyLiteral.equals(that.mapNullKeyLiteral)
&& encodeDecimalAsPlainNumber == that.encodeDecimalAsPlainNumber;
}

@Override
public int hashCode() {
return Objects.hash(
rowType,
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
}
}
Loading