Skip to content

Commit 2446056

Browse files
garyrussellartembilan
authored andcommitted
GH-1066: Protect against null Headers
Resolves #1066 Certain clients (e.g. mapR) that emulate the Kafka clients do not properly populate the `ConsumerRecord.headers()` field. Check for null before mapping; also check `timestampType`. **cherry-pick to 2.2.x, 2.1.x**
1 parent 63c51f1 commit 2446056

File tree

5 files changed

+73
-5
lines changed

5 files changed

+73
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public RecordMessageConverter getRecordMessageConverter() {
125125
@Override
126126
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment,
127127
Consumer<?, ?> consumer, Type type) {
128+
128129
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
129130
this.generateTimestamp);
130131

@@ -156,9 +157,11 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, Acknowledgment a
156157
topics.add(record.topic());
157158
partitions.add(record.partition());
158159
offsets.add(record.offset());
159-
timestampTypes.add(record.timestampType().name());
160+
if (record.timestampType() != null) {
161+
timestampTypes.add(record.timestampType().name());
162+
}
160163
timestamps.add(record.timestamp());
161-
if (this.headerMapper != null) {
164+
if (this.headerMapper != null && record.headers() != null) {
162165
Map<String, Object> converted = new HashMap<>();
163166
this.headerMapper.toHeaders(record.headers(), converted);
164167
convertedHeaders.add(converted);

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ static String getGroupid() {
4242
}
4343

4444
default void commonHeaders(Acknowledgment acknowledgment, Consumer<?, ?> consumer, Map<String, Object> rawHeaders,
45-
Object theKey, Object topic, Object partition, Object offset, Object timestampType, Object timestamp) {
45+
Object theKey, Object topic, Object partition, Object offset,
46+
@Nullable Object timestampType, Object timestamp) {
4647

4748
rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey);
4849
rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic);

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,12 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
101101
@Override
102102
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
103103
Type type) {
104+
104105
KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
105106
this.generateTimestamp);
106107

107108
Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
108-
if (this.headerMapper != null) {
109+
if (this.headerMapper != null && record.headers() != null) {
109110
this.headerMapper.toHeaders(record.headers(), rawHeaders);
110111
}
111112
else {
@@ -117,8 +118,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
117118
}
118119
rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, record.headers());
119120
}
121+
String ttName = record.timestampType() != null ? record.timestampType().name() : null;
120122
commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(),
121-
record.offset(), record.timestampType().name(), record.timestamp());
123+
record.offset(), ttName, record.timestamp());
122124

123125
return MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders);
124126
}

spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.util.ArrayList;
2323
import java.util.Arrays;
24+
import java.util.Collections;
2425
import java.util.Iterator;
2526
import java.util.List;
2627
import java.util.Map;
@@ -115,4 +116,18 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) {
115116
return headers;
116117
}
117118

119+
@SuppressWarnings("unchecked")
120+
@Test
121+
public void missingHeaders() {
122+
BatchMessageConverter converter = new BatchMessagingMessageConverter();
123+
Headers nullHeaders = null;
124+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz",
125+
nullHeaders);
126+
List<ConsumerRecord<?, ?>> records = Collections.singletonList(record);
127+
Message<?> message = converter.toMessage(records, null, null, null);
128+
assertThat(((List<String>) message.getPayload())).contains("baz");
129+
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, List.class)).contains("foo");
130+
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, List.class)).contains("bar");
131+
}
132+
118133
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.converter;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.common.header.Headers;
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.springframework.kafka.support.KafkaHeaders;
26+
import org.springframework.messaging.Message;
27+
28+
/**
29+
* @author Gary Russell
30+
* @since 2.1.13
31+
*
32+
*/
33+
public class MessagingMessageConverterTests {
34+
35+
@Test
36+
void missingHeaders() {
37+
MessagingMessageConverter converter = new MessagingMessageConverter();
38+
Headers nullHeaders = null;
39+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz",
40+
nullHeaders);
41+
Message<?> message = converter.toMessage(record, null, null, null);
42+
assertThat(message.getPayload()).isEqualTo("baz");
43+
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo");
44+
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar");
45+
}
46+
47+
}

0 commit comments

Comments
 (0)