Skip to content

Commit

Permalink
Fix flaky tests and test infra failures due to stuck tests (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhav-yb authored Jun 21, 2023
1 parent 7da4c4b commit aa50e01
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 218 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/sanity-workflow-preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ jobs:
- name: Sanity Test that code compiles and tests pass.
env:
YB_DOCKER_IMAGE: ${{ secrets.YB_DOCKER_IMAGE_PREVIEW }}
run: mvn clean test -Dtest=!YugabyteDBColocatedTablesTest#shouldWorkWithMixOfColocatedAndNonColocatedTables,!YugabyteDBDatatypesTest#testEnumValue
- name: Flaky test i.e. YugabyteDBDatatypesTest#testEnumValue
env:
YB_DOCKER_IMAGE: ${{ secrets.YB_DOCKER_IMAGE_PREVIEW }}
run: mvn clean test -Dtest=YugabyteDBDatatypesTest#testEnumValue
run: mvn clean test -Dtest=!YugabyteDBColocatedTablesTest#shouldWorkWithMixOfColocatedAndNonColocatedTables
- name: Flaky test YugabyteDBColocatedTablesTest#shouldWorkWithMixOfColocatedAndNonColocatedTables
env:
YB_DOCKER_IMAGE: ${{ secrets.YB_DOCKER_IMAGE_PREVIEW }}
Expand Down
6 changes: 1 addition & 5 deletions .github/workflows/sanity-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ jobs:
- name: Sanity Test that code compiles and tests pass.
env:
YB_DOCKER_IMAGE: ${{ secrets.YB_DOCKER_IMAGE }}
run: mvn clean test -Dtest=!YugabyteDBColocatedTablesTest#shouldWorkWithMixOfColocatedAndNonColocatedTables,!YugabyteDBDatatypesTest#testEnumValue
- name: Flaky test i.e. YugabyteDBDatatypesTest#testEnumValue
env:
YB_DOCKER_IMAGE: ${{ secrets.YB_DOCKER_IMAGE }}
run: mvn clean test -Dtest=YugabyteDBDatatypesTest#testEnumValue
run: mvn clean test -Dtest=!YugabyteDBColocatedTablesTest#shouldWorkWithMixOfColocatedAndNonColocatedTables
- name: Flaky test YugabyteDBColocatedTablesTest#shouldWorkWithMixOfColocatedAndNonColocatedTables
env:
YB_DOCKER_IMAGE: ${{ secrets.YB_DOCKER_IMAGE }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.junit.jupiter.api.Assertions.*;

import io.debezium.config.Configuration;
import io.debezium.connector.yugabytedb.annotations.PreviewOnly;
import io.debezium.connector.yugabytedb.common.TestBaseClass;
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import io.debezium.connector.yugabytedb.common.YugabytedTestBase;
Expand All @@ -13,11 +12,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.*;

/**
Expand Down Expand Up @@ -314,32 +310,6 @@ private void verifyRecordCount(List<SourceRecord> records, long recordsCount) {
waitAndFailIfCannotConsume(records, recordsCount, 10 * 60 * 1000);
}

private void waitAndFailIfCannotConsume(List<SourceRecord> records, long recordsCount,
long milliSecondsToWait) {
AtomicLong totalConsumedRecords = new AtomicLong();
long seconds = milliSecondsToWait / 1000;
try {
Awaitility.await()
.atMost(Duration.ofSeconds(seconds))
.until(() -> {
int consumed = consumeAvailableRecords(record -> {
LOGGER.debug("The record being consumed is " + record);
records.add(record);
});
if (consumed > 0) {
totalConsumedRecords.addAndGet(consumed);
LOGGER.debug("Consumed " + totalConsumedRecords + " records");
}

return totalConsumedRecords.get() == recordsCount;
});
} catch (ConditionTimeoutException exception) {
fail("Failed to consume " + recordsCount + " records in " + seconds + " seconds, total consumed: " + totalConsumedRecords.get(), exception);
}

assertEquals(recordsCount, totalConsumedRecords.get());
}

protected static class Executor extends TestBaseClass implements Runnable {
private final List<String> tables;
private final int columnCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import io.debezium.connector.yugabytedb.common.YugabytedTestBase;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.*;
import org.yb.client.GetDBStreamInfoResponse;
import org.yb.client.YBClient;
Expand All @@ -41,7 +38,6 @@ public class YugabyteDBDatatypesTest extends YugabyteDBContainerTestBase {
"CREATE SCHEMA s2; " +
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));";
private static final String SETUP_TABLES_STMT = CREATE_TABLES_STMT + INSERT_STMT;

private void insertRecords(long numOfRowsToBeInserted) throws Exception {
String formatInsertString = "INSERT INTO t1 VALUES (%d, 'Vaibhav', 'Kushwaha', 30);";
Expand All @@ -55,19 +51,6 @@ private void insertRecords(long numOfRowsToBeInserted) throws Exception {
}).get();
}

// This function will one row each of the specified enum labels
private void insertEnumRecords() throws Exception {
String[] enumLabels = {"ZERO", "ONE", "TWO"};
String formatInsertString = "INSERT INTO test_enum VALUES (%d, '%s');";
CompletableFuture.runAsync(() -> {
for (int i = 0; i < enumLabels.length; i++) {
TestHelper.execute(String.format(formatInsertString, i, enumLabels[i]));
}
}).exceptionally(throwable -> {
throw new RuntimeException(throwable);
}).get();
}

private void updateRecords(long numOfRowsToBeUpdated) throws Exception {
String formatUpdateString = "UPDATE t1 SET hours = 10 WHERE id = %d";
CompletableFuture.runAsync(() -> {
Expand Down Expand Up @@ -119,43 +102,6 @@ private void verifyDeletedFieldPresentInValue(long recordsCount, YBExtractNewRec
}
}

/**
* Consume the records available and add them to a list for further assertion purposes.
* @param records list to which we need to add the records we consume, pass a
* {@code new ArrayList<>()} if you do not need assertions on the consumed values
* @param recordsCount total number of records which should be consumed
* @param milliSecondsToWait duration in milliseconds to wait for while consuming
*/
private void waitAndFailIfCannotConsume(List<SourceRecord> records, long recordsCount,
long milliSecondsToWait) {
AtomicLong totalConsumedRecords = new AtomicLong();
long seconds = milliSecondsToWait / 1000;
try {
Awaitility.await()
.atMost(Duration.ofSeconds(seconds))
.until(() -> {
int consumed = consumeAvailableRecords(record -> {
LOGGER.debug("The record being consumed is " + record);
records.add(record);
});
if (consumed > 0) {
totalConsumedRecords.addAndGet(consumed);
LOGGER.info("Consumed " + totalConsumedRecords + " records");
}

return totalConsumedRecords.get() >= recordsCount;
});
} catch (ConditionTimeoutException exception) {
fail("Failed to consume " + recordsCount + " in " + seconds + " seconds, consumed only " + totalConsumedRecords.get(), exception);
}

assertEquals(recordsCount, totalConsumedRecords.get());
}

private void waitAndFailIfCannotConsume(List<SourceRecord> records, long recordsCount) {
waitAndFailIfCannotConsume(records, recordsCount, 300 * 1000 /* 5 minutes */);
}

private void verifyPrimaryKeyOnly(long recordsCount) {
List<SourceRecord> records = new ArrayList<>();
waitAndFailIfCannotConsume(records, recordsCount);
Expand Down Expand Up @@ -183,23 +129,6 @@ private void verifyValue(long recordsCount) {
}
}

private void verifyEnumValue(long recordsCount) {
List<SourceRecord> records = new ArrayList<>();
waitAndFailIfCannotConsume(records, recordsCount);

String[] enum_val = {"ZERO", "ONE", "TWO"};

try {
for (int i = 0; i < records.size(); ++i) {
assertValueField(records.get(i), "after/id/value", i);
assertValueField(records.get(i), "after/enum_col/value", enum_val[i]);
}
}
catch (Exception e) {
LOGGER.error("Exception caught while parsing records: " + e);
fail();
}
}
@BeforeAll
public static void beforeClass() throws SQLException {
initializeYBContainer();
Expand Down Expand Up @@ -363,31 +292,6 @@ public void testVerifyValue() throws Exception {
}).get();
}

@Test
public void testEnumValue() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("yugabyte_create_tables.ddl");
Thread.sleep(1000);

String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_enum");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_enum", dbStreamId);
startEngine(configBuilder);
assertConnectorIsRunning();

