Skip to content

Commit

Permalink
fix(consume): In Trigger, do not wait for incoming message
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye committed Apr 16, 2024
1 parent dc213a1 commit e568bef
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
13 changes: 8 additions & 5 deletions src/main/java/io/kestra/plugin/aws/sqs/Consume.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.aws.sqs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -9,6 +10,7 @@
import io.kestra.core.serializers.FileSerde;
import io.kestra.plugin.aws.sqs.model.SerdeType;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
Expand All @@ -21,8 +23,6 @@
import java.time.ZonedDateTime;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.validation.constraints.NotNull;

import static io.kestra.core.utils.Rethrow.throwConsumer;

@SuperBuilder
Expand Down Expand Up @@ -62,6 +62,9 @@ public class Consume extends AbstractSqs implements RunnableTask<Consume.Output>
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;

@JsonIgnore
protected Boolean isTrigger = false;


@SuppressWarnings("BusyWait")
@Override
Expand All @@ -77,7 +80,7 @@ public Output run(RunContext runContext) throws Exception {
var tempFile = runContext.tempFile(".ion").toFile();

try (var outputFile = new BufferedOutputStream(new FileOutputStream(tempFile))) {
while (!this.ended(total, started)) {
do {
// TODO if we have a maxNumber we can pass the number to avoid too many network calls
var receiveRequest = ReceiveMessageRequest.builder().queueUrl(queueUrl).build();
var msg = sqsClient.receiveMessage(receiveRequest);
Expand All @@ -91,7 +94,7 @@ public Output run(RunContext runContext) throws Exception {
}));

Thread.sleep(100);
}
} while (!this.ended(total, started));

runContext.metric(Counter.of("records", total.get(), "queue", queueUrl));
outputFile.flush();
Expand All @@ -113,7 +116,7 @@ private boolean ended(AtomicInteger count, ZonedDateTime start) {
return true;
}

return false;
return count.get() == 0 && isTrigger;
}

@Builder
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/kestra/plugin/aws/sqs/Trigger.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.aws.sqs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand Down Expand Up @@ -71,6 +72,9 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;

@JsonIgnore
protected Boolean isTrigger = false;

// Configuration for AWS STS AssumeRole
protected String stsRoleArn;
protected String stsRoleExternalId;
Expand Down Expand Up @@ -99,6 +103,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.stsRoleExternalId(this.stsRoleExternalId)
.stsRoleSessionDuration(this.stsRoleSessionDuration)
.stsEndpointOverride(this.stsEndpointOverride)
.isTrigger(true)
.build();

Consume.Output run = task.run(runContext);
Expand Down

0 comments on commit e568bef

Please sign in to comment.