Skip to content

Commit 85fad6b

Browse files
garyrussellartembilan
authored andcommitted
Add missingTopicsFatal to container factory
See spring-projects/spring-boot#16727
1 parent 17e2e80 commit 85fad6b

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
9696

9797
private ReplyHeadersConfigurer replyHeadersConfigurer;
9898

99+
private Boolean missingTopicsFatal;
100+
99101
/**
100102
* Specify a {@link ConsumerFactory} to use.
101103
* @param consumerFactory The consumer factory.
@@ -258,6 +260,17 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu
258260
this.replyHeadersConfigurer = replyHeadersConfigurer;
259261
}
260262

263+
/**
264+
* Set to false to allow the container to start even if any of the configured topics
265+
* are not present on the broker. Does not apply when topic patterns are configured.
266+
* Default true;
267+
* @param missingTopicsFatal the missingTopicsFatal.
268+
* @since 2.3
269+
*/
270+
public void setMissingTopicsFatal(boolean missingTopicsFatal) {
271+
this.missingTopicsFatal = missingTopicsFatal;
272+
}
273+
261274
/**
262275
* Obtain the properties template for this factory - set properties as needed
263276
* and they will be copied to a final properties instance for the endpoint.
@@ -335,7 +348,8 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
335348
properties::setAckCount)
336349
.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
337350
properties::setAckTime)
338-
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler);
351+
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
352+
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
339353
if (endpoint.getAutoStartup() != null) {
340354
instance.setAutoStartup(endpoint.getAutoStartup());
341355
}

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.springframework.kafka.annotation.KafkaHandler;
6363
import org.springframework.kafka.annotation.KafkaListener;
6464
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
65+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
6566
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
6667
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
6768
import org.springframework.kafka.core.KafkaTemplate;
@@ -131,6 +132,9 @@ public class ReplyingKafkaTemplateTests {
131132
@Autowired
132133
private Config config;
133134

135+
@Autowired
136+
private KafkaListenerEndpointRegistry registry;
137+
134138
@Test
135139
public void testGood() throws Exception {
136140
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(A_REPLY);
@@ -147,6 +151,10 @@ public void testGood() throws Exception {
147151
new DefaultKafkaHeaderMapper().toHeaders(consumerRecord.headers(), receivedHeaders);
148152
assertThat(receivedHeaders).containsKey("baz");
149153
assertThat(receivedHeaders).hasSize(2);
154+
assertThat(KafkaTestUtils.getPropertyValue(
155+
this.registry.getListenerContainer(A_REQUEST), "containerProperties.missingTopicsFatal",
156+
Boolean.class))
157+
.isFalse();
150158
}
151159
finally {
152160
template.stop();
@@ -501,6 +509,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerCon
501509
factory.setConsumerFactory(cf());
502510
factory.setReplyTemplate(template());
503511
factory.setReplyHeadersConfigurer((k, v) -> k.equals("baz"));
512+
factory.setMissingTopicsFatal(false);
504513
return factory;
505514
}
506515

@@ -529,7 +538,7 @@ public Map<String, Object> additionalHeaders() {
529538
return factory;
530539
}
531540

532-
@KafkaListener(topics = A_REQUEST)
541+
@KafkaListener(id = A_REQUEST, topics = A_REQUEST)
533542
@SendTo // default REPLY_TOPIC header
534543
public String handleA(String in) {
535544
return in.toUpperCase();

0 commit comments

Comments
 (0)