Skip to content

Commit

Permalink
fix(sqs) fix client error for sqs tasks (#SQS)
Browse files Browse the repository at this point in the history
This commit re-enables tests for SQS tasks

Fix: #514
  • Loading branch information
fhussonnois committed Aug 8, 2024
1 parent 9bc25f8 commit 13d176e
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 33 deletions.
18 changes: 6 additions & 12 deletions src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.retries.internal.BaseRetryStrategy;
import software.amazon.awssdk.retries.internal.DefaultStandardRetryStrategy;
import software.amazon.awssdk.awscore.retry.AwsRetryStrategy;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.SqsClient;
Expand Down Expand Up @@ -53,14 +50,11 @@ protected SqsAsyncClient asyncClient(final RunContext runContext,
);

clientBuilder = clientBuilder.overrideConfiguration(builder ->
builder.retryStrategy(DefaultStandardRetryStrategy
.builder()
.maxAttempts(retryMaxAttempts)
.backoffStrategy(BackoffStrategy.exponentialDelay(
RETRY_STRATEGY_BACKOFF_BASE_DELAY,
RETRY_STRATEGY_BACKOFF_MAX_DELAY
))
.build()
builder.retryStrategy(
AwsRetryStrategy.standardRetryStrategy()
.toBuilder()
.maxAttempts(retryMaxAttempts)
.build()
)
);
return clientBuilder.build();
Expand Down
25 changes: 20 additions & 5 deletions src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import io.kestra.plugin.aws.AbstractLocalStackTest;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.BeforeEach;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;

Expand All @@ -20,13 +24,24 @@ public class AbstractSqsTest extends AbstractLocalStackTest {
@Inject
protected StorageInterface storageInterface;

@BeforeEach
void beforeEach() {
try(SqsClient sqsClient = SqsClient
.builder()
.endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS))
.region(Region.of(localstack.getRegion()))
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())
))
.build()) {
if (!sqsClient.listQueues().queueUrls().contains(queueUrl())) {
sqsClient.createQueue(CreateQueueRequest.builder().queueName("test-queue").build());
}
}
}

String queueUrl() {
return localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString() + "/000000000000/test-queue";
}

void createQueue(SqsClient client) {
if (!client.listQueues().queueUrls().contains(queueUrl())) {
client.createQueue(CreateQueueRequest.builder().queueName("test-queue").build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.kestra.plugin.aws.sqs.model.Message;
import io.kestra.plugin.aws.sqs.model.SerdeType;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;

Expand All @@ -11,7 +10,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@Disabled("Issue with LocalStack, see https://github.com/localstack/localstack/issues/8267")
class PublishThenConsumeTest extends AbstractSqsTest {
@Test
void runText() throws Exception {
Expand All @@ -31,9 +29,6 @@ void runText() throws Exception {
)
.build();

var client = publish.client(runContext);
createQueue(client);

var publishOutput = publish.run(runContext);
assertThat(publishOutput.getMessagesCount(), is(2));

Expand Down Expand Up @@ -70,9 +65,6 @@ void runJson() throws Exception {
)
.build();

var client = publish.client(runContext);
createQueue(client);

var publishOutput = publish.run(runContext);
assertThat(publishOutput.getMessagesCount(), is(2));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
import reactor.core.publisher.Flux;
Expand All @@ -28,7 +27,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@Disabled("Issue with LocalStack, see https://github.com/localstack/localstack/issues/8267")
class RealtimeTriggerTest extends AbstractSqsTest {
@Inject
private ApplicationContext applicationContext;
Expand Down Expand Up @@ -85,8 +83,6 @@ void flow() throws Exception {
.build();

var runContext = runContextFactory.of();
var client = task.client(runContext);
createQueue(client);

task.run(runContext);

Expand Down
4 changes: 0 additions & 4 deletions src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.localstack.LocalStackContainer;
import reactor.core.publisher.Flux;
Expand All @@ -29,7 +28,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@Disabled("Issue with LocalStack, see https://github.com/localstack/localstack/issues/8267")
class TriggerTest extends AbstractSqsTest {
@Inject
private ApplicationContext applicationContext;
Expand Down Expand Up @@ -88,8 +86,6 @@ void flow() throws Exception {
.build();

var runContext = runContextFactory.of();
var client = task.client(runContext);
createQueue(client);

task.run(runContext);

Expand Down

0 comments on commit 13d176e

Please sign in to comment.