Skip to content

Commit

Permalink
[ManagedLedger] Compress managed ledger info (apache#11490)
Browse files Browse the repository at this point in the history
* compress managed ledger info

* 1. Move the `ManagedLedgerInfoMetadata` to `MLDataFormats.proto`.
2. Add configuration managedLedgerInfoCompressionType to control ManagedLedgerInfo compression type.

* use ByteBuf wrap bytes array, release ByteBuf if needed.

* make the compressionType as a final field

* fix comment

* 1. throw exception if using a invalid compression type.
2. change compression magic number.
3. add a unit test to verify compression could work well.

* change compression magic number.

* fix test
  • Loading branch information
gaoran10 authored and LeBW committed Aug 9, 2021
1 parent 12f2d7f commit a6be850
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Data;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.pulsar.common.api.proto.CompressionType;

/**
* Configuration for a {@link ManagedLedgerFactory}.
Expand Down Expand Up @@ -75,4 +76,9 @@ public class ManagedLedgerFactoryConfig {
* cluster name for prometheus stats
*/
private String clusterName;

/**
* ManagedLedgerInfo compression type. If the compression type is null or invalid, don't compress data.
*/
private String managedLedgerInfoCompressionType = CompressionType.NONE.name();
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private ManagedLedgerFactoryImpl(MetadataStore metadataStore,
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.metadataStore = metadataStore;
this.store = new MetaStoreImpl(metadataStore, scheduledExecutor);
this.store = new MetaStoreImpl(metadataStore, scheduledExecutor, config.getManagedLedgerInfoCompressionType());
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,23 @@
import java.util.Optional;
import java.util.concurrent.CompletionException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
Expand All @@ -47,9 +55,30 @@ public class MetaStoreImpl implements MetaStore {
private final MetadataStore store;
private final OrderedExecutor executor;

private static final int MAGIC_MANAGED_LEDGER_INFO_METADATA = 0x4778; // 0100 0111 0111 1000
private final CompressionType compressionType;

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) {
this.store = store;
this.executor = executor;
this.compressionType = CompressionType.NONE;
}

public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String compressionType) {
this.store = store;
this.executor = executor;
CompressionType finalCompressionType;
if (compressionType != null) {
try {
finalCompressionType = CompressionType.valueOf(compressionType);
} catch (Exception e) {
log.error("Failed to get compression type {} error msg: {}.", compressionType, e.getMessage());
throw e;
}
} else {
finalCompressionType = CompressionType.NONE;
}
this.compressionType = finalCompressionType;
}

@Override
Expand All @@ -62,7 +91,7 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing,
if (optResult.isPresent()) {
ManagedLedgerInfo info;
try {
info = ManagedLedgerInfo.parseFrom(optResult.get().getValue());
info = parseManagedLedgerInfo(optResult.get().getValue());
info = updateMLInfoTimestamp(info);
callback.operationComplete(info, optResult.get().getStat());
} catch (InvalidProtocolBufferException e) {
Expand Down Expand Up @@ -101,9 +130,8 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St
log.debug("[{}] Updating metadata version={} with content={}", ledgerName, stat, mlInfo);
}

byte[] serializedMlInfo = mlInfo.toByteArray(); // Binary format
String path = PREFIX + ledgerName;
store.put(path, serializedMlInfo, Optional.of(stat.getVersion()))
store.put(path, compressLedgerInfo(mlInfo), Optional.of(stat.getVersion()))
.thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion), executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
Expand Down Expand Up @@ -264,4 +292,88 @@ private static MetaStoreException getException(Throwable t) {
return new MetaStoreException(t);
}
}

/**
* Compress ManagedLedgerInfo data.
*
* compression data structure
* [MAGIC_NUMBER](2) + [METADATA_SIZE](4) + [METADATA_PAYLOAD] + [MANAGED_LEDGER_INFO_PAYLOAD]
*/
public byte[] compressLedgerInfo(ManagedLedgerInfo managedLedgerInfo) {
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
return managedLedgerInfo.toByteArray();
}
ByteBuf metadataByteBuf = null;
ByteBuf encodeByteBuf = null;
try {
MLDataFormats.ManagedLedgerInfoMetadata mlInfoMetadata = MLDataFormats.ManagedLedgerInfoMetadata
.newBuilder()
.setCompressionType(compressionType)
.setUncompressedSize(managedLedgerInfo.getSerializedSize())
.build();
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(
mlInfoMetadata.getSerializedSize() + 6, mlInfoMetadata.getSerializedSize() + 6);
metadataByteBuf.writeShort(MAGIC_MANAGED_LEDGER_INFO_METADATA);
metadataByteBuf.writeInt(mlInfoMetadata.getSerializedSize());
metadataByteBuf.writeBytes(mlInfoMetadata.toByteArray());

encodeByteBuf = getCompressionCodec().encode(Unpooled.wrappedBuffer(managedLedgerInfo.toByteArray()));

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
compositeByteBuf.readBytes(dataBytes);
return dataBytes;
} finally {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
}
}

