Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yugabyte/yugabyte-db#18239] Add consistency configuration properties #249

Merged
merged 1 commit into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public SnapshotChangeEventSource<YBPartition, YugabyteDBOffsetContext> getSnapsh

@Override
public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStreamingChangeEventSource() {
LOGGER.info("Consistency mode is {}", configuration.consistencyMode().getValue());
if (configuration.consistencyMode() == YugabyteDBConnectorConfig.ConsistencyMode.DEFAULT) {
LOGGER.info("Transaction ordering enabled: {}", configuration.transactionOrdering());
if (configuration.transactionOrdering()) {
LOGGER.info("Instantiating Vanilla Streaming Source");
return new YugabyteDBStreamingChangeEventSource(
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,15 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"'skip' to skip / ignore TRUNCATE events (default), " +
"'include' to handle and include TRUNCATE events");

public static final Field TRANSACTION_ORDERING = Field.create("transaction.ordering")
.withDisplayName("Order transactions")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
.withImportance(Importance.HIGH)
.withDefault(false)
.withType(Type.BOOLEAN)
.withValidation(Field::isBoolean)
.withDescription("Specify whether the transactions need to be ordered");

public static final Field CONSISTENCY_MODE = Field.create("consistency.mode")
.withDisplayName("Transaction Consistency mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
Expand Down Expand Up @@ -1229,6 +1238,9 @@ public TruncateHandlingMode truncateHandlingMode() {
return truncateHandlingMode;
}

public boolean transactionOrdering() {
return getConfig().getBoolean(TRANSACTION_ORDERING);
}
public ConsistencyMode consistencyMode() {
return consistencyMode;
}
Expand Down Expand Up @@ -1330,7 +1342,8 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
INTERVAL_HANDLING_MODE,
SCHEMA_REFRESH_MODE,
TRUNCATE_HANDLING_MODE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
TRANSACTION_ORDERING)
.excluding(INCLUDE_SCHEMA_CHANGES)
.create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void throwExceptionIfExplicitCheckpointingNotConfiguredWithConsistency()
TestHelper.execute("CREATE TABLE dummy_table (id INT PRIMARY KEY);");
final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "dummy_table", false, false);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.dummy_table", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void consistencyWithColocatedTables() throws Exception {

private Configuration.Builder getConsistentConfigurationBuilder(String databaseName, String tableIncludeList, String dbStreamId) throws Exception {
Configuration.Builder configBuilder = TestHelper.getConfigBuilder(databaseName, tableIncludeList, dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void recordsShouldStreamInConsistentOrderOnly() throws Exception {

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -200,7 +200,7 @@ public void fiveTablesWithForeignKeys() throws Exception {

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department", false, true);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee,public.contract,public.address,public.locality", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -339,7 +339,7 @@ public void singleTableSingleTablet() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -412,7 +412,7 @@ public void singleTableTwoTablet() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -491,7 +491,7 @@ public void singleTableSingleTabletTwoRecord() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -566,7 +566,7 @@ public void twoTableWithSingleTabletEach() throws Exception {

final String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -642,7 +642,7 @@ public void fiveTablesSingleTabletEach() throws Exception {

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "department", false, true);
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.department,public.employee,public.contract,public.address,public.locality", dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down Expand Up @@ -1001,7 +1001,7 @@ private Configuration.Builder getConsistentConfigurationBuilder(String tableIncl

private Configuration.Builder getConsistentConfigurationBuilder(String databaseName, String tableIncludeList, String dbStreamId) throws Exception {
Configuration.Builder configBuilder = TestHelper.getConfigBuilder(databaseName, tableIncludeList, dbStreamId);
configBuilder.with(YugabyteDBConnectorConfig.CONSISTENCY_MODE, "global");
configBuilder.with(YugabyteDBConnectorConfig.TRANSACTION_ORDERING, true);
configBuilder.with("transforms", "Reroute");
configBuilder.with("transforms.Reroute.type", "io.debezium.transforms.ByLogicalTableRouter");
configBuilder.with("transforms.Reroute.topic.regex", "(.*)");
Expand Down