diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java index c3a0cc6b4324c..b1fbbfcf4a7ae 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadUtils.java @@ -19,12 +19,27 @@ package org.apache.bookkeeper.mledger.offload; import com.google.common.collect.Maps; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import com.google.protobuf.ByteString; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerMetadataBuilder; +import org.apache.bookkeeper.client.api.DigestType; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.proto.MLDataFormats.KeyValue; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadContext; import org.apache.bookkeeper.mledger.proto.MLDataFormats.OffloadDriverMetadata; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.DataFormats; +@Slf4j public final class OffloadUtils { private OffloadUtils() {} @@ -87,4 +102,81 @@ public static void setOffloadDriverMetadata(LedgerInfo.Builder infoBuilder, .setValue(v) .build())); } + + public static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) { + DataFormats.LedgerMetadataFormat.Builder builder = DataFormats.LedgerMetadataFormat.newBuilder(); + builder.setQuorumSize(metadata.getWriteQuorumSize()) + .setAckQuorumSize(metadata.getAckQuorumSize()) + .setEnsembleSize(metadata.getEnsembleSize()) + .setLength(metadata.getLength()) + .setState(metadata.isClosed() ? DataFormats.LedgerMetadataFormat.State.CLOSED : DataFormats.LedgerMetadataFormat.State.OPEN) + .setLastEntryId(metadata.getLastEntryId()) + .setCtime(metadata.getCtime()) + .setDigestType(BookKeeper.DigestType.toProtoDigestType( + BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType()))); + + for (Map.Entry e : metadata.getCustomMetadata().entrySet()) { + builder.addCustomMetadataBuilder() + .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue())); + } + + for (Map.Entry> e : metadata.getAllEnsembles().entrySet()) { + builder.addSegmentBuilder() + .setFirstEntryId(e.getKey()) + .addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList())); + } + + return builder.build().toByteArray(); + } + + public static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException { + DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build(); + LedgerMetadataBuilder builder = LedgerMetadataBuilder.create() + .withLastEntryId(ledgerMetadataFormat.getLastEntryId()) + .withPassword(ledgerMetadataFormat.getPassword().toByteArray()) + .withClosedState() + .withMetadataFormatVersion(2) + .withLength(ledgerMetadataFormat.getLength()) + .withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize()) + .withCreationTime(ledgerMetadataFormat.getCtime()) + .withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize()) + .withEnsembleSize(ledgerMetadataFormat.getEnsembleSize()); + ledgerMetadataFormat.getSegmentList().forEach(segment -> { + ArrayList addressArrayList = new ArrayList<>(); + segment.getEnsembleMemberList().forEach(address -> { + try { + addressArrayList.add(new BookieSocketAddress(address)); + } catch (IOException e) { + log.error("Exception when create BookieSocketAddress. ", e); + } + }); + builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList); + }); + + if (ledgerMetadataFormat.getCustomMetadataCount() > 0) { + Map customMetadata = Maps.newHashMap(); + ledgerMetadataFormat.getCustomMetadataList().forEach( + entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray())); + builder.withCustomMetadata(customMetadata); + } + + switch (ledgerMetadataFormat.getDigestType()) { + case HMAC: + builder.withDigestType(DigestType.MAC); + break; + case CRC32: + builder.withDigestType(DigestType.CRC32); + break; + case CRC32C: + builder.withDigestType(DigestType.CRC32C); + break; + case DUMMY: + builder.withDigestType(DigestType.DUMMY); + break; + default: + throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType()); + } + + return builder.build(); + } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index b1663d229fd6a..64399d4c2666c 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -18,12 +18,9 @@ */ package org.apache.bookkeeper.mledger.offload.filesystem.impl; -import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerMetadataBuilder; -import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -32,8 +29,6 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.DataFormats; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -43,11 +38,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata; + public class FileStoreBackedReadHandleImpl implements ReadHandle { private static final Logger log = LoggerFactory.getLogger(FileStoreBackedReadHandleImpl.class); private final ExecutorService executor; @@ -184,56 +180,4 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn public static ReadHandle open(ScheduledExecutorService executor, MapFile.Reader reader, long ledgerId) throws IOException { return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId); } - - private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException { - DataFormats.LedgerMetadataFormat ledgerMetadataFormat = DataFormats.LedgerMetadataFormat.newBuilder().mergeFrom(bytes).build(); - LedgerMetadataBuilder builder = LedgerMetadataBuilder.create() - .withLastEntryId(ledgerMetadataFormat.getLastEntryId()) - .withPassword(ledgerMetadataFormat.getPassword().toByteArray()) - .withClosedState() - .withMetadataFormatVersion(2) - .withLength(ledgerMetadataFormat.getLength()) - .withAckQuorumSize(ledgerMetadataFormat.getAckQuorumSize()) - .withCreationTime(ledgerMetadataFormat.getCtime()) - .withWriteQuorumSize(ledgerMetadataFormat.getQuorumSize()) - .withEnsembleSize(ledgerMetadataFormat.getEnsembleSize()); - ledgerMetadataFormat.getSegmentList().forEach(segment -> { - ArrayList addressArrayList = new ArrayList<>(); - segment.getEnsembleMemberList().forEach(address -> { - try { - addressArrayList.add(new BookieSocketAddress(address)); - } catch (IOException e) { - log.error("Exception when create BookieSocketAddress. ", e); - } - }); - builder.newEnsembleEntry(segment.getFirstEntryId(), addressArrayList); - }); - - if (ledgerMetadataFormat.getCustomMetadataCount() > 0) { - Map customMetadata = Maps.newHashMap(); - ledgerMetadataFormat.getCustomMetadataList().forEach( - entry -> customMetadata.put(entry.getKey(), entry.getValue().toByteArray())); - builder.withCustomMetadata(customMetadata); - } - - switch (ledgerMetadataFormat.getDigestType()) { - case HMAC: - builder.withDigestType(DigestType.MAC); - break; - case CRC32: - builder.withDigestType(DigestType.CRC32); - break; - case CRC32C: - builder.withDigestType(DigestType.CRC32C); - break; - case DUMMY: - builder.withDigestType(DigestType.DUMMY); - break; - default: - throw new IllegalArgumentException("Unable to convert digest type " + ledgerMetadataFormat.getDigestType()); - } - - return builder.build(); - } - } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index eda747ded8712..bbee8289b58ae 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -20,17 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.ByteString; -import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; -import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.DataFormats; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -44,7 +39,6 @@ import java.io.IOException; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.UUID; @@ -52,8 +46,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; +import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat; public class FileSystemManagedLedgerOffloader implements LedgerOffloader { @@ -189,7 +183,7 @@ public void run() { //store the ledgerMetadata in -1 index key.set(METADATA_KEY_INDEX); byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata()); - value.set(buildLedgerMetadataFormat(readHandle.getLedgerMetadata()), 0, ledgerMetadata.length); + value.set(ledgerMetadata, 0, ledgerMetadata.length); dataWriter.append(key, value); AtomicLong haveOffloadEntryNumber = new AtomicLong(0); long needToOffloadFirstEntryNumber = 0; @@ -307,32 +301,6 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, Map e : metadata.getCustomMetadata().entrySet()) { - builder.addCustomMetadataBuilder() - .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue())); - } - - for (Map.Entry> e : metadata.getAllEnsembles().entrySet()) { - builder.addSegmentBuilder() - .setFirstEntryId(e.getKey()) - .addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList())); - } - - return builder.build().toByteArray(); - } - @Override public OffloadPolicies getOffloadPolicies() { return offloadPolicies; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java index 16492bc0f6dfb..42a5d8da67130 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/OffloadIndexBlockImpl.java @@ -18,10 +18,7 @@ */ package org.apache.bookkeeper.mledger.offload.jcloud.impl; -import static com.google.common.base.Preconditions.checkState; - import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; @@ -31,25 +28,20 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; import java.util.TreeMap; -import java.util.stream.Collectors; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.DataFormats; -import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat; +import static org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata; + public class OffloadIndexBlockImpl implements OffloadIndexBlock { private static final Logger log = LoggerFactory.getLogger(OffloadIndexBlockImpl.class); @@ -136,32 +128,6 @@ public long getDataBlockHeaderLength() { return this.dataHeaderLength; } - private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) { - LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder(); - builder.setQuorumSize(metadata.getWriteQuorumSize()) - .setAckQuorumSize(metadata.getAckQuorumSize()) - .setEnsembleSize(metadata.getEnsembleSize()) - .setLength(metadata.getLength()) - .setState(metadata.isClosed() ? LedgerMetadataFormat.State.CLOSED : LedgerMetadataFormat.State.OPEN) - .setLastEntryId(metadata.getLastEntryId()) - .setCtime(metadata.getCtime()) - .setDigestType(BookKeeper.DigestType.toProtoDigestType( - BookKeeper.DigestType.fromApiDigestType(metadata.getDigestType()))); - - for (Map.Entry e : metadata.getCustomMetadata().entrySet()) { - builder.addCustomMetadataBuilder() - .setKey(e.getKey()).setValue(ByteString.copyFrom(e.getValue())); - } - - for (Map.Entry> e : metadata.getAllEnsembles().entrySet()) { - builder.addSegmentBuilder() - .setFirstEntryId(e.getKey()) - .addAllEnsembleMember(e.getValue().stream().map(a -> a.toString()).collect(Collectors.toList())); - } - - return builder.build().toByteArray(); - } - /** * Get the content of the index block as InputStream. * Read out in format: @@ -203,145 +169,6 @@ public OffloadIndexBlock.IndexInputStream toStream() throws IOException { return new OffloadIndexBlock.IndexInputStream(new ByteBufInputStream(out, true), indexBlockLength); } - static private class InternalLedgerMetadata implements LedgerMetadata { - private LedgerMetadataFormat ledgerMetadataFormat; - - private int ensembleSize; - private int writeQuorumSize; - private int ackQuorumSize; - private long lastEntryId; - private long length; - private DataFormats.LedgerMetadataFormat.DigestType digestType; - private long ctime; - private byte[] password; - private State state; - private Map customMetadata = Maps.newHashMap(); - private TreeMap> ensembles = new TreeMap>(); - - InternalLedgerMetadata(LedgerMetadataFormat ledgerMetadataFormat) { - this.ensembleSize = ledgerMetadataFormat.getEnsembleSize(); - this.writeQuorumSize = ledgerMetadataFormat.getQuorumSize(); - this.ackQuorumSize = ledgerMetadataFormat.getAckQuorumSize(); - this.lastEntryId = ledgerMetadataFormat.getLastEntryId(); - this.length = ledgerMetadataFormat.getLength(); - this.digestType = ledgerMetadataFormat.getDigestType(); - this.ctime = ledgerMetadataFormat.getCtime(); - this.state = State.CLOSED; - this.password = ledgerMetadataFormat.getPassword().toByteArray(); - - if (ledgerMetadataFormat.getCustomMetadataCount() > 0) { - ledgerMetadataFormat.getCustomMetadataList().forEach( - entry -> this.customMetadata.put(entry.getKey(), entry.getValue().toByteArray())); - } - - ledgerMetadataFormat.getSegmentList().forEach(segment -> { - ArrayList addressArrayList = new ArrayList(); - segment.getEnsembleMemberList().forEach(address -> { - try { - addressArrayList.add(new BookieSocketAddress(address)); - } catch (IOException e) { - log.error("Exception when create BookieSocketAddress. ", e); - } - }); - this.ensembles.put(segment.getFirstEntryId(), addressArrayList); - }); - } - - @Override - public boolean hasPassword() { return true; } - - @Override - public byte[] getPassword() { return password; } - - @Override - public State getState() { return state; } - - @Override - public int getMetadataFormatVersion() { return 2; } - - @Override - public long getCToken() { - return 0; - } - - @Override - public int getEnsembleSize() { - return this.ensembleSize; - } - - @Override - public int getWriteQuorumSize() { - return this.writeQuorumSize; - } - - @Override - public int getAckQuorumSize() { - return this.ackQuorumSize; - } - - @Override - public long getLastEntryId() { - return this.lastEntryId; - } - - @Override - public long getLength() { - return this.length; - } - - @Override - public DigestType getDigestType() { - switch (this.digestType) { - case HMAC: - return DigestType.MAC; - case CRC32: - return DigestType.CRC32; - case CRC32C: - return DigestType.CRC32C; - case DUMMY: - return DigestType.DUMMY; - default: - throw new IllegalArgumentException("Unable to convert digest type " + digestType); - } - } - - @Override - public long getCtime() { - return this.ctime; - } - - @Override - public boolean isClosed() { - return this.state == State.CLOSED; - } - - @Override - public Map getCustomMetadata() { - return this.customMetadata; - } - - @Override - public List getEnsembleAt(long entryId) { - return ensembles.get(ensembles.headMap(entryId + 1).lastKey()); - } - - @Override - public NavigableMap> getAllEnsembles() { - return this.ensembles; - } - - @Override - public String toSafeString() { - return toString(); - } - } - - private static LedgerMetadata parseLedgerMetadata(byte[] bytes) throws IOException { - LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder(); - builder.mergeFrom(bytes); - return new InternalLedgerMetadata(builder.build()); - } - private OffloadIndexBlock fromStream(InputStream stream) throws IOException { DataInputStream dis = new DataInputStream(stream); int magic = dis.readInt();