Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ Pipeline 连接器配置项
<td>String</td>
<td>Sink 的名称。 </td>
</tr>
<tr>
<td>partition.strategy</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>定义发送数据到 Kafka 分区的策略, 可以设置的选项有 `all-to-zero`(将所有数据发送到 0 号分区) 以及 `hash-by-key`(所有数据根据主键的哈希值分发),默认值为 `all-to-zero`。 </td>
</tr>
<tr>
<td>key.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>用于序列化 Kafka 消息的键部分数据的格式。可以设置的选项有 `csv` 以及 `json`, 默认值为 `json`。 </td>
</tr>
<tr>
<td>value.format</td>
<td>optional</td>
Expand Down
14 changes: 14 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ Pipeline Connector Options
<td>String</td>
<td>The name of the sink.</td>
</tr>
<tr>
<td>partition.strategy</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the strategy for sending record to kafka topic, available options are `all-to-zero`(sending all records to 0 partition) and `hash-by-key`(distributing all records by hash of primary keys), default option is `all-to-zero`. </td>
</tr>
<tr>
<td>key.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the format identifier for encoding key data, available options are `csv` and `json`, default option is `json`. </td>
</tr>
<tr>
<td>value.format</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ limitations under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>


<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryStringData;

Expand All @@ -39,26 +40,59 @@
/** maintain the {@link SerializationSchema} of a specific {@link TableId}. */
public class TableSchemaInfo {

private final TableId tableId;

private final Schema schema;

private final List<Integer> primaryKeyColumnIndexes;

private final List<RecordData.FieldGetter> fieldGetters;

private final SerializationSchema<RowData> serializationSchema;

public TableSchemaInfo(
Schema schema, SerializationSchema<RowData> serializationSchema, ZoneId zoneId) {
TableId tableId,
Schema schema,
SerializationSchema<RowData> serializationSchema,
ZoneId zoneId) {
this.tableId = tableId;
this.schema = schema;
this.serializationSchema = serializationSchema;
this.fieldGetters = createFieldGetters(schema, zoneId);
primaryKeyColumnIndexes = new ArrayList<>();
for (int keyIndex = 0; keyIndex < schema.primaryKeys().size(); keyIndex++) {
for (int columnIndex = 0; columnIndex < schema.getColumnCount(); columnIndex++) {
if (schema.getColumns()
.get(columnIndex)
.getName()
.equals(schema.primaryKeys().get(keyIndex))) {
primaryKeyColumnIndexes.add(columnIndex);
break;
}
}
}
}

/** convert to {@link RowData}, which will be pass to serializationSchema. */
public RowData getRowDataFromRecordData(RecordData recordData) {
GenericRowData genericRowData = new GenericRowData(recordData.getArity());
for (int i = 0; i < recordData.getArity(); i++) {
genericRowData.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
public RowData getRowDataFromRecordData(RecordData recordData, boolean primaryKeyOnly) {
if (primaryKeyOnly) {
GenericRowData genericRowData = new GenericRowData(primaryKeyColumnIndexes.size() + 1);
genericRowData.setField(0, StringData.fromString(tableId.toString()));
for (int i = 0; i < primaryKeyColumnIndexes.size(); i++) {
genericRowData.setField(
i + 1,
fieldGetters
.get(primaryKeyColumnIndexes.get(i))
.getFieldOrNull(recordData));
}
return genericRowData;
} else {
GenericRowData genericRowData = new GenericRowData(recordData.getArity());
for (int i = 0; i < recordData.getArity(); i++) {
genericRowData.setField(i, fieldGetters.get(i).getFieldOrNull(recordData));
}
return genericRowData;
}
return genericRowData;
}

private static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public byte[] serialize(Event event) {
}
jsonSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(schema, jsonSerializer, zoneId));
new TableSchemaInfo(
schemaChangeEvent.tableId(), schema, jsonSerializer, zoneId));
return null;
}

