Skip to content

Commit 133514b

Browse files
Bastien Boucletgaryrussell
authored andcommitted
GH-1111 Propagate the CMLC paused state on start
The paused state from ConcurrentMessageListenerContainer was not propagated to the newly instanciated KafkaMessageListenerContainers when calling start().
1 parent 369ffa6 commit 133514b

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ protected void doStart() {
167167
});
168168
publishContainerStoppedEvent();
169169
});
170+
if (isPaused()) {
171+
container.pause();
172+
}
170173
container.start();
171174
this.containers.add(container);
172175
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,11 @@ public class ConcurrentMessageListenerContainerTests {
9696

9797
private static String topic11 = "testTopic11";
9898

99+
private static String topic12 = "testTopic12";
100+
99101
@ClassRule
100102
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic1, topic2, topic4, topic5,
101-
topic6, topic7, topic8, topic9, topic10, topic11);
103+
topic6, topic7, topic8, topic9, topic10, topic11, topic12);
102104

103105
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
104106

@@ -451,6 +453,44 @@ public void testManualCommitSyncExisting() throws Exception {
451453
this.logger.info("Stop MANUAL_IMMEDIATE with Existing");
452454
}
453455

456+
@Test
457+
public void testPausedStart() throws Exception {
458+
this.logger.info("Start paused start");
459+
Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "false", embeddedKafka);
460+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
461+
ContainerProperties containerProps = new ContainerProperties(topic12);
462+
463+
final CountDownLatch latch = new CountDownLatch(2);
464+
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
465+
ConcurrentMessageListenerContainerTests.this.logger.info("paused start: " + message);
466+
latch.countDown();
467+
});
468+
469+
ConcurrentMessageListenerContainer<Integer, String> container =
470+
new ConcurrentMessageListenerContainer<>(cf, containerProps);
471+
container.setConcurrency(2);
472+
container.setBeanName("testBatch");
473+
container.pause();
474+
container.start();
475+
476+
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
477+
478+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
479+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
480+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
481+
template.setDefaultTopic(topic12);
482+
template.sendDefault(0, "foo");
483+
template.sendDefault(2, "bar");
484+
template.flush();
485+
assertThat(latch.await(100, TimeUnit.MILLISECONDS)).isFalse();
486+
487+
container.resume();
488+
489+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
490+
container.stop();
491+
this.logger.info("Stop paused start");
492+
}
493+
454494
@Test
455495
@SuppressWarnings("unchecked")
456496
public void testConcurrencyWithPartitions() {

0 commit comments

Comments
 (0)