diff --git a/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java b/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java index 26a43cb..8d8fbdc 100644 --- a/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java +++ b/src/test/java/io/debezium/connector/informix/AbstractInformixDefaultValueIT.java @@ -12,6 +12,8 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.kafka.connect.data.Field; @@ -19,19 +21,15 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestRule; import io.debezium.config.Configuration; import io.debezium.connector.informix.util.TestHelper; import io.debezium.data.Envelope; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; -import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.jdbc.TemporalPrecisionMode; -import io.debezium.junit.ConditionalFail; -import io.debezium.junit.Flaky; /** * Abstract default value integration test. @@ -42,10 +40,7 @@ * * @author Lars M Johansson, Chris Cranford */ -public abstract class AbstractInformixDefaultValueIT extends AbstractConnectorTest { - - @Rule - public TestRule conditionalFail = new ConditionalFail(); +public abstract class AbstractInformixDefaultValueIT extends AbstractAsyncEngineConnectorTest { private InformixConnection connection; private Configuration config; @@ -79,7 +74,6 @@ public void after() throws SQLException { @Test @FixFor("DBZ-4990") - @Flaky("DBZ-7542") public void shouldHandleBooleanDefaultTypes() throws Exception { List columnDefinitions = List.of( new ColumnDefinition("val_boolean", "BOOLEAN", @@ -92,7 +86,6 @@ public void shouldHandleBooleanDefaultTypes() throws Exception { @Test @FixFor("DBZ-4990") - @Flaky("DBZ-7542") public void shouldHandleNumericDefaultTypes() throws Exception { // TODO: remove once https://github.com/Apicurio/apicurio-registry/issues/2990 is fixed if (VerifyRecord.isApucurioAvailable()) { @@ -126,7 +119,6 @@ public void shouldHandleNumericDefaultTypes() throws Exception { @Test @FixFor("DBZ-4990") - @Flaky("DBZ-7542") public void shouldHandleFloatPointDefaultTypes() throws Exception { // TODO: remove once https://github.com/Apicurio/apicurio-registry/issues/2980 is fixed if (VerifyRecord.isApucurioAvailable()) { @@ -160,7 +152,6 @@ public void shouldHandleFloatPointDefaultTypes() throws Exception { @Test @FixFor("DBZ-4990") - @Flaky("DBZ-7542") public void shouldHandleCharacterDefaultTypes() throws Exception { List columnDefinitions = Arrays.asList( new ColumnDefinition("val_char", "char(5)", @@ -189,7 +180,6 @@ public void shouldHandleCharacterDefaultTypes() throws Exception { @Test @FixFor("DBZ-4990") - @Flaky("DBZ-7542") public void shouldHandleDateTimeDefaultTypes() throws Exception { List columnDefinitions = Arrays.asList( new ColumnDefinition("val_date", "DATE", @@ -459,7 +449,8 @@ private void testDefaultValuesAlterTableAdd(List columnDefinit connection.execute("INSERT INTO dv_test (id) values (3)"); // TODO: ALTER TABLE ADD columns sometimes(!) result in 'ghost' inserts for all existing rows(?) - SourceRecords records = consumeRecordsByTopic(4); + waitForAvailableRecords(10, TimeUnit.SECONDS); + SourceRecords records = consumeRecordsByTopicUntil((integer, record) -> Objects.equals(3, ((Struct) record.key()).get("id"))); assertNoRecordsToConsume(); List tableRecords = records.recordsForTopic(TestHelper.TEST_DATABASE + ".informix.dv_test"); diff --git a/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java b/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java index c47befe..d9cc1d2 100644 --- a/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java +++ b/src/test/java/io/debezium/connector/informix/BlockingSnapshotIT.java @@ -12,24 +12,17 @@ import org.junit.After; import org.junit.Before; import org.junit.Ignore; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestRule; import io.debezium.config.Configuration.Builder; import io.debezium.connector.informix.InformixConnectorConfig.SnapshotMode; import io.debezium.connector.informix.util.TestHelper; import io.debezium.jdbc.JdbcConnection; -import io.debezium.junit.ConditionalFail; -import io.debezium.junit.Flaky; import io.debezium.pipeline.AbstractBlockingSnapshotTest; import io.debezium.relational.history.SchemaHistory; public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { - @Rule - public TestRule conditionalFail = new ConditionalFail(); - private InformixConnection connection; @Before @@ -160,7 +153,6 @@ public void readsSchemaOnlyForSignaledTables() throws Exception { } @Test - @Flaky("DBZ-7543") public void executeBlockingSnapshotWhileStreaming() throws Exception { super.executeBlockingSnapshotWhileStreaming(); } diff --git a/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java b/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java index 4b2b491..d99dc7e 100644 --- a/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixCdcTypesIT.java @@ -28,14 +28,13 @@ import io.debezium.connector.informix.InformixConnectorConfig.SnapshotMode; import io.debezium.connector.informix.util.TestHelper; import io.debezium.data.SourceRecordAssert; -import io.debezium.embedded.AbstractConnectorTest; -import io.debezium.junit.Flaky; +import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; import io.debezium.time.Date; import io.debezium.util.Testing; -public class InformixCdcTypesIT extends AbstractConnectorTest { +public class InformixCdcTypesIT extends AbstractAsyncEngineConnectorTest { private InformixConnection connection; @@ -86,7 +85,6 @@ public void after() throws SQLException { } @Test - @Flaky("DBZ-7531") public void testTypes() throws Exception { final Configuration config = TestHelper.defaultConfig() @@ -181,7 +179,7 @@ private void insertOneAndValidate(String tableName, Schema valueSchema, String i String topicName = String.format("testdb.informix.%s", tableName); connection.execute(String.format("insert into %s values(%s)", tableName, insertValue)); - waitForAvailableRecords(10, TimeUnit.SECONDS); + waitForAvailableRecords(1, TimeUnit.MINUTES); List records = consumeRecordsByTopic(1).recordsForTopic(topicName); assertThat(records).isNotNull().hasSize(1); diff --git a/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java b/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java index 57bbf93..a567934 100644 --- a/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixConnectorIT.java @@ -26,9 +26,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestRule; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; @@ -40,9 +38,7 @@ import io.debezium.data.SchemaAndValueField; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; -import io.debezium.embedded.AbstractConnectorTest; -import io.debezium.junit.ConditionalFail; -import io.debezium.junit.Flaky; +import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.history.MemorySchemaHistory; @@ -55,10 +51,7 @@ * Integration test for the Debezium Informix connector. * */ -public class InformixConnectorIT extends AbstractConnectorTest { - - @Rule - public TestRule conditionalFail = new ConditionalFail(); +public class InformixConnectorIT extends AbstractAsyncEngineConnectorTest { private InformixConnection connection; @@ -458,14 +451,12 @@ public void updatePrimaryKeyWithRestartInMiddle() throws Exception { @Test @FixFor("DBZ-1069") - @Flaky("DBZ-7531") public void verifyOffsetsWithoutOnlineUpd() throws Exception { verifyOffsets(false); } @Test @FixFor("DBZ-7531") - @Flaky("DBZ-7531") public void verifyOffsetsWithOnlineUpd() throws Exception { verifyOffsets(true); } @@ -538,7 +529,7 @@ public void verifyOffsets(boolean withOnlineUpd) throws Exception { // Wait for streaming to start waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); - waitForAvailableRecords(2, TimeUnit.MINUTES); + waitForAvailableRecords(3, TimeUnit.MINUTES); final SourceRecords sourceRecords = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); final List tableA = sourceRecords.recordsForTopic("testdb.informix.tablea"); @@ -722,9 +713,9 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")); assertRecord((Struct) value.get("after"), expectedLastRow); + waitForConnectorShutdown(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); stopConnector(); assertConnectorNotRunning(); - waitForConnectorShutdown(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); start(InformixConnector.class, config); assertConnectorIsRunning(); @@ -800,21 +791,18 @@ private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean af @Test @FixFor("DBZ-1128") - @Flaky("DBZ-7539") public void restartInTheMiddleOfTxAfterSnapshot() throws Exception { restartInTheMiddleOfTx(true, false); } @Test @FixFor("DBZ-1128") - @Flaky("DBZ-7539") public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception { restartInTheMiddleOfTx(false, true); } @Test @FixFor("DBZ-1128") - @Flaky("DBZ-7539") public void restartInTheMiddleOfTx() throws Exception { restartInTheMiddleOfTx(false, false); } diff --git a/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java b/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java index 1b1ab54..104a507 100644 --- a/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixValidateColumnOrderIT.java @@ -25,10 +25,10 @@ import io.debezium.connector.informix.InformixConnectorConfig.SnapshotMode; import io.debezium.connector.informix.util.TestHelper; import io.debezium.data.VerifyRecord; -import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.util.Strings; -public class InformixValidateColumnOrderIT extends AbstractConnectorTest { +public class InformixValidateColumnOrderIT extends AbstractAsyncEngineConnectorTest { private static final String testTableName = "test_column_order"; private static final Map testTableColumns = new LinkedHashMap<>() { diff --git a/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java b/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java index 60c466f..a7e185b 100644 --- a/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java +++ b/src/test/java/io/debezium/connector/informix/SchemaHistoryTopicIT.java @@ -17,17 +17,13 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestRule; import io.debezium.config.Configuration; import io.debezium.connector.informix.InformixConnectorConfig.SnapshotMode; import io.debezium.connector.informix.util.TestHelper; import io.debezium.doc.FixFor; -import io.debezium.embedded.AbstractConnectorTest; -import io.debezium.junit.ConditionalFail; -import io.debezium.junit.Flaky; +import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; /** * Integration test for the user-facing history topic of the Debezium Informix Server connector. @@ -36,10 +32,7 @@ * the {@code ALTER} schema events from streaming * */ -public class SchemaHistoryTopicIT extends AbstractConnectorTest { - - @Rule - public TestRule conditionalFail = new ConditionalFail(); +public class SchemaHistoryTopicIT extends AbstractAsyncEngineConnectorTest { private InformixConnection connection; @@ -80,7 +73,6 @@ public void after() throws SQLException { @Test @FixFor("DBZ-1904") - @Flaky("DBZ-7556") public void snapshotSchemaChanges() throws Exception { final int RECORDS_PER_TABLE = 5; final int TABLES = 2; diff --git a/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java b/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java index 0b1cfd9..afdd280 100644 --- a/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java +++ b/src/test/java/io/debezium/connector/informix/TransactionMetadataIT.java @@ -18,26 +18,19 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestRule; import io.debezium.config.Configuration; import io.debezium.connector.informix.InformixConnectorConfig.SnapshotMode; import io.debezium.connector.informix.util.TestHelper; -import io.debezium.embedded.AbstractConnectorTest; -import io.debezium.junit.ConditionalFail; -import io.debezium.junit.Flaky; +import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.util.Collect; /** * Transaction metadata test for the Debezium Informix Server connector. * */ -public class TransactionMetadataIT extends AbstractConnectorTest { - - @Rule - public TestRule conditionalFail = new ConditionalFail(); +public class TransactionMetadataIT extends AbstractAsyncEngineConnectorTest { private InformixConnection connection; @@ -75,7 +68,6 @@ public void after() throws SQLException { } @Test - @Flaky("DBZ-7540") public void transactionMetadata() throws Exception { final int RECORDS_PER_TABLE = 5; final int ID_START = 10;