Skip to content

Commit 1afba53

Browse files
nodecenicklixinyang
authored and
nicklixinyang
committed
[PIP-146] ManagedCursorInfo compression (apache#14542)
Fixes apache#14529 ### Motivation The cursor data is managed by ZooKeeper/etcd metadata store. When cursor data becomes more and more, the data size will increase and will take a lot of time to pull the data. Therefore, it is necessary to add compression for the cursor, which can reduce the size of data and reduce the time of pulling data. ### Modifications - Add a named `ManagedCursorInfoMetadata` message to `MLDataFormats.proto` for as compression metadata - Add the `managedCursorInfoCompressionType` to `org.apache.pulsar.broker.ServiceConfiguration` and `org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig` - This feature is the same as the implementation of ManagedLedgerInfo compression, so the code is optimized to avoid duplication
1 parent bf2fbfb commit 1afba53

File tree

8 files changed

+236
-77
lines changed

8 files changed

+236
-77
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,9 @@ public class ManagedLedgerFactoryConfig {
8686
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
8787
*/
8888
private String managedLedgerInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
89+
90+
/**
91+
* ManagedCursorInfo compression type. If the compression type is null or invalid, don't compress data.
92+
*/
93+
private String managedCursorInfoCompressionType = MLDataFormats.CompressionType.NONE.name();
8994
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
185185
this.bookkeeperFactory = bookKeeperGroupFactory;
186186
this.isBookkeeperManaged = isBookkeeperManaged;
187187
this.metadataStore = metadataStore;
188-
this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType());
188+
this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType(),
189+
config.getManagedCursorInfoCompressionType());
189190
this.config = config;
190191
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
191192
this.entryCacheManager = new EntryCacheManager(this);

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java

+115-68
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
4040
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
4141
import org.apache.bookkeeper.util.SafeRunnable;
42+
import org.apache.commons.lang.StringUtils;
4243
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
4344
import org.apache.pulsar.common.compression.CompressionCodec;
4445
import org.apache.pulsar.common.compression.CompressionCodecProvider;
@@ -55,30 +56,39 @@ public class MetaStoreImpl implements MetaStore {
5556
private final MetadataStore store;
5657
private final OrderedExecutor executor;
5758

58-
private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
59-
private final CompressionType compressionType;
59+
private static final int MAGIC_MANAGED_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
60+
private final CompressionType ledgerInfoCompressionType;
61+
private final CompressionType cursorInfoCompressionType;
6062

6163
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
6264
this.store = store;
6365
this.executor = executor;
64-
this.compressionType = CompressionType.NONE;
66+
this.ledgerInfoCompressionType = CompressionType.NONE;
67+
this.cursorInfoCompressionType = CompressionType.NONE;
6568
}
6669

67-
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) {
70+
public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType,
71+
String cursorInfoCompressionType) {
6872
this.store = store;
6973
this.executor = executor;
70-
CompressionType finalCompressionType;
71-
if (compressionType != null) {
72-
try {
73-
finalCompressionType = CompressionType.valueOf(compressionType);
74-
} catch (Exception e) {
75-
log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage());
76-
throw e;
77-
}
78-
} else {
79-
finalCompressionType = CompressionType.NONE;
74+
this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType);
75+
this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType);
76+
}
77+
78+
private CompressionType parseCompressionType(String value) {
79+
if (StringUtils.isEmpty(value)) {
80+
return CompressionType.NONE;
81+
}
82+
83+
CompressionType compressionType;
84+
try {
85+
compressionType = CompressionType.valueOf(value);
86+
} catch (Exception e) {
87+
log.error("Failed to get compression type {} error msg: {}.", value, e.getMessage());
88+
throw e;
8089
}
81-
this.compressionType = finalCompressionType;
90+
91+
return compressionType;
8292
}
8393

