Skip to content

Commit aee5bdc

Browse files
committed
Localstack for SqsMessageDrivenChannelAdapterTest
* Remove `AwsHeaders` which are now covered by the `SqsHeaders`
1 parent d003e1b commit aee5bdc

File tree

3 files changed

+46
-122
lines changed

3 files changed

+46
-122
lines changed

src/main/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
2323
import io.awspring.cloud.sqs.listener.MessageListener;
2424
import io.awspring.cloud.sqs.listener.SqsContainerOptions;
25+
import io.awspring.cloud.sqs.listener.SqsHeaders;
2526
import io.awspring.cloud.sqs.listener.SqsMessageListenerContainer;
2627
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2728

@@ -44,6 +45,7 @@
4445
* @see SqsMessageListenerContainerFactory
4546
* @see SqsMessageListenerContainerFactory
4647
* @see MessageListener
48+
* @see SqsHeaders
4749
*/
4850
@ManagedResource
4951
@IntegrationManagedResource
@@ -76,8 +78,7 @@ protected void onInit() {
7678
sqsContainerOptionsBuilder.fromBuilder(this.sqsContainerOptions.toBuilder()));
7779
}
7880
this.sqsMessageListenerContainerFactory.messageListener(new IntegrationMessageListener());
79-
SqsMessageListenerContainerFactory<?> containerFactory = this.sqsMessageListenerContainerFactory.build();
80-
this.listenerContainer = containerFactory.createContainer(this.queues);
81+
this.listenerContainer = this.sqsMessageListenerContainerFactory.build().createContainer(this.queues);
8182
}
8283

8384
@Override

