Skip to content

Commit

Permalink
Addressing PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
sobychacko committed Feb 8, 2024
1 parent 7bb9dec commit 6fc7c87
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
@Nullable
private KafkaAdmin kafkaAdmin;

protected AtomicBoolean enforceRebalanceRequested = new AtomicBoolean();
protected final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean();

/**
* Construct an instance with the provided factory and properties.
Expand Down Expand Up @@ -628,10 +628,6 @@ public void enforceRebalance() {
this.enforceRebalanceRequested.set(true);
}

protected boolean isEnforceRebalanceRequested() {
return this.enforceRebalanceRequested.get();
}

@Override
public void pause() {
this.paused = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ && getContainerProperties().isRestartAfterAuthExceptions()
public void enforceRebalance() {
this.lifecycleLock.lock();
try {
// Since enforceRebalance only needs to be applied against a single container, we randomly pick the first one.
// Since the rebalance is for the whole consumer group, there is no need to
// initiate this operation for every single container in the group.
final KafkaMessageListenerContainer<K, V> listenerContainer = this.containers.get(0);
listenerContainer.enforceRebalance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public void enforceRebalance() {
consumer.wakeIfNecessary();
}
}

@Override
public void pause() {
super.pause();
Expand Down Expand Up @@ -1429,9 +1430,9 @@ protected void pollAndInvoke() {
if (!this.seeks.isEmpty()) {
processSeeks();
}
enforceRebalanceIfNecessary();
pauseConsumerIfNecessary();
pausePartitionsIfNecessary();
enforceRebalanceIfNecessary();
this.lastPoll = System.currentTimeMillis();
if (!isRunning()) {
return;
Expand Down Expand Up @@ -1749,14 +1750,16 @@ private void sleepFor(Duration duration) {
}

private void enforceRebalanceIfNecessary() {
final AtomicBoolean enforceRebalanceRequested = KafkaMessageListenerContainer.super.enforceRebalanceRequested;
try {
if (enforceRebalanceRequested.get()) {
this.consumer.enforceRebalance("Enforce rebalance requested.");
if (KafkaMessageListenerContainer.this.enforceRebalanceRequested.get()) {
final String enforcedRebalanceReason = String.format("Enforced rebalance requested for container: %s",
KafkaMessageListenerContainer.this.getListenerId());
KafkaMessageListenerContainer.this.logger.info(enforcedRebalanceReason);
this.consumer.enforceRebalance(enforcedRebalanceReason);
}
}
finally {
enforceRebalanceRequested.set(false);
KafkaMessageListenerContainer.this.enforceRebalanceRequested.set(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -48,6 +50,7 @@

/**
* @author Soby Chacko
* @since 3.1.2
*/
@SpringJUnitConfig
@DirtiesContext
Expand All @@ -68,6 +71,19 @@ void enforceRebalance(@Autowired Config config, @Autowired KafkaTemplate<Integer
final List<? extends KafkaMessageListenerContainer<?, ?>> containers =
((ConcurrentMessageListenerContainer<?, ?>) listenerContainer).getContainers();
assertThat(containers.get(0).enforceRebalanceRequested).isFalse();
listenerContainer.pause();
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isPauseRequested()).isTrue());
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isContainerPaused()).isTrue());
// resetting the latches
config.partitionRevokedLatch = new CountDownLatch(1);
config.partitionAssignedLatch = new CountDownLatch(1);
listenerContainer.enforceRebalance();
assertThat(config.partitionRevokedLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(config.partitionAssignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
// Although the rebalance causes the consumer to resume again, since the container is paused,
// it will pause the rebalanced consumers again.
assertThat(listenerContainer.isPauseRequested()).isTrue();
assertThat(listenerContainer.isContainerPaused()).isTrue();
}

@Configuration
Expand All @@ -78,6 +94,7 @@ public static class Config {
EmbeddedKafkaBroker broker;

CountDownLatch partitionRevokedLatch = new CountDownLatch(1);

CountDownLatch partitionAssignedLatch = new CountDownLatch(2);

CountDownLatch listenerLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2511,7 +2511,6 @@ void enforceRabalanceOnTheConsumer() throws Exception {

container.start();
container.enforceRebalance();
assertThat(container.isEnforceRebalanceRequested()).isTrue();
assertThat(enforceRebalanceLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

Expand Down

0 comments on commit 6fc7c87

Please sign in to comment.