Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tests and reduce the overall test execution time #939

Merged
merged 2 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


class FileProcessor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(FileConnector.class);
private static final Logger LOG = LoggerFactory.getLogger(FileProcessor.class);

private static final int PARTITION = 0;
private static final int POLL_WAIT_MS = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ abstract public class AbstractKafkaBasedConnectorTask implements Runnable, Consu
protected final GroupIdConstructor _groupIdConstructor;

protected final KafkaTopicPartitionTracker _kafkaTopicPartitionTracker;
private long _commitRetryTimeoutMillis;

protected AbstractKafkaBasedConnectorTask(KafkaBasedConnectorConfig config, DatastreamTask task, Logger logger,
String metricsPrefix, GroupIdConstructor groupIdConstructor) {
Expand Down Expand Up @@ -200,6 +201,7 @@ protected AbstractKafkaBasedConnectorTask(KafkaBasedConnectorConfig config, Data
_pollTimeoutMillis = config.getPollTimeoutMillis();
_retrySleepDuration = config.getRetrySleepDuration();
_commitTimeout = config.getCommitTimeout();
_commitRetryTimeoutMillis = COMMIT_RETRY_TIMEOUT_MILLIS;
_consumerMetrics = createKafkaBasedConnectorTaskMetrics(metricsPrefix, _datastreamName, _logger,
_enableAdditionalMetrics);

Expand Down Expand Up @@ -686,7 +688,7 @@ protected void commitWithRetries(Consumer<?, ?> consumer, Optional<Map<TopicPart
}

return true;
}, COMMIT_RETRY_INTERVAL_MILLIS, COMMIT_RETRY_TIMEOUT_MILLIS);
}, COMMIT_RETRY_INTERVAL_MILLIS, getCommitRetryTimeoutMillis());
surajkn marked this conversation as resolved.
Show resolved Hide resolved

if (!result) {
String msg = "Commit failed after several retries, Giving up.";
Expand All @@ -695,6 +697,15 @@ protected void commitWithRetries(Consumer<?, ?> consumer, Optional<Map<TopicPart
}
}

@VisibleForTesting
public void setCommitRetryTimeoutMillis(long retryTimeoutMillis) {
_commitRetryTimeoutMillis = retryTimeoutMillis;
}

private long getCommitRetryTimeoutMillis() {
return _commitRetryTimeoutMillis;
}

