diff --git a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java index 0b1a1579..df66275f 100644 --- a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java +++ b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcBatch.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong; import jakarta.validation.constraints.NotNull; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -89,11 +88,11 @@ public Output run(RunContext runContext) throws Exception { try ( Connection connection = this.connection(runContext); - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from))) + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE) ) { connection.setAutoCommit(false); - Flux flowable = Flux.create(FileSerde.reader(bufferedReader), FluxSink.OverflowStrategy.BUFFER) + Flux flowable = FileSerde.readAll(bufferedReader) .doOnNext(docWriteRequest -> { count.incrementAndGet(); }) diff --git a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcQuery.java b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcQuery.java index a64cc443..56b0e808 100644 --- a/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcQuery.java +++ b/plugin-jdbc/src/main/java/io/kestra/plugin/jdbc/AbstractJdbcQuery.java @@ -5,6 +5,7 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.tasks.Task; import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.FileSerde; import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.utils.Rethrow; import io.swagger.v3.oas.annotations.media.Schema; @@ -105,10 +106,9 @@ public AbstractJdbcQuery.Output run(RunContext runContext) throws Exception { } else if (this.store) { File tempFile = runContext.workingDir().createTempFile(".ion").toFile(); - BufferedWriter fileWriter = new BufferedWriter(new FileWriter(tempFile)); - size = fetchToFile(stmt, rs, fileWriter, cellConverter, conn); - fileWriter.flush(); - fileWriter.close(); + try (BufferedWriter fileWriter = new BufferedWriter(new FileWriter(tempFile),FileSerde.BUFFER_SIZE)) { + size = fetchToFile(stmt, rs, fileWriter, cellConverter, conn); + } output .uri(runContext.storage().putFile(tempFile)) .size(size);