Skip to content

Commit 191752d

Browse files
garyrussellsnicoll
authored andcommitted
Add support for arbitrary producer/consumer Kafka properties
PR #7672 Added support for arbitrary common properties. However, Kafka emits a warning if a producer configuration contains properties intended only for consumers, and vice versa. The documentation showed a sample of how to write code to configure arbitrary properties but this is inconvenient. Add arbitrary properties to the consumer and procucer configs. See gh-9775
1 parent 43e1df7 commit 191752d

File tree

5 files changed

+47
-144
lines changed

5 files changed

+47
-144
lines changed

Diff for: spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public class KafkaProperties {
6363
private String clientId;
6464

6565
/**
66-
* Additional properties used to configure the client.
66+
* Additional properties, common to producers and consumers, used to configure the
67+
* client.
6768
*/
6869
private Map<String, String> properties = new HashMap<>();
6970

@@ -268,6 +269,11 @@ public static class Consumer {
268269
*/
269270
private Integer maxPollRecords;
270271

272+
/**
273+
* Additional properties used to configure the client.
274+
*/
275+
private Map<String, String> properties = new HashMap<>();
276+
271277
public Ssl getSsl() {
272278
return this.ssl;
273279
}
@@ -368,6 +374,14 @@ public void setMaxPollRecords(Integer maxPollRecords) {
368374
this.maxPollRecords = maxPollRecords;
369375
}
370376

377+
public Map<String, String> getProperties() {
378+
return this.properties;
379+
}
380+
381+
public void setProperties(Map<String, String> properties) {
382+
this.properties = properties;
383+
}
384+
371385
public Map<String, Object> buildProperties() {
372386
Map<String, Object> properties = new HashMap<>();
373387
if (this.autoCommitInterval != null) {
@@ -435,6 +449,7 @@ public Map<String, Object> buildProperties() {
435449
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
436450
this.maxPollRecords);
437451
}
452+
properties.putAll(this.properties);
438453
return properties;
439454
}
440455

@@ -492,6 +507,11 @@ public static class Producer {
492507
*/
493508
private Integer retries;
494509

510+
/**
511+
* Additional properties used to configure the client.
512+
*/
513+
private Map<String, String> properties = new HashMap<>();
514+
495515
public Ssl getSsl() {
496516
return this.ssl;
497517
}
@@ -568,6 +588,14 @@ public void setRetries(Integer retries) {
568588
this.retries = retries;
569589
}
570590

591+
public Map<String, String> getProperties() {
592+
return this.properties;
593+
}
594+
595+
public void setProperties(Map<String, String> properties) {
596+
this.properties = properties;
597+
}
598+
571599
public Map<String, Object> buildProperties() {
572600
Map<String, Object> properties = new HashMap<>();
573601
if (this.acks != null) {
@@ -621,6 +649,7 @@ public Map<String, Object> buildProperties() {
621649
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
622650
this.valueSerializer);
623651
}
652+
properties.putAll(this.properties);
624653
return properties;
625654
}
626655

Diff for: spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,15 @@ public void consumerProperties() {
7878
"spring.kafka.consumer.client-id=ccid", // test override common
7979
"spring.kafka.consumer.enable-auto-commit=false",
8080
"spring.kafka.consumer.fetch-max-wait=456",
81+
"spring.kafka.consumer.properties.fiz.buz=fix.fox",
8182
"spring.kafka.consumer.fetch-min-size=789",
8283
"spring.kafka.consumer.group-id=bar",
8384
"spring.kafka.consumer.heartbeat-interval=234",
8485
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
8586
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer");
8687
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
8788
.getBean(DefaultKafkaConsumerFactory.class);
88-
@SuppressWarnings("unchecked")
89-
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
90-
consumerFactory).getPropertyValue("configs");
89+
Map<String, Object> configs = consumerFactory.getConfigurationProperties();
9190
// common
9291
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
9392
.isEqualTo(Collections.singletonList("foo:1234"));
@@ -120,17 +119,21 @@ public void consumerProperties() {
120119
assertThat(configs.get("foo")).isEqualTo("bar");
121120
assertThat(configs.get("baz")).isEqualTo("qux");
122121
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
122+
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
123123
}
124124

125125
@Test
126126
public void producerProperties() {
127-
load("spring.kafka.clientId=cid", "spring.kafka.producer.acks=all",
127+
load("spring.kafka.clientId=cid",
128+
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
129+
"spring.kafka.producer.acks=all",
128130
"spring.kafka.producer.batch-size=20",
129131
"spring.kafka.producer.bootstrap-servers=bar:1234", // test override
130132
"spring.kafka.producer.buffer-memory=12345",
131133
"spring.kafka.producer.compression-type=gzip",
132134
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
133135
"spring.kafka.producer.retries=2",
136+
"spring.kafka.producer.properties.fiz.buz=fix.fox",
134137
"spring.kafka.producer.ssl.key-password=p4",
135138
"spring.kafka.producer.ssl.keystore-location=classpath:ksLocP",
136139
"spring.kafka.producer.ssl.keystore-password=p5",
@@ -139,9 +142,7 @@ public void producerProperties() {
139142
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer");
140143
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
141144
.getBean(DefaultKafkaProducerFactory.class);
142-
@SuppressWarnings("unchecked")
143-
Map<String, Object> configs = (Map<String, Object>) new DirectFieldAccessor(
144-
producerFactory).getPropertyValue("configs");
145+
Map<String, Object> configs = producerFactory.getConfigurationProperties();
145146
// common
146147
assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid");
147148
// producer
@@ -166,6 +167,8 @@ public void producerProperties() {
166167
.isEqualTo(IntegerSerializer.class);
167168
assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
168169
.isEmpty();
170+
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
171+
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
169172
}
170173

171174
@SuppressWarnings("unchecked")

Diff for: spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,7 @@ content into your application; rather pick only the properties that you need.
968968
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
969969
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
970970
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
971+
spring.kafka.consumer.properties.*= # Additional properties used to configure the client.
971972
spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file.
972973
spring.kafka.consumer.ssl.keystore-location= # Location of the key store file.
973974
spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file.
@@ -991,14 +992,15 @@ content into your application; rather pick only the properties that you need.
991992
spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging.
992993
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
993994
spring.kafka.producer.key-serializer= # Serializer class for keys.
995+
spring.kafka.producer.properties.*= # Additional properties used to configure the client.
994996
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
995997
spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file.
996998
spring.kafka.producer.ssl.keystore-location= # Location of the key store file.
997999
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
9981000
spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
9991001
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
10001002
spring.kafka.producer.value-serializer= # Serializer class for values.
1001-
spring.kafka.properties.*= # Additional properties used to configure the client.
1003+
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
10021004
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
10031005
spring.kafka.ssl.keystore-location= # Location of the key store file.
10041006
spring.kafka.ssl.keystore-password= # Store password for the key store file.

Diff for: spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

+4-11
Original file line numberDiff line numberDiff line change
@@ -5117,20 +5117,13 @@ are not directly supported, use the following:
51175117
[source,properties,indent=0]
51185118
----
51195119
spring.kafka.properties.foo.bar=baz
5120+
spring.kafka.consumer.properties.fiz.buz=qux
5121+
spring,kafka.producer.properties.baz.qux=fiz
51205122
----
51215123

5122-
This sets the common `foo.bar` Kafka property to `baz`.
5123-
5124-
These properties will be shared by both the consumer and producer factory beans.
5125-
If you wish to customize these components with different properties, such as to use a
5126-
different metrics reader for each, you can override the bean definitions, as follows:
5127-
5128-
[source,java,indent=0]
5129-
----
5130-
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
5131-
----
5132-
5124+
This sets the common `foo.bar` Kafka property to `baz` (applies to both producers and consumers), the consumer `fiz.buz` property to `qux` and the `baz.qux` producer property to `fiz`.
51335125

5126+
IMPORTANT: Properties set in this way will override properties that are in the subset that boot explicitly supports.
51345127

51355128
[[boot-features-resttemplate]]
51365129
== Calling REST services with '`RestTemplate`'

Diff for: spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java

-124
This file was deleted.

0 commit comments

Comments
 (0)