1818
1919import java .util .Arrays ;
2020import java .util .Map ;
21- import java .util .Objects ;
2221import java .util .stream .Collectors ;
2322
2423import org .apache .kafka .clients .consumer .Consumer ;
3534import org .springframework .messaging .converter .MessageConverter ;
3635import org .springframework .messaging .converter .SimpleMessageConverter ;
3736import org .springframework .messaging .support .GenericMessage ;
37+ import org .springframework .util .Assert ;
3838
3939/**
4040 * A {@link AcknowledgingConsumerAwareMessageListener} adapter that implements
4949 * @since 3.0
5050 * @see AcknowledgingConsumerAwareMessageListener
5151 */
52- public class ConvertingAndDelegatingMessageListenerAdapter <T , U , V > implements AcknowledgingConsumerAwareMessageListener <T , U > {
52+ public class ConvertingMessageListener <T , U , V > implements AcknowledgingConsumerAwareMessageListener <T , U > {
5353
54- private final Object delegate ;
54+ private final MessageListener < T , V > delegate ;
5555 private final MessageConverter messageConverter ;
5656 private final Class <V > desiredValueType ;
5757
@@ -63,9 +63,9 @@ public class ConvertingAndDelegatingMessageListenerAdapter<T, U, V> implements A
6363 * @param delegate the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
6464 * @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
6565 */
66- public ConvertingAndDelegatingMessageListenerAdapter ( Object delegate , Class <V > desiredValueType ) {
67- validateMessageListener (delegate );
68- Objects . requireNonNull (desiredValueType );
66+ public ConvertingMessageListener ( MessageListener < T , V > delegate , Class <V > desiredValueType ) {
67+ Assert . notNull (delegate , "Delegate message listener cannot be null" );
68+ Assert . notNull (desiredValueType , "Desired value type cannot be null" );
6969
7070 this .delegate = delegate ;
7171 this .desiredValueType = desiredValueType ;
@@ -81,38 +81,30 @@ public ConvertingAndDelegatingMessageListenerAdapter(Object delegate, Class<V> d
8181 * @param messageConverter the {@link MessageConverter} to use for conversion.
8282 * @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
8383 */
84- public ConvertingAndDelegatingMessageListenerAdapter ( Object delegate , MessageConverter messageConverter , Class <V > desiredValueType ) {
85- validateMessageListener (delegate );
86- Objects . requireNonNull (messageConverter );
87- Objects . requireNonNull (desiredValueType );
84+ public ConvertingMessageListener ( MessageListener < T , V > delegate , MessageConverter messageConverter , Class <V > desiredValueType ) {
85+ Assert . notNull (delegate , "Delegate message listener cannot be null" );
86+ Assert . notNull (messageConverter , "Message converter cannot be null" );
87+ Assert . notNull (desiredValueType , "Desired value type cannot be null" );
8888
8989 this .delegate = delegate ;
9090 this .messageConverter = messageConverter ;
9191 this .desiredValueType = desiredValueType ;
9292 }
9393
94- private void validateMessageListener (Object messageListener ) {
95- Objects .requireNonNull (messageListener );
96- if (!(messageListener instanceof MessageListener )) {
97- throw new IllegalArgumentException ("Passed message listener must be of MessageListener type" );
98- }
99- }
100-
10194 @ Override
10295 public void onMessage (ConsumerRecord <T , U > data , Acknowledgment acknowledgment , Consumer <?, ?> consumer ) { // NOSONAR
10396 ConsumerRecord <T , V > convertedConsumerRecord = convertConsumerRecord (data );
10497 if (this .delegate instanceof AcknowledgingConsumerAwareMessageListener ) {
105- (( AcknowledgingConsumerAwareMessageListener < T , V >) this .delegate ) .onMessage (convertedConsumerRecord , acknowledgment , consumer );
98+ this .delegate .onMessage (convertedConsumerRecord , acknowledgment , consumer );
10699 }
107100 else if (this .delegate instanceof ConsumerAwareMessageListener ) {
108- (( ConsumerAwareMessageListener < T , V >) this .delegate ) .onMessage (convertedConsumerRecord , consumer );
101+ this .delegate .onMessage (convertedConsumerRecord , consumer );
109102 }
110103 else if (this .delegate instanceof AcknowledgingMessageListener ) {
111- ((AcknowledgingMessageListener <T , V >) this .delegate ).onMessage (convertedConsumerRecord , acknowledgment );
112- }
113- else if (this .delegate instanceof MessageListener ) {
114- ((MessageListener <T , V >) this .delegate ).onMessage (convertedConsumerRecord );
104+ this .delegate .onMessage (convertedConsumerRecord , acknowledgment );
115105 }
106+
107+ this .delegate .onMessage (convertedConsumerRecord );
116108 }
117109
118110 private ConsumerRecord <T , V > convertConsumerRecord (ConsumerRecord <T , U > data ) { // NOSONAR
0 commit comments