Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Speed up tests and fix flakiness (#628)
Browse files Browse the repository at this point in the history
### Motivation

The KoP CI tests take much more time than CI tests of branch-2.7.2. 

The main reason is the `cleanup()` phase takes a long time, each time a test is cleaned up, it will take over 10 seconds to complete. This behavior was introduced from apache/pulsar#10199, which made broker shutdown gracefully by default but it would take longer to shutdown.

The other reason is caused by rebalance time. According to my observes, when a Kafka consumer subscribes a topic in KoP, it will take at least 3 seconds. Finally I found it's caused by the GroupInitialRebalanceDelayMs config, which has the same semantics with Kafka's [group.initial.rebalance.delay.ms](https://kafka.apache.org/documentation/#brokerconfigs_group.initial.rebalance.delay.ms). It makes Kafka server wait longer for `JOIN_GROUP` request for more consumers to join so that the rebalance count can reduce. However, it should be set zero  in tests.

After fixing these problems, sometimes the following error may happen and cause flakiness.

```
TwoPhaseCompactor$MockitoMock$1432102834 cannot be returned by getConfiguration()
getConfiguration() should return ServiceConfiguration
***
If you're unsure why you're getting above error read on.
Due to the nature of the syntax above problem might occur because:
1. This exception *might* occur in wrongly written multi-threaded tests.
   Please refer to Mockito FAQ on limitations of concurrency testing.
2. A spy is stubbed using when(spy.foo()).then() syntax. It is safer to stub spies - 
   - with doReturn|Throw() family of methods. More in javadocs for Mockito.spy() method.
```

It's because `PulsarService#newCompactor` is not mocked well, see apache/pulsar#7102 for detail.

### Modifications

- Configure `GroupInitialRebalanceDelayMs` and `BrokerShutdownTimeoutMs` for each mocked `BrokerService`.
- Fix the flakiness caused by mocking compactor.

After the changes, the tests time has a significant improvement.

For example, `GroupCoordinatorTest` takes only 3 minutes now but it could take 9 minutes before. Because the `cleanup()` method is marked as `@AfterMethod` and would be called each time a single test finished.

Another example is that `BasicEndToEndKafkaTest` takes only 37 seconds now but it could take 56 seconds before. The `cleanup()` is marked as `@AfterClass` and only happens once, but many consumers will be created during the whole tests and each time a subscribe call can take 3 seconds.

* Speed up tests and fix flakiness

* Speed up tests that creat their own configs

* Ignore golang-sarama tests
  • Loading branch information
BewareMyPower authored Jul 29, 2021
1 parent d13432b commit eee30f0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int
kConfig.setAllowAutoTopicCreation(true);
kConfig.setAllowAutoTopicCreationType("partitioned");
kConfig.setBrokerDeleteInactiveTopicsEnabled(false);
kConfig.setGroupInitialRebalanceDelayMs(0);
kConfig.setBrokerShutdownTimeoutMs(0);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int
kConfig.setBrokerDeleteInactiveTopicsEnabled(false);
kConfig.setSystemTopicEnabled(true);
kConfig.setTopicLevelPoliciesEnabled(true);
kConfig.setGroupInitialRebalanceDelayMs(0);
kConfig.setBrokerShutdownTimeoutMs(0);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ public KafkaIntegrationTest(final String entryFormat) {
@DataProvider
public static Object[][] integrations() {
return new Object[][]{
{"golang-sarama", Optional.empty(), true, true},
{"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true},
// TODO: golang-sarama works well but it's very likely to complete before testcontainers catch the logs
// so that GenericContainer failed to start. Ignore these two cases first.
// See https://github.com/streamnative/kop/issues/629.
//{"golang-sarama", Optional.empty(), true, true},
//{"golang-sarama", Optional.of("persistent://public/default/my-sarama-topic-full-name"), true, true},
{"golang-confluent-kafka", Optional.empty(), true, true},
// TODO: rustlang-rdkafka is failing on Github Actions and works locally, we need to investigate
// {"rustlang-rdkafka", Optional.empty(), true, true},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

Expand Down Expand Up @@ -177,6 +178,9 @@ protected void resetConfig() {
+ SSL_PREFIX + "localhost:" + kafkaBrokerPortTls);
kafkaConfig.setEntryFormat(entryFormat);

// Speed up tests for reducing rebalance time
kafkaConfig.setGroupInitialRebalanceDelayMs(0);

// set protocol related config
URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar");
Path handlerPath;
Expand Down Expand Up @@ -313,6 +317,8 @@ protected void restartBroker() throws Exception {
}

protected void stopBroker() throws Exception {
// set shutdown timeout to 0 for forceful shutdown
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsar.close();
}

Expand Down Expand Up @@ -344,6 +350,7 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();

doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor();
doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor();
}

public static MockZooKeeper createMockZooKeeper(String clusterName, String brokerUrl, String brokerUrlTls,
Expand Down

0 comments on commit eee30f0

Please sign in to comment.