Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Future;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
Expand All @@ -38,6 +39,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -426,6 +428,14 @@ public void afterSingletonsInstantiated() {
.getIfUnique(() -> this.observationRegistry);
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
if (this.kafkaAdmin != null) {
Object producerServers = this.producerFactory.getConfigurationProperties()
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
String adminServers = this.kafkaAdmin.getBootstrapServers();
if (!producerServers.equals(adminServers)) {
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
this.kafkaAdmin = new KafkaAdmin(props);
}
this.clusterId = this.kafkaAdmin.clusterId();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -784,6 +785,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
@Nullable
private final KafkaAdmin kafkaAdmin;

private final Object bootstrapServers;

private String clusterId;

private Map<TopicPartition, OffsetMetadata> definedPartitions;
Expand Down Expand Up @@ -848,6 +851,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
this.bootstrapServers = determineBootstrapServers(consumerProperties);

this.clientId = determineClientId();
this.transactionTemplate = determineTransactionTemplate();
Expand Down Expand Up @@ -921,12 +925,30 @@ else if (listener instanceof MessageListener) {
this.kafkaAdmin = obtainAdmin();
}

@Nullable
private Object determineBootstrapServers(Properties consumerProperties) {
Object servers = consumerProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (servers == null) {
servers = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties()
.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}
return servers;
}

@Nullable
private KafkaAdmin obtainAdmin() {
if (this.containerProperties.isObservationEnabled()) {
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
return applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
KafkaAdmin admin = applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
if (admin != null) {
Map<String, Object> props = new HashMap<>(admin.getConfigurationProperties());
if (!props.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG).equals(this.bootstrapServers)) {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
admin = new KafkaAdmin(props);
}
}
return admin;
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Headers;
import org.junit.jupiter.api.Test;

Expand All @@ -42,6 +45,7 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
Expand Down Expand Up @@ -84,7 +88,8 @@ public class ObservationTests {
@Test
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@Autowired MeterRegistry meterRegistry)
@Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker,
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin)
throws InterruptedException, ExecutionException, TimeoutException {

template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -171,21 +176,51 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
.hasTimerWithNameAndTags("spring.kafka.listener",
KeyValues.of("spring.kafka.listener.id", "obs1-0", "baz", "qux"))
.hasTimerWithNameAndTags("spring.kafka.listener", KeyValues.of("spring.kafka.listener.id", "obs2-0"));
assertThat(admin.getConfigurationProperties())
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
// producer factory broker different to admin
assertThat(
KafkaTestUtils.getPropertyValue(template, "kafkaAdmin", KafkaAdmin.class).getConfigurationProperties())
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
broker.getBrokersAsString() + "," + broker.getBrokersAsString());
Object container = KafkaTestUtils
.getPropertyValue(endpointRegistry.getListenerContainer("obs1"), "containers", List.class).get(0);
// consumer factory broker different to admin
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class)
.getConfigurationProperties())
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
broker.getBrokersAsString() + "," + broker.getBrokersAsString() + ","
+ broker.getBrokersAsString());
// broker override in annotation
container = KafkaTestUtils
.getPropertyValue(endpointRegistry.getListenerContainer("obs2"), "containers", List.class).get(0);
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.kafkaAdmin", KafkaAdmin.class)
.getConfigurationProperties())
.containsEntry(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
}

@Configuration
@EnableKafka
public static class Config {

@Bean
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
return new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()));
}

@Bean
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
+ broker.getBrokersAsString());
return new DefaultKafkaProducerFactory<>(producerProps);
}

@Bean
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("obs", "false", broker);
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString() + ","
+ broker.getBrokersAsString() + "," + broker.getBrokersAsString());
return new DefaultKafkaConsumerFactory<>(consumerProps);
}

Expand Down Expand Up @@ -288,7 +323,8 @@ void listen1(ConsumerRecord<Integer, String> in) {
this.template.send("observation.testT2", in.value());
}

@KafkaListener(id = "obs2", topics = "observation.testT2")
@KafkaListener(id = "obs2", topics = "observation.testT2",
properties = ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + ":" + "#{@embeddedKafka.brokersAsString}")
void listen2(ConsumerRecord<?, ?> in) {
this.record = in;
this.latch1.countDown();
Expand Down