8494
@Override
@@ -185,7 +195,7 @@ public void asyncGetCursorInfo(String ledgerName, String cursorName,
185195
.thenAcceptAsync(optRes -> {
186196
if (optRes.isPresent()) {
187197
try {
188-
ManagedCursorInfo info = ManagedCursorInfo.parseFrom(optRes.get().getValue());
198+
ManagedCursorInfo info = parseManagedCursorInfo(optRes.get().getValue());
189199
callback.operationComplete(info, optRes.get().getStat());
190200
} catch (InvalidProtocolBufferException e) {
191201
callback.operationFailed(getException(e));
@@ -208,7 +218,7 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC
208218
info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
209219

210220
String path = PREFIX + ledgerName + "/" + cursorName;
211-
byte[] content = info.toByteArray(); // Binary format
221+
byte[] content = compressCursorInfo(info);
212222

213223
long expectedVersion;
214224

@@ -322,32 +332,97 @@ private static MetaStoreException getException(Throwable t) {
322332
}
323333
}
324334

335+
public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
336+
if (ledgerInfoCompressionType.equals(CompressionType.NONE)) {
337+
return managedLedgerInfo.toByteArray();
338+
}
339+
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
340+
.newBuilder()
341+
.setCompressionType(ledgerInfoCompressionType)
342+
.setUncompressedSize(managedLedgerInfo.getSerializedSize())
343+
.build();
344+
return compressManagedInfo(managedLedgerInfo.toByteArray(), mlInfoMetadata.toByteArray(),
345+
mlInfoMetadata.getSerializedSize(), ledgerInfoCompressionType);
346+
}
347+
348+
public byte[] compressCursorInfo(ManagedCursorInfo managedCursorInfo) {
349+
if (cursorInfoCompressionType.equals(CompressionType.NONE)) {
350+
return managedCursorInfo.toByteArray();
351+
}
352+
MLDataFormats.ManagedCursorInfoMetadata metadata = MLDataFormats.ManagedCursorInfoMetadata
353+
.newBuilder()
354+
.setCompressionType(cursorInfoCompressionType)
355+
.setUncompressedSize(managedCursorInfo.getSerializedSize())
356+
.build();
357+
return compressManagedInfo(managedCursorInfo.toByteArray(), metadata.toByteArray(),
358+
metadata.getSerializedSize(), cursorInfoCompressionType);
359+
}
360+
361+
public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
362+
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
363+
364+
byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
365+
if (metadataBytes != null) {
366+
try {
367+
MLDataFormats.ManagedLedgerInfoMetadata metadata =
368+
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
369+
return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
370+
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
371+
} catch (Exception e) {
372+
log.error("Failed to parse managedLedgerInfo metadata, "
373+
+ "fall back to parse managedLedgerInfo directly.", e);
374+
return ManagedLedgerInfo.parseFrom(data);
375+
} finally {
376+
byteBuf.release();
377+
}
378+
} else {
379+
return ManagedLedgerInfo.parseFrom(data);
380+
}
381+
}
382+
383+
public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProtocolBufferException {
384+
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
385+
386+
byte[] metadataBytes = extractCompressMetadataBytes(byteBuf);
387+
if (metadataBytes != null) {
388+
try {
389+
MLDataFormats.ManagedCursorInfoMetadata metadata =
390+
MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
391+
return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
392+
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
393+
} catch (Exception e) {
394+
log.error("Failed to parse ManagedCursorInfo metadata, "
395+
+ "fall back to parse ManagedCursorInfo directly", e);
396+
return ManagedCursorInfo.parseFrom(data);
397+
} finally {
398+
byteBuf.release();
399+
}
400+
} else {
401+
return ManagedCursorInfo.parseFrom(data);
402+
}
403+
}
404+
325405
/**
326-
* Compress ManagedLedgerInfo data.
406+
* Compress Managed Info data such as LedgerInfo, CursorInfo.
327407
*
328408
* compression data structure
329409
* [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
330-
*/
331-
public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
410+
*/
411+
private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSerializedSize,
412+
MLDataFormats.CompressionType compressionType) {
332413
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
333-
return managedLedgerInfo.toByteArray();
414+
return info;
334415
}
335416
ByteBuf metadataByteBuf = null;
336417
ByteBuf encodeByteBuf = null;
337418
try {
338-
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
339-
.newBuilder()
340-
.setCompressionType(compressionType)
341-
.setUncompressedSize(managedLedgerInfo.getSerializedSize())
342-
.build();
343-
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(
344-
mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6);
345-
metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
346-
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
347-
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());
348-
419+
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
420+
metadataSerializedSize + 6);
421+
metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA);
422+
metadataByteBuf.writeInt(metadataSerializedSize);
423+
metadataByteBuf.writeBytes(metadata);
349424
encodeByteBuf = getCompressionCodec(compressionType)
350-
.encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));
425+
.encode(Unpooled.wrappedBuffer(info));
351426
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
352427
compositeByteBuf.addComponent(true, metadataByteBuf);
353428
compositeByteBuf.addComponent(true, encodeByteBuf);
@@ -364,42 +439,14 @@ public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
364439
}
365440
}
366441

