Skip to content

Commit

Permalink
[FLINK-36114][cdc-runtime] Make SchemaRegistryRequestHandler thread s…
Browse files Browse the repository at this point in the history
…afe by blocking subsequent schemaChangeEvent

This closes apache#3563.

Co-authored-by: Hongshun Wang <loserwang1024@gmail.com>
  • Loading branch information
2 people authored and qiaozongmi committed Sep 23, 2024
1 parent 8250082 commit f04cf4c
Show file tree
Hide file tree
Showing 22 changed files with 540 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {

// Used when startup mode is not initial
private boolean alreadySendCreateTableForBinlogSplit = false;
private final List<CreateTableEvent> createTableEventCache;
private List<CreateTableEvent> createTableEventCache;

public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
Expand All @@ -80,42 +81,41 @@ public MySqlPipelineRecordEmitter(
sourceConfig.isIncludeSchemaChanges());
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
this.createTableEventCache = new ArrayList<>();

if (!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
List<TableId> capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
createTableEventCache.add(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
schema));
}
} catch (SQLException e) {
throw new RuntimeException("Cannot start emitter to fetch table schema.", e);
}
}
this.createTableEventCache = generateCreateTableEvent(sourceConfig);
}

@Override
protected void processElement(
SourceRecord element, SourceOutput<Event> output, MySqlSplitState splitState)
throws Exception {
if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
// In Snapshot phase of INITIAL startup mode, we lazily send CreateTableEvent to
// downstream to avoid checkpoint timeout.
TableId tableId = splitState.asSnapshotSplitState().toMySqlSplit().getTableId();
if (!alreadySendCreateTableTables.contains(tableId)) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
sendCreateTableEvent(jdbc, tableId, output);
alreadySendCreateTableTables.add(tableId);
}
}
} else if (splitState.isBinlogSplitState()
&& !alreadySendCreateTableForBinlogSplit
&& !sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
createTableEventCache.forEach(output::collect);
} else if (splitState.isBinlogSplitState() && !alreadySendCreateTableForBinlogSplit) {
alreadySendCreateTableForBinlogSplit = true;
if (sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) {
// In Snapshot -> Binlog transition of INITIAL startup mode, ensure all table
// schemas have been sent to downstream. We use previously cached schema instead of
// re-request latest schema because there might be some pending schema change events
// in the queue, and that may accidentally emit evolved schema before corresponding
// schema change events.
createTableEventCache.stream()
.filter(
event ->
!alreadySendCreateTableTables.contains(
MySqlSchemaUtils.toDbzTableId(event.tableId())))
.forEach(output::collect);
} else {
// In Binlog only mode, we simply emit all schemas at once.
createTableEventCache.forEach(output::collect);
}
}
super.processElement(element, output, splitState);
}
Expand Down Expand Up @@ -233,4 +233,22 @@ private synchronized MySqlAntlrDdlParser getParser() {
}
return mySqlAntlrDdlParser;
}

private List<CreateTableEvent> generateCreateTableEvent(MySqlSourceConfig sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
List<CreateTableEvent> createTableEventCache = new ArrayList<>();
List<TableId> capturedTableIds = listTables(jdbc, sourceConfig.getTableFilters());
for (TableId tableId : capturedTableIds) {
Schema schema = getSchema(jdbc, tableId);
createTableEventCache.add(
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.catalog(), tableId.table()),
schema));
}
return createTableEventCache;
} catch (SQLException e) {
throw new RuntimeException("Cannot start emitter to fetch table schema.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
.executeAndCollect();

// skip CreateTableEvent
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();

Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType))
.isEqualTo(expectedSnapshot);
Expand All @@ -306,7 +307,8 @@ public void testMysqlPrecisionTypes(UniqueDatabase database) throws Throwable {
statement.execute("UPDATE precision_types SET time_6_c = null WHERE id = 1;");
}

List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType))
.isEqualTo(expectedStreamRecord);
Expand Down Expand Up @@ -397,8 +399,9 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
};

// skip CreateTableEvent
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, COMMON_TYPES))
.isEqualTo(expectedSnapshot);

Expand All @@ -412,7 +415,8 @@ private void testCommonDataTypes(UniqueDatabase database) throws Exception {
expectedSnapshot[44] = BinaryStringData.fromString("{\"key1\":\"value1\"}");
Object[] expectedStreamRecord = expectedSnapshot;

List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, COMMON_TYPES))
.isEqualTo(expectedStreamRecord);
Expand All @@ -437,9 +441,10 @@ private void testTimeDataTypes(
"Event-Source")
.executeAndCollect();

