Skip to content
This repository has been archived by the owner on Apr 30, 2019. It is now read-only.

Commit

Permalink
Support DbLedgerStorage in LedgerCmd to get list of logger files for …
Browse files Browse the repository at this point in the history
…a given ledgerId (#6)
  • Loading branch information
rdhabalia committed Jun 16, 2017
1 parent 9393225 commit d08f46e
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageRocksDB;
import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -391,12 +393,26 @@ public int runCmd(CommandLine cmdLine) throws Exception {
printUsage();
return -1;
}
if (printMeta) {
// print meta
readLedgerMeta(ledgerId);

if (bkConf.getLedgerStorageClass().equals(DbLedgerStorage.class.getName())) {
// dump ledger info
try {
DbLedgerStorage.readLedgerIndexEntries(ledgerId, bkConf,
(currentEntry, entryLogId, position) -> System.out.println(
"entry " + currentEntry + "\t:\t(log: " + entryLogId + ", pos: " + position + ")"));
} catch (IOException e) {
System.err.printf("ERROR: initializing dbLedgerStorage %s", e.getMessage());
return -1;
}
} else {
if (printMeta) {
// print meta
readLedgerMeta(ledgerId);
}
// dump ledger info
readLedgerIndexEntries(ledgerId);
}
// dump ledger info
readLedgerIndexEntries(ledgerId);

return 0;
}

Expand Down Expand Up @@ -2005,7 +2021,7 @@ protected void readLedgerMeta(long ledgerId) throws Exception {
}

/**
* Read ledger index entires
* Read ledger index entries
*
* @param ledgerId
* Ledger Id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.bookkeeper.bookie.storage.ldb;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.util.SortedMap;
Expand All @@ -24,10 +25,12 @@
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
Expand Down Expand Up @@ -708,7 +711,6 @@ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,

return numberOfEntries.get();
}

@Override
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
ledgerDeletionListeners.add(listener);
Expand All @@ -726,5 +728,46 @@ private void recordFailedEvent(OpStatsLogger logger, long startTimeNanos) {
logger.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}

/**
* Reads ledger index entries to get list of entry-logger that contains given ledgerId
*
* @param ledgerId
* @param serverConf
* @param processor
* @throws IOException
*/
public static void readLedgerIndexEntries(long ledgerId, ServerConfiguration serverConf,
LedgerLoggerProcessor processor) throws IOException {

checkNotNull(serverConf, "ServerConfiguration can't be null");
checkNotNull(processor, "LedgerLoggger info processor can't null");

LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(serverConf, serverConf.getLedgerDirs());
String ledgerBasePath = ledgerDirsManager.getAllLedgerDirs().get(0).toString();

EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConf,
(path, dbConfigType, conf1) -> new KeyValueStorageRocksDB(path, DbConfigType.Small, conf1, true),
ledgerBasePath, NullStatsLogger.INSTANCE);
try {
long lastEntryId = entryLocationIndex.getLastEntryInLedger(ledgerId);
for (long currentEntry = 0; currentEntry <= lastEntryId; currentEntry++) {
long offset = entryLocationIndex.getLocation(ledgerId, currentEntry);
if (offset <= 0) {
// entry not found in this bookie
continue;
}
long entryLogId = offset >> 32L;
long position = offset & 0xffffffffL;
processor.process(currentEntry, entryLogId, position);
}
} finally {
entryLocationIndex.close();
}
}

public interface LedgerLoggerProcessor {
void process(long entryId, long entryLogId, long position);
}

private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.bookie.BookieShell;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import junit.framework.Assert;

public class LedgerCmdTest extends BookKeeperClusterTestCase {

private final static Logger LOG = LoggerFactory.getLogger(LedgerCmdTest.class);
private DigestType digestType = DigestType.CRC32;
private static final String PASSWORD = "testPasswd";

public LedgerCmdTest() {
super(1);
baseConf.setLedgerStorageClass(DbLedgerStorage.class.getName());
baseConf.setGcWaitTime(60000);
baseConf.setFlushInterval(1);
}


/**
* list of entry logger files that contains given ledgerId
*/
@Test
public void testLedgerDbStorageCmd() throws Exception {

BookKeeper bk = new BookKeeper(baseClientConf, zkc);
LOG.info("Create ledger and add entries to it");
LedgerHandle lh1 = createLedgerWithEntries(bk, 10);

String[] argv = new String[] { "ledger", Long.toString(lh1.getId()) };
final ServerConfiguration conf = bsConfs.get(0);
conf.setUseHostNameAsBookieID(true);

BookieShell bkShell = new BookieShell();
bkShell.setConf(conf);

Assert.assertEquals("Failed to return exit code!", 0, bkShell.run(argv));

}

private LedgerHandle createLedgerWithEntries(BookKeeper bk, int numOfEntries) throws Exception {
LedgerHandle lh = bk.createLedger(1, 1, digestType, PASSWORD.getBytes());
final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
final CountDownLatch latch = new CountDownLatch(numOfEntries);

final AddCallback cb = new AddCallback() {
public void addComplete(int rccb, LedgerHandle lh, long entryId, Object ctx) {
rc.compareAndSet(BKException.Code.OK, rccb);
latch.countDown();
}
};
for (int i = 0; i < numOfEntries; i++) {
lh.asyncAddEntry(("foobar" + i).getBytes(), cb, null);
}
if (!latch.await(30, TimeUnit.SECONDS)) {
throw new Exception("Entries took too long to add");
}
if (rc.get() != BKException.Code.OK) {
throw BKException.create(rc.get());
}
return lh;
}
}

0 comments on commit d08f46e

Please sign in to comment.