Skip to content

Commit 878448e

Browse files
tsalsosobychacko
authored andcommitted
GH-2246: Partition attribute SpEL expression
Fixes: gh-2246 When partition attribute is specified as a SpEL expression on PartitionOffset annotation, it doesn not resolve the value from the expression. Fixing this issue. Adding test to verify. **Cherry-pick to 3.0.x**
1 parent 3758e62 commit 878448e

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
879879
for (String partition : partitions) {
880880
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false);
881881
}
882-
if (partitionOffsets.length == 1 && partitionOffsets[0].partition().equals("*")) {
882+
if (partitionOffsets.length == 1 && resolveExpression(partitionOffsets[0].partition()).equals("*")) {
883883
result.forEach(tpo -> {
884884
tpo.setOffset(resolveInitialOffset(tpo.getTopic(), partitionOffsets[0]));
885885
tpo.setRelativeToCurrent(isRelative(tpo.getTopic(), partitionOffsets[0]));

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,11 @@
176176
* @author Venil Noronha
177177
* @author Dimitri Penner
178178
* @author Nakul Mishra
179+
* @author Soby Chacko
179180
*/
180181
@SpringJUnitConfig
181182
@DirtiesContext
182-
@EmbeddedKafka(topics = { "annotated1", "annotated2", "annotated3",
183+
@EmbeddedKafka(topics = { "annotated1", "annotated2", "annotated3", "annotated3x",
183184
"annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated8reply",
184185
"annotated9", "annotated10",
185186
"annotated11", "annotated12", "annotated13", "annotated14", "annotated15", "annotated16", "annotated17",
@@ -313,6 +314,10 @@ public void manyTests() throws Exception {
313314
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
314315
assertThat(this.config.listen3Exception).isNotNull();
315316

317+
template.send("annotated3x", 0, "foo");
318+
assertThat(this.listener.latch3x.await(60, TimeUnit.SECONDS)).isTrue();
319+
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
320+
316321
template.send("annotated4", 0, "foo");
317322
assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue();
318323
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
@@ -1840,6 +1845,8 @@ static class Listener implements ConsumerSeekAware {
18401845

18411846
final CountDownLatch latch3 = new CountDownLatch(1);
18421847

1848+
final CountDownLatch latch3x = new CountDownLatch(1);
1849+
18431850
final CountDownLatch latch4 = new CountDownLatch(1);
18441851

18451852
final CountDownLatch latch5 = new CountDownLatch(1);
@@ -2018,6 +2025,14 @@ public void listen3(ConsumerRecord<?, ?> record) {
20182025
this.latch3.countDown();
20192026
}
20202027

2028+
@KafkaListener(id = "partitionExpression", topicPartitions = @TopicPartition(topic = "${topicThree:annotated3x}",
2029+
partitions = "${zero:0}",
2030+
partitionOffsets = @PartitionOffset(partition = "#{'*'}", initialOffset = "0")))
2031+
public void listenPartitionSpelExpression(ConsumerRecord<?, ?> record) {
2032+
this.capturedRecord = record;
2033+
this.latch3x.countDown();
2034+
}
2035+
20212036
@KafkaListener(id = "#{'qux'}", topics = "annotated4",
20222037
containerFactory = "kafkaManualAckListenerContainerFactory", containerGroup = "qux#{'Group'}",
20232038
properties = {

0 commit comments

Comments
 (0)