// skip CreateTableEvent
List<Event> snapshotResults = MySqSourceTestUtils.fetchResults(iterator, 2);
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(1)).after();
// skip CreateTableEvents
List<Event> snapshotResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData snapshotRecord = ((DataChangeEvent) snapshotResults.get(0)).after();

Assertions.assertThat(RecordDataTestUtils.recordFields(snapshotRecord, recordType))
.isEqualTo(expectedSnapshot);
Expand All @@ -450,7 +455,8 @@ private void testTimeDataTypes(
"UPDATE time_types SET time_6_c = null, timestamp_def_c = default WHERE id = 1;");
}

List<Event> streamResults = MySqSourceTestUtils.fetchResults(iterator, 1);
List<Event> streamResults =
MySqSourceTestUtils.fetchResultsAndCreateTableEvent(iterator, 1).f0;
RecordData streamRecord = ((DataChangeEvent) streamResults.get(0)).after();
Assertions.assertThat(RecordDataTestUtils.recordFields(streamRecord, recordType))
.isEqualTo(expectedStreamRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -226,12 +227,33 @@ public void testInitialStartupMode() throws Exception {
BinaryStringData.fromString("c-21")
})));
}
// In this configuration, several subtasks might emit their corresponding CreateTableEvent
// to downstream. Since it is not possible to predict how many CreateTableEvents should we
// expect, we simply filter them out from expected sets, and assert there's at least one.
List<Event> actual =
fetchResults(events, 1 + expectedSnapshot.size() + expectedBinlog.size());
assertThat(actual.get(0)).isEqualTo(createTableEvent);
assertThat(actual.subList(1, 10))
fetchResultsExcept(
events, expectedSnapshot.size() + expectedBinlog.size(), createTableEvent);
assertThat(actual.subList(0, expectedSnapshot.size()))
.containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0]));
assertThat(actual.subList(10, actual.size())).isEqualTo(expectedBinlog);
assertThat(actual.subList(expectedSnapshot.size(), actual.size()))
.isEqualTo(expectedBinlog);
}

private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) {
List<T> result = new ArrayList<>(size);
List<T> sideResults = new ArrayList<>();
while (size > 0 && iter.hasNext()) {
T event = iter.next();
if (!event.equals(sideEvent)) {
result.add(event);
size--;
} else {
sideResults.add(sideEvent);
}
}
// Also ensure we've received at least one or many side events.
assertThat(sideResults).isNotEmpty();
return result;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.flink.cdc.connectors.mysql.testutils;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.CreateTableEvent;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -38,6 +41,22 @@ public static <T> List<T> fetchResults(Iterator<T> iter, int size) {
return result;
}

public static <T> Tuple2<List<T>, List<CreateTableEvent>> fetchResultsAndCreateTableEvent(
Iterator<T> iter, int size) {
List<T> result = new ArrayList<>(size);
List<CreateTableEvent> createTableEvents = new ArrayList<>();
while (size > 0 && iter.hasNext()) {
T event = iter.next();
if (event instanceof CreateTableEvent) {
createTableEvents.add((CreateTableEvent) event);
} else {
result.add(event);
size--;
}
}
return Tuple2.of(result, createTableEvents);
}

public static String getServerId(int parallelism) {
final Random random = new Random();
int serverId = random.nextInt(100) + 5400;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,13 @@ public void testSyncWholeDatabase() throws Exception {
+ " table.create.properties.replication_num: 1\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName(),
DORIS.getUsername(),
DORIS.getPassword());
DORIS.getPassword(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
Expand Down Expand Up @@ -324,13 +325,14 @@ public void testComplexDataTypes() throws Exception {
+ " projection: \\*, 'fine' AS FINE\n"
+ " filter: id <> 3 AND id <> 4\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
complexDataTypesDatabase.getDatabaseName(),
DORIS.getUsername(),
DORIS.getPassword(),
complexDataTypesDatabase.getDatabaseName());
complexDataTypesDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path dorisCdcConnector = TestUtils.getResource("doris-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ public void testSyncWholeDatabase() throws Exception {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName());
mysqlInventoryDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
Expand Down Expand Up @@ -209,11 +210,12 @@ public void testSchemaChangeEvents() throws Exception {
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " parallelism: 1",
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
mysqlInventoryDatabase.getDatabaseName());
mysqlInventoryDatabase.getDatabaseName(),
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
Expand Down
Loading

0 comments on commit f04cf4c

Please sign in to comment.