Skip to content

Commit 9003aa5

Browse files
author
Adrian Chlebosz
committed
GH-2068: Add lightweight converting adapter
1 parent ca9d100 commit 9003aa5

File tree

2 files changed

+253
-0
lines changed

2 files changed

+253
-0
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import java.util.Arrays;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
import java.util.stream.Collectors;
23+
24+
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.apache.kafka.common.header.Header;
27+
28+
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
29+
import org.springframework.kafka.listener.AcknowledgingMessageListener;
30+
import org.springframework.kafka.listener.ConsumerAwareMessageListener;
31+
import org.springframework.kafka.listener.MessageListener;
32+
import org.springframework.kafka.support.Acknowledgment;
33+
import org.springframework.messaging.Message;
34+
import org.springframework.messaging.converter.MessageConversionException;
35+
import org.springframework.messaging.converter.MessageConverter;
36+
import org.springframework.messaging.converter.SimpleMessageConverter;
37+
import org.springframework.messaging.support.GenericMessage;
38+
39+
/**
40+
* A {@link AcknowledgingConsumerAwareMessageListener} adapter that implements
41+
* converting received {@link ConsumerRecord} using specified {@link MessageConverter}
42+
* and then passes result to specified {@link MessageListener}.
43+
*
44+
* @param <T> the key type.
45+
* @param <U> the value type.
46+
* @param <V> the desired value type after conversion.
47+
*
48+
* @author Adrian Chlebosz
49+
* @since 3.0
50+
* @see AcknowledgingConsumerAwareMessageListener
51+
*/
52+
public class ConvertingAndDelegatingMessageListenerAdapter<T, U, V> implements AcknowledgingConsumerAwareMessageListener<T, U> {
53+
54+
private final Object delegate;
55+
private final MessageConverter messageConverter;
56+
private final Class<V> desiredValueType;
57+
58+
/**
59+
* Construct an instance with the provided {@link MessageListener} and {@link Class}
60+
* as a desired type of {@link ConsumerRecord}'s value after conversion. Default value of
61+
* {@link MessageConverter} is used, which is {@link SimpleMessageConverter}.
62+
*
63+
* @param delegate the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
64+
* @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
65+
*/
66+
public ConvertingAndDelegatingMessageListenerAdapter(Object delegate, Class<V> desiredValueType) {
67+
validateMessageListener(delegate);
68+
Objects.requireNonNull(desiredValueType);
69+
70+
this.delegate = delegate;
71+
this.desiredValueType = desiredValueType;
72+
73+
this.messageConverter = new SimpleMessageConverter();
74+
}
75+
76+
/**
77+
* Construct an instance with the provided {@link MessageListener}, {@link MessageConverter} and {@link Class}
78+
* as a desired type of {@link ConsumerRecord}'s value after conversion.
79+
*
80+
* @param delegate the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
81+
* @param messageConverter the {@link MessageConverter} to use for conversion.
82+
* @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
83+
*/
84+
public ConvertingAndDelegatingMessageListenerAdapter(Object delegate, MessageConverter messageConverter, Class<V> desiredValueType) {
85+
validateMessageListener(delegate);
86+
Objects.requireNonNull(messageConverter);
87+
Objects.requireNonNull(desiredValueType);
88+
89+
this.delegate = delegate;
90+
this.messageConverter = messageConverter;
91+
this.desiredValueType = desiredValueType;
92+
}
93+
94+
private void validateMessageListener(Object messageListener) {
95+
Objects.requireNonNull(messageListener);
96+
if (!(messageListener instanceof MessageListener)) {
97+
throw new IllegalArgumentException("Passed message listener must be of MessageListener type");
98+
}
99+
}
100+
101+
@Override
102+
public void onMessage(ConsumerRecord<T, U> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { // NOSONAR
103+
ConsumerRecord<T, V> convertedConsumerRecord = convertConsumerRecord(data);
104+
if (this.delegate instanceof AcknowledgingConsumerAwareMessageListener) {
105+
((AcknowledgingConsumerAwareMessageListener<T, V>) this.delegate).onMessage(convertedConsumerRecord, acknowledgment, consumer);
106+
}
107+
else if (this.delegate instanceof ConsumerAwareMessageListener) {
108+
((ConsumerAwareMessageListener<T, V>) this.delegate).onMessage(convertedConsumerRecord, consumer);
109+
}
110+
else if (this.delegate instanceof AcknowledgingMessageListener) {
111+
((AcknowledgingMessageListener<T, V>) this.delegate).onMessage(convertedConsumerRecord, acknowledgment);
112+
}
113+
else if (this.delegate instanceof MessageListener) {
114+
((MessageListener<T, V>) this.delegate).onMessage(convertedConsumerRecord);
115+
}
116+
}
117+
118+
private ConsumerRecord<T, V> convertConsumerRecord(ConsumerRecord<T, U> data) { // NOSONAR
119+
Header[] headerArray = data.headers().toArray();
120+
Map<String, Object> headerMap = Arrays.stream(headerArray)
121+
.collect(Collectors.toMap(Header::key, Header::value));
122+
123+
Message<U> message = new GenericMessage<>(data.value(), headerMap);
124+
V converted = (V) this.messageConverter.fromMessage(message, this.desiredValueType);
125+
126+
if (converted == null) {
127+
throw new MessageConversionException(message, "Message cannot be converted by used MessageConverter");
128+
}
129+
130+
return rebuildConsumerRecord(data, converted);
131+
}
132+
133+
private ConsumerRecord<T, V> rebuildConsumerRecord(ConsumerRecord<T, U> data, V converted) {
134+
return new ConsumerRecord<>(
135+
data.topic(),
136+
data.partition(),
137+
data.offset(),
138+
data.key(),
139+
converted
140+
);
141+
}
142+
143+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.springframework.kafka.listener.MessageListener;
26+
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
27+
import org.springframework.messaging.converter.MessageConversionException;
28+
29+
import com.fasterxml.jackson.core.JsonProcessingException;
30+
import com.fasterxml.jackson.databind.ObjectMapper;
31+
32+
/**
33+
* @author Adrian Chlebosz
34+
* @since 3.0.0
35+
*
36+
*/
37+
class ConvertingAndDelegatingMessageListenerAdapterTest {
38+
39+
private final ObjectMapper mapper = new ObjectMapper();
40+
41+
@Test
42+
public void testMessageListenerIsInvokedWithConvertedSimpleRecord() {
43+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", 0);
44+
45+
var messageListener = (MessageListener<String, Integer>) (data) -> assertThat(data.value()).isNotNull();
46+
var adapter = new ConvertingAndDelegatingMessageListenerAdapter<String, Integer, Integer>(
47+
messageListener,
48+
Integer.class
49+
);
50+
51+
adapter.onMessage(consumerRecord, null, null);
52+
}
53+
54+
@Test
55+
public void testMessageListenerIsInvokedWithRecordConvertedByCustomConverter() throws JsonProcessingException {
56+
var toBeConverted = new ToBeConverted("foo");
57+
var toBeConvertedJson = mapper.writeValueAsString(toBeConverted);
58+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", toBeConvertedJson);
59+
60+
var messageListener = (MessageListener<String, ToBeConverted>) (data) -> {
61+
assertThat(data.value()).isNotNull();
62+
assertThat(data.value().getA()).isEqualTo("foo");
63+
};
64+
var adapter = new ConvertingAndDelegatingMessageListenerAdapter<String, String, ToBeConverted>(
65+
messageListener,
66+
new MappingJackson2MessageConverter(),
67+
ToBeConverted.class
68+
);
69+
70+
adapter.onMessage(consumerRecord, null, null);
71+
}
72+
73+
@Test
74+
public void testConversionFailsWhileUsingDefaultConverterForComplexObject() throws JsonProcessingException {
75+
var toBeConverted = new ToBeConverted("foo");
76+
var toBeConvertedJson = mapper.writeValueAsString(toBeConverted);
77+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", toBeConvertedJson);
78+
79+
var messageListener = (MessageListener<String, ToBeConverted>) (data) -> {
80+
assertThat(data.value()).isNotNull();
81+
assertThat(data.value().getA()).isEqualTo("foo");
82+
};
83+
var adapter = new ConvertingAndDelegatingMessageListenerAdapter<String, String, ToBeConverted>(
84+
messageListener,
85+
ToBeConverted.class
86+
);
87+
88+
assertThatThrownBy(() -> adapter.onMessage(consumerRecord, null, null)).isInstanceOf(MessageConversionException.class);
89+
}
90+
91+
private static class ToBeConverted {
92+
private String a;
93+
94+
ToBeConverted() {
95+
}
96+
97+
ToBeConverted(String a) {
98+
this.a = a;
99+
}
100+
101+
public String getA() {
102+
return a;
103+
}
104+
105+
public void setA(String a) {
106+
this.a = a;
107+
}
108+
}
109+
110+
}

0 commit comments

Comments
 (0)