Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.mysql.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
Expand All @@ -35,6 +36,9 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
Expand All @@ -46,16 +50,27 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.debezium.config.CommonConnectorConfig.TOMBSTONES_ON_DELETE;
import static io.debezium.connector.mysql.MySqlConnectorConfig.SNAPSHOT_MODE;
import static io.debezium.engine.DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical
* description.
*/
public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata {
private static final Logger LOG = LoggerFactory.getLogger(MySqlTableSource.class);
private final Set<String> exceptDbzProperties =
Stream.of(
SNAPSHOT_MODE.name(),
OFFSET_FLUSH_INTERVAL_MS_PROP,
TOMBSTONES_ON_DELETE.name())
.collect(Collectors.toSet());

private final ResolvedSchema physicalSchema;
private final int port;
Expand Down Expand Up @@ -196,7 +211,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.connectTimeout(connectTimeout)
.connectMaxRetries(connectMaxRetries)
.connectionPoolSize(connectionPoolSize)
.debeziumProperties(dbzProperties)
.debeziumProperties(getParallelDbzProperties(dbzProperties))
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
Expand Down Expand Up @@ -372,4 +387,22 @@ public int hashCode() {
public String asSummaryString() {
return "MySQL-CDC";
}

@VisibleForTesting
Properties getDbzProperties() {
return dbzProperties;
}

@VisibleForTesting
Properties getParallelDbzProperties(Properties dbzProperties) {
Properties newDbzProperties = new Properties(dbzProperties);
for (String key : dbzProperties.stringPropertyNames()) {
if (exceptDbzProperties.contains(key)) {
LOG.warn("Cannot override debezium option {}.", key);
} else {
newDbzProperties.put(key, dbzProperties.get(key));
}
}
return newDbzProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ public void testOptionalProperties() {
options.put("server-time-zone", "Asia/Shanghai");
options.put("scan.newly-added-table.enabled", "true");
options.put("debezium.snapshot.mode", "never");
options.put("debezium.offset.flush.interval.ms", "3000");
options.put("debezium.tombstones.on.delete", "true");
options.put("debezium.test", "test");
options.put("jdbc.properties.useSSL", "false");
options.put("heartbeat.interval", "15213ms");
options.put("scan.incremental.snapshot.chunk.key-column", "testCol");
Expand All @@ -276,6 +279,9 @@ public void testOptionalProperties() {
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "never");
dbzProperties.put("offset.flush.interval.ms", "3000");
dbzProperties.put("tombstones.on.delete", "true");
dbzProperties.put("test", "test");
Properties jdbcProperties = new Properties();
jdbcProperties.setProperty("useSSL", "false");
MySqlTableSource expectedSource =
Expand Down Expand Up @@ -307,6 +313,14 @@ public void testOptionalProperties() {
"testCol",
true);
assertEquals(expectedSource, actualSource);
assertTrue(actualSource instanceof MySqlTableSource);
MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource;
Properties parellelProperties = new Properties();
parellelProperties.put("test", "test");
assertEquals(
parellelProperties,
actualMySqlTableSource.getParallelDbzProperties(
actualMySqlTableSource.getDbzProperties()));
}

@Test
Expand Down