|
28 | 28 | import java.util.concurrent.TimeUnit; |
29 | 29 | import java.util.concurrent.TimeoutException; |
30 | 30 |
|
| 31 | +import org.apache.kafka.clients.admin.AdminClientConfig; |
| 32 | +import org.apache.kafka.clients.consumer.ConsumerConfig; |
31 | 33 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| 34 | +import org.apache.kafka.clients.producer.ProducerConfig; |
32 | 35 | import org.apache.kafka.common.header.Headers; |
33 | 36 | import org.junit.jupiter.api.Test; |
34 | 37 |
|
|
42 | 45 | import org.springframework.kafka.core.ConsumerFactory; |
43 | 46 | import org.springframework.kafka.core.DefaultKafkaConsumerFactory; |
44 | 47 | import org.springframework.kafka.core.DefaultKafkaProducerFactory; |
| 48 | +import org.springframework.kafka.core.KafkaAdmin; |
45 | 49 | import org.springframework.kafka.core.KafkaTemplate; |
46 | 50 | import org.springframework.kafka.core.ProducerFactory; |
47 | 51 | import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; |
@@ -84,7 +88,8 @@ public class ObservationTests { |
84 | 88 | @Test |
85 | 89 | void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template, |
86 | 90 | @Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler, |
87 | | - @Autowired MeterRegistry meterRegistry) |
| 91 | + @Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker, |
| 92 | + @Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin) |
88 | 93 | throws InterruptedException, ExecutionException, TimeoutException { |
89 | 94 |
|
90 | 95 | template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS); |
@@ -171,21 +176,51 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) |
171 | 176 | .hasTimerWithNameAndTags("spring.kafka.listener", |
172 | 177 | KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux")) |
173 | 178 | .hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0")); |
| 179 | + assertThat(admin.getConfigurationProperties()) |
| 180 | + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); |
| 181 | + // producer factory broker different to admin |
| 182 | + assertThat( |
| 183 | + KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class).getConfigurationProperties()) |
| 184 | + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, |
| 185 | + broker.getBrokersAsString() + "," + broker.getBrokersAsString()); |
| 186 | + Object container = KafkaTestUtils |
| 187 | + .getPropertyValue(endpointRegistry.getListenerContainer("obs1"), "containers", List.class).get(0); |
| 188 | + // consumer factory broker different to admin |
| 189 | + assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class) |
| 190 | + .getConfigurationProperties()) |
| 191 | + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, |
| 192 | + broker.getBrokersAsString() + "," + broker.getBrokersAsString() + "," |
| 193 | + + broker.getBrokersAsString()); |
| 194 | + // broker override in annotation |
| 195 | + container = KafkaTestUtils |
| 196 | + .getPropertyValue(endpointRegistry.getListenerContainer("obs2"), "containers", List.class).get(0); |
| 197 | + assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class) |
| 198 | + .getConfigurationProperties()) |
| 199 | + .containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); |
174 | 200 | } |
175 | 201 |
|
176 | 202 | @Configuration |
177 | 203 | @EnableKafka |
178 | 204 | public static class Config { |
179 | 205 |
|
| 206 | + @Bean |
| 207 | + KafkaAdmin admin(EmbeddedKafkaBroker broker) { |
| 208 | + return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString())); |
| 209 | + } |
| 210 | + |
180 | 211 | @Bean |
181 | 212 | ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) { |
182 | 213 | Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker); |
| 214 | + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," |
| 215 | + + broker.getBrokersAsString()); |
183 | 216 | return new DefaultKafkaProducerFactory<>(producerProps); |
184 | 217 | } |
185 | 218 |
|
186 | 219 | @Bean |
187 | 220 | ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) { |
188 | 221 | Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker); |
| 222 | + consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + "," |
| 223 | + + broker.getBrokersAsString() + "," + broker.getBrokersAsString()); |
189 | 224 | return new DefaultKafkaConsumerFactory<>(consumerProps); |
190 | 225 | } |
191 | 226 |
|
@@ -288,7 +323,8 @@ void listen1(ConsumerRecord<Integer, String> in) { |
288 | 323 | this.template.send("observation.testT2", in.value()); |
289 | 324 | } |
290 | 325 |
|
291 | | - @KafkaListener(id = "obs2", topics = "observation.testT2") |
| 326 | + @KafkaListener(id = "obs2", topics = "observation.testT2", |
| 327 | + properties = ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":" + "#{@embeddedKafka.brokersAsString}") |
292 | 328 | void listen2(ConsumerRecord<?, ?> in) { |
293 | 329 | this.record = in; |
294 | 330 | this.latch1.countDown(); |
|
0 commit comments