Expand All @@ -153,7 +154,8 @@ public byte[] serialize(Event event) {
new RowData[] {
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData((dataChangeEvent.after()))
.getRowDataFromRecordData(
dataChangeEvent.after(), false)
}));
reuseGenericRowData.setField(2, OP_INSERT);
return jsonSerializers
Expand All @@ -168,7 +170,7 @@ public byte[] serialize(Event event) {
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(
(dataChangeEvent.before()))
dataChangeEvent.before(), false)
}));
reuseGenericRowData.setField(1, null);
reuseGenericRowData.setField(2, OP_DELETE);
Expand All @@ -185,15 +187,16 @@ public byte[] serialize(Event event) {
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(
(dataChangeEvent.before()))
dataChangeEvent.before(), false)
}));
reuseGenericRowData.setField(
1,
new GenericArrayData(
new RowData[] {
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData((dataChangeEvent.after()))
.getRowDataFromRecordData(
dataChangeEvent.after(), false)
}));
reuseGenericRowData.setField(2, OP_UPDATE);
return jsonSerializers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ public byte[] serialize(Event event) {
}
jsonSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(schema, jsonSerializer, zoneId));
new TableSchemaInfo(
schemaChangeEvent.tableId(), schema, jsonSerializer, zoneId));
return null;
}

Expand All @@ -144,7 +145,7 @@ public byte[] serialize(Event event) {
1,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.after()));
.getRowDataFromRecordData(dataChangeEvent.after(), false));
reuseGenericRowData.setField(2, OP_INSERT);
return jsonSerializers
.get(dataChangeEvent.tableId())
Expand All @@ -155,7 +156,7 @@ public byte[] serialize(Event event) {
0,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.before()));
.getRowDataFromRecordData(dataChangeEvent.before(), false));
reuseGenericRowData.setField(1, null);
reuseGenericRowData.setField(2, OP_DELETE);
return jsonSerializers
Expand All @@ -168,12 +169,12 @@ public byte[] serialize(Event event) {
0,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.before()));
.getRowDataFromRecordData(dataChangeEvent.before(), false));
reuseGenericRowData.setField(
1,
jsonSerializers
.get(dataChangeEvent.tableId())
.getRowDataFromRecordData(dataChangeEvent.after()));
.getRowDataFromRecordData(dataChangeEvent.after(), false));
reuseGenericRowData.setField(2, OP_UPDATE);
return jsonSerializers
.get(dataChangeEvent.tableId())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.kafka.serialization;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
import org.apache.flink.formats.csv.CsvRowDataSerializationSchema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;

/** A {@link SerializationSchema} to convert {@link Event} into byte of csv format. */
public class CsvSerializationSchema implements SerializationSchema<Event> {

private static final long serialVersionUID = 1L;

/**
* A map of {@link TableId} and its {@link SerializationSchema} to serialize Debezium JSON data.
*/
private final Map<TableId, TableSchemaInfo> csvSerializers;

private final ZoneId zoneId;

private InitializationContext context;

public CsvSerializationSchema(ZoneId zoneId) {
this.zoneId = zoneId;
csvSerializers = new HashMap<>();
}

@Override
public void open(InitializationContext context) {
this.context = context;
}

@Override
public byte[] serialize(Event event) {
if (event instanceof SchemaChangeEvent) {
Schema schema;
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
schema = createTableEvent.getSchema();
} else {
schema =
SchemaUtils.applySchemaChangeEvent(
csvSerializers.get(schemaChangeEvent.tableId()).getSchema(),
schemaChangeEvent);
}
CsvRowDataSerializationSchema csvSerializer = buildSerializationForPrimaryKey(schema);
try {
csvSerializer.open(context);
} catch (Exception e) {
throw new RuntimeException(e);
}
csvSerializers.put(
schemaChangeEvent.tableId(),
new TableSchemaInfo(
schemaChangeEvent.tableId(), schema, csvSerializer, zoneId));
return null;
}
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
RecordData recordData =
dataChangeEvent.op().equals(OperationType.DELETE)
? dataChangeEvent.before()
: dataChangeEvent.after();
TableSchemaInfo tableSchemaInfo = csvSerializers.get(dataChangeEvent.tableId());
return tableSchemaInfo
.getSerializationSchema()
.serialize(tableSchemaInfo.getRowDataFromRecordData(recordData, true));
}

private CsvRowDataSerializationSchema buildSerializationForPrimaryKey(Schema schema) {
DataField[] fields = new DataField[schema.primaryKeys().size() + 1];
fields[0] = DataTypes.FIELD("TableId", DataTypes.STRING());
for (int i = 0; i < schema.primaryKeys().size(); i++) {
Column column = schema.getColumn(schema.primaryKeys().get(i)).get();
fields[i + 1] = DataTypes.FIELD(column.getName(), column.getType());
}
// the row should never be null
DataType dataType = DataTypes.ROW(fields).notNull();
LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
return new CsvRowDataSerializationSchema.Builder((RowType) rowType).build();
}
}
Loading