src/main/java/org/springframework/integration/aws/support/AwsHeaders.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@ public abstract class AwsHeaders {
3030
*/
3131
public static final String QUEUE = PREFIX + "queue";
3232

33-
/**
34-
* The {@value RECEIVED_QUEUE} header for receiving data from SQS.
35-
*/
36-
public static final String RECEIVED_QUEUE = PREFIX + "receivedQueue";
37-
3833
/**
3934
* The {@value TOPIC} header for sending/receiving data over SNS.
4035
*/
@@ -45,16 +40,6 @@ public abstract class AwsHeaders {
4540
*/
4641
public static final String MESSAGE_ID = PREFIX + "messageId";
4742

48-
/**
49-
* The {@value RECEIPT_HANDLE} header for received SQS message.
50-
*/
51-
public static final String RECEIPT_HANDLE = PREFIX + "receiptHandle";
52-
53-
/**
54-
* The {@value ACKNOWLEDGMENT} header for received SQS message.
55-
*/
56-
public static final String ACKNOWLEDGMENT = PREFIX + "acknowledgment";
57-
5843
/**
5944
* The {@value NOTIFICATION_STATUS} header for SNS notification status.
6045
*/

src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java

Lines changed: 43 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -16,161 +16,99 @@
1616

1717
package org.springframework.integration.aws.inbound;
1818

19-
import java.util.concurrent.CompletableFuture;
19+
import java.util.Map;
2020

21-
import org.junit.jupiter.api.Disabled;
21+
import io.awspring.cloud.sqs.listener.SqsHeaders;
22+
import org.junit.jupiter.api.BeforeAll;
2223
import org.junit.jupiter.api.Test;
2324
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
24-
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
25-
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
26-
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
27-
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
28-
import software.amazon.awssdk.services.sqs.model.Message;
29-
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
30-
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
25+
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
26+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
3127

3228
import org.springframework.beans.factory.annotation.Autowired;
3329
import org.springframework.context.annotation.Bean;
3430
import org.springframework.context.annotation.Configuration;
35-
import org.springframework.integration.annotation.ServiceActivator;
36-
import org.springframework.integration.aws.support.AwsHeaders;
31+
import org.springframework.integration.aws.LocalstackContainerTest;
3732
import org.springframework.integration.channel.QueueChannel;
3833
import org.springframework.integration.config.EnableIntegration;
39-
import org.springframework.integration.config.ExpressionControlBusFactoryBean;
4034
import org.springframework.integration.core.MessageProducer;
41-
import org.springframework.integration.test.util.TestUtils;
42-
import org.springframework.messaging.MessageChannel;
4335
import org.springframework.messaging.PollableChannel;
44-
import org.springframework.messaging.support.GenericMessage;
4536
import org.springframework.test.annotation.DirtiesContext;
4637
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4738

4839
import static org.assertj.core.api.Assertions.assertThat;
49-
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
50-
import static org.mockito.ArgumentMatchers.any;
51-
import static org.mockito.BDDMockito.given;
52-
import static org.mockito.BDDMockito.mock;
5340

5441
/**
5542
* @author Artem Bilan
5643
*/
57-
@Disabled("Revise in favor of Local Stack")
5844
@SpringJUnitConfig
5945
@DirtiesContext
60-
public class SqsMessageDrivenChannelAdapterTests {
46+
public class SqsMessageDrivenChannelAdapterTests implements LocalstackContainerTest {
6147

62-
@Autowired
63-
private PollableChannel inputChannel;
48+
private static SqsAsyncClient AMAZON_SQS;
6449

65-
@Autowired
66-
private SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter;
50+
private static String testQueueUrl;
6751

6852
@Autowired
69-
private MessageChannel controlBusInput;
53+
private PollableChannel inputChannel;
7054

71-
@Autowired
72-
private PollableChannel controlBusOutput;
55+
@BeforeAll
56+
static void setup() {
57+
AMAZON_SQS = LocalstackContainerTest.sqsClient();
58+
testQueueUrl = AMAZON_SQS.createQueue(request -> request.queueName("testQueue")).join().queueUrl();
59+
}
7360

7461
@Test
7562
void testSqsMessageDrivenChannelAdapter() {
76-
assertThat(
77-
TestUtils.getPropertyValue(this.sqsMessageDrivenChannelAdapter, "listenerContainer.queueStopTimeout"))
78-
.isEqualTo(20000L);
79-
org.springframework.messaging.Message<?> receive = this.inputChannel.receive(1000);
80-
assertThat(receive).isNotNull();
81-
assertThat((String) receive.getPayload()).isIn("messageContent", "messageContent2");
82-
assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_QUEUE)).isEqualTo("testQueue");
83-
receive = this.inputChannel.receive(1000);
63+
Map<String, MessageAttributeValue> attributes =
64+
Map.of("someAttribute",
65+
MessageAttributeValue.builder()
66+
.stringValue("someValue")
67+
.dataType("String")
68+
.build());
69+
70+
AMAZON_SQS.sendMessageBatch(request ->
71+
request.queueUrl(testQueueUrl)
72+
.entries(SendMessageBatchRequestEntry.builder()
73+
.messageBody("messageContent")
74+
.id("messageContent_id")
75+
.messageAttributes(attributes)
76+
.build(),
77+
SendMessageBatchRequestEntry.builder()
78+
.messageBody("messageContent2")
79+
.id("messageContent2_id")
80+
.messageAttributes(attributes)
81+
.build()));
82+
83+
org.springframework.messaging.Message<?> receive = this.inputChannel.receive(10000);
8484
assertThat(receive).isNotNull();
8585
assertThat((String) receive.getPayload()).isIn("messageContent", "messageContent2");
86-
assertThat(receive.getHeaders().get(AwsHeaders.RECEIVED_QUEUE)).isEqualTo("testQueue");
87-
88-
try {
89-
this.controlBusInput.send(new GenericMessage<>("@sqsMessageDrivenChannelAdapter.stop('testQueue')"));
90-
}
91-
catch (Exception e) {
92-
// May fail with NPE. See
93-
// https://github.com/spring-cloud/spring-cloud-aws/issues/232
94-
}
95-
this.controlBusInput.send(new GenericMessage<>("@sqsMessageDrivenChannelAdapter.isRunning('testQueue')"));
96-
97-
receive = this.controlBusOutput.receive(1000);
98-
assertThat(receive).isNotNull();
99-
assertThat((Boolean) receive.getPayload()).isFalse();
100-
101-
this.controlBusInput.send(new GenericMessage<>("@sqsMessageDrivenChannelAdapter.start('testQueue')"));
102-
this.controlBusInput.send(new GenericMessage<>("@sqsMessageDrivenChannelAdapter.isRunning('testQueue')"));
86+
assertThat(receive.getHeaders().get(SqsHeaders.SQS_QUEUE_NAME_HEADER)).isEqualTo("testQueue");
87+
assertThat(receive.getHeaders().get("someAttribute")).isEqualTo("someValue");
10388

104-
receive = this.controlBusOutput.receive(1000);
89+
receive = this.inputChannel.receive(10000);
10590
assertThat(receive).isNotNull();
106-
assertThat((Boolean) receive.getPayload()).isTrue();
107-
108-
assertThatThrownBy(
109-
() -> this.controlBusInput.send(new GenericMessage<>("@sqsMessageDrivenChannelAdapter.start('foo')")))
110-
.hasCauseExactlyInstanceOf(IllegalArgumentException.class)
111-
.hasStackTraceContaining("Queue with name 'foo' does not exist");
112-
113-
assertThat(this.sqsMessageDrivenChannelAdapter.getQueues()).isEqualTo(new String[] {"testQueue"});
91+
assertThat((String) receive.getPayload()).isIn("messageContent", "messageContent2");
92+
assertThat(receive.getHeaders().get(SqsHeaders.SQS_QUEUE_NAME_HEADER)).isEqualTo("testQueue");
93+
assertThat(receive.getHeaders().get("someAttribute")).isEqualTo("someValue");
11494
}
11595

11696
@Configuration
11797
@EnableIntegration
11898
public static class ContextConfiguration {
11999

120-
@Bean
121-
public SqsAsyncClient amazonSqs() {
122-
SqsAsyncClient sqs = mock(SqsAsyncClient.class);
123-
given(sqs.getQueueUrl(GetQueueUrlRequest.builder().queueName("testQueue").build()))
124-
.willReturn(CompletableFuture.completedFuture(
125-
GetQueueUrlResponse.builder().queueUrl("http://testQueue.amazonaws.com").build()));
126-
127-
given(sqs.receiveMessage(
128-
ReceiveMessageRequest.builder()
129-
.queueUrl("http://testQueue.amazonaws.com")
130-
.maxNumberOfMessages(10)
131-
.attributeNamesWithStrings("All")
132-
.messageAttributeNames("All")
133-
.waitTimeSeconds(20)
134-
.build()))
135-
.willReturn(
136-
CompletableFuture.completedFuture(
137-
ReceiveMessageResponse.builder()
138-
.messages(Message.builder().body("messageContent").build(),
139-
Message.builder().body("messageContent2").build())
140-
.build()))
141-
.willReturn(CompletableFuture.completedFuture(ReceiveMessageResponse.builder().build()));
142-
143-
given(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
144-
.willReturn(CompletableFuture.completedFuture(GetQueueAttributesResponse.builder().build()));
145-
146-
return sqs;
147-
}
148-
149100
@Bean
150101
public PollableChannel inputChannel() {
151102
return new QueueChannel();
152103
}
153104

154105
@Bean
155106
public MessageProducer sqsMessageDrivenChannelAdapter() {
156-
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs(), "testQueue");
107+
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(AMAZON_SQS, "testQueue");
157108
adapter.setOutputChannel(inputChannel());
158109
return adapter;
159110
}
160111

161-
@Bean
162-
@ServiceActivator(inputChannel = "controlBusInput")
163-
public ExpressionControlBusFactoryBean controlBus() {
164-
ExpressionControlBusFactoryBean controlBusFactoryBean = new ExpressionControlBusFactoryBean();
165-
controlBusFactoryBean.setOutputChannel(controlBusOutput());
166-
return controlBusFactoryBean;
167-
}
168-
169-
@Bean
170-
public PollableChannel controlBusOutput() {
171-
return new QueueChannel();
172-
}
173-
174112
}
175113

176114
}

0 commit comments

Comments
 (0)