Skip to content

Commit dfd7c7a

Browse files
garyrussellartembilan
authored andcommitted
GH-897 Docs: wiring Producer/Consumer interceptors
Resolves #897
1 parent 9dccd38 commit dfd7c7a

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed

src/reference/asciidoc/kafka.adoc

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1917,6 +1917,152 @@ Note that `SimpleThreadScope` does not destroy beans that have a destruction int
19171917
IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
19181918
If you change the multicaster to use an async executor, thread cleanup is not effective.
19191919

1920+
[[interceptors]]
1921+
==== Wiring Spring Beans into Producer/Consumer Interceptors
1922+
1923+
Apache Kafka provides a mechanism to add interceptors to producers and consumers.
1924+
These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won't work for wiring in dependent Spring Beans.
1925+
However, you can manually wire in those dependencies using the interceptor `config()` method.
1926+
The following Spring Boot application shows how to do this by overriding boot's default factories to add some dependent bean into the configuration properties.
1927+
1928+
====
1929+
[source, java]
1930+
----
1931+
@SpringBootApplication
1932+
public class Application {
1933+
1934+
public static void main(String[] args) {
1935+
SpringApplication.run(Application.class, args);
1936+
}
1937+
1938+
@Bean
1939+
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties, SomeBean someBean) {
1940+
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
1941+
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
1942+
consumerProperties.put("some.bean", someBean);
1943+
return new DefaultKafkaConsumerFactory<>(consumerProperties);
1944+
}
1945+
1946+
@Bean
1947+
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties, SomeBean someBean) {
1948+
Map<String, Object> producerProperties = properties.buildProducerProperties();
1949+
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
1950+
producerProperties.put("some.bean", someBean);
1951+
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
1952+
String transactionIdPrefix = properties.getProducer()
1953+
.getTransactionIdPrefix();
1954+
if (transactionIdPrefix != null) {
1955+
factory.setTransactionIdPrefix(transactionIdPrefix);
1956+
}
1957+
return factory;
1958+
}
1959+
1960+
@Bean
1961+
public SomeBean someBean() {
1962+
return new SomeBean();
1963+
}
1964+
1965+
@KafkaListener(id = "kgk897", topics = "kgh897")
1966+
public void listen(String in) {
1967+
System.out.println("Received " + in);
1968+
}
1969+
1970+
@Bean
1971+
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
1972+
return args -> template.send("kgh897", "test");
1973+
}
1974+
1975+
@Bean
1976+
public NewTopic topic() {
1977+
return new NewTopic("kgh897", 1, (short) 1);
1978+
}
1979+
1980+
}
1981+
----
1982+
====
1983+
1984+
====
1985+
[source, java]
1986+
----
1987+
public class SomeBean {
1988+
1989+
public void someMethod(String what) {
1990+
System.out.println(what + " in my foo bean");
1991+
}
1992+
1993+
}
1994+
----
1995+
====
1996+
====
1997+
[source, java]
1998+
----
1999+
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
2000+
2001+
private SomeBean bean;
2002+
2003+
@Override
2004+
public void configure(Map<String, ?> configs) {
2005+
this.bean = (SomeBean) configs.get("some.bean");
2006+
}
2007+
2008+
@Override
2009+
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
2010+
this.bean.someMethod("producer interceptor");
2011+
return record;
2012+
}
2013+
2014+
@Override
2015+
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
2016+
}
2017+
2018+
@Override
2019+
public void close() {
2020+
}
2021+
2022+
}
2023+
----
2024+
====
2025+
====
2026+
[source, java]
2027+
----
2028+
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
2029+
2030+
private SomeBean bean;
2031+
2032+
@Override
2033+
public void configure(Map<String, ?> configs) {
2034+
this.bean = (SomeBean) configs.get("some.bean");
2035+
}
2036+
2037+
@Override
2038+
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
2039+
this.bean.someMethod("consumer interceptor");
2040+
return records;
2041+
}
2042+
2043+
@Override
2044+
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
2045+
}
2046+
2047+
@Override
2048+
public void close() {
2049+
}
2050+
2051+
}
2052+
----
2053+
====
2054+
2055+
Result:
2056+
2057+
====
2058+
[source]
2059+
----
2060+
producer interceptor in my foo bean
2061+
consumer interceptor in my foo bean
2062+
Received test
2063+
----
2064+
====
2065+
19202066
[[pause-resume]]
19212067
==== Pausing and Resuming Listener Containers
19222068

0 commit comments

Comments
 (0)