367-
public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
368-
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
369-
if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
370-
ByteBuf decodeByteBuf = null;
371-
try {
372-
int metadataSize = byteBuf.readInt();
373-
byte[] metadataBytes = new byte[metadataSize];
374-
byteBuf.readBytes(metadataBytes);
375-
MLDataFormats.ManagedLedgerInfoMetadata metadata =
376-
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
377-
378-
long unpressedSize = metadata.getUncompressedSize();
379-
decodeByteBuf = getCompressionCodec(metadata.getCompressionType())
380-
.decode(byteBuf, (int) unpressedSize);
381-
byte[] decodeBytes;
382-
// couldn't decode data by ZLIB compression byteBuf array() directly
383-
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
384-
decodeBytes = decodeByteBuf.array();
385-
} else {
386-
decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()];
387-
decodeByteBuf.readBytes(decodeBytes);
388-
}
389-
return ManagedLedgerInfo.parseFrom(decodeBytes);
390-
} catch (Exception e) {
391-
log.error("Failed to parse managedLedgerInfo metadata, "
392-
+ "fall back to parse managedLedgerInfo directly.", e);
393-
return ManagedLedgerInfo.parseFrom(data);
394-
} finally {
395-
if (decodeByteBuf != null) {
396-
decodeByteBuf.release();
397-
}
398-
byteBuf.release();
399-
}
400-
} else {
401-
return ManagedLedgerInfo.parseFrom(data);
442+
private byte[] extractCompressMetadataBytes(ByteBuf data) {
443+
if (data.readableBytes() > 0 && data.readShort() == MAGIC_MANAGED_INFO_METADATA) {
444+
int metadataSize = data.readInt();
445+
byte[] metadataBytes = new byte[metadataSize];
446+
data.readBytes(metadataBytes);
447+
return metadataBytes;
402448
}
449+
return null;
403450
}
404451

405452
private CompressionCodec getCompressionCodec(CompressionType compressionType) {

managed-ledger/src/main/proto/MLDataFormats.proto

+5
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,8 @@ message ManagedLedgerInfoMetadata {
137137
required CompressionType compressionType = 1;
138138
required int32 uncompressedSize = 2;
139139
}
140+
141+
message ManagedCursorInfoMetadata {
142+
required CompressionType compressionType = 1;
143+
required int32 uncompressedSize = 2;
144+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.impl;
20+
21+
import static org.junit.Assert.assertEquals;
22+
import static org.testng.Assert.expectThrows;
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import lombok.extern.slf4j.Slf4j;
27+
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
28+
import org.apache.pulsar.common.api.proto.CompressionType;
29+
import org.testng.Assert;
30+
import org.testng.annotations.DataProvider;
31+
import org.testng.annotations.Test;
32+
33+
/**
34+
* ManagedCursorInfo metadata test.
35+
*/
36+
@Slf4j
37+
public class ManagedCursorInfoMetadataTest {
38+
private final String INVALID_TYPE = "INVALID_TYPE";
39+
40+
@DataProvider(name = "compressionTypeProvider")
41+
private Object[][] compressionTypeProvider() {
42+
return new Object[][]{
43+
{null},
44+
{INVALID_TYPE},
45+
{CompressionType.NONE.name()},
46+
{CompressionType.LZ4.name()},
47+
{CompressionType.ZLIB.name()},
48+
{CompressionType.ZSTD.name()},
49+
{CompressionType.SNAPPY.name()}
50+
};
51+
}
52+
53+
@Test(dataProvider = "compressionTypeProvider")
54+
public void testEncodeAndDecode(String compressionType) throws IOException {
55+
long ledgerId = 10000;
56+
MLDataFormats.ManagedCursorInfo.Builder builder = MLDataFormats.ManagedCursorInfo.newBuilder();
57+
58+
builder.setCursorsLedgerId(ledgerId);
59+
builder.setMarkDeleteLedgerId(ledgerId);
60+
61+
List<MLDataFormats.BatchedEntryDeletionIndexInfo> batchedEntryDeletionIndexInfos = new ArrayList<>();
62+
for (int i = 0; i < 1000; i++) {
63+
MLDataFormats.NestedPositionInfo nestedPositionInfo = MLDataFormats.NestedPositionInfo.newBuilder()
64+
.setEntryId(i).setLedgerId(i).build();
65+
MLDataFormats.BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = MLDataFormats
66+
.BatchedEntryDeletionIndexInfo.newBuilder().setPosition(nestedPositionInfo).build();
67+
batchedEntryDeletionIndexInfos.add(batchedEntryDeletionIndexInfo);
68+
}
69+
builder.addAllBatchedEntryDeletionIndexInfo(batchedEntryDeletionIndexInfos);
70+
71+
MetaStoreImpl metaStore;
72+
if (INVALID_TYPE.equals(compressionType)) {
73+
IllegalArgumentException compressionTypeEx = expectThrows(IllegalArgumentException.class, () -> {
74+
new MetaStoreImpl(null, null, null, compressionType);
75+
});
76+
assertEquals("No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType."
77+
+ compressionType, compressionTypeEx.getMessage());
78+
return;
79+
} else {
80+
metaStore = new MetaStoreImpl(null, null, null, compressionType);
81+
}
82+
83+
MLDataFormats.ManagedCursorInfo managedCursorInfo = builder.build();
84+
byte[] compressionBytes = metaStore.compressCursorInfo(managedCursorInfo);
85+
log.info("[{}] Uncompressed data size: {}, compressed data size: {}",
86+
compressionType, managedCursorInfo.getSerializedSize(), compressionBytes.length);
87+
if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) {
88+
Assert.assertEquals(compressionBytes.length, managedCursorInfo.getSerializedSize());
89+
}
90+
91+
// parse compression data and unCompression data, check their results.
92+
MLDataFormats.ManagedCursorInfo info1 = metaStore.parseManagedCursorInfo(compressionBytes);
93+
MLDataFormats.ManagedCursorInfo info2 = metaStore.parseManagedCursorInfo(managedCursorInfo.toByteArray());
94+
Assert.assertEquals(info1, info2);
95+
}
96+
}

0 commit comments

Comments
 (0)