Skip to content

Commit 4b6e79c

Browse files
committed
Merge pull request #9775 from garyrussell:kafkaProps
* pr/9775: Polish "Add support for arbitrary producer/consumer Kafka properties" Add support for arbitrary producer/consumer Kafka properties
2 parents 43e1df7 + 38ad582 commit 4b6e79c

File tree

5 files changed

+43
-149
lines changed

5 files changed

+43
-149
lines changed

spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ public class KafkaProperties {
6363
private String clientId;
6464

6565
/**
66-
* Additional properties used to configure the client.
66+
* Additional properties, common to producers and consumers, used to configure the
67+
* client.
6768
*/
68-
private Map<String, String> properties = new HashMap<>();
69+
private final Map<String, String> properties = new HashMap<>();
6970

7071
private final Consumer consumer = new Consumer();
7172

@@ -99,10 +100,6 @@ public Map<String, String> getProperties() {
99100
return this.properties;
100101
}
101102

102-
public void setProperties(Map<String, String> properties) {
103-
this.properties = properties;
104-
}
105-
106103
public Consumer getConsumer() {
107104
return this.consumer;
108105
}
@@ -268,6 +265,11 @@ public static class Consumer {
268265
*/
269266
private Integer maxPollRecords;
270267

268+
/**
269+
* Additional consumer-specific properties used to configure the client.
270+
*/
271+
private final Map<String, String> properties = new HashMap<>();
272+
271273
public Ssl getSsl() {
272274
return this.ssl;
273275
}
@@ -368,6 +370,10 @@ public void setMaxPollRecords(Integer maxPollRecords) {
368370
this.maxPollRecords = maxPollRecords;
369371
}
370372

373+
public Map<String, String> getProperties() {
374+
return this.properties;
375+
}
376+
371377
public Map<String, Object> buildProperties() {
372378
Map<String, Object> properties = new HashMap<>();
373379
if (this.autoCommitInterval != null) {
@@ -435,6 +441,7 @@ public Map<String, Object> buildProperties() {
435441
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
436442
this.maxPollRecords);
437443
}
444+
properties.putAll(this.properties);
438445
return properties;
439446
}
440447

@@ -492,6 +499,11 @@ public static class Producer {
492499
*/
493500
private Integer retries;
494501

502+
/**
503+
* Additional producer-specific properties used to configure the client.
504+
*/
505+
private final Map<String, String> properties = new HashMap<>();
506+
495507
public Ssl getSsl() {
496508
return this.ssl;
497509
}
@@ -568,6 +580,10 @@ public void setRetries(Integer retries) {
568580
this.retries = retries;
569581
}
570582

583+
public Map<String, String> getProperties() {
584+
return this.properties;
585+
}
586+
571587
public Map<String, Object> buildProperties() {
572588
Map<String, Object> properties = new HashMap<>();
573589
if (this.acks != null) {
@@ -621,6 +637,7 @@ public Map<String, Object> buildProperties() {
621637
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
622638
this.valueSerializer);
623639
}
640+
properties.putAll(this.properties);
624641
return properties;
625642
}
626643

spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,15 @@ public void consumerProperties() {
7878
"spring.kafka.consumer.client-id=ccid", // test override common
7979
"spring.kafka.consumer.enable-auto-commit=false",
8080
"spring.kafka.consumer.fetch-max-wait=456",
81+
"spring.kafka.consumer.properties.fiz.buz=fix.fox",
8182
"spring.kafka.consumer.fetch-min-size=789",
8283
"spring.kafka.consumer.group-id=bar",
8384
"spring.kafka.consumer.heartbeat-interval=234",
8485
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
8586
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
8687
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
8788
.getBean(DefaultKafkaConsumerFactory.class);
88-
@SuppressWarnings("unchecked")
89-
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
90-
consumerFactory).getPropertyValue("configs");
89+
Map<String, Object> configs = consumerFactory.getConfigurationProperties();
9190
// common
9291
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
9392
.isEqualTo(Collections.singletonList("foo:1234"));
@@ -120,17 +119,21 @@ public void consumerProperties() {
120119
assertThat(configs.get("foo")).isEqualTo("bar");
121120
assertThat(configs.get("baz")).isEqualTo("qux");
122121
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
122+
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
123123
}
124124

125125
@Test
126126
public void producerProperties() {
127-
load("spring.kafka.clientId=cid", "spring.kafka.producer.acks=all",
127+
load("spring.kafka.clientId=cid",
128+
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
129+
"spring.kafka.producer.acks=all",
128130
"spring.kafka.producer.batch-size=20",
129131
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override
130132
"spring.kafka.producer.buffer-memory=12345",
131133
"spring.kafka.producer.compression-type=gzip",
132134
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
133135
"spring.kafka.producer.retries=2",
136+
"spring.kafka.producer.properties.fiz.buz=fix.fox",
134137
"spring.kafka.producer.ssl.key-password=p4",
135138
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
136139
"spring.kafka.producer.ssl.keystore-password=p5",
@@ -139,9 +142,7 @@ public void producerProperties() {
139142
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
140143
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
141144
.getBean(DefaultKafkaProducerFactory.class);
142-
@SuppressWarnings("unchecked")
143-
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
144-
producerFactory).getPropertyValue("configs");
145+
Map<String, Object> configs = producerFactory.getConfigurationProperties();
145146
// common
146147
assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
147148
// producer
@@ -166,6 +167,8 @@ public void producerProperties() {
166167
.isEqualTo(IntegerSerializer.class);
167168
assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
168169
.isEmpty();
170+
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
171+
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
169172
}
170173

171174
@SuppressWarnings("unchecked")

spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,7 @@ content into your application; rather pick only the properties that you need.
968968
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
969969
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
970970
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
971+
spring.kafka.consumer.properties.*= # Additional consumer-specific properties used to configure the client.
971972
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
972973
spring.kafka.consumer.ssl.keystore-location= # Location of the key store file.
973974
spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
@@ -991,14 +992,15 @@ content into your application; rather pick only the properties that you need.
991992
spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging.
992993
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
993994
spring.kafka.producer.key-serializer= # Serializer class for keys.
995+
spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.
994996
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
995997
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
996998
spring.kafka.producer.ssl.keystore-location= # Location of the key store file.
997999
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
9981000
spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
9991001
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
10001002
spring.kafka.producer.value-serializer= # Serializer class for values.
1001-
spring.kafka.properties.*= # Additional properties used to configure the client.
1003+
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
10021004
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
10031005
spring.kafka.ssl.keystore-location= # Location of the key store file.
10041006
spring.kafka.ssl.keystore-password= # Store password for the key store file.

spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

+7-11
Original file line numberDiff line numberDiff line change
@@ -5117,20 +5117,16 @@ are not directly supported, use the following:
51175117
[source,properties,indent=0]
51185118
----
51195119
spring.kafka.properties.foo.bar=baz
5120+
spring.kafka.consumer.properties.fiz.buz=qux
5121+
spring,kafka.producer.properties.baz.qux=fiz
51205122
----
51215123

5122-
This sets the common `foo.bar` Kafka property to `baz`.
5123-
5124-
These properties will be shared by both the consumer and producer factory beans.
5125-
If you wish to customize these components with different properties, such as to use a
5126-
different metrics reader for each, you can override the bean definitions, as follows:
5127-
5128-
[source,java,indent=0]
5129-
----
5130-
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
5131-
----
5132-
5124+
This sets the common `foo.bar` Kafka property to `baz` (applies to both producers and
5125+
consumers), the consumer `fiz.buz` property to `qux` and the `baz.qux` producer property
5126+
to `fiz`.
51335127

5128+
IMPORTANT: Properties set in this way will override any configuration item that Spring
5129+
Boot explicitly supports.
51345130

51355131
[[boot-features-resttemplate]]
51365132
== Calling REST services with '`RestTemplate`'

spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java

-124
This file was deleted.

0 commit comments

Comments
 (0)