Skip to content

Commit

Permalink
revert PR 11594 to avoid copy data to direct buffer afer aircompresso…
Browse files Browse the repository at this point in the history
…r upgrade to 0.20 (#11792)

### Motivation
Due to aircompressor 0.19 can't work with heap buffer on JDK1.8, so #11594 use copy data to direct buffer to avoid NoSuchMethodError exception. Now aircompressor released 0.20 and #11790 has upgrade the aircompressor version to 0.20 to fix this issue, we can revert #11594 to avoid copy data to direct buffer to improve performance.

### Modification
1. revert Fix java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer when enabling topic metadata compression #11594, but keep the tests.

(cherry picked from commit cc1b983)
  • Loading branch information
hangc0276 committed Aug 26, 2021
1 parent f9850fb commit b817fcd
Showing 1 changed file with 3 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
return managedLedgerInfo.toByteArray();
}
ByteBuf metadataByteBuf = null;
ByteBuf uncompressedByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
Expand All @@ -323,13 +322,9 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
byte[] byteArray = managedLedgerInfo.toByteArray();
// The reason for copy the data to a direct buffer here is to ensure the metadata compression feature can
// work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593
uncompressedByteBuf = Unpooled.directBuffer(byteArray.length);
uncompressedByteBuf.writeBytes(byteArray);

encodeByteBuf = getCompressionCodec(compressionType)
.encode(uncompressedByteBuf);
.encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
Expand All @@ -340,9 +335,6 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (uncompressedByteBuf != null) {
uncompressedByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
Expand All @@ -353,7 +345,6 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
ByteBuf decodeByteBuf = null;
ByteBuf compressedByteBuf = null;
try {
int metadataSize = byteBuf.readInt();
byte[] metadataBytes = new byte[metadataSize];
Expand All @@ -362,12 +353,8 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);

long unpressedSize = metadata.getUncompressedSize();
// The reason for copy the data to a direct buffer here is to ensure the metadata compression feature
// can work on JDK1.8, for more details to see: https://github.com/apache/pulsar/issues/11593
compressedByteBuf = Unpooled.directBuffer(byteBuf.readableBytes());
compressedByteBuf.writeBytes(byteBuf);
decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
.decode(compressedByteBuf, (int) unpressedSize);
.decode(byteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
Expand All @@ -385,9 +372,6 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
if (decodeByteBuf != null) {
decodeByteBuf.release();
}
if (compressedByteBuf != null) {
compressedByteBuf.release();
}
byteBuf.release();
}
} else {
Expand Down

0 comments on commit b817fcd

Please sign in to comment.