public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProtocolBufferException {
ByteBuf byteBuf = Unpooled.wrappedBuffer(data);
if (byteBuf.readableBytes() > 0 && byteBuf.readShort() == MAGIC_MANAGED_LEDGER_INFO_METADATA) {
ByteBuf decodeByteBuf = null;
try {
int metadataSize = byteBuf.readInt();
byte[] metadataBytes = new byte[metadataSize];
byteBuf.readBytes(metadataBytes);
MLDataFormats.ManagedLedgerInfoMetadata metadata =
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);

long unpressedSize = metadata.getUncompressedSize();
decodeByteBuf = getCompressionCodec().decode(byteBuf, (int) unpressedSize);
byte[] decodeBytes;
// couldn't decode data by ZLIB compression byteBuf array() directly
if (decodeByteBuf.hasArray() && !CompressionType.ZLIB.equals(metadata.getCompressionType())) {
decodeBytes = decodeByteBuf.array();
} else {
decodeBytes = new byte[decodeByteBuf.readableBytes() - decodeByteBuf.readerIndex()];
decodeByteBuf.readBytes(decodeBytes);
}
return ManagedLedgerInfo.parseFrom(decodeBytes);
} catch (Exception e) {
log.error("Failed to parse managedLedgerInfo metadata, "
+ "fall back to parse managedLedgerInfo directly.", e);
return ManagedLedgerInfo.parseFrom(data);
} finally {
if (decodeByteBuf != null) {
decodeByteBuf.release();
}
}
} else {
return ManagedLedgerInfo.parseFrom(data);
}
}

private CompressionCodec getCompressionCodec() {
return CompressionCodecProvider.getCompressionCodec(
org.apache.pulsar.common.api.proto.CompressionType.valueOf(compressionType.name()));
}

}
13 changes: 13 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,16 @@ message ManagedCursorInfo {
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
}

enum CompressionType {
NONE = 0;
LZ4 = 1;
ZLIB = 2;
ZSTD = 3;
SNAPPY = 4;
}

message ManagedLedgerInfoMetadata {
required CompressionType compressionType = 1;
required int32 uncompressedSize = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;

import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* ManagedLedgerInfo metadata test.
*/
@Slf4j
public class ManagedLedgerInfoMetadataTest {

@DataProvider(name = "compressionTypeProvider")
private Object[][] compressionTypeProvider() {
return new Object[][] {
{null},
{"INVALID_TYPE"},
{CompressionType.NONE.name()},
{CompressionType.LZ4.name()},
{CompressionType.ZLIB.name()},
{CompressionType.ZSTD.name()},
{CompressionType.SNAPPY.name()}
};
}

@Test(dataProvider = "compressionTypeProvider")
public void testEncodeAndDecode(String compressionType) throws IOException {
long ledgerId = 10000;
List<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfoList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder builder = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder();
builder.setLedgerId(ledgerId);
builder.setEntries(RandomUtils.nextInt());
builder.setSize(RandomUtils.nextLong());
builder.setTimestamp(System.currentTimeMillis());

UUID uuid = UUID.randomUUID();
builder.getOffloadContextBuilder()
.setUidMsb(uuid.getMostSignificantBits())
.setUidLsb(uuid.getLeastSignificantBits());
Map<String, String> offloadDriverMetadata = new HashMap<>();
offloadDriverMetadata.put("bucket", "test-bucket");
offloadDriverMetadata.put("managedLedgerOffloadDriver", "pulsar-offload-dev");
offloadDriverMetadata.put("serviceEndpoint", "https://s3.eu-west-1.amazonaws.com");
offloadDriverMetadata.put("region", "eu-west-1");
OffloadUtils.setOffloadDriverMetadata(
builder,
"aws-s3",
offloadDriverMetadata
);

MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = builder.build();
ledgerInfoList.add(ledgerInfo);
ledgerId ++;
}

MLDataFormats.ManagedLedgerInfo managedLedgerInfo = MLDataFormats.ManagedLedgerInfo.newBuilder()
.addAllLedgerInfo(ledgerInfoList)
.build();

MetaStoreImpl metaStore;
try {
metaStore = new MetaStoreImpl(null, null, compressionType);
if ("INVALID_TYPE".equals(compressionType)) {
Assert.fail("The managedLedgerInfo compression type is invalid, should fail.");
}
} catch (Exception e) {
if ("INVALID_TYPE".equals(compressionType)) {
Assert.assertEquals(e.getClass(), IllegalArgumentException.class);
Assert.assertEquals(
"No enum constant org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType."
+ compressionType, e.getMessage());
return;
} else {
throw e;
}
}

byte[] compressionBytes = metaStore.compressLedgerInfo(managedLedgerInfo);
log.info("[{}] Uncompressed data size: {}, compressed data size: {}",
compressionType, managedLedgerInfo.getSerializedSize(), compressionBytes.length);
if (compressionType == null || compressionType.equals(CompressionType.NONE.name())) {
Assert.assertEquals(compressionBytes.length, managedLedgerInfo.getSerializedSize());
}

// parse compression data and unCompression data, check their results.
MLDataFormats.ManagedLedgerInfo info1 = metaStore.parseManagedLedgerInfo(compressionBytes);
MLDataFormats.ManagedLedgerInfo info2 = metaStore.parseManagedLedgerInfo(managedLedgerInfo.toByteArray());
Assert.assertEquals(info1, info2);
}

@Test
public void testParseEmptyData() throws InvalidProtocolBufferException {
MetaStoreImpl metaStore = new MetaStoreImpl(null, null);
MLDataFormats.ManagedLedgerInfo managedLedgerInfo = metaStore.parseManagedLedgerInfo(new byte[0]);
Assert.assertEquals(managedLedgerInfo.toString(), "");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1608,6 +1608,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
private String managedLedgerDataReadPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST
.getValue();

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n"
+ "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.")
private String managedLedgerInfoCompressionType = "NONE";

/*** --- Load balancer --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void initialize(ServiceConfiguration conf, MetadataStore metadataStore,
conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
managedLedgerFactoryConfig.setCursorPositionFlushSeconds(conf.getManagedLedgerCursorPositionFlushSeconds());
managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(conf.getManagedLedgerInfoCompressionType());

Configuration configuration = new ClientConfiguration();
if (conf.isBookkeeperClientExposeStatsToPrometheus()) {
Expand Down
Loading

0 comments on commit a6be850

Please sign in to comment.