diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index a00c161641083..25fcb377e3e11 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -86,4 +86,9 @@ public class ManagedLedgerFactoryConfig { * ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data. */ private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name(); + + /** + * ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data. + */ + private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f8806824d8f0c..e3d24f5522211 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -184,7 +184,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, this.bookkeeperFactory = bookKeeperGroupFactory; this.isBookkeeperManaged = isBookkeeperManaged; this.metadataStore = metadataStore; - this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType()); + this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType(), + config.getManagedCursorInfoCompressionType()); this.config = config; this.mbean = new ManagedLedgerFactoryMBeanImpl(this); this.entryCacheManager = new EntryCacheManager(this); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 5ad62b228bce0..e1c72d2fc0dcb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -41,6 +41,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; @@ -57,30 +58,39 @@ public class MetaStoreImpl implements MetaStore { private final MetadataStore store; private final OrderedExecutor executor; - private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000 - private final CompressionType compressionType; + private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778; // 0100 0111 0111 1000 + private final CompressionType ledgerInfoCompressionType; + private final CompressionType cursorInfoCompressionType; public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) { this.store = store; this.executor = executor; - this.compressionType = CompressionType.NONE; + this.ledgerInfoCompressionType = CompressionType.NONE; + this.cursorInfoCompressionType = CompressionType.NONE; } - public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) { + public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType, + String cursorInfoCompressionType) { this.store = store; this.executor = executor; - CompressionType finalCompressionType; - if (compressionType != null) { - try { - finalCompressionType = CompressionType.valueOf(compressionType); - } catch (Exception e) { - log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage()); - throw e; - } - } else { - finalCompressionType = CompressionType.NONE; + this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType); + this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType); + } + + private CompressionType parseCompressionType(String value) { + if (StringUtils.isEmpty(value)) { + return CompressionType.NONE; + } + + CompressionType compressionType; + try { + compressionType = CompressionType.valueOf(value); + } catch (Exception e) { + log.error("Failed to get compression type {} error msg: {}.", value, e.getMessage()); + throw e; } - this.compressionType = finalCompressionType; + + return compressionType; } @Override @@ -174,7 +184,7 @@ public void asyncGetCursorInfo(String ledgerName, String cursorName, .thenAcceptAsync(optRes -> { if (optRes.isPresent()) { try { - ManagedCursorInfo info = ManagedCursorInfo.parseFrom(optRes.get().getValue()); + ManagedCursorInfo info = parseManagedCursorInfo(optRes.get().getValue()); callback.operationComplete(info, optRes.get().getStat()); } catch (InvalidProtocolBufferException e) { callback.operationFailed(getException(e)); @@ -196,7 +206,7 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId()); String path = PREFIX + ledgerName + "/" + cursorName; - byte[] content = info.toByteArray(); // Binary format + byte[] content = compressCursorInfo(info); long expectedVersion; @@ -306,32 +316,97 @@ private static MetaStoreException getException(Throwable t) { } } + public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) { + if (ledgerInfoCompressionType.equals(CompressionType.NONE)) { + return managedLedgerInfo.toByteArray(); + } + MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata + .newBuilder() + .setCompressionType(ledgerInfoCompressionType) + .setUncompressedSize(managedLedgerInfo.getSerializedSize()) + .build(); + return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(), + mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType); + } + + public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) { + if (cursorInfoCompressionType.equals(CompressionType.NONE)) { + return managedCursorInfo.toByteArray(); + } + MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata + .newBuilder() + .setCompressionType(cursorInfoCompressionType) + .setUncompressedSize(managedCursorInfo.getSerializedSize()) + .build(); + return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(), + metadata.getSerializedSize(), cursorInfoCompressionType); + } + + public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException { + ByteBuf byteBuf = Unpooled.wrappedBuffer(data); + + byte[] metadataBytes = extractCompressMetadataBytes(byteBuf); + if (metadataBytes != null) { + try { + MLDataFormats.ManagedLedgerInfoMetadata metadata = + MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes); + return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType()) + .decode(byteBuf, metadata.getUncompressedSize()).nioBuffer()); + } catch (Exception e) { + log.error("Failed to parse managedLedgerInfo metadata, " + + "fall back to parse managedLedgerInfo directly.", e); + return ManagedLedgerInfo.parseFrom(data); + } finally { + byteBuf.release(); + } + } else { + return ManagedLedgerInfo.parseFrom(data); + } + } + + public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException { + ByteBuf byteBuf = Unpooled.wrappedBuffer(data); + + byte[] metadataBytes = extractCompressMetadataBytes(byteBuf); + if (metadataBytes != null) { + try { + MLDataFormats.ManagedCursorInfoMetadata metadata = + MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes); + return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType()) + .decode(byteBuf, metadata.getUncompressedSize()).nioBuffer()); + } catch (Exception e) { + log.error("Failed to parse ManagedCursorInfo metadata, " + + "fall back to parse ManagedCursorInfo directly", e); + return ManagedCursorInfo.parseFrom(data); + } finally { + byteBuf.release(); + } + } else { + return ManagedCursorInfo.parseFrom(data); + } + } + /** - * Compress ManagedLedgerInfo data. + * Compress Managed Info data such as LedgerInfo, CursorInfo. * * compression data structure * [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD] - */ - public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) { + */ + private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSerializedSize, + MLDataFormats.CompressionType compressionType) { if (compressionType == null || compressionType.equals(CompressionType.NONE)) { - return managedLedgerInfo.toByteArray(); + return info; } ByteBuf metadataByteBuf = null; ByteBuf encodeByteBuf = null; try { - MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata - .newBuilder() - .setCompressionType(compressionType) - .setUncompressedSize(managedLedgerInfo.getSerializedSize()) - .build(); - metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer( - mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6); - metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA); - metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize()); - metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray()); - + metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6, + metadataSerializedSize + 6); + metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA); + metadataByteBuf.writeInt(metadataSerializedSize); + metadataByteBuf.writeBytes(metadata); encodeByteBuf = getCompressionCodec(compressionType) - .encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray())); + .encode(Unpooled.wrappedBuffer(info)); CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponent(true, metadataByteBuf); compositeByteBuf.addComponent(true, encodeByteBuf); @@ -348,42 +423,14 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) { } } - public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException { - ByteBuf byteBuf = Unpooled.wrappedBuffer(data); - if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) { - ByteBuf decodeByteBuf = null; - try { - int metadataSize = byteBuf.readInt(); - byte[] metadataBytes = new byte[metadataSize]; - byteBuf.readBytes(metadataBytes); - MLDataFormats.ManagedLedgerInfoMetadata metadata = - MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes); - - long unpressedSize = metadata.getUncompressedSize(); - decodeByteBuf = getCompressionCodec(metadata.getCompressionType()) - .decode(byteBuf, (int) unpressedSize); - byte[] decodeBytes; - // couldn't decode data by ZLIB compression byteBuf array() directly - if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) { - decodeBytes = decodeByteBuf.array(); - } else { - decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()]; - decodeByteBuf.readBytes(decodeBytes); - } - return ManagedLedgerInfo.parseFrom(decodeBytes); - } catch (Exception e) { - log.error("Failed to parse managedLedgerInfo metadata, " - + "fall back to parse managedLedgerInfo directly.", e); - return ManagedLedgerInfo.parseFrom(data); - } finally { - if (decodeByteBuf != null) { - decodeByteBuf.release(); - } - byteBuf.release(); - } - } else { - return ManagedLedgerInfo.parseFrom(data); + private byte[] extractCompressMetadataBytes(ByteBuf data) { + if (data.readableBytes() > 0 && data.readShort() == MAGIC_MANAGED_INFO_METADATA) { + int metadataSize = data.readInt(); + byte[] metadataBytes = new byte[metadataSize]; + data.readBytes(metadataBytes); + return metadataBytes; } + return null; } private CompressionCodec getCompressionCodec(CompressionType compressionType) { diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index a3528b664e29f..4671816c1a199 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -137,3 +137,8 @@ message ManagedLedgerInfoMetadata { required CompressionType compressionType = 1; required int32 uncompressedSize = 2; } + +message ManagedCursorInfoMetadata { + required CompressionType compressionType = 1; + required int32 uncompressedSize = 2; +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorInfoMetadataTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorInfoMetadataTest.java new file mode 100644 index 0000000000000..8b95876d0ae3c --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorInfoMetadataTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.junit.Assert.assertEquals; +import static org.testng.Assert.expectThrows; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.pulsar.common.api.proto.CompressionType; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * ManagedCursorInfo metadata test. + */ +@Slf4j +public class ManagedCursorInfoMetadataTest { + private final String INVALID_TYPE = "INVALID_TYPE"; + + @DataProvider(name = "compressionTypeProvider") + private Object[][] compressionTypeProvider() { + return new Object[][]{ + {null}, + {INVALID_TYPE}, + {CompressionType.NONE.name()}, + {CompressionType.LZ4.name()}, + {CompressionType.ZLIB.name()}, + {CompressionType.ZSTD.name()}, + {CompressionType.SNAPPY.name()} + }; + } + + @Test(dataProvider = "compressionTypeProvider") + public void testEncodeAndDecode(String compressionType) throws IOException { + long ledgerId = 10000; + MLDataFormats.ManagedCursorInfo.Builder builder = MLDataFormats.ManagedCursorInfo.newBuilder(); + + builder.setCursorsLedgerId(ledgerId); + builder.setMarkDeleteLedgerId(ledgerId); + + List batchedEntryDeletionIndexInfos = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + MLDataFormats.NestedPositionInfo nestedPositionInfo = MLDataFormats.NestedPositionInfo.newBuilder() + .setEntryId(i).setLedgerId(i).build(); + MLDataFormats.BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = MLDataFormats + .BatchedEntryDeletionIndexInfo.newBuilder().setPosition(nestedPositionInfo).build(); + batchedEntryDeletionIndexInfos.add(batchedEntryDeletionIndexInfo); + } + builder.addAllBatchedEntryDeletionIndexInfo(batchedEntryDeletionIndexInfos); + + MetaStoreImpl metaStore; + if (INVALID_TYPE.equals(compressionType)) { + IllegalArgumentException compressionTypeEx = expectThrows(IllegalArgumentException.class, () -> { + new MetaStoreImpl(null, null, null, compressionType); + }); + assertEquals("No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType." + + compressionType, compressionTypeEx.getMessage()); + return; + } else { + metaStore = new MetaStoreImpl(null, null, null, compressionType); + } + + MLDataFormats.ManagedCursorInfo managedCursorInfo = builder.build(); + byte[] compressionBytes = metaStore.compressCursorInfo(managedCursorInfo); + log.info("[{}] Uncompressed data size: {}, compressed data size: {}", + compressionType, managedCursorInfo.getSerializedSize(), compressionBytes.length); + if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) { + Assert.assertEquals(compressionBytes.length, managedCursorInfo.getSerializedSize()); + } + + // parse compression data and unCompression data, check their results. + MLDataFormats.ManagedCursorInfo info1 = metaStore.parseManagedCursorInfo(compressionBytes); + MLDataFormats.ManagedCursorInfo info2 = metaStore.parseManagedCursorInfo(managedCursorInfo.toByteArray()); + Assert.assertEquals(info1, info2); + } +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java index 2f27489aeb9f6..91bc7f143a4ae 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerInfoMetadataTest.java @@ -19,6 +19,12 @@ package org.apache.bookkeeper.mledger.impl; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; @@ -28,13 +34,6 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - /** * ManagedLedgerInfo metadata test. */ @@ -91,7 +90,7 @@ public void testEncodeAndDecode(String compressionType) throws IOException { MetaStoreImpl metaStore; try { - metaStore = new MetaStoreImpl(null, null, compressionType); + metaStore = new MetaStoreImpl(null, null, compressionType, null); if ("INVALID_TYPE".equals(compressionType)) { Assert.fail("The managedLedgerInfo compression type is invalid, should fail."); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index dc5640f2315fd..bbdbc31b3f16c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1706,6 +1706,11 @@ public class ServiceConfiguration implements PulsarConfiguration { + "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.") private String managedLedgerInfoCompressionType = "NONE"; + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n" + + "If value is NONE, then save the ManagedCursorInfo bytes data directly.") + private String managedCursorInfoCompressionType = "NONE"; + /*** --- Load balancer --- ****/ @FieldContext( category = CATEGORY_LOAD_BALANCER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index b615628c08fa1..6a9aa8877e252 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -71,6 +71,7 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds()); managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType()); managedLedgerFactoryConfig.setStatsPeriodSeconds(conf.getManagedLedgerStatsPeriodSeconds()); + managedLedgerFactoryConfig.setManagedCursorInfoCompressionType(conf.getManagedCursorInfoCompressionType()); Configuration configuration = new ClientConfiguration(); if (conf.isBookkeeperClientExposeStatsToPrometheus()) {