diff --git a/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java b/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java index dc6065c7..4a79448f 100644 --- a/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java +++ b/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java @@ -16,6 +16,8 @@ import io.kestra.core.serializers.JacksonMapper; import io.kestra.plugin.aws.AbstractConnection; import io.kestra.plugin.aws.eventbridge.model.Entry; +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; @@ -177,8 +179,8 @@ private List readEntryList(RunContext runContext, Object entries) throws throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry."); } try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) { - return MAPPER.readValue(inputStream, new TypeReference>() { - }); + return Flowable.create(FileSerde.reader(inputStream, Entry.class), BackpressureStrategy.BUFFER) + .toList().blockingGet(); } } else if (entries instanceof List) { return MAPPER.convertValue(entries, new TypeReference<>() { diff --git a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java index af3ef0f4..ac1d35a6 100644 --- a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java +++ b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; +import static io.kestra.core.utils.Rethrow.throwConsumer; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; @@ -162,9 +163,9 @@ void runStorage() throws Exception { )) .build(); - File tempFile = runContext.tempFile(".json").toFile(); + File tempFile = runContext.tempFile(".ion").toFile(); try (var stream = new FileOutputStream(tempFile)) { - MAPPER.writeValue(stream, List.of(entry, entry2, entry3)); + List.of(entry, entry2, entry3).forEach(throwConsumer(e -> FileSerde.write(stream, e))); } var put = PutEvents.builder()