Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Use pooled direct memory allocator when decoding Pulsar entry to Kafk…
Browse files Browse the repository at this point in the history
…a records (#673)

### Motivation
When a Pulsar entry is decoded to Kafka record in `ByteBufUtils#decodePulsarEntryToKafkaRecords`, a NIO buffer whose initial capacity is 1 MB will be allocated from heap memory. Therefore, each time an entry is read, 1 MB heap memory will be allocated. Then the heap memory will increase very quickly and GC will happen frequently.

Kafka `MemoryRecordsBuilder` uses its underlying `ByteBufferOutputStream` field as the internal buffer whose capacity can be increased in `write` method. Even if a direct buffer was allocated by Netty's pooled direct memory allocator and its underlying `ByteBuffer` was passed to `ByteBufferOutputStream`'s constructor, if the reallocation happened, the new buffer could still be allocated from heap memory.

### Modification

This PR adds a `DirectBufferOutputStream` class that inherits from `ByteBufferOutputStream` and overrides some methods that can be called in `MemoryRecordsBuilder`. This class uses Pulsar's default `ByteBufAllocator` to allocate memory. The other methods' behaviors are the same with `ByteBufferOutputStream`.

A unit test is added to verify that the `MemoryRecordsBuilder` will build the same records no matter the underlying `ByteBufferOutputStream` is `ByteBufferOutputStream` or `DirectBufferOutputStream`. Three cases are tested in this test:
1. The initial capacity is less than the size of records header, in this case, `position(int)` method will be called to increase the capacity.
2. The initial capacity is greater than both the size of records header and the total size of records.
3. The initial capacity is greater than the size of records header but less than the total size of records, in this case, `write()` method will increase the capacity automatically.

Then, a `DirectBufferOutputStream` instance is passed to `MemoryRecordsBuilder`'s constructor in `ByteBufUtils#decodePulsarEntryToKafkaRecords` and the return value's type is changed to `DecodeResult` because we need to release the `ByteBuf` later.
  • Loading branch information
BewareMyPower authored Aug 24, 2021
1 parent cb3d180 commit 4de6212
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
import org.apache.kafka.common.record.MemoryRecords;

/**
Expand All @@ -25,10 +27,10 @@
@AllArgsConstructor
public class DecodeResult {

private MemoryRecords records;
private @NonNull MemoryRecords records;
private ByteBuf releasedByteBuf;

public DecodeResult(MemoryRecords records) {
public DecodeResult(@NonNull MemoryRecords records) {
this.records = records;
}

Expand All @@ -37,4 +39,12 @@ public void release() {
releasedByteBuf.release();
}
}

public @NonNull ByteBuf getOrCreateByteBuf() {
if (releasedByteBuf != null) {
return releasedByteBuf;
} else {
return Unpooled.wrappedBuffer(records.buffer());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.ByteBuffer;
import lombok.Getter;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;

/**
* The OutputStream class that uses direct buffer from Netty's buffer allocator as its underlying buffer.
*
* The methods that may be called in `MemoryRecordsBuilder` are all overridden.
*/
public class DirectBufferOutputStream extends ByteBufferOutputStream {

private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private static final ByteBufAllocator ALLOCATOR = PulsarByteBufAllocator.DEFAULT;

private final int initialCapacity;
@Getter
private final ByteBuf byteBuf;

public DirectBufferOutputStream(int initialCapacity) {
super(EMPTY_BUFFER);
this.initialCapacity = initialCapacity;
this.byteBuf = ALLOCATOR.directBuffer(initialCapacity);
}

@Override
public void write(int b) {
byteBuf.writeByte(b);
}

@Override
public void write(byte[] bytes, int off, int len) {
byteBuf.writeBytes(bytes, off, len);
}

@Override
public void write(ByteBuffer sourceBuffer) {
byteBuf.writeBytes(sourceBuffer);
}

@Override
public ByteBuffer buffer() {
// When this method is called, the internal NIO ByteBuffer should be treated as a buffer that has only been
// written. In this case, the position should be the same with the limit because the caller side will usually
// call `ByteBuffer#flip()` to reset position and limit.
final ByteBuffer byteBuffer = byteBuf.nioBuffer();
byteBuffer.position(byteBuffer.limit());
return byteBuffer;
}

@Override
public int position() {
return byteBuf.readerIndex();
}

@Override
public void position(int position) {
if (position > byteBuf.capacity()) {
byteBuf.capacity(position);
}
byteBuf.writerIndex(position);
}

@Override
public int initialCapacity() {
return initialCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public DecodeResult decode(List<Entry> entries, byte magic) {
orderedByteBuf.add(byteBuf.slice(byteBuf.readerIndex(), byteBuf.readableBytes()));
}
} else {
final MemoryRecords records =
final DecodeResult decodeResult =
ByteBufUtils.decodePulsarEntryToKafkaRecords(metadata, byteBuf, startOffset, magic);
final ByteBuf kafkaBuffer = Unpooled.wrappedBuffer(records.buffer());
final ByteBuf kafkaBuffer = decodeResult.getOrCreateByteBuf();
orderedByteBuf.add(kafkaBuffer);
if (!optionalByteBufs.isPresent()) {
optionalByteBufs = Optional.of(new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class PulsarEntryFormatter implements EntryFormatter {
private static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
private static final int MAX_MESSAGE_BATCH_SIZE_BYTES = 128 * 1024;

private static final DecodeResult EMPTY_DECODE_RESULT = new DecodeResult(
MemoryRecords.readableRecords(ByteBuffer.allocate(0)));

@Override
public ByteBuf encode(final MemoryRecords records, final int numMessages) {
long currentBatchSizeBytes = 0;
Expand Down Expand Up @@ -100,7 +103,7 @@ public ByteBuf encode(final MemoryRecords records, final int numMessages) {

@Override
public DecodeResult decode(final List<Entry> entries, final byte magic) {
final List<MemoryRecords> recordsList = new ArrayList<>();
final List<DecodeResult> decodeResults = new ArrayList<>();

entries.parallelStream().forEachOrdered(entry -> {
try {
Expand All @@ -111,7 +114,7 @@ public DecodeResult decode(final List<Entry> entries, final byte magic) {
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata msgMetadata = Commands.parseMessageMetadata(metadataAndPayload);

recordsList.add(ByteBufUtils.decodePulsarEntryToKafkaRecords(
decodeResults.add(ByteBufUtils.decodePulsarEntryToKafkaRecords(
msgMetadata, metadataAndPayload, baseOffset, magic));
} catch (KoPMessageMetadataNotFoundException | IOException e) { // skip failed decode entry
log.error("[{}:{}] Failed to decode entry", entry.getLedgerId(), entry.getEntryId());
Expand All @@ -120,14 +123,17 @@ public DecodeResult decode(final List<Entry> entries, final byte magic) {
}
});

if (recordsList.isEmpty()) {
return new DecodeResult(MemoryRecords.readableRecords(ByteBuffer.allocate(0)));
} else if (recordsList.size() == 1) {
return new DecodeResult(recordsList.get(0));
if (decodeResults.isEmpty()) {
return EMPTY_DECODE_RESULT;
} else if (decodeResults.size() == 1) {
return decodeResults.get(0);
} else {
final int totalSize = recordsList.stream().mapToInt(MemoryRecords::sizeInBytes).sum();
final int totalSize = decodeResults.stream()
.mapToInt(decodeResult -> decodeResult.getRecords().sizeInBytes())
.sum();
final ByteBuf mergedBuffer = PulsarByteBufAllocator.DEFAULT.directBuffer(totalSize);
recordsList.forEach(records -> mergedBuffer.writeBytes(records.buffer()));
decodeResults.forEach(decodeResult -> mergedBuffer.writeBytes(decodeResult.getRecords().buffer()));
decodeResults.forEach(DecodeResult::release);
return new DecodeResult(MemoryRecords.readableRecords(mergedBuffer.nioBuffer()), mergedBuffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import io.netty.buffer.ByteBuf;
import io.streamnative.pulsar.handlers.kop.format.DecodeResult;
import io.streamnative.pulsar.handlers.kop.format.DirectBufferOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Base64;
Expand Down Expand Up @@ -93,28 +95,28 @@ public static ByteBuffer getNioBuffer(ByteBuf buffer) {
return ByteBuffer.wrap(bytes);
}

public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadata metadata,
final ByteBuf payload,
final long baseOffset,
final byte magic) throws IOException {
public static DecodeResult decodePulsarEntryToKafkaRecords(final MessageMetadata metadata,
final ByteBuf payload,
final long baseOffset,
final byte magic) throws IOException {
if (metadata.hasMarkerType()
&& (metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE
|| metadata.getMarkerType() == MarkerType.TXN_ABORT_VALUE)) {
return MemoryRecords.withEndTransactionMarker(
return new DecodeResult(MemoryRecords.withEndTransactionMarker(
baseOffset,
metadata.getPublishTime(),
0,
metadata.getTxnidMostBits(),
(short) metadata.getTxnidLeastBits(),
new EndTransactionMarker(metadata.getMarkerType() == MarkerType.TXN_COMMIT_VALUE
? ControlRecordType.COMMIT : ControlRecordType.ABORT, 0));
? ControlRecordType.COMMIT : ControlRecordType.ABORT, 0)));
}
final int uncompressedSize = metadata.getUncompressedSize();
final CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
final ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);

final ByteBuffer byteBuffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
final MemoryRecordsBuilder builder = new MemoryRecordsBuilder(byteBuffer,
final DirectBufferOutputStream directBufferOutputStream = new DirectBufferOutputStream(DEFAULT_BUFFER_SIZE);
final MemoryRecordsBuilder builder = new MemoryRecordsBuilder(directBufferOutputStream,
magic,
CompressionType.NONE,
TimestampType.CREATE_TIME,
Expand Down Expand Up @@ -181,8 +183,7 @@ public static MemoryRecords decodePulsarEntryToKafkaRecords(final MessageMetadat

final MemoryRecords records = builder.build();
uncompressedPayload.release();
byteBuffer.flip();
return records;
return new DecodeResult(records, directBufferOutputStream.getByteBuf());
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.format;

import java.nio.ByteBuffer;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
* Test for {@link DirectBufferOutputStream}.
*/
public class DirectBufferOutputStreamTest {

private static final int WRITE_LIMIT = 100 * 1024 * 1024;
private static final long LOG_APPEND_TIME = System.currentTimeMillis();

@DataProvider
public static Object[][] initialCapacityAndNumRecords() {
return new Object[][]{
{ 1, 10 },
{ 1024 * 1024, 10 },
{ 1024 * 1024, 1024 },
};
}

@Test(dataProvider = "initialCapacityAndNumRecords")
public void testBuildMemoryRecords(int initialCapacity, int numRecords) {
final MemoryRecordsBuilder heapMemoryRecordsBuilder =
newMemoryRecordsBuilder(new ByteBufferOutputStream(initialCapacity));
// We must expose the DirectBufferOutputStream because we need to release the internal ByteBuf later
final DirectBufferOutputStream directBufferOutputStream = new DirectBufferOutputStream(initialCapacity);
final MemoryRecordsBuilder directMemoryRecordsBuilder = newMemoryRecordsBuilder(directBufferOutputStream);
final ByteBuffer valueBuffer = ByteBuffer.allocate(1024);

for (int i = 0; i < numRecords; i++) {
heapMemoryRecordsBuilder.appendWithOffset(i, LOG_APPEND_TIME + i, null, valueBuffer.duplicate());
directMemoryRecordsBuilder.appendWithOffset(i, LOG_APPEND_TIME + i, null, valueBuffer.duplicate());
}

final ByteBuffer heapBuffer = heapMemoryRecordsBuilder.build().buffer();
final ByteBuffer directBuffer = directMemoryRecordsBuilder.build().buffer();
System.out.println("heapBuffer size: " + heapBuffer.limit() + ", directBuffer size: " + directBuffer.limit());
Assert.assertEquals(heapBuffer, directBuffer);

Assert.assertEquals(directBufferOutputStream.getByteBuf().refCnt(), 1);
directBufferOutputStream.getByteBuf().release();
}

private static MemoryRecordsBuilder newMemoryRecordsBuilder(final ByteBufferOutputStream bufferStream) {
return new MemoryRecordsBuilder(bufferStream,
RecordBatch.MAGIC_VALUE_V2,
CompressionType.NONE,
TimestampType.CREATE_TIME,
0,
LOG_APPEND_TIME,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
false,
false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
WRITE_LIMIT);
}
}

0 comments on commit 4de6212

Please sign in to comment.