Skip to content

Commit

Permalink
DBZ-7015 Make slot validation check opt-in
Browse files Browse the repository at this point in the history
  • Loading branch information
prburgu authored and jpechane committed Oct 16, 2023
1 parent 7fd68a5 commit f9b07e6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,16 @@ public static SchemaRefreshMode parse(String value) {
"Whether or not to drop the logical replication slot when the connector finishes orderly. " +
"By default the replication is kept so that on restart progress can resume from the last recorded location");

public static final Field SLOT_SEEK_TO_KNOWN_OFFSET = Field.createInternal("slot.seek.to.known.offset.on.start")
.withDisplayName("Seek to last known offset on the replication slot")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 3))
.withDefault(false)
.withImportance(Importance.HIGH)
.withDescription(
"Whether or not to seek to the last known offset on the replication slot." +
"Enabling this option results in startup failure if the slot is re-created instead of data loss.");

public static final Field PUBLICATION_NAME = Field.create("publication.name")
.withDisplayName("Publication")
.withType(Type.STRING)
Expand Down Expand Up @@ -952,6 +962,10 @@ protected boolean dropSlotOnStop() {
return getConfig().getBoolean(DROP_SLOT_ON_STOP);
}

public boolean slotSeekToKnownOffsetOnStart() {
return getConfig().getBoolean(SLOT_SEEK_TO_KNOWN_OFFSET);
}

public String publicationName() {
return getConfig().getString(PUBLICATION_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi
int tryCount = 0;
while (true) {
try {
validateSlotIsInExpectedState(walPosition);
if (connectorConfig.slotSeekToKnownOffsetOnStart()) {
validateSlotIsInExpectedState(walPosition);
}
return createReplicationStream(lsn, walPosition);
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3317,7 +3317,8 @@ public void shouldStopConnectorOnSlotRecreation() throws InterruptedException {

configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE);

start(PostgresConnector.class, configBuilder.build());
Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS)
Expand Down Expand Up @@ -3349,7 +3350,8 @@ public void shouldSeekToCorrectOffset() throws InterruptedException {

configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE);

start(PostgresConnector.class, configBuilder.build());
consumeRecordsByTopic(1);
Expand Down

0 comments on commit f9b07e6

Please sign in to comment.