diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 8ff5c328aab..1fd5ee725e7 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -49,8 +50,10 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -248,7 +251,11 @@ public Iterator pollSplitRecords() throws InterruptedException { boolean reachBinlogEnd = false; SourceRecord lowWatermark = null; SourceRecord highWatermark = null; - Map snapshotRecords = new LinkedHashMap<>(); + + SnapshotRecords snapshotRecords = + containsPrimaryKey() + ? new PrimaryKeySnapshotRecords() + : new NoPrimaryKeySnapshotRecords(); while (!reachBinlogEnd) { checkReadException(); List batch = queue.poll(); @@ -273,14 +280,7 @@ public Iterator pollSplitRecords() throws InterruptedException { break; } - if (!reachBinlogStart) { - snapshotRecords.put((Struct) record.key(), record); - } else { - if (isRequiredBinlogRecord(record)) { - // upsert binlog events through the record key - upsertBinlog(snapshotRecords, record); - } - } + snapshotRecords.collect(record, reachBinlogStart); } } // snapshot split return its data once @@ -288,7 +288,7 @@ public Iterator pollSplitRecords() throws InterruptedException { final List normalizedRecords = new ArrayList<>(); normalizedRecords.add(lowWatermark); - normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); + normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.getRecords())); normalizedRecords.add(highWatermark); final List sourceRecordsSet = new ArrayList<>(); @@ -403,4 +403,58 @@ public boolean isRunning() { return currentTaskRunning; } } + + private boolean containsPrimaryKey() { + return !CollectionUtil.isNullOrEmpty(currentSnapshotSplit.getSplitKeyType().getFields()); + } + + /** Collect records need to be sent, except low/high watermark. */ + interface SnapshotRecords { + void collect(SourceRecord record, boolean reachBinlogStart); + + Collection getRecords(); + } + + /** Collect records with primary key. May upsert binlog events. */ + class PrimaryKeySnapshotRecords implements SnapshotRecords { + private final Map snapshotRecords = new LinkedHashMap<>(); + + @Override + public void collect(SourceRecord record, boolean reachBinlogStart) { + if (!reachBinlogStart) { + snapshotRecords.put((Struct) record.key(), record); + } else { + if (isRequiredBinlogRecord(record)) { + // upsert binlog events through the record key + upsertBinlog(snapshotRecords, record); + } + } + } + + @Override + public Collection getRecords() { + return snapshotRecords.values(); + } + } + + /** Collect records without primary key. There are no binlog events. */ + static class NoPrimaryKeySnapshotRecords implements SnapshotRecords { + private final List snapshotRecords = new LinkedList<>(); + + @Override + public void collect(SourceRecord record, boolean reachBinlogStart) { + if (!reachBinlogStart) { + snapshotRecords.add(record); + } else { + throw new IllegalStateException( + "a table without primary key can not have binlog splits" + + " in initialization stage"); + } + } + + @Override + public Collection getRecords() { + return snapshotRecords; + } + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index cdad275eb1a..258c5ae2b3f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -148,7 +148,12 @@ private void analyzeTable(TableId tableId) { ChunkUtils.getChunkKeyColumn( currentSplittingTable, sourceConfig.getChunkKeyColumn()); splitType = ChunkUtils.getChunkKeyColumnType(splitColumn); - minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn.name()); + // split column would be null if table has no primary key. + if (splitColumn != null) { + minMaxOfSplitColumn = queryMinMax(jdbcConnection, tableId, splitColumn.name()); + } else { + minMaxOfSplitColumn = new Object[2]; + } approximateRowCnt = queryApproximateRowCnt(jdbcConnection, tableId); } catch (Exception e) { throw new RuntimeException("Fail to analyze table in chunk splitter.", e); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java index 381ca763f38..fddb778cd62 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -42,20 +42,20 @@ public static RowType getChunkKeyColumnType(Table table, @Nullable String chunkK return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumn)); } - public static RowType getChunkKeyColumnType(Column chunkKeyColumn) { + public static RowType getChunkKeyColumnType(@Nullable Column chunkKeyColumn) { + if (chunkKeyColumn == null) { + return (RowType) ROW().getLogicalType(); + } return (RowType) ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn))) .getLogicalType(); } + @Nullable public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) { List primaryKeys = table.primaryKeyColumns(); if (primaryKeys.isEmpty()) { - throw new ValidationException( - String.format( - "Incremental snapshot for tables requires primary key," - + " but table %s doesn't have primary key.", - table.id())); + return null; } if (chunkKeyColumn != null) { diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java index 496a82d1bd9..ee9f881ab14 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -44,7 +44,6 @@ import org.apache.flink.types.RowUtils; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; -import org.apache.flink.util.ExceptionUtils; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; import com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory; @@ -57,15 +56,19 @@ import io.debezium.jdbc.JdbcConnection; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.sql.SQLException; import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -89,11 +92,14 @@ import static org.junit.Assert.assertTrue; /** IT tests for {@link MySqlSource}. */ +@RunWith(Parameterized.class) public class MySqlSourceITCase extends MySqlSourceTestBase { @Rule public final Timeout timeoutPerTest = Timeout.seconds(300); private static final String DEFAULT_SCAN_STARTUP_MODE = "initial"; + private static final String TABLE_WITH_PRIMARY_KEY = "customers"; + private static final String TABLE_WITHOUT_PRIMARY_KEY = "customers_no_pk"; private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); @@ -143,119 +149,122 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { "+I[2003, user_24, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]"); + @Parameterized.Parameter public String tableName; + + @Parameterized.Parameters(name = "tableName: {0}") + public static Collection parameters() { + return Arrays.asList(TABLE_WITH_PRIMARY_KEY, TABLE_WITHOUT_PRIMARY_KEY); + } + @Test public void testReadSingleTableWithSingleParallelism() throws Exception { testMySqlParallelSource( - 1, FailoverType.NONE, FailoverPhase.NEVER, new String[] {"customers"}); + 1, FailoverType.NONE, FailoverPhase.NEVER, new String[] {tableName}); } @Test public void testReadSingleTableWithMultipleParallelism() throws Exception { testMySqlParallelSource( - 4, FailoverType.NONE, FailoverPhase.NEVER, new String[] {"customers"}); + 4, FailoverType.NONE, FailoverPhase.NEVER, new String[] {tableName}); } @Test public void testReadMultipleTableWithSingleParallelism() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( 1, FailoverType.NONE, FailoverPhase.NEVER, - new String[] {"customers", "customers_1"}); + new String[] {"customers", "customers_no_pk"}); } @Test public void testReadMultipleTableWithMultipleParallelism() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( 4, FailoverType.NONE, FailoverPhase.NEVER, - new String[] {"customers", "customers_1"}); + new String[] {"customers", "customers_no_pk"}); } // Failover tests @Test public void testTaskManagerFailoverInSnapshotPhase() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( - FailoverType.TM, FailoverPhase.SNAPSHOT, new String[] {"customers", "customers_1"}); + FailoverType.TM, + FailoverPhase.SNAPSHOT, + new String[] {"customers", "customers_no_pk"}); } @Test public void testTaskManagerFailoverInBinlogPhase() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( - FailoverType.TM, FailoverPhase.BINLOG, new String[] {"customers", "customers_1"}); + FailoverType.TM, + FailoverPhase.BINLOG, + new String[] {"customers", "customers_no_pk"}); } @Test public void testTaskManagerFailoverFromLatestOffset() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( DEFAULT_PARALLELISM, "latest-offset", FailoverType.TM, FailoverPhase.BINLOG, - new String[] {"customers", "customers_1"}, + new String[] {"customers", "customers_no_pk"}, RestartStrategies.fixedDelayRestart(1, 0)); } @Test public void testJobManagerFailoverInSnapshotPhase() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( - FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"customers", "customers_1"}); + FailoverType.JM, + FailoverPhase.SNAPSHOT, + new String[] {"customers", "customers_no_pk"}); } @Test public void testJobManagerFailoverInBinlogPhase() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( - FailoverType.JM, FailoverPhase.BINLOG, new String[] {"customers", "customers_1"}); + FailoverType.JM, + FailoverPhase.BINLOG, + new String[] {"customers", "customers_no_pk"}); } @Test public void testJobManagerFailoverFromLatestOffset() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testMySqlParallelSource( DEFAULT_PARALLELISM, "latest-offset", FailoverType.JM, FailoverPhase.BINLOG, - new String[] {"customers", "customers_1"}, + new String[] {"customers", "customers_no_pk"}, RestartStrategies.fixedDelayRestart(1, 0)); } @Test public void testTaskManagerFailoverSingleParallelism() throws Exception { testMySqlParallelSource( - 1, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[] {"customers"}); + 1, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[] {tableName}); } @Test public void testJobManagerFailoverSingleParallelism() throws Exception { testMySqlParallelSource( - 1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"customers"}); - } - - @Test - public void testConsumingTableWithoutPrimaryKey() { - try { - testMySqlParallelSource( - 1, - DEFAULT_SCAN_STARTUP_MODE, - FailoverType.NONE, - FailoverPhase.NEVER, - new String[] {"customers_no_pk"}, - RestartStrategies.noRestart()); - } catch (Exception e) { - assertTrue( - ExceptionUtils.findThrowableWithMessage( - e, - String.format( - "Incremental snapshot for tables requires primary key, but table %s doesn't have primary key", - customDatabase.getDatabaseName() + ".customers_no_pk")) - .isPresent()); - } + 1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {tableName}); } @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); customDatabase.createAndInitialize(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(DEFAULT_PARALLELISM); @@ -329,6 +338,7 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { @Test public void testStartFromEarliestOffset() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); List expected = new ArrayList<>(); expected.addAll(initialChanges); expected.addAll(firstPartBinlogEvents); @@ -337,6 +347,7 @@ public void testStartFromEarliestOffset() throws Exception { @Test public void testStartFromLatestOffset() throws Exception { + Assume.assumeTrue(TABLE_WITH_PRIMARY_KEY.equals(tableName)); testStartingOffset(StartupOptions.latest(), Collections.emptyList()); }