From 2a0af99359a61ce839cee76b7c6849bc86827d32 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 2 Jul 2024 08:46:56 +0800 Subject: [PATCH] [fix][broker] Fix broker OOM when upload a large package. (#22989) (cherry picked from commit da2a1910a32e622ea609ff7b9e91711ecaf36de6) (cherry picked from commit 311b2a771969b17d012ee6da7dfb565459794125) --- .../storage/bookkeeper/DLOutputStream.java | 53 +++++++++---------- .../bookkeeper/DLOutputStreamTest.java | 14 ++--- 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java index 222987aa49d43..67345ebd47e31 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java @@ -22,8 +22,6 @@ import io.netty.buffer.Unpooled; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.LogRecord; @@ -38,6 +36,7 @@ class DLOutputStream { private final DistributedLogManager distributedLogManager; private final AsyncLogWriter writer; + private final byte[] readBuffer = new byte[8192]; private long offset = 0L; private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) { @@ -50,42 +49,38 @@ static CompletableFuture openWriterAsync(DistributedLogManager d return distributedLogManager.openAsyncLogWriter().thenApply(w -> new DLOutputStream(distributedLogManager, w)); } - private CompletableFuture> getRecords(InputStream inputStream) { - CompletableFuture> future = new CompletableFuture<>(); - CompletableFuture.runAsync(() -> { - byte[] readBuffer = new byte[8192]; - List records = new ArrayList<>(); - try { - int read = 0; - while ((read = inputStream.read(readBuffer)) != -1) { - log.info("write something into the ledgers offset: {}, length: {}", offset, read); - ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0, read); - offset += writeBuf.readableBytes(); - LogRecord record = new LogRecord(offset, writeBuf); - records.add(record); - } - future.complete(records); - } catch (IOException e) { - log.error("Failed to get all records from the input stream", e); - future.completeExceptionally(e); + private void writeAsyncHelper(InputStream is, CompletableFuture result) { + try { + int read = is.read(readBuffer); + if (read != -1) { + log.info("write something into the ledgers offset: {}, length: {}", offset, read); + final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read); + offset += writeBuf.readableBytes(); + final LogRecord record = new LogRecord(offset, writeBuf); + writer.write(record).thenAccept(v -> writeAsyncHelper(is, result)) + .exceptionally(e -> { + result.completeExceptionally(e); + return null; + }); + } else { + result.complete(this); } - }); - return future; + } catch (IOException e) { + log.error("Failed to get all records from the input stream", e); + result.completeExceptionally(e); + } } /** * Write all input stream data to the distribute log. * * @param inputStream the data we need to write - * @return + * @return CompletableFuture */ CompletableFuture writeAsync(InputStream inputStream) { - return getRecords(inputStream) - .thenCompose(this::writeAsync); - } - - private CompletableFuture writeAsync(List records) { - return writer.writeBulk(records).thenApply(ignore -> this); + CompletableFuture result = new CompletableFuture<>(); + writeAsyncHelper(inputStream, result); + return result; } /** diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java index 63fcf5e46ebe1..b55e0e0d34a4f 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java @@ -21,17 +21,18 @@ import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.api.AsyncLogWriter; import org.apache.distributedlog.api.DistributedLogManager; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.anyList; @@ -53,9 +54,8 @@ public void setup() { when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null)); when(writer.markEndOfStream()).thenReturn(CompletableFuture.completedFuture(null)); when(writer.asyncClose()).thenReturn(CompletableFuture.completedFuture(null)); - when(writer.writeBulk(anyList())) - .thenReturn(CompletableFuture.completedFuture( - Collections.singletonList(CompletableFuture.completedFuture(DLSN.InitialDLSN)))); + when(writer.write(any(LogRecord.class))) + .thenReturn(CompletableFuture.completedFuture(DLSN.InitialDLSN)); } @AfterMethod(alwaysRun = true) @@ -75,7 +75,7 @@ public void writeInputStreamData() throws ExecutionException, InterruptedExcepti .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)) .thenCompose(DLOutputStream::closeAsync)).get(); - verify(writer, times(1)).writeBulk(anyList()); + verify(writer, times(1)).write(any(LogRecord.class)); verify(writer, times(1)).markEndOfStream(); verify(writer, times(1)).asyncClose(); verify(dlm, times(1)).asyncClose(); @@ -91,7 +91,7 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)) .thenCompose(DLOutputStream::closeAsync)).get(); - verify(writer, times(1)).writeBulk(anyList()); + verify(writer, times(1)).write(any(LogRecord.class)); verify(writer, times(1)).markEndOfStream(); verify(writer, times(1)).asyncClose(); verify(dlm, times(1)).asyncClose(); @@ -104,7 +104,7 @@ public void writeLongBytesArrayData() throws ExecutionException, InterruptedExce .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)) .thenCompose(DLOutputStream::closeAsync)).get(); - verify(writer, times(1)).writeBulk(anyList()); + verify(writer, times(4)).write(any(LogRecord.class)); verify(writer, times(1)).markEndOfStream(); verify(writer, times(1)).asyncClose(); verify(dlm, times(1)).asyncClose();