diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 46222aa84f0..86321bd0466 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -44,6 +44,8 @@ import org.apache.bookkeeper.bookie.Journal.JournalScanner; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageRocksDB; import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -390,12 +392,26 @@ public int runCmd(CommandLine cmdLine) throws Exception { printUsage(); return -1; } - if (printMeta) { - // print meta - readLedgerMeta(ledgerId); + + if (bkConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName())) { + // dump ledger info + try { + DbLedgerStorage.readLedgerIndexEntries(ledgerId, bkConf, + (currentEntry, entryLogId, position) -> System.out.println( + "entry " + currentEntry + "\t:\t(log: " + entryLogId + ", pos: " + position + ")")); + } catch (IOException e) { + System.err.printf("ERROR: initializing dbLedgerStorage %s", e.getMessage()); + return -1; + } + } else { + if (printMeta) { + // print meta + readLedgerMeta(ledgerId); + } + // dump ledger info + readLedgerIndexEntries(ledgerId); } - // dump ledger info - readLedgerIndexEntries(ledgerId); + return 0; } @@ -2003,7 +2019,7 @@ protected void readLedgerMeta(long ledgerId) throws Exception { } /** - * Read ledger index entires + * Read ledger index entries * * @param ledgerId * Ledger Id diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index 8821e5d2b62..58024f902c9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -1,6 +1,7 @@ package org.apache.bookkeeper.bookie.storage.ldb; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; import java.util.SortedMap; @@ -24,10 +25,12 @@ import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData; import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch; +import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.MathUtils; @@ -708,7 +711,6 @@ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey, return numberOfEntries.get(); } - @Override public void registerLedgerDeletionListener(LedgerDeletionListener listener) { ledgerDeletionListeners.add(listener); @@ -726,5 +728,46 @@ private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) { logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } + /** + * Reads ledger index entries to get list of entry-logger that contains given ledgerId + * + * @param ledgerId + * @param serverConf + * @param processor + * @throws IOException + */ + public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration serverConf, + LedgerLoggerProcessor processor) throws IOException { + + checkNotNull(serverConf, "ServerConfiguration can't be null"); + checkNotNull(processor, "LedgerLoggger info processor can't null"); + + LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs()); + String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString(); + + EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf, + (path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true), + ledgerBasePath, NullStatsLogger.INSTANCE); + try { + long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId); + for (long currentEntry = 0; currentEntry <= lastEntryId; currentEntry++) { + long offset = entryLocationIndex.getLocation(ledgerId, currentEntry); + if (offset <= 0) { + // entry not found in this bookie + continue; + } + long entryLogId = offset >> 32L; + long position = offset & 0xffffffffL; + processor.process(currentEntry, entryLogId, position); + } + } finally { + entryLocationIndex.close(); + } + } + + public interface LedgerLoggerProcessor { + void process(long entryId, long entryLogId, long position); + } + private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java new file mode 100644 index 00000000000..b43ce6dcbce --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java @@ -0,0 +1,96 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.bookkeeper.bookie.BookieShell; +import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import junit.framework.Assert; + +public class LedgerCmdTest extends BookKeeperClusterTestCase { + + private final static Logger LOG = LoggerFactory.getLogger(LedgerCmdTest.class); + private DigestType digestType = DigestType.CRC32; + private static final String PASSWORD = "testPasswd"; + + public LedgerCmdTest() { + super(1); + baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + baseConf.setGcWaitTime(60000); + baseConf.setFlushInterval(1); + } + + + /** + * list of entry logger files that contains given ledgerId + */ + @Test + public void testLedgerDbStorageCmd() throws Exception { + + BookKeeper bk = new BookKeeper(baseClientConf, zkc); + LOG.info("Create ledger and add entries to it"); + LedgerHandle lh1 = createLedgerWithEntries(bk, 10); + + String[] argv = new String[] { "ledger", Long.toString(lh1.getId()) }; + final ServerConfiguration conf = bsConfs.get(0); + conf.setUseHostNameAsBookieID(true); + + BookieShell bkShell = new BookieShell(); + bkShell.setConf(conf); + + Assert.assertEquals("Failed to return exit code!", 0, bkShell.run(argv)); + + } + + private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws Exception { + LedgerHandle lh = bk.createLedger(1, 1, digestType, PASSWORD.getBytes()); + final AtomicInteger rc = new AtomicInteger(BKException.Code.OK); + final CountDownLatch latch = new CountDownLatch(numOfEntries); + + final AddCallback cb = new AddCallback() { + public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) { + rc.compareAndSet(BKException.Code.OK, rccb); + latch.countDown(); + } + }; + for (int i = 0; i < numOfEntries; i++) { + lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null); + } + if (!latch.await(30, TimeUnit.SECONDS)) { + throw new Exception("Entries took too long to add"); + } + if (rc.get() != BKException.Code.OK) { + throw BKException.create(rc.get()); + } + return lh; + } +}