Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -60,6 +62,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
Expand Down Expand Up @@ -184,6 +187,35 @@ public DataSource createDataSource(Context context) {
}
configFactory.tableList(capturedTables.toArray(new String[0]));

String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
if (chunkKeyColumns != null) {
Map<ObjectPath, String> chunkKeyColumnMap = new HashMap<>();
List<TableId> tableIds =
MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
String[] splits = chunkKeyColumn.split(":");
if (splits.length == 2) {
Selectors chunkKeySelector =
new Selectors.SelectorsBuilder().includeTables(splits[0]).build();
List<ObjectPath> tableList =
getChunkKeyColumnTableList(tableIds, chunkKeySelector);
for (ObjectPath table : tableList) {
chunkKeyColumnMap.put(table, splits[1]);
}
} else {
throw new IllegalArgumentException(
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key()
+ " = "
+ chunkKeyColumns
+ " failed to be parsed in this part '"
+ chunkKeyColumn
+ "'.");
}
}
LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}

return new MySqlDataSource(configFactory);
}

Expand Down Expand Up @@ -219,6 +251,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
Expand Down Expand Up @@ -246,6 +279,14 @@ private static List<String> getTableList(MySqlSourceConfig sourceConfig, Selecto
.collect(Collectors.toList());
}

private static List<ObjectPath> getChunkKeyColumnTableList(
List<TableId> tableIds, Selectors selectors) {
return tableIds.stream()
.filter(selectors::isMatch)
.map(tableId -> new ObjectPath(tableId.getSchemaName(), tableId.getTableName()))
.collect(Collectors.toList());
}

private static StartupOptions getStartupOptions(Configuration config) {
String modeString = config.get(SCAN_STARTUP_MODE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ public class MySqlDataSourceOptions {
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

@Experimental
public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
.stringType()
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key."
+ "eg. db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2;");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import org.junit.Test;

Expand All @@ -38,6 +39,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
Expand Down Expand Up @@ -107,6 +109,7 @@ public void testExcludeTable() {
.isEqualTo(
Arrays.asList(
inventoryDatabase.getDatabaseName() + ".customers",
inventoryDatabase.getDatabaseName() + ".multi_max_table",
inventoryDatabase.getDatabaseName() + ".products"));
}

Expand Down Expand Up @@ -239,6 +242,40 @@ public void testPrefixRequireOption() {
.isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".products"));
}

@Test
public void testAddChunkKeyColumns() {
inventoryDatabase.createAndInitialize();
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*");
options.put(
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key(),
inventoryDatabase.getDatabaseName()
+ ".multi_max_\\.*:order_id;"
+ inventoryDatabase.getDatabaseName()
+ ".products:id;");
Factory.Context context = new MockContext(Configuration.fromMap(options));

MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
ObjectPath multiMaxTable =
new ObjectPath(inventoryDatabase.getDatabaseName(), "multi_max_table");
ObjectPath productsTable = new ObjectPath(inventoryDatabase.getDatabaseName(), "products");

assertThat(dataSource.getSourceConfig().getChunkKeyColumns())
.isNotEmpty()
.isEqualTo(
new HashMap<ObjectPath, String>() {
{
put(multiMaxTable, "order_id");
put(productsTable, "id");
}
});
}

class MockContext implements Factory.Context {

Configuration factoryConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,24 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-02-19', 1002, 2, 106),
(default, '16-02-21', 1003, 1, 107);

CREATE TABLE `multi_max_table`
(
`order_id` varchar(128) NOT NULL,
`index` int(11) NOT NULL,
`desc` varchar(512) NOT NULL,
PRIMARY KEY (`order_id`, `index`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO multi_max_table
VALUES ('', 0, 'flink'),
('', 1, 'flink'),
('', 2, 'flink'),
('a', 0, 'flink'),
('b', 0, 'flink'),
('c', 0, 'flink'),
('d', 0, 'flink'),
('E', 0, 'flink'),
('E', 1, 'flink'),
('E', 2, 'flink'),
('e', 4, 'flink'),
('E', 3, 'flink');