2424import java .util .List ;
2525import java .util .Map ;
2626
27- import org .apache .commons .logging .LogFactory ;
2827import org .apache .kafka .clients .consumer .Consumer ;
2928import org .apache .kafka .clients .consumer .ConsumerRecord ;
3029import org .apache .kafka .clients .producer .ProducerRecord ;
3130import org .apache .kafka .common .header .Headers ;
31+ import org .apache .kafka .common .record .TimestampType ;
3232import org .apache .kafka .common .utils .Bytes ;
3333
3434import org .springframework .core .log .LogAccessor ;
35+ import org .springframework .core .log .LogMessage ;
3536import org .springframework .kafka .support .Acknowledgment ;
3637import org .springframework .kafka .support .DefaultKafkaHeaderMapper ;
3738import org .springframework .kafka .support .JacksonPresent ;
5455 * <p>
5556 * If a {@link RecordMessageConverter} is provided, and the batch type is a {@link ParameterizedType}
5657 * with a single generic type parameter, each record will be passed to the converter, thus supporting
57- * a method signature {@code List<Foo> foos }.
58+ * a method signature {@code List<MyType> myObjects }.
5859 *
5960 * @author Marius Bogoevici
6061 * @author Gary Russell
6364 * @author Sanghyeok An
6465 * @author Hope Kim
6566 * @author Borahm Lee
67+ * @author Artem Bilan
68+ *
6669 * @since 1.1
6770 */
6871public class BatchMessagingMessageConverter implements BatchMessageConverter {
6972
70- protected final LogAccessor logger = new LogAccessor (LogFactory . getLog ( getClass () )); // NOSONAR
73+ protected final LogAccessor logger = new LogAccessor (getClass ()); // NOSONAR
7174
7275 @ Nullable
7376 private final RecordMessageConverter recordConverter ;
@@ -102,7 +105,7 @@ public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordCon
102105
103106 /**
104107 * Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
105- * will try to use a default value. By default set to {@code false}.
108+ * will try to use a default value. By default, set to {@code false}.
106109 * @param generateMessageId true if a message id should be generated
107110 */
108111 public void setGenerateMessageId (boolean generateMessageId ) {
@@ -111,7 +114,7 @@ public void setGenerateMessageId(boolean generateMessageId) {
111114
112115 /**
113116 * Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
114- * used instead. By default set to {@code false}.
117+ * used instead. By default, set to {@code false}.
115118 * @param generateTimestamp true if a timestamp should be generated
116119 */
117120 public void setGenerateTimestamp (boolean generateTimestamp ) {
@@ -147,8 +150,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
147150 public Message <?> toMessage (List <ConsumerRecord <?, ?>> records , @ Nullable Acknowledgment acknowledgment ,
148151 Consumer <?, ?> consumer , Type type ) {
149152
150- KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders ( this . generateMessageId ,
151- this .generateTimestamp );
153+ KafkaMessageHeaders kafkaMessageHeaders =
154+ new KafkaMessageHeaders ( this . generateMessageId , this .generateTimestamp );
152155
153156 Map <String , Object > rawHeaders = kafkaMessageHeaders .getRawHeaders ();
154157 List <Object > payloads = new ArrayList <>();
@@ -169,16 +172,18 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
169172
170173 String listenerInfo = null ;
171174 for (ConsumerRecord <?, ?> record : records ) {
172- addRecordInfo (record , type , payloads , keys , topics , partitions , offsets , timestampTypes , timestamps , conversionFailures );
173- if (this .headerMapper != null && record .headers () != null ) {
174- Map <String , Object > converted = convertHeaders (record .headers (), convertedHeaders );
175+ addRecordInfo (record , type , payloads , keys , topics , partitions , offsets , timestampTypes , timestamps ,
176+ conversionFailures );
177+ Headers recordHeaders = record .headers ();
178+ if (this .headerMapper != null && recordHeaders != null ) {
179+ Map <String , Object > converted = convertHeaders (recordHeaders , convertedHeaders );
175180 Object obj = converted .get (KafkaHeaders .LISTENER_INFO );
176- if (obj instanceof String ) {
177- listenerInfo = ( String ) obj ;
181+ if (obj instanceof String info ) {
182+ listenerInfo = info ;
178183 }
179184 }
180185 else {
181- natives .add (record . headers () );
186+ natives .add (recordHeaders );
182187 }
183188 if (this .rawRecordHeader ) {
184189 raws .add (record );
@@ -198,6 +203,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
198203
199204 private void addToRawHeaders (Map <String , Object > rawHeaders , List <Map <String , Object >> convertedHeaders ,
200205 List <Headers > natives , List <ConsumerRecord <?, ?>> raws , List <ConversionException > conversionFailures ) {
206+
201207 if (this .headerMapper != null ) {
202208 rawHeaders .put (KafkaHeaders .BATCH_CONVERTED_HEADERS , convertedHeaders );
203209 }
@@ -211,16 +217,18 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
211217 }
212218
213219 private void addRecordInfo (ConsumerRecord <?, ?> record , Type type , List <Object > payloads , List <Object > keys ,
214- List <String > topics , List <Integer > partitions , List <Long > offsets , List <String > timestampTypes ,
215- List <Long > timestamps , List <ConversionException > conversionFailures ) {
220+ List <String > topics , List <Integer > partitions , List <Long > offsets , List <String > timestampTypes ,
221+ List <Long > timestamps , List <ConversionException > conversionFailures ) {
222+
216223 payloads .add (obtainPayload (type , record , conversionFailures ));
217224 keys .add (record .key ());
218225 topics .add (record .topic ());
219226 partitions .add (record .partition ());
220227 offsets .add (record .offset ());
221228 timestamps .add (record .timestamp ());
222- if (record .timestampType () != null ) {
223- timestampTypes .add (record .timestampType ().name ());
229+ TimestampType timestampType = record .timestampType ();
230+ if (timestampType != null ) {
231+ timestampTypes .add (timestampType .name ());
224232 }
225233 }
226234
@@ -264,24 +272,29 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
264272 protected Object convert (ConsumerRecord <?, ?> record , Type type , List <ConversionException > conversionFailures ) {
265273 try {
266274 Object payload = this .recordConverter
267- .toMessage (record , null , null , ((ParameterizedType ) type ).getActualTypeArguments ()[0 ]).getPayload ();
275+ .toMessage (record , null , null , ((ParameterizedType ) type ).getActualTypeArguments ()[0 ]).getPayload ();
268276 conversionFailures .add (null );
269277 return payload ;
270278 }
271279 catch (ConversionException ex ) {
272280 byte [] original = null ;
273- if (record .value () instanceof byte []) {
274- original = ( byte []) record . value () ;
281+ if (record .value () instanceof byte [] bytes ) {
282+ original = bytes ;
275283 }
276- else if (record .value () instanceof Bytes ) {
277- original = (( Bytes ) record . value ()) .get ();
284+ else if (record .value () instanceof Bytes bytes ) {
285+ original = bytes .get ();
278286 }
279- else if (record .value () instanceof String ) {
280- original = (( String ) record . value ()) .getBytes (StandardCharsets .UTF_8 );
287+ else if (record .value () instanceof String string ) {
288+ original = string .getBytes (StandardCharsets .UTF_8 );
281289 }
282290 if (original != null ) {
283291 SerializationUtils .deserializationException (record .headers (), original , ex , false );
284292 conversionFailures .add (ex );
293+ this .logger .warn (ex ,
294+ LogMessage .format ("Could not convert message for topic=%s, partition=%d, offset=%d" ,
295+ record .topic (),
296+ record .partition (),
297+ record .offset ()));
285298 return null ;
286299 }
287300 throw new ConversionException ("The batch converter can only report conversion failures to the listener "
@@ -296,8 +309,8 @@ else if (record.value() instanceof String) {
296309 * @return true if the conditions are met.
297310 */
298311 private boolean containerType (Type type ) {
299- return type instanceof ParameterizedType
300- && (( ParameterizedType ) type ) .getActualTypeArguments ().length == 1 ;
312+ return type instanceof ParameterizedType parameterizedType
313+ && parameterizedType .getActualTypeArguments ().length == 1 ;
301314 }
302315
303316}
0 commit comments