Skip to content

Commit 3771e2a

Browse files
garyrussellartembilan
authored andcommitted
Fix @sendto Runtime SpEL Detection
Previously, `setReplyTopic` detected runtime SpEL by checking if the expression starts with `!{`. Since we use a `TemplateParserContext` the topic can be evaluated even if the `!{` is not at the beginning. This is not a breaking change since `!` and `{` are not valid topic characters.
1 parent ac323ec commit 3771e2a

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -158,7 +158,7 @@ protected boolean isConsumerRecordList() {
158158
* @since 2.0
159159
*/
160160
public void setReplyTopic(String replyTopic) {
161-
if (replyTopic.startsWith(PARSER_CONTEXT.getExpressionPrefix())) {
161+
if (replyTopic.contains(PARSER_CONTEXT.getExpressionPrefix())) {
162162
this.replyTopicExpression = PARSER.parseExpression(replyTopic, PARSER_CONTEXT);
163163
}
164164
else {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -477,10 +477,10 @@ public void testReplyingListener() throws Exception {
477477
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
478478
Consumer<Integer, String> consumer = cf.createConsumer();
479479
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "annotated21reply");
480-
template.send("annotated21", 0, "annotated21reply");
480+
template.send("annotated21", 0, "nnotated21reply"); // drop the leading 'a'.
481481
template.flush();
482482
ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated21reply");
483-
assertThat(reply.value()).isEqualTo("ANNOTATED21REPLY");
483+
assertThat(reply.value()).isEqualTo("NNOTATED21REPLY");
484484
consumer.close();
485485
}
486486

@@ -1019,7 +1019,7 @@ public KafkaListenerErrorHandler voidSendToErrorHandler() {
10191019

10201020
private Object listen16Message;
10211021

1022-
private CountDownLatch listen16ErrorLatch = new CountDownLatch(1);
1022+
private final CountDownLatch listen16ErrorLatch = new CountDownLatch(1);
10231023

10241024
@Bean
10251025
public ConsumerAwareErrorHandler listen16ErrorHandler() {
@@ -1290,13 +1290,13 @@ public String errorHandler(String data) throws Exception {
12901290
}
12911291

12921292
@KafkaListener(id = "replyingListener", topics = "annotated21")
1293-
@SendTo("!{request.value()}") // runtime SpEL - test payload is the reply queue
1293+
@SendTo("a!{request.value()}") // runtime SpEL - test payload is the reply queue minus leading 'a'
12941294
public String replyingListener(String in) {
12951295
return in.toUpperCase();
12961296
}
12971297

12981298
@KafkaListener(id = "replyingBatchListener", topics = "annotated22", containerFactory = "batchFactory")
1299-
@SendTo("#{'annotated22reply'}") // config time SpEL
1299+
@SendTo("a#{'nnotated22reply'}") // config time SpEL
13001300
public Collection<String> replyingBatchListener(List<String> in) {
13011301
return in.stream().map(String::toUpperCase).collect(Collectors.toList());
13021302
}

0 commit comments

Comments
 (0)