Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[pipeline-connector][mysql] Enable send schema change by default (apa…
Browse files Browse the repository at this point in the history
…che#2815)

This closes apache#2815.
leonardBang authored and zhangchaoming.zcm committed Jan 3, 2025
1 parent a8a10ac commit 7ba1f8e
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -227,7 +227,7 @@ public class MySqlDataSourceOptions {
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
ConfigOptions.key("schema-change.enabled")
.booleanType()
.defaultValue(false)
.defaultValue(true)
.withDescription(
"Whether send schema change events, by default is false. If set to false, the schema changes will not be sent.");
"Whether send schema change events, by default is true. If set to false, the schema changes will not be sent.");
}
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@
import java.util.List;
import java.util.stream.Stream;

import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults;
@@ -114,7 +115,7 @@ public void testInitialStartupMode() throws Exception {
.startupOptions(StartupOptions.initial())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.includeSchemaChanges(true);
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
@@ -242,7 +243,7 @@ public void testParseAlterStatement() throws Exception {
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
.includeSchemaChanges(true);
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();

0 comments on commit 7ba1f8e

Please sign in to comment.