Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ pipeline:
<td>String</td>
<td> 是否通过FE重定向写入,直连BE写入 </td>
</tr>
<tr>
<td>charset-encoding</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td> Doris Http客户端字符集编码,默认UTF-8 </td>
</tr>
<tr>
<td>sink.enable.batch-mode</td>
<td>optional</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ pipeline:
这是一项实验性功能。
</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ pipeline:
<td>String</td>
<td> Whether to write through FE redirection and directly connect to BE to write </td>
</tr>
<tr>
<td>charset-encoding</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td> Charset encoding for doris http client, default UTF-8 </td>
</tr>
<tr>
<td>sink.enable.batch-mode</td>
<td>optional</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,14 @@ pipeline:
This is an experimental feature, and subject to change in the future.
</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.<br>
Note: Enable this option will bring the implications on memory usage.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -239,6 +240,9 @@ public String toString() {
if (!partitionKeys.isEmpty()) {
sb.append(", partitionKeys=").append(String.join(";", partitionKeys));
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
sb.append(", comment=").append(comment);
}
sb.append(", options=").append(describeOptions());

return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ limitations under the License.

<properties>
<doris.connector.version>24.0.1</doris.connector.version>
<mysql.connector.version>8.0.26</mysql.connector.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -84,13 +85,13 @@ limitations under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<version>${mysql.connector.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
Expand Down Expand Up @@ -143,6 +144,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(JDBC_URL);
options.add(PASSWORD);
options.add(AUTO_REDIRECT);
options.add(CHARSET_ENCODING);

options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class DorisDataSinkOptions {
.withDescription(
"Use automatic redirection of fe without explicitly obtaining the be list");

public static final ConfigOption<String> CHARSET_ENCODING =
ConfigOptions.key("charset-encoding")
.stringType()
.defaultValue("UTF-8")
.withDescription("Charset encoding for doris http client, default UTF-8.");

// Streaming Sink options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
ConfigOptions.key("sink.enable-2pc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;

/** Supports {@link DorisDataSink} to schema evolution. */
Expand All @@ -76,7 +77,8 @@ public class DorisMetadataApplier implements MetadataApplier {

public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
this.dorisOptions = dorisOptions;
this.schemaChangeManager = new DorisSchemaChangeManager(dorisOptions);
this.schemaChangeManager =
new DorisSchemaChangeManager(dorisOptions, config.get(CHARSET_ENCODING));
this.config = config;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
Expand Down Expand Up @@ -147,6 +149,7 @@ private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveEx
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
tableSchema.setTableComment(schema.comment());

if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

/** An enriched version of Doris' {@link SchemaChangeManager}. */
public class DorisSchemaChangeManager extends SchemaChangeManager {
public DorisSchemaChangeManager(DorisOptions dorisOptions) {
super(dorisOptions);
public DorisSchemaChangeManager(DorisOptions dorisOptions, String charsetEncoding) {
super(dorisOptions, charsetEncoding);
}

public boolean truncateTable(String databaseName, String tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -66,6 +68,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.METADATA_LIST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
Expand Down Expand Up @@ -132,6 +135,7 @@ public DataSource createDataSource(Context context) {
double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);

boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);

Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
Expand All @@ -152,6 +156,13 @@ public DataSource createDataSource(Context context) {

Map<String, String> configMap = config.toMap();
OptionUtils.printOptions(IDENTIFIER, config.toMap());
if (includeComments) {
// set debezium config 'include.schema.comments' to true
configMap.put(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
"true");
}

MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
Expand Down Expand Up @@ -310,6 +321,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(METADATA_LIST);
options.add(INCLUDE_COMMENTS_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.cdc.connectors.mysql.table.MySqlReadableMetadata;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;

import io.debezium.relational.RelationalDatabaseConnectorConfig;

import java.util.ArrayList;
import java.util.List;

Expand All @@ -57,11 +59,19 @@ public MySqlDataSource(

@Override
public EventSourceProvider getEventSourceProvider() {
boolean includeComments =
sourceConfig
.getDbzConfiguration()
.getBoolean(
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
false);

MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
readableMetadataList);
readableMetadataList,
includeComments);

MySqlSource<Event> source =
new MySqlSource<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,13 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to parse schema change events generated by gh-ost/pt-osc utilities. Defaults to false.");

@Experimental
public static final ConfigOption<Boolean> INCLUDE_COMMENTS_ENABLED =
ConfigOptions.key("include-comments.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. "
+ "Note: Enable this option will bring the implications on memory usage.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final boolean includeSchemaChanges;
private final boolean includeComments;

private transient Tables tables;
private transient CustomMySqlAntlrDdlParser customParser;
Expand All @@ -70,23 +71,25 @@ public class MySqlEventDeserializer extends DebeziumEventDeserializationSchema {

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) {
this(changelogMode, includeSchemaChanges, new ArrayList<>());
this(changelogMode, includeSchemaChanges, new ArrayList<>(), false);
}

public MySqlEventDeserializer(
DebeziumChangelogMode changelogMode,
boolean includeSchemaChanges,
List<MySqlReadableMetadata> readableMetadataList) {
List<MySqlReadableMetadata> readableMetadataList,
boolean includeComments) {
super(new MySqlSchemaDataTypeInference(), changelogMode);
this.includeSchemaChanges = includeSchemaChanges;
this.readableMetadataList = readableMetadataList;
this.includeComments = includeComments;
}

@Override
protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord record) {
if (includeSchemaChanges) {
if (customParser == null) {
customParser = new CustomMySqlAntlrDdlParser();
customParser = new CustomMySqlAntlrDdlParser(includeComments);
tables = new Tables();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.relational.Tables;

import java.sql.Types;
import java.util.ArrayList;
Expand All @@ -35,8 +36,8 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {

private final LinkedList<SchemaChangeEvent> parsedEvents;

public CustomMySqlAntlrDdlParser() {
super();
public CustomMySqlAntlrDdlParser(boolean includeComments) {
super(true, false, includeComments, null, Tables.TableFilter.includeAll());
this.parsedEvents = new LinkedList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
Expand Down Expand Up @@ -211,6 +212,7 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
column.comment(),
column.defaultValueExpression().orElse(null));
}
tableBuilder.comment(table.comment());

List<String> primaryKey = table.primaryKeyColumnNames();
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
Expand All @@ -229,7 +231,16 @@ private synchronized Table parseDdl(String ddlStatement, TableId tableId) {

private synchronized MySqlAntlrDdlParser getParser() {
if (mySqlAntlrDdlParser == null) {
mySqlAntlrDdlParser = new MySqlAntlrDdlParser();
boolean includeComments =
sourceConfig
.getDbzConfiguration()
.getBoolean(
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS
.name(),
false);
mySqlAntlrDdlParser =
new MySqlAntlrDdlParser(
true, false, includeComments, null, Tables.TableFilter.includeAll());
}
return mySqlAntlrDdlParser;
}
Expand Down
Loading
Loading