-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ManagedLedger] Compress managed ledger info #11490
Conversation
@@ -186,6 +186,11 @@ message BrokerEntryMetadata { | |||
optional uint64 index = 2; | |||
} | |||
|
|||
message ManagedLedgerInfoMetadata { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaoran10 We should move it to MLDataFormats.proto
? The PulsarApi.proto is used for the broker and client interaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll move it.
String path = PREFIX + ledgerName; | ||
store.put(path, serializedMlInfo, Optional.of(stat.getVersion())) | ||
store.put(path, compressLedgerInfo(mlInfo, CompressionType.ZSTD), Optional.of(stat.getVersion())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compression type should be configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
@@ -47,6 +54,15 @@ | |||
private final MetadataStore store; | |||
private final OrderedExecutor executor; | |||
|
|||
public static final short magicManagedLedgerInfoMetadata = 0x0b9c; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static final short magicManagedLedgerInfoMetadata = 0x0b9c; | |
public static final short MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x0b9c; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this.
2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type.
|
||
public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException { | ||
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length, data.length); | ||
byteBuf.writeBytes(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we can use wrappedBuffer to reduce one copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll fix this.
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray()); | ||
|
||
ByteBuf originalByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(originalBytes.length, originalBytes.length); | ||
originalByteBuf.writeBytes(originalBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we can use wrappedBuffer to reduce one copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll fix this.
|
||
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()]; | ||
compositeByteBuf.readBytes(dataBytes); | ||
return dataBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these ByteBufs need to be released?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Release the compositeByteBuf will encounter error o.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
.
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes); | ||
|
||
long unpressedSize = metadata.getUnpressedSize(); | ||
ByteBuf decodeByteBuf = CompressionCodecProvider.getCompressionCodec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a release here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* Decompress a buffer.
*
* <p>The buffer needs to have been compressed with the matching Encoder.
*
* @param encoded
* the compressed content
* @param uncompressedSize
* the size of the original content
* @return a ByteBuf with the compressed content. The buffer needs to be released by the receiver
* @throws IOException
* if the decompression fails
*/
ByteBuf decode(ByteBuf encoded, int uncompressedSize) throws IOException;
we need to release the ByteBuf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this.
@@ -47,11 +53,27 @@ | |||
private final MetadataStore store; | |||
private final OrderedExecutor executor; | |||
|
|||
public static final short MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x0b9c; | |||
private CompressionType compressionType = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initiate to be None
instead of null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's better.
@@ -124,3 +124,8 @@ message ManagedCursorInfo { | |||
// Store which index in the batch message has been deleted | |||
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; | |||
} | |||
|
|||
message ManagedLedgerInfoMetadata { | |||
required string compressionType = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use compression enum type instead of String?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CompressionType
in the PulsarApi.proto
, if we want to use the enum CompressionType
, we need import PulsarApi.proto
, maybe we could use string
type here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could just copy the same enum type here, or even declare it as int
and use the other enum type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks we'd better to copy the enum type to the MLDataFormats.proto because the managed ledger module can be a independent component.
@@ -1601,6 +1601,11 @@ | |||
private String managedLedgerDataReadPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST | |||
.getValue(); | |||
|
|||
@FieldContext(category = CATEGORY_STORAGE_ML, | |||
doc = "ManagedLedgerInfo compression type, option values (LZ4, ZLIB, ZSTD, SNAPPY). \n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add NONE
compression value type to doc, and default value set to be NONE
instead of null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll fix this.
/** | ||
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data. | ||
*/ | ||
private String managedLedgerInfoCompressionType = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better use enum type to strict compression type info specific type instead of string. I'm not sure whether it's easy to implement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the configuration managedLedgerInfoCompressionType
of the ServiceConfiguration
to set this field, maybe we could convert String
to ComprssionType
in MetaStoreImpl
.
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes); | ||
|
||
long unpressedSize = metadata.getUnpressedSize(); | ||
ByteBuf decodeByteBuf = CompressionCodecProvider.getCompressionCodec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* Decompress a buffer.
*
* <p>The buffer needs to have been compressed with the matching Encoder.
*
* @param encoded
* the compressed content
* @param uncompressedSize
* the size of the original content
* @return a ByteBuf with the compressed content. The buffer needs to be released by the receiver
* @throws IOException
* if the decompression fails
*/
ByteBuf decode(ByteBuf encoded, int uncompressedSize) throws IOException;
we need to release the ByteBuf
|
||
ByteBuf originalByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(originalBytes.length, originalBytes.length); | ||
originalByteBuf.writeBytes(originalBytes); | ||
ByteBuf encodeByteBuf = CompressionCodecProvider.getCompressionCodec(compressionType).encode(originalByteBuf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* Compress a buffer.
*
* @param raw
* a buffer with the uncompressed content. The reader/writer indexes will not be modified
* @return a new buffer with the compressed content. The buffer needs to be released by the receiver
*/
ByteBuf encode(ByteBuf raw);
Need to be released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good.
What is the upgrade story?
@@ -47,11 +53,27 @@ | |||
private final MetadataStore store; | |||
private final OrderedExecutor executor; | |||
|
|||
public static final short MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x0b9c; | |||
private CompressionType compressionType = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's better.
@codelipenghui this is a new feature, with a significant impact, as you cannot rollback if you enable this feature. I believe that we cannot cherry pick this feature to 2.8.x release line. We should cherry pick to released versions only bug fixes and small features that have a low impact. does this make sense to you ? |
@eolivelli: the upgrade requires two steps:
Because it is a 2-steps upgrade story, we can add the code to 2.8.1 because this feature would be turned off. With the code available in 2.8.1, it provides a smooth path to enable this feature. |
we have to document it carefully, because it will ease switching from 2.8.1 to 2.9.0, if you feel strong then I am not against cherry picking this into 2.8.x. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as soon as all the existing comments have been addressed
Marked it for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change their positions so as to avoid NPE ?
metadata.getCompressionType().equals(CompressionType.ZLIB.name())
change to
CompressionType.ZLIB.name().equals(...)
Good catch, the parameter |
@@ -47,9 +54,31 @@ | |||
private final MetadataStore store; | |||
private final OrderedExecutor executor; | |||
|
|||
public static final short MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x0b9c; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to use a magic number with the most significant bit set to 1 because that should be guaranteed to be a non-valid protobuf sequence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll change this.
try { | ||
finalCompressionType = CompressionType.valueOf(compressionType); | ||
} catch (Exception e) { | ||
log.warn("Failed to get compression type {}, disable managedLedgerInfo compression, error msg: {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the compression type is not valid we should just throw exception instead of trying to handle and disable compression. This could lead to situation in which someone thinks compression is enabled but it really is failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll throw the exception.
@@ -124,3 +124,8 @@ message ManagedCursorInfo { | |||
// Store which index in the batch message has been deleted | |||
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; | |||
} | |||
|
|||
message ManagedLedgerInfoMetadata { | |||
required string compressionType = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could just copy the same enum type here, or even declare it as int
and use the other enum type
|
||
message ManagedLedgerInfoMetadata { | ||
required string compressionType = 1; | ||
required int32 unpressedSize = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
required int32 unpressedSize = 2; | |
required int32 uncompressedSize = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
@@ -47,9 +54,31 @@ | |||
private final MetadataStore store; | |||
private final OrderedExecutor executor; | |||
|
|||
public static final short MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x0b9c; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to use a magic number with the most significant bit set to 1 because that should be guaranteed to be a non-valid protobuf sequence
2. change compression magic number. 3. add a unit test to verify compression could work well.
@merlimat I'm not sure about how to generate a non-valid protobuf sequence. I take a look at some docs, the basic protobuf encode format is |
* compress managed ledger info * 1. Move the `ManagedLedgerInfoMetadata` to `MLDataFormats.proto`. 2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type. * use ByteBuf wrap bytes array, release ByteBuf if needed. * make the compressionType as a final field * fix comment * 1. throw exception if using a invalid compression type. 2. change compression magic number. 3. add a unit test to verify compression could work well. * change compression magic number. * fix test (cherry picked from commit 4361b6d)
* compress managed ledger info * 1. Move the `ManagedLedgerInfoMetadata` to `MLDataFormats.proto`. 2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type. * use ByteBuf wrap bytes array, release ByteBuf if needed. * make the compressionType as a final field * fix comment * 1. throw exception if using a invalid compression type. 2. change compression magic number. 3. add a unit test to verify compression could work well. * change compression magic number. * fix test (cherry picked from commit 4361b6d)
* compress managed ledger info * 1. Move the `ManagedLedgerInfoMetadata` to `MLDataFormats.proto`. 2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type. * use ByteBuf wrap bytes array, release ByteBuf if needed. * make the compressionType as a final field * fix comment * 1. throw exception if using a invalid compression type. 2. change compression magic number. 3. add a unit test to verify compression could work well. * change compression magic number. * fix test
…configuration doc (apache#11563) ### Modifications Add a new configuration managedLedgerInfoCompressionType in broker configuration doc. Related to apache#11490
* compress managed ledger info * 1. Move the `ManagedLedgerInfoMetadata` to `MLDataFormats.proto`. 2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type. * use ByteBuf wrap bytes array, release ByteBuf if needed. * make the compressionType as a final field * fix comment * 1. throw exception if using a invalid compression type. 2. change compression magic number. 3. add a unit test to verify compression could work well. * change compression magic number. * fix test
…configuration doc (apache#11563) ### Modifications Add a new configuration managedLedgerInfoCompressionType in broker configuration doc. Related to apache#11490
Motivation
Currently, the
ManagedLedgerInfo
contains offload context info, if there are too many ledgers in one ManagedLedger, a Zookeeper ZNode data will increase, it's hard for Zookeeper to manage these data. We could compress theManagedLedgerInfo
data, this will decrease the data size.For example, if one
ManagedLedgerInfo
contains 30000 ledgers and eachLedgerInfo
contains offload context, the uncompressed data size is about 6MB, after compress with ZSTD, the compression data size could be decreased to about 1.3MB.Modifications
Add a
ManagedLedgerInfoMetadata
beforeManagedLedgerInfo
, the data structure as below.Add a new configuration
managedLedgerInfoCompressionType
to control ManagedLedgerInfo compression type, if not set this configuration, then don't compress ManagedLedgerInfo data.Migration
When reading data from the Zookeeper, check the magic number of the data, if the head data match the magic number then try to parse metadata and uncompress data, if encounter errors or not match fall back to parse
ManagedLedgerInfo
directly.Verifying this change
Verify compress and uncompress
ManagedLedgerInfo
data.Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
For committer
For this PR, do we need to update docs?
If yes,
if you update docs in this PR, label this PR with the
doc
label.if you plan to update docs later, label this PR with the
doc-required
label.if you need help on updating docs, create a follow-up issue with the
doc-required
label.If no, label this PR with the
no-need-doc
label and explain why.