Skip to content

Commit 6e96d5a

Browse files
authored
MMConverter Support Custom Partition Selection
Allow the user to specify a custom partition selection function.
1 parent 2b4b570 commit 6e96d5a

File tree

2 files changed

+36
-3
lines changed

2 files changed

+36
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.lang.reflect.Type;
2121
import java.nio.charset.StandardCharsets;
2222
import java.util.Map;
23+
import java.util.function.Function;
2324

2425
import org.apache.commons.logging.LogFactory;
2526
import org.apache.kafka.clients.consumer.Consumer;
@@ -61,6 +62,8 @@ public class MessagingMessageConverter implements RecordMessageConverter {
6162

6263
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
6364

65+
private final Function<Message<?>, Integer> partitionProvider;
66+
6467
private boolean generateMessageId = false;
6568

6669
private boolean generateTimestamp = false;
@@ -71,13 +74,29 @@ public class MessagingMessageConverter implements RecordMessageConverter {
7174

7275
private SmartMessageConverter messagingConverter;
7376

77+
/**
78+
* Construct an instance that uses the {@link KafkaHeaders#PARTITION} to determine the
79+
* target partition.
80+
*/
7481
public MessagingMessageConverter() {
82+
this(msg -> msg.getHeaders().get(KafkaHeaders.PARTITION, Integer.class));
83+
}
84+
85+
/**
86+
* Construct an instance that uses the supplied partition provider function. The
87+
* function can return null to delegate the partition selection to the kafka client.
88+
* @param partitionProvider the provider.
89+
* @since 3.0.8
90+
*/
91+
public MessagingMessageConverter(Function<Message<?>, Integer> partitionProvider) {
92+
Assert.notNull(partitionProvider, "'partitionProvider' cannot be null");
7593
if (JacksonPresent.isJackson2Present()) {
7694
this.headerMapper = new DefaultKafkaHeaderMapper();
7795
}
7896
else {
7997
this.headerMapper = new SimpleKafkaHeaderMapper();
8098
}
99+
this.partitionProvider = partitionProvider;
81100
}
82101

83102
/**
@@ -227,7 +246,7 @@ else if (topicHeader == null) {
227246
throw new IllegalStateException(KafkaHeaders.TOPIC + " must be a String or byte[], not "
228247
+ topicHeader.getClass());
229248
}
230-
Integer partition = headers.get(KafkaHeaders.PARTITION, Integer.class);
249+
Integer partition = this.partitionProvider.apply(message);
231250
Object key = headers.get(KafkaHeaders.KEY);
232251
Object payload = convertPayload(message);
233252
Long timestamp = headers.get(KafkaHeaders.TIMESTAMP, Long.class);

spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import java.util.Arrays;
2222
import java.util.Collection;
23+
import java.util.Map;
2324
import java.util.Optional;
2425

2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -36,6 +37,7 @@
3637
import org.springframework.messaging.converter.AbstractMessageConverter;
3738
import org.springframework.messaging.converter.CompositeMessageConverter;
3839
import org.springframework.messaging.converter.MessageConverter;
40+
import org.springframework.messaging.support.GenericMessage;
3941
import org.springframework.util.MimeType;
4042

4143
/**
@@ -47,6 +49,18 @@
4749
*/
4850
public class MessagingMessageConverterTests {
4951

52+
@Test
53+
void partition() {
54+
MessagingMessageConverter converter = new MessagingMessageConverter();
55+
Message<?> msg = new GenericMessage<>("foo", Map.of(KafkaHeaders.PARTITION, 42));
56+
ProducerRecord<?, ?> record = converter.fromMessage(msg, "topic");
57+
assertThat(record.partition()).isEqualTo(42);
58+
converter = new MessagingMessageConverter(m -> m.getHeaders().get("part", Integer.class));
59+
msg = new GenericMessage<>("foo", Map.of(KafkaHeaders.PARTITION, 42, "part", 43));
60+
record = converter.fromMessage(msg, "topic");
61+
assertThat(record.partition()).isEqualTo(43);
62+
}
63+
5064
@Test
5165
void missingHeaders() {
5266
MessagingMessageConverter converter = new MessagingMessageConverter();

0 commit comments

Comments
 (0)