Skip to content

Commit

Permalink
[improve][tiered storage] Reduce cpu usage when offloading the ledger (
Browse files Browse the repository at this point in the history
…#15063)

* [imporve][tiered storage] Reduce cpu usage when offloading the ledger
---

*Motivation*

When offloading a ledger, the BlockAwareSegmentInputStreamImpl will
wrap the ledger handler and make it can stream output. Then the JCloud
will read the stream as the payload and upload to the storage.
In the JCloud implementation, it read the stream with a buffer
https://github.com/apache/jclouds/blob/36f351cd18925d2bb27bf7ad2c5d75e555da377a/core/src/main/java/org/jclouds/io/ByteStreams2.java#L68

In the current offload implementation, the read will call multiple times
to construct the buffer and then return the data.
After implement the read(byte[] b, int off, int len), the cpu usage reduced
almost 10%.

*Modifications*

- Add read(byte[] b, int off, int len) implementation in the BlockAwareSegmentInputStreamImpl

(cherry picked from commit 938ab7b)
  • Loading branch information
zymap authored and mattisonchao committed Jun 7, 2022
1 parent 362c126 commit 12ee3f5
Show file tree
Hide file tree
Showing 2 changed files with 410 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
Expand All @@ -44,6 +46,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
private static final Logger log = LoggerFactory.getLogger(BlockAwareSegmentInputStreamImpl.class);

static final int[] BLOCK_END_PADDING = new int[]{ 0xFE, 0xDC, 0xDE, 0xAD };
static final byte[] BLOCK_END_PADDING_BYTES = Ints.toByteArray(0xFEDCDEAD);

private final ByteBuf paddingBuf = PulsarByteBufAllocator.DEFAULT.buffer(128, 128);

private final ReadHandle ledger;
private final long startEntryId;
Expand All @@ -65,6 +70,9 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre
static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */;
// Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content.
private List<ByteBuf> entriesByteBuf = null;
private int currentOffset = 0;
private final AtomicBoolean close = new AtomicBoolean(false);


public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) {
this.ledger = ledger;
Expand All @@ -76,6 +84,52 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in
this.entriesByteBuf = Lists.newLinkedList();
}

private ByteBuf readEntries(int len) throws IOException {
checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
checkState(bytesReadOffset < blockSize);

// once reach the end of entry buffer, read more, if there is more
if (bytesReadOffset < dataBlockFullOffset
&& entriesByteBuf.isEmpty()
&& startEntryId + blockEntryCount <= ledger.getLastAddConfirmed()) {
entriesByteBuf = readNextEntriesFromLedger(startEntryId + blockEntryCount, ENTRIES_PER_READ);
}

if (!entriesByteBuf.isEmpty()
&& bytesReadOffset + entriesByteBuf.get(0).readableBytes() <= blockSize) {
// always read from the first ByteBuf in the list, once read all of its content remove it.
ByteBuf entryByteBuf = entriesByteBuf.get(0);
int readableBytes = entryByteBuf.readableBytes();
int read = Math.min(readableBytes, len);
ByteBuf buf = entryByteBuf.slice(currentOffset, read);
buf.retain();
currentOffset += read;
entryByteBuf.readerIndex(currentOffset);
bytesReadOffset += read;

if (entryByteBuf.readableBytes() == 0) {
entryByteBuf.release();
entriesByteBuf.remove(0);
blockEntryCount++;
currentOffset = 0;
}

return buf;
} else {
// no space for a new entry or there are no more entries
// set data block full, return end padding
if (dataBlockFullOffset == blockSize) {
dataBlockFullOffset = bytesReadOffset;
}
paddingBuf.clear();
for (int i = 0; i < Math.min(len, paddingBuf.capacity()); i++) {
paddingBuf.writeByte(BLOCK_END_PADDING_BYTES[(bytesReadOffset++ - dataBlockFullOffset)
% BLOCK_END_PADDING_BYTES.length]);
}
return paddingBuf.retain();
}
}

// read ledger entries.
private int readEntries() throws IOException {
checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset());
Expand Down Expand Up @@ -143,6 +197,46 @@ private List<ByteBuf> readNextEntriesFromLedger(long start, long maxNumberEntrie
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException("The given bytes are null");
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", b.length=" + b.length);
} else if (len == 0) {
return 0;
}

int offset = off;
int readLen = len;
int readBytes = 0;
// reading header
if (dataBlockHeaderStream.available() > 0) {
int read = dataBlockHeaderStream.read(b, off, len);
offset += read;
readLen -= read;
readBytes += read;
bytesReadOffset += read;
}
if (readLen == 0) {
return readBytes;
}

// reading ledger entries
if (bytesReadOffset < blockSize) {
readLen = Math.min(readLen, blockSize - bytesReadOffset);
ByteBuf readEntries = readEntries(readLen);
int read = readEntries.readableBytes();
readEntries.readBytes(b, offset, read);
readEntries.release();
readBytes += read;
return readBytes;
}

// reached end
return -1;
}

@Override
public int read() throws IOException {
// reading header
Expand All @@ -162,11 +256,20 @@ public int read() throws IOException {

@Override
public void close() throws IOException {
super.close();
dataBlockHeaderStream.close();
if (!entriesByteBuf.isEmpty()) {
entriesByteBuf.forEach(buf -> buf.release());
entriesByteBuf.clear();
// The close method will be triggered twice in the BlobStoreManagedLedgerOffloader#offload method.
// The stream resource used by the try-with block which will called the close
// And through debug, writeBlobStore.uploadMultipartPart in the offload method also will trigger
// the close method.
// So we add the close variable to avoid release paddingBuf twice.
if (!close.compareAndSet(false, true)) {
super.close();
dataBlockHeaderStream.close();
if (!entriesByteBuf.isEmpty()) {
entriesByteBuf.forEach(buf -> buf.release());
entriesByteBuf.clear();
}
paddingBuf.clear();
paddingBuf.release();
}
}

Expand All @@ -185,6 +288,10 @@ public int getBlockSize() {
return blockSize;
}

public int getDataBlockFullOffset() {
return dataBlockFullOffset;
}

@Override
public int getBlockEntryCount() {
return blockEntryCount;
Expand Down
Loading

0 comments on commit 12ee3f5

Please sign in to comment.