Skip to content

Commit

Permalink
feat: use the new FileSerde.readAll() method that improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Aug 12, 2024
1 parent dac17d4 commit aa6ca11
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.atomic.AtomicLong;
import jakarta.validation.constraints.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import static io.kestra.core.utils.Rethrow.throwFunction;

Expand Down Expand Up @@ -89,11 +88,11 @@ public Output run(RunContext runContext) throws Exception {

try (
Connection connection = this.connection(runContext);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)
) {
connection.setAutoCommit(false);

Flux<Integer> flowable = Flux.create(FileSerde.reader(bufferedReader), FluxSink.OverflowStrategy.BUFFER)
Flux<Integer> flowable = FileSerde.readAll(bufferedReader)
.doOnNext(docWriteRequest -> {
count.incrementAndGet();
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.Rethrow;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -105,10 +106,9 @@ public AbstractJdbcQuery.Output run(RunContext runContext) throws Exception {

} else if (this.store) {
File tempFile = runContext.workingDir().createTempFile(".ion").toFile();
BufferedWriter fileWriter = new BufferedWriter(new FileWriter(tempFile));
size = fetchToFile(stmt, rs, fileWriter, cellConverter, conn);
fileWriter.flush();
fileWriter.close();
try (BufferedWriter fileWriter = new BufferedWriter(new FileWriter(tempFile),FileSerde.BUFFER_SIZE)) {
size = fetchToFile(stmt, rs, fileWriter, cellConverter, conn);
}
output
.uri(runContext.storage().putFile(tempFile))
.size(size);
Expand Down

0 comments on commit aa6ca11

Please sign in to comment.