private void commitSync(Consumer<?, ?> consumer, Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
if (offsets.isPresent()) {
consumer.commitSync(offsets.get(), _commitTimeout);
Expand All @@ -714,7 +725,7 @@ protected void seekToLastCheckpoint(Set<TopicPartition> topicPartitions) {
Set<TopicPartition> tpWithNoCommits = new HashSet<>();
// construct last checkpoint
topicPartitions.forEach(tp -> getLastCheckpointToSeekTo(lastCheckpoint, tpWithNoCommits, tp));
_logger.info("Seeking to previous checkpoints {}", lastCheckpoint);
_logger.info("Seeking to previous checkpoints {} ", lastCheckpoint);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: redundant change

// reset consumer to last checkpoint, by default we will rewind the checkpoint
lastCheckpoint.forEach((tp, offsetAndMetadata) -> _consumer.seek(tp, offsetAndMetadata.offset()));
if (!tpWithNoCommits.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ static KafkaMirrorMakerConnectorTask createKafkaMirrorMakerConnectorTask(Datastr

static KafkaMirrorMakerConnectorTask createKafkaMirrorMakerConnectorTask(DatastreamTaskImpl task,
KafkaBasedConnectorConfig connectorConfig, String connectorName) {
return new KafkaMirrorMakerConnectorTask(connectorConfig, task, connectorName, false,
KafkaMirrorMakerConnectorTask kafkaMirrorMakerConnectorTask = new KafkaMirrorMakerConnectorTask(connectorConfig, task, connectorName, false,
new KafkaMirrorMakerGroupIdConstructor(false, "testCluster"));
kafkaMirrorMakerConnectorTask.setCommitRetryTimeoutMillis(1000);
return kafkaMirrorMakerConnectorTask;
}

static KafkaMirrorMakerConnectorTask createFlushlessKafkaMirrorMakerConnectorTask(DatastreamTaskImpl task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
/**
* Tests for {@link KafkaMirrorMakerConnectorTask}
*/
@Test(singleThreaded = false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be doing this in all our test classes?
I thought by default each test was executed in its own thread and run in parallel. Is that not the case ?

public class TestKafkaMirrorMakerConnectorTask extends BaseKafkaZkTest {

private static final long CONNECTOR_AWAIT_STOP_TIMEOUT_MS = 30000;
Expand Down Expand Up @@ -553,7 +554,7 @@ public void testPartitionManagedLockReleaseOnInterruptException() throws Interru

KafkaBasedConnectorConfig connectorConfig = new KafkaBasedConnectorConfigBuilder()
.setConsumerFactory(new LiKafkaConsumerFactory())
.setCommitIntervalMillis(10000)
.setCommitIntervalMillis(100)
.setEnablePartitionManaged(true)
.build();

Expand All @@ -577,7 +578,7 @@ public void testPartitionManagedLockReleaseOnThreadInterrupt() throws Interrupte

KafkaBasedConnectorConfig connectorConfig = new KafkaBasedConnectorConfigBuilder()
.setConsumerFactory(new LiKafkaConsumerFactory())
.setCommitIntervalMillis(10000)
.setCommitIntervalMillis(100)
.setEnablePartitionManaged(true)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void startup() {
properties.setProperty("log.flush.interval.messages", String.valueOf(1));
properties.setProperty("log.cleaner.enable", Boolean.FALSE.toString()); //to save memory
properties.setProperty("offsets.topic.num.partitions", "1");
properties.setProperty("offsets.topic.replication.factor", "1");
surajkn marked this conversation as resolved.
Show resolved Hide resolved

KafkaServerStartable broker = startBroker(properties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void waitForTopicCreation(AdminClient adminClient, String topic, S
consumer.subscribe(Collections.singleton(topic));
while (Instant.now().isBefore(expiration)) {
try {
consumer.poll(Duration.ofSeconds(1).toMillis());
consumer.poll(Duration.ofSeconds(1));
return;
} catch (Exception ignored) {
// Exception should occur when we are waiting for the broker to be assigned this topic
Expand Down Expand Up @@ -189,7 +189,7 @@ public static void readTopic(String topic, Integer partition, String brokerList,
boolean keepGoing = true;
long now = System.currentTimeMillis();
do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<byte[], byte[]> record : records.records(topic)) {
if (!callback.onMessage(record.key(), record.value())) {
keepGoing = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,15 @@ public ActionResult<Void> stop(@PathKeysParam PathKeys pathKeys,
}
}

long t1 = System.currentTimeMillis();
// polls until the leader transitions the state of the datastream to STOPPED state
PollUtils.poll(() -> datastreamsToStop.stream()
.allMatch(ds -> _store.getDatastream(ds.getName()).getStatus().equals(DatastreamStatus.STOPPED)),
allStopped -> allStopped, _stopTransitionRetryPeriodMs.toMillis(), _stopTransitionTimeoutMs.toMillis())
PollUtils.poll(() ->
datastreamsToStop.stream()
.allMatch(ds ->
_store.getDatastream(ds.getName()).getStatus().equals(DatastreamStatus.STOPPED)),
allStopped -> allStopped, _stopTransitionRetryPeriodMs.toMillis(), _stopTransitionTimeoutMs.toMillis())
.orElseThrow(() -> new RestLiServiceException(HttpStatus.S_408_REQUEST_TIMEOUT,
String.format("Stop request timed out for datastream: %s", datastreamName)));
String.format("Stop request timed out for datastream: %s %d", datastreamName, System.currentTimeMillis() - t1)));

LOG.info("Completed request for stopping datastream {}", _store.getDatastream(datastream.getName()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
public class TestCoordinator {
private static final Logger LOG = LoggerFactory.getLogger(TestCoordinator.class);
private static final long WAIT_DURATION_FOR_ZK = Duration.ofMinutes(1).toMillis();
private static final int WAIT_TIMEOUT_MS = 60000;
private static final int WAIT_TIMEOUT_MS = 2000;

EmbeddedZookeeper _embeddedZookeeper;
String _zkConnectionString;
Expand Down Expand Up @@ -523,7 +523,7 @@ public void testCoordinationWithBroadcastStrategy() throws Exception {

@Test
public void testHandleAssignmentChangeTransientFailure() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeTransientFailure";
String testConnectorType = "testConnectorType";
String datastreamName1 = "datastream1";

Expand All @@ -550,7 +550,7 @@ public void testHandleAssignmentChangeTransientFailure() throws Exception {

@Test
public void testHandleAssignmentChangeFailure() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeChangeFailure";
String testConnectorType = "testConnectorType";
String datastreamName1 = "datastream1";

Expand All @@ -575,7 +575,7 @@ public void testHandleAssignmentChangeFailure() throws Exception {

@Test
public void testStopAndResumeDatastream() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeResumeDatastream";
String testConnectorType = "testConnectorType";
String datastreamName1 = "datastream1";

Expand Down Expand Up @@ -627,7 +627,7 @@ public void testStopAndResumeDatastream() throws Exception {
*/
@Test
public void testCoordinationWithStickyMulticastStrategy() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeWithSticky";
String testConnectorType = "testConnectorType";
Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);

Expand Down Expand Up @@ -708,8 +708,8 @@ public void testCoordinationWithStickyMulticastStrategy() throws Exception {
instance4.start();

connectors.add(connector4);
// verify connector4 get at least 5 task assignment
waitTillAssignmentIsComplete(5, WAIT_TIMEOUT_MS, connector4);
// verify connector4 get at least 4 task assignment
waitTillAssignmentIsComplete(4, WAIT_TIMEOUT_MS, connector4);

instance2.stop();
instance3.stop();
Expand Down Expand Up @@ -845,7 +845,7 @@ public void testCoordinationWithStickyMulticastStrategyAndMaxTaskLimit() throws

@Test
public void testCoordinationWithPartitionAssignment() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeWithPartitionAssignment";
String testConnectorType = "testConnectorType";
Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);
ZkClient zkClient = new ZkClient(_zkConnectionString);
Expand Down Expand Up @@ -1051,7 +1051,7 @@ public void testCoordinationWithElasticTaskAssignmentPartitionAssignment() throw
// Verify all the partitions are assigned
Map<String, List<String>> assignment2 = collectDatastreamPartitions(connectors);
return assignment2.get("datastream42").size() == partitions1.size() && assignment2.get("datastream43").size() == partitions2.size();
}, interval, WAIT_TIMEOUT_MS));
}, interval, WAIT_TIMEOUT_MS * 5));

instance1.stop();
instance2.stop();
Expand All @@ -1077,7 +1077,7 @@ private StickyPartitionAssignmentStrategy createStrategy(String testCluster, ZkC
*/
@Test
public void testBYOTDatastreamWithUsedDestination() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeWithUsedDestination";
String testConnectorType = "testConnectorType";

Coordinator coordinator = createCoordinator(_zkConnectionString, testCluster);
Expand Down Expand Up @@ -1229,7 +1229,7 @@ public void testInvokePostDataStreamStateChangeAction() throws Exception {
*/
@Test
public void testDatastreamWithConnectorManagedDestination() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeWithConnectorManagedDestination";
String testConnectorType = "testConnectorType";

DummyTransportProviderAdminFactory transportProviderAdminFactory = new DummyTransportProviderAdminFactory();
Expand Down Expand Up @@ -1272,7 +1272,7 @@ public void testDatastreamWithConnectorManagedDestination() throws Exception {
*/
@Test
public void testDatastreamWithoutConnectorManagedDestination() throws Exception {
String testCluster = "testCoordinationSmoke";
String testCluster = "testCoordinationSmokeWithoutConnectorManagedDestination";
String testConnectorType = "testConnectorType";

DummyTransportProviderAdminFactory transportProviderAdminFactory = new DummyTransportProviderAdminFactory();
Expand All @@ -1297,6 +1297,11 @@ public void testDatastreamWithoutConnectorManagedDestination() throws Exception
Assert.assertEquals(transportProviderAdminFactory._createDestinationCount, 1,
"Create destination count should have been 1, since Datastream does not have connector-managed destination");

// wait for datastream to be READY
Assert.assertTrue(PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, datastreamName)
.getStatus()
.equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS));

resource.delete(datastreamName);
String path = KeyBuilder.datastream(testCluster, datastreamName);
Assert.assertTrue(PollUtils.poll(() -> !zkClient.exists(path), 200, WAIT_TIMEOUT_MS));
Expand Down Expand Up @@ -1444,9 +1449,9 @@ public void testCoordinatorHandleUpdateDatastream() throws Exception {
LOG.info("Created datastream: {}", datastream);

// wait for datastream to be READY
PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, "datastream1")
Assert.assertTrue(PollUtils.poll(() -> DatastreamTestUtils.getDatastream(zkClient, testCluster, "datastream1")
.getStatus()
.equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS);
.equals(DatastreamStatus.READY), 1000, WAIT_TIMEOUT_MS));
datastream = DatastreamTestUtils.getDatastream(zkClient, testCluster, datastream.getName());
assertConnectorAssignment(connector1, WAIT_TIMEOUT_MS, datastream.getName());
assertConnectorAssignment(connector2, WAIT_TIMEOUT_MS, datastream.getName());
Expand Down Expand Up @@ -2559,12 +2564,6 @@ public void testDatastreamDeleteUponTTLExpire() throws Exception {
Datastream[] streams = DatastreamTestUtils.createDatastreams(DummyConnector.CONNECTOR_TYPE, streamNames);
streams[0].getSource().setConnectionString(DummyConnector.VALID_DUMMY_SOURCE);
streams[1].getSource().setConnectionString(DummyConnector.VALID_DUMMY_SOURCE);
streams[0].getDestination()
.setConnectionString(new KafkaDestination(setup._datastreamKafkaCluster.getKafkaCluster().getZkConnection(),
"TestDatastreamTopic1", false).getDestinationURI());
streams[1].getDestination()
.setConnectionString(new KafkaDestination(setup._datastreamKafkaCluster.getKafkaCluster().getZkConnection(),
"TestDatastreamTopic2", false).getDestinationURI());

// stream1 expires after 500ms and should get deleted when stream2 is created
long threeDaysAgo = Instant.now().minus(Duration.ofDays(3)).toEpochMilli();
Expand Down Expand Up @@ -2610,13 +2609,17 @@ public void testMultipleDatastreamDeleteUponTTLExpire() throws Exception {

// stream1 and stream2 expire in 1 minute from now and should get deleted when stream3 is created
long createTime = Instant.now().toEpochMilli();
long expireTTL = Duration.ofMinutes(1).toMillis();
long expireTTL = Duration.ofSeconds(5).toMillis();

streams[0].getMetadata().put(CREATION_MS, String.valueOf(createTime));
streams[0].getMetadata().put(TTL_MS, String.valueOf(expireTTL));
streams[1].getMetadata().put(CREATION_MS, String.valueOf(createTime));
streams[1].getMetadata().put(TTL_MS, String.valueOf(expireTTL));

streams[0].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
streams[1].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
streams[2].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());

// Creation should go through as TTL is not considered for freshly created streams (INITIALIZING)
CreateResponse createResponse = setup._resource.create(streams[0]);
Assert.assertNull(createResponse.getError());
Expand All @@ -2625,35 +2628,39 @@ public void testMultipleDatastreamDeleteUponTTLExpire() throws Exception {
Assert.assertNull(createResponse.getError());
Assert.assertEquals(createResponse.getStatus(), HttpStatus.S_201_CREATED);

// Sleep for 1 minute to wait for stream1 and stream2 to expire.
Thread.sleep(Duration.ofMinutes(1).toMillis());
// Sleep for 5 sec to wait for stream1 and stream2 to expire.
Thread.sleep(Duration.ofSeconds(5).toMillis());

// Creating a stream3 which should trigger stream1 to be deleted
createResponse = setup._resource.create(streams[2]);
Assert.assertNull(createResponse.getError());
Assert.assertEquals(createResponse.getStatus(), HttpStatus.S_201_CREATED);

// Poll up to 30s for stream1 to get deleted
PollUtils.poll(() -> {
// Poll up to 10s for stream1 to get deleted
boolean status = PollUtils.poll(() -> {
try {
setup._resource.get(streams[0].getName());
setup._coordinator.getDatastreamCache().getZkclient().writeData(KeyBuilder.datastreams(setup._coordinator.getClusterName()), "1234");
return false;
} catch (RestLiServiceException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND);
return true;
}
}, 200, Duration.ofSeconds(30).toMillis());
}, 200, Duration.ofSeconds(10).toMillis());
Assert.assertTrue(status);

// Poll up to 30s for stream2 to get deleted
PollUtils.poll(() -> {
// Poll up to 10s for stream2 to get deleted
status = PollUtils.poll(() -> {
try {
setup._resource.get(streams[1].getName());
setup._coordinator.getDatastreamCache().getZkclient().writeData(KeyBuilder.datastreams(setup._coordinator.getClusterName()), "5678");
return false;
} catch (RestLiServiceException e) {
Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND);
return true;
}
}, 200, Duration.ofSeconds(30).toMillis());
}, 200, Duration.ofSeconds(10).toMillis());
Assert.assertTrue(status);
}

@Test
Expand All @@ -2671,6 +2678,9 @@ public void testDoNotAssignExpiredStreams() throws Exception {
.setConnectionString(new KafkaDestination(setup._datastreamKafkaCluster.getKafkaCluster().getZkConnection(),
"TestDatastreamTopic2", false).getDestinationURI());

streams[0].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());
streams[1].getMetadata().put(DatastreamMetadataConstants.IS_CONNECTOR_MANAGED_DESTINATION_KEY, Boolean.TRUE.toString());

// stream2 expires after 500ms and should not get assigned
long threeDaysAgo = Instant.now().minus(Duration.ofDays(3)).toEpochMilli();
streams[1].getMetadata().put(CREATION_MS, String.valueOf(threeDaysAgo));
Expand Down Expand Up @@ -3156,7 +3166,7 @@ public void testOnSessionExpiredHandleNewSession() throws Exception {
}

void testOnSessionExpired(boolean handleNewSession) throws DatastreamException, InterruptedException {
String testCluster = "testCoordinationSmoke3";
String testCluster = "testOnSessionExpired";
String testConnectorType = "testConnectorType";
String datastreamName = "datastreamNameSessionExpired";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,13 @@ public void testStopRequestTimeoutWithBusyLeader() throws DatastreamException {
Mockito.doReturn(mockDatastreamStore).when(mockDatastreamServer).getDatastreamStore();
Mockito.doReturn(mockDatastreamServer).when(mockDatastreamCluster).getPrimaryDatastreamServer();

DatastreamResources resource1 = new DatastreamResources(mockDatastreamCluster.getPrimaryDatastreamServer());
// Configuring small timeouts to mock timeout scenario
Properties testProperties = new Properties();
testProperties.setProperty(CONFIG_STOP_TRANSITION_TIMEOUT_MS, "1000");
testProperties.setProperty(CONFIG_STOP_TRANSITION_RETRY_PERIOD_MS, "5");

DatastreamResources resource1 = new DatastreamResources(mockDatastreamCluster.getPrimaryDatastreamServer().getDatastreamStore(),
mockDatastreamCluster.getPrimaryDatastreamServer().getCoordinator(), testProperties);

// Create a Datastream.
Datastream datastreamToCreate = generateDatastream(0);
Expand Down
Loading