Skip to content

Commit 9f33f66

Browse files
junmuzymuzammil
authored andcommitted
Adding Test case as well
1 parent 1578d52 commit 9f33f66

File tree

3 files changed

+182
-7
lines changed

3 files changed

+182
-7
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,9 @@ public EventSourceProvider getEventSourceProvider() {
6767
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
6868
false);
6969

70-
DebeziumChangelogMode changelogMode =
71-
sourceConfig.isScanReadChangelogAsAppendOnly()
72-
? DebeziumChangelogMode.UPSERT
73-
: DebeziumChangelogMode.ALL;
74-
7570
MySqlEventDeserializer deserializer =
7671
new MySqlEventDeserializer(
77-
changelogMode,
72+
DebeziumChangelogMode.ALL,
7873
sourceConfig.isIncludeSchemaChanges(),
7974
readableMetadataList,
8075
includeComments,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
4444
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
4545
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
46+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED;
4647
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
4748
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
4849
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
@@ -269,14 +270,68 @@ public void testOptionalOption() {
269270
.contains(
270271
TREAT_TINYINT1_AS_BOOLEAN_ENABLED,
271272
PARSE_ONLINE_SCHEMA_CHANGES,
272-
SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
273+
SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED,
274+
SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
273275

274276
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
275277
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse();
276278
assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue();
277279
assertThat(dataSource.getSourceConfig().isAssignUnboundedChunkFirst()).isTrue();
278280
}
279281

282+
@Test
283+
void testScanReadChangelogAsAppendOnlyEnabledDefault() {
284+
inventoryDatabase.createAndInitialize();
285+
Map<String, String> options = new HashMap<>();
286+
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
287+
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
288+
options.put(USERNAME.key(), TEST_USER);
289+
options.put(PASSWORD.key(), TEST_PASSWORD);
290+
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
291+
Factory.Context context = new MockContext(Configuration.fromMap(options));
292+
293+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
294+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
295+
296+
assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isFalse();
297+
}
298+
299+
@Test
300+
void testScanReadChangelogAsAppendOnlyEnabledTrue() {
301+
inventoryDatabase.createAndInitialize();
302+
Map<String, String> options = new HashMap<>();
303+
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
304+
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
305+
options.put(USERNAME.key(), TEST_USER);
306+
options.put(PASSWORD.key(), TEST_PASSWORD);
307+
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
308+
options.put(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.key(), "true");
309+
Factory.Context context = new MockContext(Configuration.fromMap(options));
310+
311+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
312+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
313+
314+
assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isTrue();
315+
}
316+
317+
@Test
318+
void testScanReadChangelogAsAppendOnlyEnabledFalse() {
319+
inventoryDatabase.createAndInitialize();
320+
Map<String, String> options = new HashMap<>();
321+
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
322+
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
323+
options.put(USERNAME.key(), TEST_USER);
324+
options.put(PASSWORD.key(), TEST_PASSWORD);
325+
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
326+
options.put(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.key(), "false");
327+
Factory.Context context = new MockContext(Configuration.fromMap(options));
328+
329+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
330+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
331+
332+
assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isFalse();
333+
}
334+
280335
@Test
281336
void testPrefixRequireOption() {
282337
inventoryDatabase.createAndInitialize();

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,131 @@ void testSyncWholeDatabase() throws Exception {
186186
validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase);
187187
}
188188

189+
@Test
190+
public void testReadChangelogAsAppendOnlyWithPaimon() throws Exception {
191+
String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID();
192+
String database = inventoryDatabase.getDatabaseName();
193+
String pipelineJob =
194+
String.format(
195+
"source:\n"
196+
+ " type: mysql\n"
197+
+ " hostname: mysql\n"
198+
+ " port: 3306\n"
199+
+ " username: %s\n"
200+
+ " password: %s\n"
201+
+ " tables: %s.readChangelogAsAppendOnly\n"
202+
+ " server-id: 5400-5404\n"
203+
+ " server-time-zone: UTC\n"
204+
+ " scan.read-changelog-as-append-only.enabled: true\n"
205+
+ "\n"
206+
+ "sink:\n"
207+
+ " type: paimon\n"
208+
+ " catalog.properties.warehouse: %s\n"
209+
+ " catalog.properties.metastore: filesystem\n"
210+
+ " catalog.properties.cache-enabled: false\n"
211+
+ "\n"
212+
+ "pipeline:\n"
213+
+ " schema.change.behavior: evolve\n"
214+
+ " parallelism: 4",
215+
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, database, warehouse);
216+
217+
Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
218+
Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
219+
220+
String mysqlJdbcUrl =
221+
String.format(
222+
"jdbc:mysql://%s:%s/%s",
223+
MYSQL.getHost(), MYSQL.getDatabasePort(), database);
224+
225+
// Create source table and insert initial data
226+
try (Connection conn =
227+
DriverManager.getConnection(
228+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
229+
Statement stat = conn.createStatement()) {
230+
231+
stat.execute(
232+
"CREATE TABLE readChangelogAsAppendOnly (\n"
233+
+ " id INTEGER NOT NULL PRIMARY KEY,\n"
234+
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
235+
+ " description VARCHAR(512),\n"
236+
+ " weight FLOAT,\n"
237+
+ " enum_c enum('red', 'white') default 'red',\n"
238+
+ " json_c JSON,\n"
239+
+ " point_c POINT)");
240+
241+
stat.execute(
242+
"INSERT INTO readChangelogAsAppendOnly \n"
243+
+ "VALUES (1,\"One\", \"Alice\", 3.202, 'red', '{\"key1\": \"value1\"}', null),\n"
244+
+ " (2,\"Two\", \"Bob\", 1.703, 'white', '{\"key2\": \"value2\"}', null),\n"
245+
+ " (3,\"Three\", \"Cecily\", 4.105, 'red', '{\"key3\": \"value3\"}', null),\n"
246+
+ " (4,\"Four\", \"Derrida\", 1.857, 'white', '{\"key4\": \"value4\"}', null),\n"
247+
+ " (5,\"Five\", \"Evelyn\", 5.211, 'red', '{\"K\": \"V\", \"k\": \"v\"}', null)");
248+
} catch (SQLException e) {
249+
LOG.error("Create table for CDC failed.", e);
250+
throw e;
251+
}
252+
253+
submitPipelineJob(pipelineJob, paimonCdcConnector, hadoopJar);
254+
waitUntilJobRunning(Duration.ofSeconds(30));
255+
LOG.info("Pipeline job is running");
256+
257+
// Validate initial snapshot data
258+
validateSinkResult(
259+
warehouse,
260+
database,
261+
"readChangelogAsAppendOnly",
262+
Arrays.asList(
263+
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null",
264+
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null",
265+
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null",
266+
"4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null",
267+
"5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null"));
268+
269+
LOG.info("Begin incremental reading stage with append-only enabled.");
270+
271+
// Perform CDC operations: INSERT, UPDATE, DELETE
272+
try (Connection conn =
273+
DriverManager.getConnection(
274+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
275+
Statement stat = conn.createStatement()) {
276+
277+
// INSERT operations - these should appear as new records
278+
stat.execute(
279+
"INSERT INTO readChangelogAsAppendOnly VALUES (6,'Six','Ferris',9.813, null, null, null);");
280+
stat.execute(
281+
"INSERT INTO readChangelogAsAppendOnly VALUES (7,'Seven','Grace',2.117, null, null, null);");
282+
283+
// UPDATE operations - with append-only, both old and new versions should be preserved
284+
stat.execute("UPDATE readChangelogAsAppendOnly SET description='Alice Updated' WHERE id=1;");
285+
stat.execute("UPDATE readChangelogAsAppendOnly SET weight=2.0 WHERE id=2;");
286+
287+
// DELETE operations - with append-only, delete records should be preserved with row_kind metadata
288+
stat.execute("DELETE FROM readChangelogAsAppendOnly WHERE id=3;");
289+
290+
Thread.sleep(5000); // Wait for changes to be processed
291+
} catch (SQLException e) {
292+
LOG.error("Update table for CDC failed.", e);
293+
throw e;
294+
}
295+
296+
// For append-only mode, we expect to see all operations preserved as separate records
297+
// This includes the original records, plus insert records, plus update records (before and after), plus delete records
298+
List<String> expectedAppendOnlyRecords = Arrays.asList(
299+
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null", // Original
300+
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null", // Original
301+
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null", // Original (will also have delete record)
302+
"4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null", // Original
303+
"5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null", // Original
304+
"6, Six, Ferris, 9.813, null, null, null", // Insert
305+
"7, Seven, Grace, 2.117, null, null, null", // Insert
306+
"1, One, Alice Updated, 3.202, red, {\"key1\": \"value1\"}, null", // Update (new version)
307+
"2, Two, Bob, 2.0, white, {\"key2\": \"value2\"}, null" // Update (new version)
308+
// Note: Delete operation creates a delete record but the exact representation may vary
309+
);
310+
311+
validateSinkResult(warehouse, database, "readChangelogAsAppendOnly", expectedAppendOnlyRecords);
312+
}
313+
189314
@Test
190315
public void testSinkToAppendOnlyTable() throws Exception {
191316
String warehouse = sharedVolume.toString() + "/" + "paimon_" + UUID.randomUUID();

0 commit comments

Comments
 (0)