// 3 because there are 3 enum values in the enum type
final long recordsCount = 3;

awaitUntilConnectorIsReady();

// 3 records will be inserted in the table test_enum
insertEnumRecords();

CompletableFuture.runAsync(() -> verifyEnumValue(recordsCount))
.exceptionally(throwable -> {
throw new RuntimeException(throwable);
}).get();
}

@Test
public void testNonPublicSchema() throws Exception {
TestHelper.dropAllSchemas();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.debezium.connector.yugabytedb;

import static org.junit.jupiter.api.Assertions.fail;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;

/**
* Unit tests to verify the streaming of ENUM values.
*
* @author Vaibhav Kushwaha (vkushwaha@yugabyte.com)
*/
public class YugabyteDBEnumValuesTest extends YugabyteDBContainerTestBase {

@BeforeAll
public static void beforeClass() throws SQLException {
initializeYBContainer();
TestHelper.dropAllSchemas();
}

@BeforeEach
public void before() throws Exception {
initializeConnectorTestFramework();
TestHelper.dropAllSchemas();
TestHelper.executeDDL("yugabyte_create_tables.ddl");
}

@AfterEach
public void after() throws Exception {
stopConnector();
TestHelper.executeDDL("drop_tables_and_databases.ddl");
}

@AfterAll
public static void afterClass() {
shutdownYBContainer();
}

@Test
public void testEnumValue() throws Exception {
String dbStreamId = TestHelper.getNewDbStreamId("yugabyte", "test_enum");
Configuration.Builder configBuilder = TestHelper.getConfigBuilder("public.test_enum", dbStreamId);
startEngine(configBuilder);

// 3 because there are 3 enum values in the enum type
final long recordsCount = 3;

awaitUntilConnectorIsReady();

// 3 records will be inserted in the table test_enum
insertEnumRecords();

verifyEnumValue(recordsCount);
}

// This function will one row each of the specified enum labels
private void insertEnumRecords() throws Exception {
String[] enumLabels = {"ZERO", "ONE", "TWO"};
String formatInsertString = "INSERT INTO test_enum VALUES (%d, '%s');";
for (int i = 0; i < enumLabels.length; i++) {
TestHelper.execute(String.format(formatInsertString, i, enumLabels[i]));
}
}

private void verifyEnumValue(long recordsCount) {
List<SourceRecord> records = new ArrayList<>();
waitAndFailIfCannotConsume(records, recordsCount, 2 * 60 * 1000 /* 2 minutes */);

String[] enum_val = {"ZERO", "ONE", "TWO"};

try {
for (int i = 0; i < records.size(); ++i) {
assertValueField(records.get(i), "after/id/value", i);
assertValueField(records.get(i), "after/enum_col/value", enum_val[i]);
}
} catch (Exception e) {
LOGGER.error("Exception caught while parsing records: " + e);
fail();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package io.debezium.connector.yugabytedb;

import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import io.debezium.connector.yugabytedb.common.YugabyteDBContainerTestBase;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.*;

import io.debezium.config.Configuration;
Expand Down Expand Up @@ -248,32 +244,6 @@ private void verifyRecordCount(List<SourceRecord> records, long recordsCount) {
waitAndFailIfCannotConsume(records, recordsCount, 10 * 60 * 1000);
}

private void waitAndFailIfCannotConsume(List<SourceRecord> records, long recordsCount,
long milliSecondsToWait) {
AtomicLong totalConsumedRecords = new AtomicLong();
long seconds = milliSecondsToWait / 1000;
try {
Awaitility.await()
.atMost(Duration.ofSeconds(seconds))
.until(() -> {
int consumed = consumeAvailableRecords(record -> {
LOGGER.debug("The record being consumed is " + record);
records.add(record);
});
if (consumed > 0) {
totalConsumedRecords.addAndGet(consumed);
LOGGER.debug("Consumed " + totalConsumedRecords + " records");
}

return totalConsumedRecords.get() == recordsCount;
});
} catch (ConditionTimeoutException exception) {
fail("Failed to consume " + recordsCount + " records in " + seconds + " seconds, total consumed: " + totalConsumedRecords.get(), exception);
}

assertEquals(recordsCount, totalConsumedRecords.get());
}

protected class Executor implements Runnable {
private final String generateSeries = "INSERT INTO t1 VALUES (generate_series(%d, %d));";
private final int columnCount;
Expand Down
Loading

0 comments on commit aa50e01

Please sign in to comment.