From d7149ef7113f8170e4a0f9c4bb9a252643de0956 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Tue, 12 Dec 2023 14:56:06 -0500 Subject: [PATCH] DBZ-4321 Fix test stability issues --- .../InformixReselectColumnsProcessorIT.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java b/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java index 69ce511..8124251 100644 --- a/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java +++ b/src/test/java/io/debezium/connector/informix/InformixReselectColumnsProcessorIT.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.informix; +import java.util.concurrent.TimeUnit; + import org.junit.After; import org.junit.Before; @@ -25,6 +27,8 @@ public class InformixReselectColumnsProcessorIT extends AbstractReselectProcesso @Before public void beforeEach() throws Exception { connection = TestHelper.testConnection(); + connection.setAutoCommit(false); + initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH); super.beforeEach(); @@ -78,6 +82,7 @@ protected String reselectColumnsList() { @Override protected void createTable() throws Exception { + connection.execute("DROP TABLE IF EXISTS DBZ4321"); connection.execute("CREATE TABLE DBZ4321 (id int not null, data varchar(50), data2 int, primary key(id))"); } @@ -97,7 +102,17 @@ protected String getInsertWithNullValue() { @Override protected void waitForStreamingStarted() throws InterruptedException { + waitForSnapshotToBeCompleted(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); waitForStreamingRunning(TestHelper.TEST_CONNECTOR, TestHelper.TEST_DATABASE); } + protected SourceRecords consumeRecordsByTopicReselectWhenNullStreaming() throws InterruptedException { + waitForAvailableRecords(10, TimeUnit.SECONDS); + return super.consumeRecordsByTopicReselectWhenNullStreaming(); + } + + protected SourceRecords consumeRecordsByTopicReselectWhenNotNullStreaming() throws InterruptedException { + waitForAvailableRecords(10, TimeUnit.SECONDS); + return super.consumeRecordsByTopicReselectWhenNotNullStreaming(); + } }