Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Use pooled direct memory allocator when decoding Pulsar entry to Kafka records #673

Conversation

BewareMyPower
Copy link
Collaborator

Motivation

When a Pulsar entry is decoded to Kafka record in ByteBufUtils#decodePulsarEntryToKafkaRecords, a NIO buffer whose initial capacity is 1 MB will be allocated from heap memory. Therefore, each time an entry is read, 1 MB heap memory will be allocated. Then the heap memory will increase very quickly and GC will happen frequently.

Kafka MemoryRecordsBuilder uses its underlying ByteBufferOutputStream field as the internal buffer whose capacity can be increased in write method. Even if a direct buffer was allocated by Netty's pooled direct memory allocator and its underlying ByteBuffer was passed to ByteBufferOutputStream's constructor, if the reallocation happened, the new buffer could still be allocated from heap memory.

Modification

This PR adds a DirectBufferOutputStream class that inherits from ByteBufferOutputStream and overrides some methods that can be called in MemoryRecordsBuilder. This class uses Pulsar's default ByteBufAllocator to allocate memory. The other methods' behaviors are the same with ByteBufferOutputStream.

A unit test is added to verify that the MemoryRecordsBuilder will build the same records no matter the underlying ByteBufferOutputStream is ByteBufferOutputStream or DirectBufferOutputStream. Three cases are tested in this test:

  1. The initial capacity is less than the size of records header, in this case, position(int) method will be called to increase the capacity.
  2. The initial capacity is greater than both the size of records header and the total size of records.
  3. The initial capacity is greater than the size of records header but less than the total size of records, in this case, write() method will increase the capacity automatically.

Then, a DirectBufferOutputStream instance is passed to MemoryRecordsBuilder's constructor in ByteBufUtils#decodePulsarEntryToKafkaRecords and the return value's type is changed to DecodeResult because we need to release the ByteBuf later.

@BewareMyPower BewareMyPower added the type/enhancement Indicates an improvement to an existing feature label Aug 23, 2021
@BewareMyPower BewareMyPower self-assigned this Aug 23, 2021
@BewareMyPower BewareMyPower changed the title Use pooled direct memory allocator when decoding Pulsar entry to Kafka records [WIP] Use pooled direct memory allocator when decoding Pulsar entry to Kafka records Aug 23, 2021
@BewareMyPower BewareMyPower changed the title [WIP] Use pooled direct memory allocator when decoding Pulsar entry to Kafka records Use pooled direct memory allocator when decoding Pulsar entry to Kafka records Aug 23, 2021
@BewareMyPower
Copy link
Collaborator Author

Here're the comparison between KoP before this PR and KoP after this PR

截屏2021-08-23 下午7 14 26

截屏2021-08-23 下午7 49 07

@BewareMyPower BewareMyPower merged commit 4de6212 into streamnative:master Aug 24, 2021
@BewareMyPower BewareMyPower deleted the bewaremypower/direct-buffer-decode branch August 24, 2021 03:16
BewareMyPower added a commit that referenced this pull request Aug 25, 2021
…a records (#673)

### Motivation
When a Pulsar entry is decoded to Kafka record in `ByteBufUtils#decodePulsarEntryToKafkaRecords`, a NIO buffer whose initial capacity is 1 MB will be allocated from heap memory. Therefore, each time an entry is read, 1 MB heap memory will be allocated. Then the heap memory will increase very quickly and GC will happen frequently.

Kafka `MemoryRecordsBuilder` uses its underlying `ByteBufferOutputStream` field as the internal buffer whose capacity can be increased in `write` method. Even if a direct buffer was allocated by Netty's pooled direct memory allocator and its underlying `ByteBuffer` was passed to `ByteBufferOutputStream`'s constructor, if the reallocation happened, the new buffer could still be allocated from heap memory.

### Modification

This PR adds a `DirectBufferOutputStream` class that inherits from `ByteBufferOutputStream` and overrides some methods that can be called in `MemoryRecordsBuilder`. This class uses Pulsar's default `ByteBufAllocator` to allocate memory. The other methods' behaviors are the same with `ByteBufferOutputStream`.

A unit test is added to verify that the `MemoryRecordsBuilder` will build the same records no matter the underlying `ByteBufferOutputStream` is `ByteBufferOutputStream` or `DirectBufferOutputStream`. Three cases are tested in this test:
1. The initial capacity is less than the size of records header, in this case, `position(int)` method will be called to increase the capacity.
2. The initial capacity is greater than both the size of records header and the total size of records.
3. The initial capacity is greater than the size of records header but less than the total size of records, in this case, `write()` method will increase the capacity automatically.

Then, a `DirectBufferOutputStream` instance is passed to `MemoryRecordsBuilder`'s constructor in `ByteBufUtils#decodePulsarEntryToKafkaRecords` and the return value's type is changed to `DecodeResult` because we need to release the `ByteBuf` later.
@BewareMyPower BewareMyPower mentioned this pull request Aug 27, 2021
9 tasks
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
type/enhancement Indicates an improvement to an existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants