Skip to content

Commit

Permalink
chore: change input from internal storage to be ion
Browse files Browse the repository at this point in the history
closes #254
  • Loading branch information
Martin GUIBERT committed Sep 25, 2023
1 parent f5d4bb0 commit 569677d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
6 changes: 4 additions & 2 deletions src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.aws.AbstractConnection;
import io.kestra.plugin.aws.eventbridge.model.Entry;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -177,8 +179,8 @@ private List<Entry> readEntryList(RunContext runContext, Object entries) throws
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entry.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
return MAPPER.readValue(inputStream, new TypeReference<List<Entry>>() {
});
return Flowable.create(FileSerde.reader(inputStream, Entry.class), BackpressureStrategy.BUFFER)
.toList().blockingGet();
}
} else if (entries instanceof List) {
return MAPPER.convertValue(entries, new TypeReference<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;

import static io.kestra.core.utils.Rethrow.throwConsumer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

Expand Down Expand Up @@ -162,9 +163,9 @@ void runStorage() throws Exception {
))
.build();

File tempFile = runContext.tempFile(".json").toFile();
File tempFile = runContext.tempFile(".ion").toFile();
try (var stream = new FileOutputStream(tempFile)) {
MAPPER.writeValue(stream, List.of(entry, entry2, entry3));
List.of(entry, entry2, entry3).forEach(throwConsumer(e -> FileSerde.write(stream, e)));
}

var put = PutEvents.builder()
Expand Down

0 comments on commit 569677d

Please sign in to comment.