Skip to content

Commit b5f6c39

Browse files
siddharthjain210artembilan
authored andcommitted
GH-249 Add properties for handling backpressure in KplMessageHandler
Fixes: #249 * Simplification of back-pressure handling, introduced `KplBackpressureException`. * Javadoc corrections. * Code Review comments addressed. Added Test cases with RetryAdvice. * Added Javadoc for `KplBackpressureException` in the `KplMessageHandler` class level. * Javadoc related code review actions fixed. * Updated Copyright and Javadoc related comments. * Revert in `AbstractAwsMessageHandler`.
1 parent 510fe5b commit b5f6c39

File tree

3 files changed

+267
-4
lines changed

3 files changed

+267
-4
lines changed

src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@
4949
import org.springframework.expression.Expression;
5050
import org.springframework.expression.common.LiteralExpression;
5151
import org.springframework.integration.aws.support.AwsHeaders;
52+
import org.springframework.integration.aws.support.KplBackpressureException;
5253
import org.springframework.integration.aws.support.UserRecordResponse;
5354
import org.springframework.integration.expression.ValueExpression;
5455
import org.springframework.integration.handler.AbstractMessageHandler;
@@ -63,11 +64,15 @@
6364
import org.springframework.util.StringUtils;
6465

6566
/**
66-
* The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer
67-
* Library {@code putRecord(s)}.
67+
* The {@link AbstractMessageHandler} implementation for the Amazon Kinesis Producer Library {@code putRecord(s)}.
68+
* <p>
69+
* The {@link KplBackpressureException} is thrown when backpressure handling is enabled and buffer is at max capacity.
70+
* This exception can be handled with {@link org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice}.
71+
* </p>
6872
*
6973
* @author Arnaud Lecollaire
7074
* @author Artem Bilan
75+
* @author Siddharth Jain
7176
*
7277
* @since 2.2
7378
*
@@ -99,6 +104,8 @@ public class KplMessageHandler extends AbstractAwsMessageHandler<Void> implement
99104

100105
private volatile ScheduledFuture<?> flushFuture;
101106

107+
private long backPressureThreshold = 0;
108+
102109
public KplMessageHandler(KinesisProducer kinesisProducer) {
103110
Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null.");
104111
this.kinesisProducer = kinesisProducer;
@@ -115,6 +122,19 @@ public void setConverter(Converter<Object, byte[]> converter) {
115122
setMessageConverter(new ConvertingFromMessageConverter(converter));
116123
}
117124

125+
/**
126+
* Configure maximum records in flight for handling backpressure.
127+
* By default, backpressure handling is not enabled.
128+
* When backpressure handling is enabled and number of records in flight exceeds the threshold, a
129+
* {@link KplBackpressureException} would be thrown.
130+
* @param backPressureThreshold a value greater than {@code 0} to enable backpressure handling.
131+
* @since 3.0.9
132+
*/
133+
public void setBackPressureThreshold(long backPressureThreshold) {
134+
Assert.isTrue(backPressureThreshold >= 0, "'backPressureThreshold must be greater than or equal to 0.");
135+
this.backPressureThreshold = backPressureThreshold;
136+
}
137+
118138
/**
119139
* Configure a {@link MessageConverter} for converting payload to {@code byte[]} for Kinesis record.
120140
* @param messageConverter the {@link MessageConverter} to use.
@@ -368,6 +388,14 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message<?>
368388
}
369389

370390
private CompletableFuture<UserRecordResponse> handleUserRecord(UserRecord userRecord) {
391+
if (this.backPressureThreshold > 0) {
392+
var numberOfRecordsInFlight = this.kinesisProducer.getOutstandingRecordsCount();
393+
if (numberOfRecordsInFlight > this.backPressureThreshold) {
394+
throw new KplBackpressureException("Cannot send record to Kinesis since buffer is at max capacity.",
395+
userRecord);
396+
}
397+
}
398+
371399
ListenableFuture<UserRecordResult> recordResult = this.kinesisProducer.addUserRecord(userRecord);
372400
return listenableFutureToCompletableFuture(recordResult)
373401
.thenApply(UserRecordResponse::new);
@@ -403,7 +431,8 @@ private PutRecordRequest buildPutRecordRequest(Message<?> message) {
403431
if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) {
404432
partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class);
405433
}
406-
Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. "
434+
Assert.state(partitionKey != null,
435+
"'partitionKey' must not be null for sending a Kinesis record."
407436
+ "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " +
408437
"or supply an 'aws_partitionKey' message header.");
409438

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aws.support;
18+
19+
import java.io.Serial;
20+
21+
import com.amazonaws.services.kinesis.producer.UserRecord;
22+
23+
/**
24+
* An exception triggered from the {@link org.springframework.integration.aws.outbound.KplMessageHandler}
25+
* while sending records to Kinesis when maximum number of records in flight exceeds the backpressure threshold.
26+
*
27+
* @author Siddharth Jain
28+
* @author Artem Bilan
29+
*
30+
* @since 3.0.9
31+
*/
32+
public class KplBackpressureException extends RuntimeException {
33+
34+
@Serial
35+
private static final long serialVersionUID = 1L;
36+
37+
private final UserRecord userRecord;
38+
39+
public KplBackpressureException(String message, UserRecord userRecord) {
40+
super(message);
41+
this.userRecord = userRecord;
42+
}
43+
44+
/**
45+
* Get the {@link UserRecord} when this exception has been thrown.
46+
* @return the {@link UserRecord} when this exception has been thrown.
47+
*/
48+
public UserRecord getUserRecord() {
49+
return this.userRecord;
50+
}
51+
52+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2019-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aws.outbound;
18+
19+
import com.amazonaws.services.kinesis.producer.KinesisProducer;
20+
import com.amazonaws.services.kinesis.producer.UserRecord;
21+
import org.junit.jupiter.api.AfterEach;
22+
import org.junit.jupiter.api.Test;
23+
import org.mockito.ArgumentCaptor;
24+
import org.mockito.Mockito;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.integration.annotation.ServiceActivator;
30+
import org.springframework.integration.aws.support.AwsHeaders;
31+
import org.springframework.integration.aws.support.KplBackpressureException;
32+
import org.springframework.integration.config.EnableIntegration;
33+
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
34+
import org.springframework.messaging.Message;
35+
import org.springframework.messaging.MessageChannel;
36+
import org.springframework.messaging.MessageHandler;
37+
import org.springframework.messaging.MessageHandlingException;
38+
import org.springframework.messaging.support.MessageBuilder;
39+
import org.springframework.retry.support.RetryTemplate;
40+
import org.springframework.test.annotation.DirtiesContext;
41+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
42+
43+
import static org.assertj.core.api.Assertions.assertThat;
44+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
45+
import static org.mockito.ArgumentMatchers.any;
46+
import static org.mockito.BDDMockito.given;
47+
import static org.mockito.Mockito.clearInvocations;
48+
import static org.mockito.Mockito.mock;
49+
import static org.mockito.Mockito.verify;
50+
51+
/** The class contains test cases for KplMessageHandler.
52+
*
53+
* @author Siddharth Jain
54+
*
55+
* @since 3.0.9
56+
*/
57+
@SpringJUnitConfig
58+
@DirtiesContext
59+
public class KplMessageHandlerTests {
60+
61+
@Autowired
62+
protected KinesisProducer kinesisProducer;
63+
64+
@Autowired
65+
protected MessageChannel kinesisSendChannel;
66+
67+
@Autowired
68+
protected KplMessageHandler kplMessageHandler;
69+
70+
@Test
71+
@SuppressWarnings("unchecked")
72+
void kplMessageHandlerWithRawPayloadBackpressureDisabledSuccess() {
73+
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
74+
.willReturn(mock());
75+
final Message<?> message = MessageBuilder
76+
.withPayload("someMessage")
77+
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
78+
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
79+
.setHeader("someHeaderKey", "someHeaderValue")
80+
.build();
81+
82+
ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
83+
.forClass(UserRecord.class);
84+
this.kplMessageHandler.setBackPressureThreshold(0);
85+
this.kinesisSendChannel.send(message);
86+
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
87+
verify(this.kinesisProducer, Mockito.never()).getOutstandingRecordsCount();
88+
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
89+
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
90+
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
91+
assertThat(userRecord.getExplicitHashKey()).isNull();
92+
}
93+
94+
@Test
95+
@SuppressWarnings("unchecked")
96+
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityAvailable() {
97+
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
98+
.willReturn(mock());
99+
this.kplMessageHandler.setBackPressureThreshold(2);
100+
given(this.kinesisProducer.getOutstandingRecordsCount())
101+
.willReturn(1);
102+
final Message<?> message = MessageBuilder
103+
.withPayload("someMessage")
104+
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
105+
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
106+
.setHeader("someHeaderKey", "someHeaderValue")
107+
.build();
108+
109+
ArgumentCaptor<UserRecord> userRecordRequestArgumentCaptor = ArgumentCaptor
110+
.forClass(UserRecord.class);
111+
112+
this.kinesisSendChannel.send(message);
113+
verify(this.kinesisProducer).addUserRecord(userRecordRequestArgumentCaptor.capture());
114+
verify(this.kinesisProducer).getOutstandingRecordsCount();
115+
UserRecord userRecord = userRecordRequestArgumentCaptor.getValue();
116+
assertThat(userRecord.getStreamName()).isEqualTo("someStream");
117+
assertThat(userRecord.getPartitionKey()).isEqualTo("somePartitionKey");
118+
assertThat(userRecord.getExplicitHashKey()).isNull();
119+
}
120+
121+
@Test
122+
@SuppressWarnings("unchecked")
123+
void kplMessageHandlerWithRawPayloadBackpressureEnabledCapacityInsufficient() {
124+
given(this.kinesisProducer.addUserRecord(any(UserRecord.class)))
125+
.willReturn(mock());
126+
this.kplMessageHandler.setBackPressureThreshold(2);
127+
given(this.kinesisProducer.getOutstandingRecordsCount())
128+
.willReturn(5);
129+
final Message<?> message = MessageBuilder
130+
.withPayload("someMessage")
131+
.setHeader(AwsHeaders.PARTITION_KEY, "somePartitionKey")
132+
.setHeader(AwsHeaders.SEQUENCE_NUMBER, "10")
133+
.setHeader("someHeaderKey", "someHeaderValue")
134+
.build();
135+
136+
assertThatExceptionOfType(RuntimeException.class)
137+
.isThrownBy(() -> this.kinesisSendChannel.send(message))
138+
.withCauseInstanceOf(MessageHandlingException.class)
139+
.withRootCauseExactlyInstanceOf(KplBackpressureException.class)
140+
.withStackTraceContaining("Cannot send record to Kinesis since buffer is at max capacity.");
141+
142+
verify(this.kinesisProducer, Mockito.never()).addUserRecord(any(UserRecord.class));
143+
verify(this.kinesisProducer).getOutstandingRecordsCount();
144+
}
145+
146+
@AfterEach
147+
public void tearDown() {
148+
clearInvocations(this.kinesisProducer);
149+
}
150+
151+
@Configuration
152+
@EnableIntegration
153+
public static class ContextConfiguration {
154+
155+
@Bean
156+
public KinesisProducer kinesisProducer() {
157+
return mock();
158+
}
159+
160+
@Bean
161+
public RequestHandlerRetryAdvice retryAdvice() {
162+
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
163+
requestHandlerRetryAdvice.setRetryTemplate(RetryTemplate.builder()
164+
.retryOn(KplBackpressureException.class)
165+
.exponentialBackoff(100, 2.0, 1000)
166+
.maxAttempts(3)
167+
.build());
168+
return requestHandlerRetryAdvice;
169+
}
170+
171+
@Bean
172+
@ServiceActivator(inputChannel = "kinesisSendChannel", adviceChain = "retryAdvice")
173+
public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) {
174+
KplMessageHandler kplMessageHandler = new KplMessageHandler(kinesisProducer);
175+
kplMessageHandler.setAsync(true);
176+
kplMessageHandler.setStream("someStream");
177+
return kplMessageHandler;
178+
}
179+
180+
}
181+
182+
}

0 commit comments

Comments
 (0)