From f0362887305ee0cd07c1d4e6c3ccb7c7fac44dff Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Tue, 23 May 2017 11:59:21 -0700 Subject: [PATCH 1/4] Support DbLedgerStorage in LedgerCmd to get list of logger files for a given ledgerId --- .../apache/bookkeeper/bookie/BookieShell.java | 74 ++++++++++++-- .../bookie/storage/ldb/ArrayUtil.java | 6 +- .../bookkeeper/client/LedgerCmdTest.java | 96 +++++++++++++++++++ 3 files changed, 167 insertions(+), 9 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 46222aa84f0..08fb39f2eae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -42,8 +42,11 @@ import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; import org.apache.bookkeeper.bookie.Journal.JournalScanner; +import org.apache.bookkeeper.bookie.storage.ldb.ArrayUtil; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageRocksDB; import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -91,6 +94,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; import io.netty.buffer.ByteBuf; @@ -390,12 +394,19 @@ public int runCmd(CommandLine cmdLine) throws Exception { printUsage(); return -1; } - if (printMeta) { - // print meta - readLedgerMeta(ledgerId); + + if (bkConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName())) { + // dump ledger info + return readLedgerIndexEntriesFromDbLedgerStorage(ledgerId); + } else { + if (printMeta) { + // print meta + readLedgerMeta(ledgerId); + } + // dump ledger info + readLedgerIndexEntries(ledgerId); } - // dump ledger info - readLedgerIndexEntries(ledgerId); + return 0; } @@ -2003,7 +2014,7 @@ protected void readLedgerMeta(long ledgerId) throws Exception { } /** - * Read ledger index entires + * Read ledger index entries * * @param ledgerId * Ledger Id @@ -2049,6 +2060,57 @@ protected void readLedgerIndexEntries(long ledgerId) throws IOException { } } + /** + * Read ledger index entries + * + * @param ledgerId + * @return + * @throws IOException if failed to init KeyValueStorageRocksDB + */ + protected int readLedgerIndexEntriesFromDbLedgerStorage(long ledgerId) throws IOException { + + ServerConfiguration conf = new ServerConfiguration(bkConf); + LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs()); + + String ledgersPath = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); + String locationsDbPath = FileSystems.getDefault().getPath(ledgersPath, "locations").toFile().toString(); + KeyValueStorageRocksDB ledgersDb; + try { + ledgersDb = new KeyValueStorageRocksDB(locationsDbPath, DbConfigType.Small, conf, true); + } catch (IOException e) { + System.err.printf("ERROR: initializing rocksDb storage %s", e.getMessage()); + return -1; + } + try { + byte[] array = new byte[16]; + for (long curEntry = 0; curEntry <= Long.MAX_VALUE; curEntry++) { + ArrayUtil.setLong(array, 0, ledgerId); + ArrayUtil.setLong(array, 8, curEntry); + byte[] value = new byte[8]; + try { + int code = ledgersDb.get(array, value); + if (code <= 0) { + // no-more entry found + System.out.println("entry " + curEntry + "\t:\tN/A"); + break; + } + long offset = ArrayUtil.getLong(value, 0); + long entryLogId = offset >> 32L; + long pos = offset & 0xffffffffL; + System.out.println("entry " + curEntry + "\t:\t(log:" + entryLogId + ", pos: " + pos + ")"); + } catch (Exception e) { + System.err.println("ERROR: while getting location for entry " + ledgerId + ", " + curEntry + ", " + + e.getMessage()); + break; + } + } + } finally { + ledgersDb.close(); + } + + return 0; + } + /** * Get an iterable over pages of entries and locations for a ledger * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java index 2ab527e95fe..e541aa7cf2e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java @@ -7,13 +7,13 @@ /** * Utility to serialize/deserialize longs into byte arrays */ -class ArrayUtil { +public class ArrayUtil { private static final boolean UNALIGNED = PlatformDependent.isUnaligned(); private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); private static final boolean BIG_ENDIAN_NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; - static long getLong(byte[] array, int index) { + public static long getLong(byte[] array, int index) { if (HAS_UNSAFE && UNALIGNED) { long v = PlatformDependent.getLong(array, index); return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v); @@ -29,7 +29,7 @@ static long getLong(byte[] array, int index) { (long) array[index + 7] & 0xff; } - static void setLong(byte[] array, int index, long value) { + public static void setLong(byte[] array, int index, long value) { if (HAS_UNSAFE && UNALIGNED) { PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value)); } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java new file mode 100644 index 00000000000..babe2cb11d3 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java @@ -0,0 +1,96 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.bookie.BookieShell; +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import junit.framework.Assert; + +public class LedgerCmdTest extends BookKeeperClusterTestCase { + + private final static Logger LOG = LoggerFactory.getLogger(LedgerCmdTest.class); + private DigestType digestType = DigestType.CRC32; + private static final String PASSWORD = "testPasswd"; + + public LedgerCmdTest() { + super(1); + baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + baseConf.setGcWaitTime(60000); + } + + + /** + * list of entry logger files that contains given ledgerId + */ + @Test + public void testLedgerDbStorageCmd() throws Exception { + + BookKeeper bk = new BookKeeper(baseClientConf, zkc); + LOG.info("Create ledger and add entries to it"); + LedgerHandle lh1 = createLedgerWithEntries(bk, 10); + + Thread.sleep(1000); // sleep to flush entries to logger file + String[] argv = new String[] { "ledger", Long.toString(lh1.getId()) }; + final ServerConfiguration conf = bsConfs.get(0); + conf.setUseHostNameAsBookieID(true); + + BookieShell bkShell = new BookieShell(); + bkShell.setConf(conf); + + Assert.assertEquals("Failed to return exit code!", 0, bkShell.run(argv)); + + } + + private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws Exception { + LedgerHandle lh = bk.createLedger(1, 1, digestType, PASSWORD.getBytes()); + final AtomicInteger rc = new AtomicInteger(BKException.Code.OK); + final CountDownLatch latch = new CountDownLatch(numOfEntries); + + final AddCallback cb = new AddCallback() { + public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) { + rc.compareAndSet(BKException.Code.OK, rccb); + latch.countDown(); + } + }; + for (int i = 0; i < numOfEntries; i++) { + lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null); + } + if (!latch.await(30, TimeUnit.SECONDS)) { + throw new Exception("Entries took too long to add"); + } + if (rc.get() != BKException.Code.OK) { + throw BKException.create(rc.get()); + } + return lh; + } +} From 63d1e75f09433ce7798b6b6f57f83a0a39106faa Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 24 May 2017 11:11:13 -0700 Subject: [PATCH 2/4] fix: test remove sleep by decrease flush-interval to sync logger immediately --- .../test/java/org/apache/bookkeeper/client/LedgerCmdTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java index babe2cb11d3..b43ce6dcbce 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java @@ -46,6 +46,7 @@ public LedgerCmdTest() { super(1); baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName()); baseConf.setGcWaitTime(60000); + baseConf.setFlushInterval(1); } @@ -59,7 +60,6 @@ public void testLedgerDbStorageCmd() throws Exception { LOG.info("Create ledger and add entries to it"); LedgerHandle lh1 = createLedgerWithEntries(bk, 10); - Thread.sleep(1000); // sleep to flush entries to logger file String[] argv = new String[] { "ledger", Long.toString(lh1.getId()) }; final ServerConfiguration conf = bsConfs.get(0); conf.setUseHostNameAsBookieID(true); From c41ac6d992eeb0562b78043fe2063dc60a8ffac6 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 24 May 2017 11:50:05 -0700 Subject: [PATCH 3/4] Use EntryLocationIndex to get ledger location --- .../apache/bookkeeper/bookie/BookieShell.java | 84 +++++++++---------- .../bookie/storage/ldb/ArrayUtil.java | 6 +- 2 files changed, 42 insertions(+), 48 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 08fb39f2eae..bb5d096da77 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -42,7 +42,6 @@ import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; import org.apache.bookkeeper.bookie.Journal.JournalScanner; -import org.apache.bookkeeper.bookie.storage.ldb.ArrayUtil; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; @@ -94,7 +93,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; import io.netty.buffer.ByteBuf; @@ -2067,49 +2065,45 @@ protected void readLedgerIndexEntries(long ledgerId) throws IOException { * @return * @throws IOException if failed to init KeyValueStorageRocksDB */ - protected int readLedgerIndexEntriesFromDbLedgerStorage(long ledgerId) throws IOException { - - ServerConfiguration conf = new ServerConfiguration(bkConf); - LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs()); - - String ledgersPath = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); - String locationsDbPath = FileSystems.getDefault().getPath(ledgersPath, "locations").toFile().toString(); - KeyValueStorageRocksDB ledgersDb; - try { - ledgersDb = new KeyValueStorageRocksDB(locationsDbPath, DbConfigType.Small, conf, true); - } catch (IOException e) { - System.err.printf("ERROR: initializing rocksDb storage %s", e.getMessage()); - return -1; - } - try { - byte[] array = new byte[16]; - for (long curEntry = 0; curEntry <= Long.MAX_VALUE; curEntry++) { - ArrayUtil.setLong(array, 0, ledgerId); - ArrayUtil.setLong(array, 8, curEntry); - byte[] value = new byte[8]; - try { - int code = ledgersDb.get(array, value); - if (code <= 0) { - // no-more entry found - System.out.println("entry " + curEntry + "\t:\tN/A"); - break; - } - long offset = ArrayUtil.getLong(value, 0); - long entryLogId = offset >> 32L; - long pos = offset & 0xffffffffL; - System.out.println("entry " + curEntry + "\t:\t(log:" + entryLogId + ", pos: " + pos + ")"); - } catch (Exception e) { - System.err.println("ERROR: while getting location for entry " + ledgerId + ", " + curEntry + ", " - + e.getMessage()); - break; - } - } - } finally { - ledgersDb.close(); - } - - return 0; - } + protected int readLedgerIndexEntriesFromDbLedgerStorage(long ledgerId) throws IOException { + + ServerConfiguration conf = new ServerConfiguration(bkConf); + LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs()); + String ledgersPath = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); + + EntryLocationIndex entryLocationIndex = null; + + try { + entryLocationIndex = new EntryLocationIndex(conf, + (path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true), + ledgersPath, NullStatsLogger.INSTANCE); + } catch (IOException e) { + System.err.printf("ERROR: initializing rocksDb storage %s", e.getMessage()); + return -1; + } + try { + long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId); + for (long curEntry = 0; curEntry <= lastEntryId; curEntry++) { + try { + long offset = entryLocationIndex.getLocation(ledgerId, curEntry); + if (offset <= 0) { + // entry not found in this bookie + continue; + } + long entryLogId = offset >> 32L; + long pos = offset & 0xffffffffL; + System.out.println("entry " + curEntry + "\t:\t(log: " + entryLogId + ", pos: " + pos + ")"); + } catch (Exception e) { + System.err.println("ERROR: while getting location for entry " + ledgerId + ", " + curEntry + ", " + + e.getMessage()); + } + } + } finally { + entryLocationIndex.close(); + } + + return 0; + } /** * Get an iterable over pages of entries and locations for a ledger diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java index e541aa7cf2e..2ab527e95fe 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ArrayUtil.java @@ -7,13 +7,13 @@ /** * Utility to serialize/deserialize longs into byte arrays */ -public class ArrayUtil { +class ArrayUtil { private static final boolean UNALIGNED = PlatformDependent.isUnaligned(); private static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); private static final boolean BIG_ENDIAN_NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; - public static long getLong(byte[] array, int index) { + static long getLong(byte[] array, int index) { if (HAS_UNSAFE && UNALIGNED) { long v = PlatformDependent.getLong(array, index); return BIG_ENDIAN_NATIVE_ORDER ? v : Long.reverseBytes(v); @@ -29,7 +29,7 @@ public static long getLong(byte[] array, int index) { (long) array[index + 7] & 0xff; } - public static void setLong(byte[] array, int index, long value) { + static void setLong(byte[] array, int index, long value) { if (HAS_UNSAFE && UNALIGNED) { PlatformDependent.putLong(array, index, BIG_ENDIAN_NATIVE_ORDER ? value : Long.reverseBytes(value)); } else { From 2d9827c9a11d62b8a8a40812946df5252f0e0c35 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 24 May 2017 23:33:08 -0700 Subject: [PATCH 4/4] Move readLedgerIndexEntries to DbLedgerStorage --- .../apache/bookkeeper/bookie/BookieShell.java | 60 ++++--------------- .../bookie/storage/ldb/DbLedgerStorage.java | 45 +++++++++++++- 2 files changed, 54 insertions(+), 51 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index bb5d096da77..86321bd0466 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -392,10 +392,17 @@ public int runCmd(CommandLine cmdLine) throws Exception { printUsage(); return -1; } - + if (bkConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName())) { // dump ledger info - return readLedgerIndexEntriesFromDbLedgerStorage(ledgerId); + try { + DbLedgerStorage.readLedgerIndexEntries(ledgerId, bkConf, + (currentEntry, entryLogId, position) -> System.out.println( + "entry " + currentEntry + "\t:\t(log: " + entryLogId + ", pos: " + position + ")")); + } catch (IOException e) { + System.err.printf("ERROR: initializing dbLedgerStorage %s", e.getMessage()); + return -1; + } } else { if (printMeta) { // print meta @@ -404,7 +411,7 @@ public int runCmd(CommandLine cmdLine) throws Exception { // dump ledger info readLedgerIndexEntries(ledgerId); } - + return 0; } @@ -2058,53 +2065,6 @@ protected void readLedgerIndexEntries(long ledgerId) throws IOException { } } - /** - * Read ledger index entries - * - * @param ledgerId - * @return - * @throws IOException if failed to init KeyValueStorageRocksDB - */ - protected int readLedgerIndexEntriesFromDbLedgerStorage(long ledgerId) throws IOException { - - ServerConfiguration conf = new ServerConfiguration(bkConf); - LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs()); - String ledgersPath = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); - - EntryLocationIndex entryLocationIndex = null; - - try { - entryLocationIndex = new EntryLocationIndex(conf, - (path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true), - ledgersPath, NullStatsLogger.INSTANCE); - } catch (IOException e) { - System.err.printf("ERROR: initializing rocksDb storage %s", e.getMessage()); - return -1; - } - try { - long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId); - for (long curEntry = 0; curEntry <= lastEntryId; curEntry++) { - try { - long offset = entryLocationIndex.getLocation(ledgerId, curEntry); - if (offset <= 0) { - // entry not found in this bookie - continue; - } - long entryLogId = offset >> 32L; - long pos = offset & 0xffffffffL; - System.out.println("entry " + curEntry + "\t:\t(log: " + entryLogId + ", pos: " + pos + ")"); - } catch (Exception e) { - System.err.println("ERROR: while getting location for entry " + ledgerId + ", " + curEntry + ", " - + e.getMessage()); - } - } - } finally { - entryLocationIndex.close(); - } - - return 0; - } - /** * Get an iterable over pages of entries and locations for a ledger * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 8821e5d2b62..58024f902c9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -1,6 +1,7 @@ package org.apache.bookkeeper.bookie.storage.ldb; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; import java.util.SortedMap; @@ -24,10 +25,12 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; @@ -708,7 +711,6 @@ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey, return numberOfEntries.get(); } - @Override public void registerLedgerDeletionListener(LedgerDeletionListener listener) { ledgerDeletionListeners.add(listener); @@ -726,5 +728,46 @@ private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) { logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } + /** + * Reads ledger index entries to get list of entry-logger that contains given ledgerId + * + * @param ledgerId + * @param serverConf + * @param processor + * @throws IOException + */ + public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration serverConf, + LedgerLoggerProcessor processor) throws IOException { + + checkNotNull(serverConf, "ServerConfiguration can't be null"); + checkNotNull(processor, "LedgerLoggger info processor can't null"); + + LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs()); + String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); + + EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf, + (path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true), + ledgerBasePath, NullStatsLogger.INSTANCE); + try { + long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId); + for (long currentEntry = 0; currentEntry <= lastEntryId; currentEntry++) { + long offset = entryLocationIndex.getLocation(ledgerId, currentEntry); + if (offset <= 0) { + // entry not found in this bookie + continue; + } + long entryLogId = offset >> 32L; + long position = offset & 0xffffffffL; + processor.process(currentEntry, entryLogId, position); + } + } finally { + entryLocationIndex.close(); + } + } + + public interface LedgerLoggerProcessor { + void process(long entryId, long entryLogId, long position); + } + private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class); }