diff --git a/.github/workflows/cross-maven.yml b/.github/workflows/cross-maven.yml
index 88008b6..0049ebf 100644
--- a/.github/workflows/cross-maven.yml
+++ b/.github/workflows/cross-maven.yml
@@ -15,13 +15,13 @@ on:
pull_request:
branches:
- main
- - 1.*
- 2.*
+ - 3.*
paths-ignore:
- '*.md'
jobs:
- build-core:
+ build_core:
runs-on: ubuntu-latest
outputs:
cache-key: ${{ steps.cache-key-generator.outputs.cache-key }}
@@ -39,6 +39,7 @@ jobs:
echo "BRANCH_FOUND=true" >> $GITHUB_OUTPUT
fi
done < SORTED_PULLS.txt
+
- name: Checkout core repository with pull request branch
if: ${{ steps.branch.outputs.BRANCH_FOUND == 'true' }}
uses: actions/checkout@v4
@@ -46,6 +47,7 @@ jobs:
repository: ${{ github.event.pull_request.user.login }}/debezium
ref: ${{ github.head_ref }}
path: core
+
- name: Checkout core repository with default base branch
if: ${{ steps.branch.outputs.BRANCH_FOUND != 'true' }}
uses: actions/checkout@v4
@@ -53,62 +55,77 @@ jobs:
repository: debezium/debezium
ref: ${{ github.base_ref }}
path: core
- - name: Set up JDK
+
+ - name: Set up Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 17
- - name: Cache Maven packages
+
+ - name: Cache Maven Repository
uses: actions/cache@v4
with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2
- - name: Maven build core
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}
+
+ - name: Build Debezium (Core)
run: >
- ./core/mvnw clean install -B -f core/pom.xml
- -pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka -am
- -DskipTests=true
+ ./core/mvnw clean install -B -ntp -f core/pom.xml
+ -pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi
+ -am
+ -DskipTests=true
-DskipITs=true
-Dcheckstyle.skip=true
-Dformat.skip=true
-Drevapi.skip=true
+ -Dhttp.keepAlive=false
+ -Dmaven.wagon.http.pool=false
+ -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
- # This job builds and creates/restores the cache based on the hash of the POM files from the core
+ # This job builds and creates/restores the cache based on the hash of the POM files from the core
# repository; therefore, we need to output this so that the matrix job can reuse this cache.
- name: Generate Cache Key
id: cache-key-generator
- run: echo "cache-key=${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}" >> "$GITHUB_OUTPUT"
+ run: echo "cache-key=${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}" >> "$GITHUB_OUTPUT"
- build-ifx:
- needs: build-core
- runs-on: ubuntu-latest
+ build_informix:
strategy:
+ # Runs each combination concurrently
matrix:
- informix-plugin: [ "assembly,informix12", "assembly,informix14" ]
+ profile: [ "assembly,informix12", "assembly,informix14" ]
fail-fast: false
+ name: "Informix - ${{ matrix.profile }}"
+ needs: [ build_core ]
+ runs-on: ubuntu-latest
steps:
- - name: Checkout informix repository
+ - name: Checkout Action (Informix)
uses: actions/checkout@v4
with:
path: informix
- - name: Set up JDK
+
+ - name: Set up Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 17
- - name: Cache Maven packages
+
+ - name: Cache Maven Repository
uses: actions/cache@v4
with:
- path: ~/.m2
- key: ${{ needs.build-core.outputs.cache-key }}
- restore-keys: ${{ needs.build-core.outputs.cache-key }}
- - name: Maven build Informix (${{ matrix.informix-plugin }})
+ path: ~/.m2/repository
+ key: ${{ needs.build_core.outputs.cache-key }}
+ restore-keys: ${{ needs.build_core.outputs.cache-key }}
+
+ - name: Build Informix
run: >
- ./informix/mvnw clean install -B -f informix/pom.xml
- -P${{ matrix.informix-plugin }}
+ ./informix/mvnw clean install -B -ntp -f informix/pom.xml
+ -P${{ matrix.profile }}
-Dformat.formatter.goal=validate
-Dformat.imports.goal=check
+ -Dhttp.keepAlive=false
+ -Dmaven.wagon.http.pool=false
+ -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-Ddebezium.test.records.waittime=7
-Ddebezium.test.records.waittime.after.nulls=13
-DfailFlakyTests=false
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index a69ade2..e555c9e 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -9,70 +9,94 @@ on:
push:
branches:
- main
- - 1.*
- 2.*
+ - 3.*
paths-ignore:
- - '*.md'
+ - '*.md'
jobs:
- build-core:
+ build_core:
runs-on: ubuntu-latest
+ outputs:
+ cache-key: ${{ steps.cache-key-generator.outputs.cache-key }}
steps:
- - name: Checkout core repository
+ - name: Checkout Action (Core)
uses: actions/checkout@v4
with:
repository: debezium/debezium
ref: ${{ github.ref }}
path: core
- - name: Set up JDK
+
+ - name: Set up Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 17
- - name: Cache Maven packages
+
+ - name: Cache Maven Repository
uses: actions/cache@v4
with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2
- - name: Maven build core
- run: >
- ./core/mvnw clean install -B -f core/pom.xml
- -pl debezium-bom,debezium-core,debezium-embedded,debezium-storage/debezium-storage-file,debezium-storage/debezium-storage-kafka -am
- -DskipTests=true
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}
+
+ - name: Build Debezium (Core)
+ run: >
+ ./core/mvnw clean install -B -ntp -f core/pom.xml
+ -pl debezium-assembly-descriptors,debezium-bom,debezium-core,debezium-embedded,:debezium-ide-configs,:debezium-checkstyle,:debezium-revapi
+ -am
+ -DskipTests=true
-DskipITs=true
-Dcheckstyle.skip=true
-Dformat.skip=true
-Drevapi.skip=true
- build-ifx:
- needs: build-core
- runs-on: ubuntu-latest
+ -Dhttp.keepAlive=false
+ -Dmaven.wagon.http.pool=false
+ -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
+
+ # This job builds and creates/restores the cache based on the hash of the POM files from the core
+ # repository; therefore, we need to output this so that the matrix job can reuse this cache.
+ - name: Generate Cache Key
+ id: cache-key-generator
+ run: echo "cache-key=${{ runner.os }}-m2-${{ hashFiles('core/**/pom.xml') }}" >> "$GITHUB_OUTPUT"
+
+ build_informix:
strategy:
+ # Runs each combination concurrently
matrix:
- informix-plugin: [ "assembly,informix12", "assembly,informix14" ]
+ profile: [ "assembly,informix12", "assembly,informix14" ]
fail-fast: false
+ name: "Informix - ${{ matrix.profile }}"
+ needs: [ build_core ]
+ runs-on: ubuntu-latest
steps:
- - name: Checkout informix repository
+ - name: Checkout Action (Informix)
uses: actions/checkout@v4
with:
path: informix
- - name: Set up JDK
+
+ - name: Set up Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: 17
- - name: Cache Maven packages
+
+ - name: Cache Maven Repository
uses: actions/cache@v4
with:
- path: ~/.m2
- key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-m2
- - name: Maven build Informix (${{ matrix.informix-plugin }})
+ path: ~/.m2/repository
+ key: ${{ needs.build_core.outputs.cache-key }}
+ restore-keys: ${{ needs.build_core.outputs.cache-key }}
+
+ - name: Build Informix
run: >
- ./informix/mvnw clean install -B -f informix/pom.xml
- -P${{ matrix.informix-plugin }}
+ ./informix/mvnw clean install -B -ntp -f informix/pom.xml
+ -P${{ matrix.profile }}
-Dformat.formatter.goal=validate
-Dformat.imports.goal=check
+ -Dhttp.keepAlive=false
+ -Dmaven.wagon.http.pool=false
+ -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-Ddebezium.test.records.waittime=7
-Ddebezium.test.records.waittime.after.nulls=13
-DfailFlakyTests=false
diff --git a/pom.xml b/pom.xml
index 9db27e6..13f966e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -270,7 +270,7 @@
${docker.dbs}:${informix.version}
- small
+ medium
accept
1
diff --git a/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java b/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java
index 6d915e4..2702df4 100644
--- a/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java
+++ b/src/main/java/io/debezium/connector/informix/InformixChangeRecordEmitter.java
@@ -12,6 +12,7 @@
import com.informix.jdbc.IfmxReadableType;
+import io.debezium.DebeziumException;
import io.debezium.data.Envelope.Operation;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.TableSchema;
@@ -56,7 +57,8 @@ protected Object[] getNewColumnValues() {
@Override
protected void emitTruncateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException {
receiver.changeRecord(getPartition(), tableSchema, Operation.TRUNCATE, null,
- tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()), getOffset(), null);
+ tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()),
+ getOffset(), null);
}
/**
@@ -79,8 +81,11 @@ private static X propagate(Callable callable) {
try {
return callable.call();
}
- catch (Exception e) {
- throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);
+ catch (RuntimeException rex) {
+ throw rex;
+ }
+ catch (Exception ex) {
+ throw new DebeziumException(ex);
}
}
}
diff --git a/src/main/java/io/debezium/connector/informix/InformixConnection.java b/src/main/java/io/debezium/connector/informix/InformixConnection.java
index 28a4f74..3dc6c95 100644
--- a/src/main/java/io/debezium/connector/informix/InformixConnection.java
+++ b/src/main/java/io/debezium/connector/informix/InformixConnection.java
@@ -7,8 +7,6 @@
import java.io.PrintWriter;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.time.Instant;
@@ -28,8 +26,6 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
-import io.debezium.relational.Table;
-import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
@@ -80,6 +76,19 @@ public InformixConnection(JdbcConfiguration config) {
realDatabaseName = retrieveRealDatabaseName().trim();
}
+ private String retrieveRealDatabaseName() {
+ try {
+ return queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name"));
+ }
+ catch (SQLException e) {
+ throw new RuntimeException("Couldn't obtain database name", e);
+ }
+ }
+
+ public String getRealDatabaseName() {
+ return realDatabaseName;
+ }
+
/**
* Calculates the highest available Log Sequence Number.
* Tecnically, the _exact_ highest available LSN is not available in the JDBC session, but the current page of the active
@@ -100,16 +109,33 @@ public Lsn getMaxLsn() throws SQLException {
}, "Maximum LSN query must return exactly one value"));
}
- public String getRealDatabaseName() {
- return realDatabaseName;
+ /**
+ * Calculates the lowest available Log Sequence Number.
+ * @see InformixConnection#getMaxLsn()
+ * @return the current lowest log sequence number
+ */
+ private Lsn getMinLsn() throws SQLException {
+ return queryAndMap(GET_MIN_LSN, singleResultMapper(rs -> {
+ final Lsn lsn = Lsn.of(rs.getLong("uniqid"), rs.getLong("logpage") << 12);
+ LOGGER.trace("Current minimum lsn is {}", lsn.toLongString());
+ return lsn;
+ }, "Minimum LSN query must return exactly one value"));
}
- private String retrieveRealDatabaseName() {
+ public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
+ final TxLogPosition lastPosition = ((InformixOffsetContext) offset).getChangePosition();
+ final Lsn lastBeginLsn = lastPosition.getBeginLsn();
+ final Lsn restartLsn = lastBeginLsn.isAvailable() ? lastBeginLsn : lastPosition.getCommitLsn();
+ LOGGER.trace("Restart LSN is '{}'", restartLsn);
+
try {
- return queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name"));
+ final Lsn minLsn = getMinLsn();
+ LOGGER.trace("Lowest available LSN is '{}'", minLsn);
+
+ return restartLsn.isAvailable() && minLsn.isAvailable() && restartLsn.compareTo(minLsn) >= 0;
}
catch (SQLException e) {
- throw new RuntimeException("Couldn't obtain database name", e);
+ throw new DebeziumException("Couldn't obtain lowest available Log Sequence Number", e);
}
}
@@ -158,22 +184,6 @@ public String quotedColumnIdString(String columnName) {
return columnName;
}
- public Table getTableSchemaFromTableId(TableId tableId) throws SQLException {
- final DatabaseMetaData metadata = connection().getMetaData();
- try (ResultSet columns = metadata.getColumns(
- tableId.catalog(),
- tableId.schema(),
- tableId.table(),
- null)) {
- final TableEditor tableEditor = Table.editor().tableId(tableId);
- while (columns.next()) {
- readTableColumn(columns, tableId, null).ifPresent(columnEditor -> tableEditor.addColumns(columnEditor.create()));
- }
- tableEditor.setPrimaryKeyNames(readPrimaryKeyNames(metadata, tableId));
- return tableEditor.create();
- }
- }
-
public DataSource datasource() {
return new DataSource() {
@Override
@@ -228,31 +238,4 @@ public boolean isWrapperFor(Class> iface) throws SQLException {
};
}
- public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
-
- final Lsn storedLsn = ((InformixOffsetContext) offset).getChangePosition().getCommitLsn();
-
- try {
- final Lsn oldestLsn = getOldestLsn();
-
- if (oldestLsn == null) {
- return false;
- }
-
- LOGGER.trace("Oldest SCN in logs is '{}'", oldestLsn);
- return storedLsn == null || oldestLsn.compareTo(storedLsn) < 0;
- }
- catch (SQLException e) {
- throw new DebeziumException("Unable to get last available log position", e);
- }
- }
-
- private Lsn getOldestLsn() throws SQLException {
- return queryAndMap(GET_MIN_LSN, singleResultMapper(rs -> {
- final Lsn lsn = Lsn.of(rs.getLong("uniqid"), rs.getLong("logpage") << 12);
- LOGGER.trace("Current minimum lsn is {}", lsn.toLongString());
- return lsn;
- }, "Minimum LSN query must return exactly one value"));
- }
-
}
diff --git a/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java b/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java
index d536dbd..f9f091f 100644
--- a/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java
+++ b/src/main/java/io/debezium/connector/informix/InformixDatabaseSchema.java
@@ -12,6 +12,7 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
+import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent;
@@ -75,4 +76,14 @@ else if (schemaChange.getType() == SchemaChangeEventType.ALTER) {
protected DdlParser getDdlParser() {
return null;
}
+
+ @Override
+ public Tables tables() {
+ return super.tables();
+ }
+
+ @Override
+ public Tables.TableFilter getTableFilter() {
+ return super.getTableFilter();
+ }
}
diff --git a/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java
index f59b3a4..b6e0037 100644
--- a/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java
+++ b/src/main/java/io/debezium/connector/informix/InformixSnapshotChangeEventSource.java
@@ -100,7 +100,6 @@ else if (connectorConfig.getSnapshotIsolationMode() == SnapshotIsolationMode.EXC
LOGGER.info("Locking table {}", tableId);
statement.execute(lockingStatement.get());
}
-
}
}
}
diff --git a/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java
index 1402b9e..14f5d8d 100644
--- a/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java
+++ b/src/main/java/io/debezium/connector/informix/InformixStreamingChangeEventSource.java
@@ -30,7 +30,6 @@
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
@@ -88,19 +87,25 @@ public void execute(ChangeEventSourceContext context, InformixPartition partitio
throws InterruptedException {
// Need to refresh schema before CDCEngine is started, to capture columns added in off-line schema evolution
- try {
- for (TableId tableId : schema.tableIds()) {
- final Table table = metadataConnection.getTableSchemaFromTableId(tableId);
- schema.refresh(table);
+ schema.tableIds().stream().map(TableId::schema).distinct().forEach(schemaName -> {
+ try {
+ metadataConnection.readSchema(
+ schema.tables(),
+ null,
+ schemaName,
+ schema.getTableFilter(),
+ null,
+ true);
}
- }
- catch (SQLException e) {
- LOGGER.error("Caught SQLException", e);
- errorHandler.setProducerThrowable(e);
- }
+ catch (SQLException e) {
+ LOGGER.error("Caught SQLException", e);
+ errorHandler.setProducerThrowable(e);
+ }
+ });
TxLogPosition lastPosition = offsetContext.getChangePosition();
Lsn lastCommitLsn = lastPosition.getCommitLsn();
+ Lsn lastChangeLsn = lastPosition.getChangeLsn();
Lsn lastBeginLsn = lastPosition.getBeginLsn();
Lsn beginLsn = lastBeginLsn.isAvailable() ? lastBeginLsn : lastCommitLsn;
@@ -136,9 +141,15 @@ public void execute(ChangeEventSourceContext context, InformixPartition partitio
transactionRecord.getTransactionId(), commitLsn, lastCommitLsn);
break;
}
+ if (commitLsn.equals(lastCommitLsn) && lastChangeLsn.equals(lastCommitLsn)) {
+ LOGGER.info("Skipping transaction with id: '{}' since commitLsn='{}' == lastCommitLsn='{}' and lastChangeLsn='{}' == lastCommitLsn",
+ transactionRecord.getTransactionId(), commitLsn, lastCommitLsn, lastChangeLsn);
+ break;
+ }
if (commitLsn.compareTo(lastCommitLsn) > 0) {
+ Lsn currentLsn = Lsn.of(transactionRecord.getBeginRecord().getSequenceId());
LOGGER.info("Recover finished: from lastBeginLsn='{}' to lastCommitLsn='{}', current Lsn='{}'",
- lastBeginLsn, lastCommitLsn, commitLsn);
+ lastBeginLsn, lastCommitLsn, currentLsn);
recovering = false;
}
handleTransaction(transactionEngine, partition, offsetContext, transactionRecord, recovering);
@@ -248,6 +259,8 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
throws InterruptedException, IfxStreamException {
long tStart = System.nanoTime();
+ long lastChangeSeq = offsetContext.getChangePosition().getChangeLsn().sequence();
+
int transactionId = transactionRecord.getTransactionId();
IfxCDCBeginTransactionRecord beginRecord = transactionRecord.getBeginRecord();
@@ -257,11 +270,11 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
long beginTs = beginRecord.getTime();
long beginSeq = beginRecord.getSequenceId();
- long lowestBeginSeq = engine.getLowestBeginSequence().orElse(beginSeq);
long endSeq = endRecord.getSequenceId();
+ long restartSeq = engine.getLowestBeginSequence().orElse(endSeq);
if (!recover) {
- updateChangePosition(offsetContext, endSeq, beginSeq, transactionId, lowestBeginSeq < beginSeq ? lowestBeginSeq : beginSeq);
+ updateChangePosition(offsetContext, endSeq, beginSeq, transactionId, Math.min(restartSeq, beginSeq));
dispatcher.dispatchTransactionStartedEvent(
partition,
String.valueOf(transactionId),
@@ -276,7 +289,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
if (IfmxStreamRecordType.COMMIT.equals(endRecord.getType())) {
IfxCDCCommitTransactionRecord commitRecord = (IfxCDCCommitTransactionRecord) endRecord;
- long commitSeq = commitRecord.getSequenceId();
+ long commitSeq = endSeq;
long commitTs = commitRecord.getTime();
Map before = null;
@@ -287,7 +300,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
long changeSeq = streamRecord.getSequenceId();
- if (recover && Lsn.of(changeSeq).compareTo(offsetContext.getChangePosition().getChangeLsn()) <= 0) {
+ if (recover && changeSeq <= lastChangeSeq) {
LOGGER.info("Skipping already processed record {}", changeSeq);
continue;
}
@@ -370,7 +383,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
start = System.nanoTime();
- updateChangePosition(offsetContext, commitSeq, commitSeq, transactionId, lowestBeginSeq);
+ updateChangePosition(offsetContext, commitSeq, commitSeq, transactionId, restartSeq);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, Instant.ofEpochSecond(commitTs));
end = System.nanoTime();
@@ -384,7 +397,7 @@ private void handleTransaction(InformixCdcTransactionEngine engine, InformixPart
if (IfmxStreamRecordType.ROLLBACK.equals(endRecord.getType())) {
if (!recover) {
- updateChangePosition(offsetContext, endSeq, endSeq, transactionId, null);
+ updateChangePosition(offsetContext, endSeq, endSeq, transactionId, restartSeq);
offsetContext.getTransactionContext().endTransaction();
}
diff --git a/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java b/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java
index 8d8fbdc..839ccda 100644
--- a/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java
+++ b/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java
@@ -13,7 +13,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Field;
@@ -329,6 +328,8 @@ private void testDefaultValuesCreateTableAndSnapshot(List colu
connection.execute("INSERT INTO dv_test (id) values (1)");
+ waitForAvailableRecords();
+
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic(TestHelper.TEST_DATABASE + ".informix.dv_test");
assertThat(tableRecords).hasSize(1);
@@ -365,6 +366,8 @@ private void testDefaultValuesAlterTableModifyExisting(List co
connection.execute("INSERT INTO dv_test (id) values (2)");
+ waitForAvailableRecords();
+
SourceRecords records = consumeRecordsByTopic(1);
// Verify we got only 1 record for our test
@@ -449,7 +452,7 @@ private void testDefaultValuesAlterTableAdd(List columnDefinit
connection.execute("INSERT INTO dv_test (id) values (3)");
// TODO: ALTER TABLE ADD columns sometimes(!) result in 'ghost' inserts for all existing rows(?)
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
SourceRecords records = consumeRecordsByTopicUntil((integer, record) -> Objects.equals(3, ((Struct) record.key()).get("id")));
assertNoRecordsToConsume();
diff --git a/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java b/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java
index c47befe..7c45c01 100644
--- a/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java
+++ b/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java
@@ -21,7 +21,6 @@
import io.debezium.connector.informix.util.TestHelper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.ConditionalFail;
-import io.debezium.junit.Flaky;
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
import io.debezium.relational.history.SchemaHistory;
@@ -158,10 +157,4 @@ protected int insertMaxSleep() {
public void readsSchemaOnlyForSignaledTables() throws Exception {
super.readsSchemaOnlyForSignaledTables();
}
-
- @Test
- @Flaky("DBZ-7543")
- public void executeBlockingSnapshotWhileStreaming() throws Exception {
- super.executeBlockingSnapshotWhileStreaming();
- }
}
diff --git a/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java b/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java
index 02d7069..2995c73 100644
--- a/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java
+++ b/src/test/java/io/debezium/connector/informix/IncrementalSnapshotIT.java
@@ -162,12 +162,6 @@ protected String server() {
return TestHelper.TEST_DATABASE;
}
- @Override
- protected void sendAdHocSnapshotSignal() throws SQLException {
- super.sendAdHocSnapshotSignal();
- TestHelper.waitForCDC();
- }
-
@Test
@Ignore("Informix does not support DDL operations on tables defined for replication")
@Override
diff --git a/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java b/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java
index d99dc7e..cad4b9e 100644
--- a/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java
+++ b/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java
@@ -14,7 +14,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -179,7 +178,7 @@ private void insertOneAndValidate(String tableName, Schema valueSchema, String i
String topicName = String.format("testdb.informix.%s", tableName);
connection.execute(String.format("insert into %s values(%s)", tableName, insertValue));
- waitForAvailableRecords(1, TimeUnit.MINUTES);
+ waitForAvailableRecords();
List records = consumeRecordsByTopic(1).recordsForTopic(topicName);
assertThat(records).isNotNull().hasSize(1);
diff --git a/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java b/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java
index 9349ad0..a736207 100644
--- a/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java
+++ b/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java
@@ -8,20 +8,20 @@
import static io.debezium.connector.informix.util.TestHelper.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.connector.informix.util.TestHelper.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.connector.informix.util.TestHelper.TYPE_SCALE_PARAMETER_KEY;
-import static io.debezium.data.Envelope.FieldName.AFTER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import static org.junit.Assert.assertNull;
+import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
@@ -36,13 +36,15 @@
import io.debezium.connector.informix.util.TestHelper;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.converters.spi.CloudEventsMaker;
-import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
import io.debezium.data.SchemaAndValueField;
+import io.debezium.data.SourceRecordAssert;
+import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
+import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.junit.ConditionalFail;
-import io.debezium.junit.Flaky;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.history.MemorySchemaHistory;
@@ -123,13 +125,13 @@ public void deleteWithoutTombstone() throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')");
}
+ waitForAvailableRecords();
consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
@@ -151,7 +153,7 @@ public void deleteWithoutTombstone() throws Exception {
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct deleteValue = (Struct) deleteRecord.value();
- assertRecord((Struct) deleteValue.get("before"), expectedValueBefore);
+ assertRecord((Struct) deleteValue.get(FieldName.BEFORE), expectedValueBefore);
}
}
@@ -180,8 +182,7 @@ public void deleteWithTombstone() throws Exception {
connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')");
}
-
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
@@ -206,7 +207,7 @@ public void deleteWithTombstone() throws Exception {
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct deleteValue = (Struct) deleteRecord.value();
- assertRecord((Struct) deleteValue.get("before"), expectedValueBefore);
+ assertRecord((Struct) deleteValue.get(FieldName.BEFORE), expectedValueBefore);
}
}
}
@@ -225,12 +226,12 @@ public void testTruncateTable() throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute("INSERT INTO truncate_table VALUES(" + id + ", 'name')");
}
+ waitForAvailableRecords();
consumeRecordsByTopic(RECORDS_PER_TABLE);
@@ -262,14 +263,10 @@ public void updatePrimaryKey() throws Exception {
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
connection.execute("INSERT INTO tableb VALUES(1, 'b')");
- waitForAvailableRecords(1, TimeUnit.SECONDS);
consumeRecordsByTopic(1);
- connection.execute(
- "UPDATE tablea SET id=100 WHERE id=1",
- "UPDATE tableb SET id=100 WHERE id=1");
-
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ connection.execute("UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1");
+ waitForAvailableRecords();
final SourceRecords records = consumeRecordsByTopic(6);
final List tableA = records.recordsForTopic("testdb.informix.tablea");
@@ -295,9 +292,9 @@ public void updatePrimaryKey() throws Exception {
final Struct deleteKeyA = (Struct) deleteRecordA.key();
final Struct deleteValueA = (Struct) deleteRecordA.value();
- assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA);
+ assertRecord(deleteValueA.getStruct(FieldName.BEFORE), expectedDeleteRowA);
assertRecord(deleteKeyA, expectedDeleteKeyA);
- assertNull(deleteValueA.get("after"));
+ assertNull(deleteValueA.get(FieldName.AFTER));
final Struct tombstoneKeyA = (Struct) tombstoneRecordA.key();
final Struct tombstoneValueA = (Struct) tombstoneRecordA.value();
@@ -306,9 +303,9 @@ public void updatePrimaryKey() throws Exception {
final Struct insertKeyA = (Struct) insertRecordA.key();
final Struct insertValueA = (Struct) insertRecordA.value();
- assertRecord(insertValueA.getStruct("after"), expectedInsertRowA);
+ assertRecord(insertValueA.getStruct(FieldName.AFTER), expectedInsertRowA);
assertRecord(insertKeyA, expectedInsertKeyA);
- assertNull(insertValueA.get("before"));
+ assertNull(insertValueA.get(FieldName.BEFORE));
final List expectedDeleteRowB = List.of(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1),
@@ -327,9 +324,9 @@ public void updatePrimaryKey() throws Exception {
final Struct deletekeyB = (Struct) deleteRecordB.key();
final Struct deleteValueB = (Struct) deleteRecordB.value();
- assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB);
+ assertRecord(deleteValueB.getStruct(FieldName.BEFORE), expectedDeleteRowB);
assertRecord(deletekeyB, expectedDeleteKeyB);
- assertNull(deleteValueB.get("after"));
+ assertNull(deleteValueB.get(FieldName.AFTER));
final Struct tombstonekeyB = (Struct) tombstoneRecordB.key();
final Struct tombstoneValueB = (Struct) tombstoneRecordB.value();
@@ -338,9 +335,9 @@ public void updatePrimaryKey() throws Exception {
final Struct insertkeyB = (Struct) insertRecordB.key();
final Struct insertValueB = (Struct) insertRecordB.value();
- assertRecord(insertValueB.getStruct("after"), expectedInsertRowB);
+ assertRecord(insertValueB.getStruct(FieldName.AFTER), expectedInsertRowB);
assertRecord(insertkeyB, expectedInsertKeyB);
- assertNull(insertValueB.get("before"));
+ assertNull(insertValueB.get(FieldName.BEFORE));
}
@Test
@@ -364,11 +361,8 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
connection.execute("INSERT INTO tableb VALUES(1, 'b')");
consumeRecordsByTopic(1);
- connection.execute(
- "UPDATE tablea SET id=100 WHERE id=1",
- "UPDATE tableb SET id=100 WHERE id=1");
-
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ connection.execute("UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1");
+ waitForAvailableRecords();
final SourceRecords records1 = consumeRecordsByTopic(2);
@@ -382,7 +376,7 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
final SourceRecords records2 = consumeRecordsByTopic(4);
@@ -410,9 +404,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
final Struct deleteKeyA = (Struct) deleteRecordA.key();
final Struct deleteValueA = (Struct) deleteRecordA.value();
- assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA);
+ assertRecord(deleteValueA.getStruct(FieldName.BEFORE), expectedDeleteRowA);
assertRecord(deleteKeyA, expectedDeleteKeyA);
- assertNull(deleteValueA.get("after"));
+ assertNull(deleteValueA.get(FieldName.AFTER));
final Struct tombstoneKeyA = (Struct) tombstoneRecordA.key();
final Struct tombstoneValueA = (Struct) tombstoneRecordA.value();
@@ -421,9 +415,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
final Struct insertKeyA = (Struct) insertRecordA.key();
final Struct insertValueA = (Struct) insertRecordA.value();
- assertRecord(insertValueA.getStruct("after"), expectedInsertRowA);
+ assertRecord(insertValueA.getStruct(FieldName.AFTER), expectedInsertRowA);
assertRecord(insertKeyA, expectedInsertKeyA);
- assertNull(insertValueA.get("before"));
+ assertNull(insertValueA.get(FieldName.BEFORE));
final List expectedDeleteRowB = List.of(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1),
@@ -442,9 +436,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
final Struct deletekeyB = (Struct) deleteRecordB.key();
final Struct deleteValueB = (Struct) deleteRecordB.value();
- assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB);
+ assertRecord(deleteValueB.getStruct(FieldName.BEFORE), expectedDeleteRowB);
assertRecord(deletekeyB, expectedDeleteKeyB);
- assertNull(deleteValueB.get("after"));
+ assertNull(deleteValueB.get(FieldName.AFTER));
final Struct tombstonekeyB = (Struct) tombstoneRecordB.key();
final Struct tombstoneValueB = (Struct) tombstoneRecordB.value();
@@ -453,9 +447,9 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
final Struct insertkeyB = (Struct) insertRecordB.key();
final Struct insertValueB = (Struct) insertRecordB.value();
- assertRecord(insertValueB.getStruct("after"), expectedInsertRowB);
+ assertRecord(insertValueB.getStruct(FieldName.AFTER), expectedInsertRowB);
assertRecord(insertkeyB, expectedInsertKeyB);
- assertNull(insertValueB.get("before"));
+ assertNull(insertValueB.get(FieldName.BEFORE));
}
@Test
@@ -509,11 +503,12 @@ public void verifyOffsets(boolean withOnlineUpd) throws Exception {
if (withOnlineUpd) {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
connection.execute("UPDATE tablea SET cola = 'aa' WHERE id > 1");
connection.execute("UPDATE tableb SET colb = 'bb' WHERE id > 1");
+ waitForAvailableRecords();
+
final SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List tableA = sourceRecords.recordsForTopic("testdb.informix.tablea");
final List tableB = sourceRecords.recordsForTopic("testdb.informix.tableb");
@@ -538,7 +533,7 @@ public void verifyOffsets(boolean withOnlineUpd) throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(3, TimeUnit.MINUTES);
+ waitForAvailableRecords();
final SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List tableA = sourceRecords.recordsForTopic("testdb.informix.tablea");
@@ -559,12 +554,12 @@ public void verifyOffsets(boolean withOnlineUpd) throws Exception {
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct valueA = (Struct) recordA.value();
- assertRecord((Struct) valueA.get("after"), expectedRowA);
- assertNull(valueA.get("before"));
+ assertRecord((Struct) valueA.get(FieldName.AFTER), expectedRowA);
+ assertNull(valueA.get(FieldName.BEFORE));
final Struct valueB = (Struct) recordB.value();
- assertRecord((Struct) valueB.get("after"), expectedRowB);
- assertNull(valueB.get("before"));
+ assertRecord((Struct) valueB.get(FieldName.AFTER), expectedRowB);
+ assertNull(valueB.get(FieldName.BEFORE));
assertThat(recordA.sourceOffset().get("snapshot")).as("Streaming phase").isNull();
assertThat(recordA.sourceOffset().get("snapshot_completed")).as("Streaming phase").isNull();
@@ -590,13 +585,13 @@ public void testTableIncludeList() throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')");
}
+ waitForAvailableRecords();
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE);
final List tableA = records.recordsForTopic("testdb.informix.tablea");
@@ -632,8 +627,7 @@ public void testTableExcludeList() throws Exception {
connection.execute("INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute("INSERT INTO tableb VALUES(" + id + ", 'b')");
}
-
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE);
final List tableA = records.recordsForTopic("testdb.informix.tablea");
@@ -643,6 +637,143 @@ public void testTableExcludeList() throws Exception {
assertNoRecordsToConsume();
}
+ @Test
+ @FixFor("DBZ-7813")
+ public void testColumnIncludeListWithInitialSnapshot() throws Exception {
+ testColumnIncludeList(SnapshotMode.INITIAL);
+ }
+
+ @Test
+ @FixFor("DBZ-7813")
+ public void testColumnIncludeListWithNoData() throws Exception {
+ testColumnIncludeList(SnapshotMode.NO_DATA);
+ }
+
+ public void testColumnIncludeList(SnapshotMode snapshotMode) throws Exception {
+ final Configuration config = TestHelper.defaultConfig()
+ .with(InformixConnectorConfig.SNAPSHOT_MODE, snapshotMode)
+ .with(InformixConnectorConfig.TABLE_INCLUDE_LIST, "testdb.informix.dt_table")
+ .with(InformixConnectorConfig.COLUMN_INCLUDE_LIST,
+ "informix.dt_table.id,informix.dt_table.c1,informix.dt_table.c2,informix.dt_table.c3a,informix.dt_table.c3b")
+ .build();
+
+ final int expectedRecords;
+ if (snapshotMode == SnapshotMode.INITIAL) {
+ expectedRecords = 2;
+ connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (0,123,456,789.01,'test',1.228,234.56)");
+ }
+ else {
+ expectedRecords = 1;
+ }
+
+ start(InformixConnector.class, config);
+ assertConnectorIsRunning();
+
+ // Wait for snapshot completion
+ waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
+
+ // Wait for streaming to start
+ waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
+
+ connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)");
+
+ waitForAvailableRecords();
+
+ final SourceRecords records = consumeRecordsByTopic(expectedRecords);
+ final List table = records.recordsForTopic("testdb.informix.dt_table");
+ assertThat(table).hasSize(expectedRecords);
+ Schema aSchema = SchemaBuilder.struct().optional()
+ .name("testdb.informix.dt_table.Value")
+ .field("id", Schema.INT32_SCHEMA)
+ .field("c1", Schema.OPTIONAL_INT32_SCHEMA)
+ .field("c2", Schema.OPTIONAL_INT32_SCHEMA)
+ .field("c3a", SpecialValueDecimal.builder(DecimalMode.PRECISE, 5, 2).optional().build())
+ .field("c3b", Schema.OPTIONAL_STRING_SCHEMA)
+ .build();
+ Struct aStruct = new Struct(aSchema)
+ .put("c1", 123)
+ .put("c2", 456)
+ .put("c3a", BigDecimal.valueOf(789.01))
+ .put("c3b", "test");
+ if (snapshotMode == SnapshotMode.INITIAL) {
+ SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 0));
+ SourceRecordAssert.assertThat(table.get(1)).valueAfterFieldIsEqualTo(aStruct.put("id", 1));
+ }
+ else {
+ SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 1));
+ }
+
+ assertNoRecordsToConsume();
+ }
+
+ @Test
+ @FixFor("DBZ-7813")
+ public void testColumnExcludeListWithInitialSnapshot() throws Exception {
+ testColumnExcludeList(SnapshotMode.INITIAL);
+ }
+
+ @Test
+ @FixFor("DBZ-7813")
+ public void testColumnExcludeListWithINoData() throws Exception {
+ testColumnExcludeList(SnapshotMode.NO_DATA);
+ }
+
+ public void testColumnExcludeList(SnapshotMode snapshotMode) throws Exception {
+ final Configuration config = TestHelper.defaultConfig()
+ .with(InformixConnectorConfig.SNAPSHOT_MODE, snapshotMode)
+ .with(InformixConnectorConfig.TABLE_EXCLUDE_LIST, "testdb.informix.tablea")
+ .with(InformixConnectorConfig.COLUMN_EXCLUDE_LIST, "informix.dt_table.f1,informix.dt_table.f2")
+ .build();
+
+ final int expectedRecords;
+ if (snapshotMode == SnapshotMode.INITIAL) {
+ expectedRecords = 2;
+ connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (0,123,456,789.01,'test',1.228,234.56)");
+ }
+ else {
+ expectedRecords = 1;
+ }
+
+ start(InformixConnector.class, config);
+ assertConnectorIsRunning();
+
+ // Wait for snapshot completion
+ waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
+
+ // Wait for streaming to start
+ waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
+
+ connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)");
+
+ waitForAvailableRecords();
+
+ final SourceRecords records = consumeRecordsByTopic(expectedRecords);
+ final List table = records.recordsForTopic("testdb.informix.dt_table");
+ assertThat(table).hasSize(expectedRecords);
+ Schema aSchema = SchemaBuilder.struct().optional()
+ .name("testdb.informix.dt_table.Value")
+ .field("id", Schema.INT32_SCHEMA)
+ .field("c1", Schema.OPTIONAL_INT32_SCHEMA)
+ .field("c2", Schema.OPTIONAL_INT32_SCHEMA)
+ .field("c3a", SpecialValueDecimal.builder(DecimalMode.PRECISE, 5, 2).optional().build())
+ .field("c3b", Schema.OPTIONAL_STRING_SCHEMA)
+ .build();
+ Struct aStruct = new Struct(aSchema)
+ .put("c1", 123)
+ .put("c2", 456)
+ .put("c3a", BigDecimal.valueOf(789.01))
+ .put("c3b", "test");
+ if (snapshotMode == SnapshotMode.INITIAL) {
+ SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 0));
+ SourceRecordAssert.assertThat(table.get(1)).valueAfterFieldIsEqualTo(aStruct.put("id", 1));
+ }
+ else {
+ SourceRecordAssert.assertThat(table.get(0)).valueAfterFieldIsEqualTo(aStruct.put("id", 1));
+ }
+
+ assertNoRecordsToConsume();
+ }
+
private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean afterStreaming) throws Exception {
final int RECORDS_PER_TABLE = 30;
final int TABLES = 2;
@@ -675,7 +806,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
return false;
}
final Struct envelope = (Struct) record.value();
- final Struct after = envelope.getStruct("after");
+ final Struct after = envelope.getStruct(FieldName.AFTER);
final Integer id = after.getInt32("id");
final String value = after.getString("cola");
return id != null && id == HALF_ID && "a".equals(value);
@@ -685,7 +816,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
// Wait for snapshot to be completed or a first streaming message delivered
if (restartJustAfterSnapshot) {
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(2, TimeUnit.MINUTES);
+ waitForAvailableRecords();
}
else {
waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
@@ -699,7 +830,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
final List expectedRow = List.of(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, -2),
new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "-a"));
- assertRecord(((Struct) records.allRecordsInOrder().get(0).value()).getStruct(Envelope.FieldName.AFTER), expectedRow);
+ assertRecord(((Struct) records.allRecordsInOrder().get(0).value()).getStruct(FieldName.AFTER), expectedRow);
}
connection.setAutoCommit(false);
@@ -710,7 +841,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
}
connection.commit();
- waitForAvailableRecords(1, TimeUnit.MINUTES);
+ waitForAvailableRecords();
List records = consumeRecordsByTopic(RECORDS_PER_TABLE).allRecordsInOrder();
@@ -720,7 +851,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
final List expectedLastRow = List.of(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, HALF_ID - 1),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
- assertRecord((Struct) value.get("after"), expectedLastRow);
+ assertRecord((Struct) value.get(FieldName.AFTER), expectedLastRow);
waitForConnectorShutdown(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
stopConnector();
@@ -730,7 +861,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(1, TimeUnit.MINUTES);
+ waitForAvailableRecords();
SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE);
List tableA = sourceRecords.recordsForTopic("testdb.informix.tablea");
@@ -751,12 +882,12 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct valueA = (Struct) recordA.value();
- assertRecord((Struct) valueA.get("after"), expectedRowA);
- assertNull(valueA.get("before"));
+ assertRecord((Struct) valueA.get(FieldName.AFTER), expectedRowA);
+ assertNull(valueA.get(FieldName.BEFORE));
final Struct valueB = (Struct) recordB.value();
- assertRecord((Struct) valueB.get("after"), expectedRowB);
- assertNull(valueB.get("before"));
+ assertRecord((Struct) valueB.get(FieldName.AFTER), expectedRowB);
+ assertNull(valueB.get(FieldName.BEFORE));
}
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
@@ -765,8 +896,7 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
connection.executeWithoutCommitting("INSERT INTO tableb VALUES(" + id + ", 'b')");
connection.commit();
}
-
- waitForAvailableRecords(1, TimeUnit.MINUTES);
+ waitForAvailableRecords();
sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
tableA = sourceRecords.recordsForTopic("testdb.informix.tablea");
@@ -787,12 +917,12 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct valueA = (Struct) recordA.value();
- assertRecord((Struct) valueA.get("after"), expectedRowA);
- assertNull(valueA.get("before"));
+ assertRecord((Struct) valueA.get(FieldName.AFTER), expectedRowA);
+ assertNull(valueA.get(FieldName.BEFORE));
final Struct valueB = (Struct) recordB.value();
- assertRecord((Struct) valueB.get("after"), expectedRowB);
- assertNull(valueB.get("before"));
+ assertRecord((Struct) valueB.get(FieldName.AFTER), expectedRowB);
+ assertNull(valueB.get(FieldName.BEFORE));
}
assertNoRecordsToConsume();
@@ -852,11 +982,12 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
connection.execute("INSERT INTO masked_hashed_column_table (id, name, name2, name3) VALUES (10, 'some_name', 'test', 'test')");
connection.execute("INSERT INTO truncated_column_table VALUES(11, 'some_name')");
+ waitForAvailableRecords();
+
final SourceRecords records = consumeRecordsByTopic(2);
final List tableA = records.recordsForTopic("testdb.informix.masked_hashed_column_table");
final List tableB = records.recordsForTopic("testdb.informix.truncated_column_table");
@@ -867,8 +998,8 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception
VerifyRecord.isValidInsert(record, "id", 10);
Struct value = (Struct) record.value();
- if (value.getStruct("after") != null) {
- Struct after = value.getStruct("after");
+ if (value.getStruct(FieldName.AFTER) != null) {
+ Struct after = value.getStruct(FieldName.AFTER);
assertThat(after.getString("name")).isEqualTo("************");
assertThat(after.getString("name2")).isEqualTo("8e68c68edbbac316dfe2f6ada6b0d2d3e2002b487a985d4b7c7c82dd83b0f4d7");
assertThat(after.getString("name3")).isEqualTo("8e68c68edbbac316dfe2");
@@ -879,8 +1010,8 @@ record = tableB.get(0);
VerifyRecord.isValidInsert(record, "id", 11);
value = (Struct) record.value();
- if (value.getStruct("after") != null) {
- assertThat(value.getStruct("after").getString("name")).isEqualTo("some");
+ if (value.getStruct(FieldName.AFTER) != null) {
+ assertThat(value.getStruct(FieldName.AFTER).getString("name")).isEqualTo("some");
}
}
@@ -897,10 +1028,11 @@ public void shouldRewriteIdentityKey() throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
connection.execute("INSERT INTO tablea (id, cola) values (100, 'hundred')");
+ waitForAvailableRecords();
+
List records = consumeRecordsByTopic(1).recordsForTopic("testdb.informix.tablea");
assertThat(records).isNotNull().isNotEmpty();
assertThat(records.get(0).key()).isNotNull();
@@ -923,17 +1055,18 @@ public void shouldPropagateSourceTypeByDatatype() throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
connection.execute("INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)");
+ waitForAvailableRecords();
+
final SourceRecords records = consumeRecordsByTopic(1);
List recordsForTopic = records.recordsForTopic("testdb.informix.dt_table");
assertThat(recordsForTopic).hasSize(1);
assertNoRecordsToConsume();
- final Field before = recordsForTopic.get(0).valueSchema().field("before");
+ final Field before = recordsForTopic.get(0).valueSchema().field(FieldName.BEFORE);
assertThat(before.schema().field("id").schema().parameters()).isNull();
assertThat(before.schema().field("c1").schema().parameters()).isNull();
@@ -965,8 +1098,6 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
.with(InformixConnectorConfig.TABLE_INCLUDE_LIST, "testdb.informix.tablea")
.build();
- connection.execute("INSERT INTO tablea (id,cola) values (1001, 'DBZ3668')");
-
start(InformixConnector.class, config);
assertConnectorIsRunning();
@@ -987,9 +1118,9 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
// Wait for streaming to start
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- connection.execute("INSERT INTO tablea (id,cola) VALUES (1002, 'DBZ3668')");
+ connection.execute("INSERT INTO tablea (id,cola) VALUES (3668, 'DBZ3668')");
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
records = consumeRecordsByTopic(1);
@@ -1029,10 +1160,10 @@ public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception {
int expectedRecordCount = 2;
SourceRecords sourceRecords = consumeRecordsByTopic(expectedRecordCount);
assertThat(sourceRecords.recordsForTopic("testdb.informix.always_snapshot")).hasSize(expectedRecordCount);
- Struct struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(AFTER);
+ Struct struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(0).value()).get(FieldName.AFTER);
TestCase.assertEquals(1, struct.get("id"));
TestCase.assertEquals("Test1", struct.get("data"));
- struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(1).value()).get(AFTER);
+ struct = (Struct) ((Struct) sourceRecords.allRecordsInOrder().get(1).value()).get(FieldName.AFTER);
TestCase.assertEquals(2, struct.get("id"));
TestCase.assertEquals("Test2", struct.get("data"));
@@ -1054,16 +1185,16 @@ public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception {
// Check we get up-to-date data in the snapshot.
assertThat(sourceRecords.recordsForTopic("testdb.informix.always_snapshot")).hasSize(expectedRecordCount);
- struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(0).value()).get(AFTER);
+ struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(0).value()).get(FieldName.AFTER);
TestCase.assertEquals(3, struct.get("id"));
TestCase.assertEquals("Test3", struct.get("data"));
- struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(1).value()).get(AFTER);
+ struct = (Struct) ((Struct) sourceRecords.recordsForTopic("testdb.informix.always_snapshot").get(1).value()).get(FieldName.AFTER);
TestCase.assertEquals(2, struct.get("id"));
TestCase.assertEquals("Test2", struct.get("data"));
}
@Test
- @Flaky("7699")
+ @FixFor("DBZ-7699")
public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
Configuration.Builder builder = TestHelper.defaultConfig()
@@ -1097,7 +1228,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
connection.execute("INSERT INTO tablea VALUES (100,'100')");
connection.execute("INSERT INTO tablea VALUES (200,'200')");
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
recordCount = 2;
sourceRecords = consumeRecordsByTopic(recordCount);
@@ -1144,7 +1275,7 @@ public void shouldAllowForCustomSnapshot() throws InterruptedException, SQLExcep
connection.execute("INSERT INTO tablea VALUES (2, '1');");
connection.execute("INSERT INTO tableb VALUES (2, '1');");
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
actualRecords = consumeRecordsByTopic(2);
diff --git a/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java b/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java
index 09472ca..ebde52b 100644
--- a/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java
+++ b/src/test/java/io/debezium/connector/informix/InformixOfflineDefaultValueIT.java
@@ -6,7 +6,6 @@
package io.debezium.connector.informix;
import java.sql.SQLException;
-import java.util.concurrent.TimeUnit;
import org.junit.Before;
@@ -52,7 +51,5 @@ protected void performSchemaChange(Configuration config, InformixConnection conn
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
assertConnectorIsRunning();
-
- waitForAvailableRecords(1, TimeUnit.SECONDS);
}
}
diff --git a/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java b/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java
index 8124251..3e17ea7 100644
--- a/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java
+++ b/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java
@@ -5,8 +5,6 @@
*/
package io.debezium.connector.informix;
-import java.util.concurrent.TimeUnit;
-
import org.junit.After;
import org.junit.Before;
@@ -107,12 +105,12 @@ protected void waitForStreamingStarted() throws InterruptedException {
}
protected SourceRecords consumeRecordsByTopicReselectWhenNullStreaming() throws InterruptedException {
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
return super.consumeRecordsByTopicReselectWhenNullStreaming();
}
protected SourceRecords consumeRecordsByTopicReselectWhenNotNullStreaming() throws InterruptedException {
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
return super.consumeRecordsByTopicReselectWhenNotNullStreaming();
}
}
diff --git a/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java b/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java
index 104a507..09854f8 100644
--- a/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java
+++ b/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java
@@ -11,7 +11,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
@@ -88,8 +87,6 @@ public void testColumnOrderWhileInsert() throws Exception {
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
-
// insert a record
Map recordToBeInsert = new LinkedHashMap<>() {
{
@@ -104,7 +101,7 @@ public void testColumnOrderWhileInsert() throws Exception {
Strings.join(", ", recordToBeInsert.keySet()),
Strings.join("\", \"", recordToBeInsert.values())));
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
String topicName = String.format("%s.informix.%s", TestHelper.TEST_DATABASE, testTableName);
SourceRecords sourceRecords = consumeRecordsByTopic(1);
@@ -144,8 +141,6 @@ public void testColumnOrderWhileUpdate() throws Exception {
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
-
Map recordAfterUpdate = new LinkedHashMap<>(recordToBeUpdate);
// new value
recordAfterUpdate.put("address", "00:00:00:00:00:00");
@@ -154,7 +149,7 @@ public void testColumnOrderWhileUpdate() throws Exception {
connection.execute(String.format("update %s set address = \"%s\" where id = \"%s\"",
testTableName, recordAfterUpdate.get("address"), recordToBeUpdate.get("id")));
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
String topicName = String.format("%s.informix.%s", TestHelper.TEST_DATABASE, testTableName);
SourceRecords sourceRecords = consumeRecordsByTopic(1);
@@ -197,11 +192,9 @@ public void testColumnOrderWhileDelete() throws Exception {
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
-
connection.execute(String.format("delete from %s where id = \"%s\"", testTableName, recordToBeDelete.get("id")));
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
String topicName = String.format("%s.informix.%s", TestHelper.TEST_DATABASE, testTableName);
SourceRecords sourceRecords = consumeRecordsByTopic(1);
diff --git a/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java b/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java
index a7e185b..5236f0c 100644
--- a/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java
+++ b/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java
@@ -9,7 +9,6 @@
import java.sql.SQLException;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -94,7 +93,7 @@ public void snapshotSchemaChanges() throws Exception {
waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE);
- waitForAvailableRecords(10, TimeUnit.SECONDS);
+ waitForAvailableRecords();
// DDL for 3 tables
SourceRecords records = consumeRecordsByTopic(3);
diff --git a/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java b/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java
index afdd280..8ebee75 100644
--- a/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java
+++ b/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java
@@ -11,7 +11,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
@@ -98,7 +97,7 @@ public void transactionMetadata() throws Exception {
connection.setAutoCommit(true);
connection.execute("INSERT INTO tableb VALUES(1000, 'b')");
- waitForAvailableRecords(5, TimeUnit.SECONDS);
+ waitForAvailableRecords();
// BEGIN, data, END, BEGIN, data, END
final SourceRecords records = consumeRecordsByTopic(1 + RECORDS_PER_TABLE * 2 + 1 + 3);
diff --git a/src/test/java/io/debezium/connector/informix/util/TestHelper.java b/src/test/java/io/debezium/connector/informix/util/TestHelper.java
index 622ec52..61d058e 100644
--- a/src/test/java/io/debezium/connector/informix/util/TestHelper.java
+++ b/src/test/java/io/debezium/connector/informix/util/TestHelper.java
@@ -11,10 +11,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.List;
+import java.util.concurrent.TimeUnit;
-import org.apache.kafka.connect.data.Struct;
-import org.awaitility.Durations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,11 +20,9 @@
import io.debezium.config.Configuration;
import io.debezium.connector.informix.InformixConnection;
import io.debezium.connector.informix.InformixConnectorConfig;
-import io.debezium.data.SchemaAndValueField;
import io.debezium.jdbc.JdbcConfiguration;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.storage.file.history.FileSchemaHistory;
-import io.debezium.util.Clock;
-import io.debezium.util.Metronome;
import io.debezium.util.Testing;
public class TestHelper {
@@ -87,6 +83,7 @@ public static Configuration.Builder defaultConfig() {
return Configuration.copy(defaultJdbcConfig().map(key -> InformixConnectorConfig.DATABASE_CONFIG_PREFIX + key))
.with(CommonConnectorConfig.TOPIC_PREFIX, TEST_DATABASE)
+ .with(RelationalDatabaseConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, TimeUnit.SECONDS.toMillis(30))
.with(InformixConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class)
.with(FileSchemaHistory.FILE_PATH, SCHEMA_HISTORY_PATH)
.with(InformixConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
@@ -125,16 +122,4 @@ public static void assertCdcEnabled(InformixConnection conn) throws SQLException
assertThat(is_logging + is_buff_logging).isPositive();
}
- public static void assertRecord(Struct record, List expected) {
- expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
- }
-
- public static void waitForCDC() {
- try {
- Metronome.parker(Durations.TWO_SECONDS, Clock.SYSTEM).pause();
- }
- catch (InterruptedException e) {
- // IGNORE
- }
- }
}