diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index 6760eb47f48..d5aa961b0e9 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -163,7 +163,7 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics stati rocksDBCheckpointDiffer.setCompactionLogTableCFHandle( compactionLogTableCF.getHandle()); // Set activeRocksDB in differ to access compaction log CF. - rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb().get()); + rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb()); // Load all previous compaction logs rocksDBCheckpointDiffer.loadAllCompactionLogs(); } diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSstFileReader.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSstFileReader.java index b49c6e7a9e4..38d09e601d2 100644 --- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSstFileReader.java +++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedSstFileReader.java @@ -18,18 +18,29 @@ */ package org.apache.hadoop.hdds.utils.db.managed; +import org.apache.ratis.util.UncheckedAutoCloseable; +import org.rocksdb.Options; import org.rocksdb.SstFileReader; +import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track; + /** * Managed SstFileReader. */ -public class ManagedSstFileReader extends ManagedObject { +public class ManagedSstFileReader extends SstFileReader { + + private final UncheckedAutoCloseable leakTracker = track(this); - ManagedSstFileReader(SstFileReader original) { - super(original); + public ManagedSstFileReader(final Options options) { + super(options); } - public static ManagedSstFileReader managed(SstFileReader reader) { - return new ManagedSstFileReader(reader); + @Override + public void close() { + try { + super.close(); + } finally { + leakTracker.close(); + } } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml index 829c0d6ac36..e3d365b6505 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml +++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml @@ -106,43 +106,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> depcheck - - banned-rocksdb-imports - process-sources - - enforce - - - - - false - Use managed RocksObjects under org.apache.hadoop.hdds.utils.db.managed instead. - - org.rocksdb.** - - org.rocksdb.AbstractEventListener - org.rocksdb.Checkpoint - org.rocksdb.ColumnFamilyDescriptor - org.rocksdb.ColumnFamilyHandle - org.rocksdb.ColumnFamilyOptions - org.rocksdb.CompactionJobInfo - org.rocksdb.CompressionType - org.rocksdb.DBOptions - org.rocksdb.FlushOptions - org.rocksdb.LiveFileMetaData - org.rocksdb.Options - org.rocksdb.RocksDB - org.rocksdb.RocksDBException - org.rocksdb.SstFileReader - org.rocksdb.TableProperties - org.rocksdb.ReadOptions - org.rocksdb.SstFileReaderIterator - - org.apache.hadoop.hdds.utils.db.managed.* - - - - diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java index 913eeb73384..ea5060b22a4 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileSetReader.java @@ -23,13 +23,12 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator; import org.apache.hadoop.util.ClosableIterator; import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; -import org.rocksdb.ReadOptions; import org.rocksdb.RocksDBException; -import org.rocksdb.SstFileReader; -import org.rocksdb.SstFileReaderIterator; import java.io.IOException; import java.io.UncheckedIOException; @@ -78,7 +77,7 @@ public long getEstimatedTotalKeys() throws RocksDBException { try (ManagedOptions options = new ManagedOptions()) { for (String sstFile : sstFiles) { - try (SstFileReader fileReader = new SstFileReader(options)) { + try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) { fileReader.open(sstFile); estimatedSize += fileReader.getTableProperties().getNumEntries(); } @@ -95,7 +94,7 @@ public Stream getKeyStream(String lowerBound, // TODO: [SNAPSHOT] Check if default Options and ReadOptions is enough. final MultipleSstFileIterator itr = new MultipleSstFileIterator(sstFiles) { private ManagedOptions options; - private ReadOptions readOptions; + private ManagedReadOptions readOptions; private ManagedSlice lowerBoundSLice; @@ -122,9 +121,8 @@ protected void init() { protected ClosableIterator getKeyIteratorForFile(String file) throws RocksDBException { return new ManagedSstFileIterator(file, options, readOptions) { @Override - protected String getIteratorValue( - SstFileReaderIterator iterator) { - return new String(iterator.key(), UTF_8); + protected String getIteratorValue(ManagedSstFileReaderIterator iterator) { + return new String(iterator.get().key(), UTF_8); } }; } @@ -177,14 +175,15 @@ public void close() throws UncheckedIOException { } private abstract static class ManagedSstFileIterator implements ClosableIterator { - private final SstFileReader fileReader; - private final SstFileReaderIterator fileReaderIterator; + private final ManagedSstFileReader fileReader; + private final ManagedSstFileReaderIterator fileReaderIterator; - ManagedSstFileIterator(String path, ManagedOptions options, ReadOptions readOptions) throws RocksDBException { - this.fileReader = new SstFileReader(options); + ManagedSstFileIterator(String path, ManagedOptions options, ManagedReadOptions readOptions) + throws RocksDBException { + this.fileReader = new ManagedSstFileReader(options); this.fileReader.open(path); - this.fileReaderIterator = fileReader.newIterator(readOptions); - fileReaderIterator.seekToFirst(); + this.fileReaderIterator = ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions)); + fileReaderIterator.get().seekToFirst(); } @Override @@ -195,15 +194,15 @@ public void close() { @Override public boolean hasNext() { - return fileReaderIterator.isValid(); + return fileReaderIterator.get().isValid(); } - protected abstract String getIteratorValue(SstFileReaderIterator iterator); + protected abstract String getIteratorValue(ManagedSstFileReaderIterator iterator); @Override public String next() { String value = getIteratorValue(fileReaderIterator); - fileReaderIterator.next(); + fileReaderIterator.get().next(); return value; } } diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java index 3fa1459488e..fef6f05ae06 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.Scheduler; +import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; @@ -55,12 +56,9 @@ import org.rocksdb.AbstractEventListener; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactionJobInfo; -import org.rocksdb.DBOptions; import org.rocksdb.LiveFileMetaData; -import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.SstFileReader; import org.rocksdb.TableProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,7 +175,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable, private AtomicBoolean suspended; private ColumnFamilyHandle compactionLogTableCFHandle; - private RocksDB activeRocksDB; + private ManagedRocksDB activeRocksDB; /** * For snapshot diff calculation we only need to track following column @@ -349,32 +347,11 @@ public static void addDebugLevel(Integer level) { DEBUG_LEVEL.add(level); } - /** - * Takes {@link org.rocksdb.Options}. - */ - public void setRocksDBForCompactionTracking(Options rocksOptions, - List list) { - list.add(newCompactionBeginListener()); - list.add(newCompactionCompletedListener()); - rocksOptions.setListeners(list); - } - - public void setRocksDBForCompactionTracking(Options rocksOptions) { - setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>()); - } - - /** - * Takes {@link org.rocksdb.DBOptions}. - */ - public void setRocksDBForCompactionTracking(DBOptions rocksOptions, - List list) { - list.add(newCompactionBeginListener()); - list.add(newCompactionCompletedListener()); - rocksOptions.setListeners(list); - } - - public void setRocksDBForCompactionTracking(DBOptions rocksOptions) { - setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>()); + public void setRocksDBForCompactionTracking(ManagedDBOptions rocksOptions) { + List events = new ArrayList<>(); + events.add(newCompactionBeginListener()); + events.add(newCompactionCompletedListener()); + rocksOptions.setListeners(events); } /** @@ -403,7 +380,7 @@ public synchronized void setCompactionLogTableCFHandle( * Set activeRocksDB to access CompactionLogTable. * @param activeRocksDB RocksDB */ - public synchronized void setActiveRocksDB(RocksDB activeRocksDB) { + public synchronized void setActiveRocksDB(ManagedRocksDB activeRocksDB) { Preconditions.checkNotNull(activeRocksDB, "RocksDB should not be null."); this.activeRocksDB = activeRocksDB; } @@ -436,8 +413,7 @@ private boolean isSnapshotInfoTableEmpty(RocksDB db) { // Note the goal of compaction DAG is to track all compactions that happened // _after_ a DB checkpoint is taken. - try (ManagedRocksIterator it = ManagedRocksIterator.managed( - db.newIterator(snapshotInfoTableCFHandle))) { + try (ManagedRocksIterator it = ManagedRocksIterator.managed(db.newIterator(snapshotInfoTableCFHandle))) { it.get().seekToFirst(); return !it.get().isValid(); } @@ -499,7 +475,6 @@ public void onCompactionBegin(RocksDB db, }; } - private AbstractEventListener newCompactionCompletedListener() { return new AbstractEventListener() { @Override @@ -577,7 +552,7 @@ void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) { byte[] key = keyString.getBytes(UTF_8); byte[] value = compactionLogEntry.getProtobuf().toByteArray(); try { - activeRocksDB.put(compactionLogTableCFHandle, key, value); + activeRocksDB.get().put(compactionLogTableCFHandle, key, value); } catch (RocksDBException exception) { // TODO: Revisit exception handling before merging the PR. throw new RuntimeException(exception); @@ -631,11 +606,11 @@ private long getSSTFileSummary(String filename) } try (ManagedOptions option = new ManagedOptions(); - ManagedSstFileReader reader = ManagedSstFileReader.managed(new SstFileReader(option))) { + ManagedSstFileReader reader = new ManagedSstFileReader(option)) { - reader.get().open(getAbsoluteSstFilePath(filename)); + reader.open(getAbsoluteSstFilePath(filename)); - TableProperties properties = reader.get().getTableProperties(); + TableProperties properties = reader.getTableProperties(); if (LOG.isDebugEnabled()) { LOG.debug("{} has {} keys", filename, properties.getNumEntries()); } @@ -801,7 +776,7 @@ public void loadAllCompactionLogs() { preconditionChecksForLoadAllCompactionLogs(); addEntriesFromLogFilesToDagAndCompactionLogTable(); try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator( - activeRocksDB.newIterator(compactionLogTableCFHandle))) { + activeRocksDB.get().newIterator(compactionLogTableCFHandle))) { managedRocksIterator.get().seekToFirst(); while (managedRocksIterator.get().isValid()) { byte[] value = managedRocksIterator.get().value(); @@ -1252,7 +1227,7 @@ private synchronized Pair, List> getOlderFileNodes() { List keysToRemove = new ArrayList<>(); try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator( - activeRocksDB.newIterator(compactionLogTableCFHandle))) { + activeRocksDB.get().newIterator(compactionLogTableCFHandle))) { managedRocksIterator.get().seekToFirst(); while (managedRocksIterator.get().isValid()) { CompactionLogEntry compactionLogEntry = CompactionLogEntry @@ -1282,7 +1257,7 @@ private synchronized void removeKeyFromCompactionLogTable( List keysToRemove) { try { for (byte[] key: keysToRemove) { - activeRocksDB.delete(compactionLogTableCFHandle, key); + activeRocksDB.get().delete(compactionLogTableCFHandle, key); } } catch (RocksDBException exception) { // TODO Handle exception properly before merging the PR. @@ -1575,11 +1550,11 @@ private CompactionFileInfo toFileInfo(String sstFile, CompactionFileInfo.Builder fileInfoBuilder = new CompactionFileInfo.Builder(fileName); - try (ManagedSstFileReader fileReader = ManagedSstFileReader.managed(new SstFileReader(options))) { - fileReader.get().open(sstFile); - String columnFamily = StringUtils.bytes2String(fileReader.get().getTableProperties().getColumnFamilyName()); + try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) { + fileReader.open(sstFile); + String columnFamily = StringUtils.bytes2String(fileReader.getTableProperties().getColumnFamilyName()); try (ManagedSstFileReaderIterator iterator = - ManagedSstFileReaderIterator.managed(fileReader.get().newIterator(readOptions))) { + ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions))) { iterator.get().seekToFirst(); String startKey = StringUtils.bytes2String(iterator.get().key()); iterator.get().seekToLast(); diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java index 5ddcf8b7e6a..e116868410f 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator; -import org.rocksdb.SstFileReader; import org.rocksdb.TableProperties; import org.rocksdb.RocksDBException; import org.slf4j.Logger; @@ -90,9 +89,9 @@ public static boolean doesSstFileContainKeyRange(String filepath, try ( ManagedOptions options = new ManagedOptions(); - ManagedSstFileReader sstFileReader = ManagedSstFileReader.managed(new SstFileReader(options))) { - sstFileReader.get().open(filepath); - TableProperties properties = sstFileReader.get().getTableProperties(); + ManagedSstFileReader sstFileReader = new ManagedSstFileReader(options)) { + sstFileReader.open(filepath); + TableProperties properties = sstFileReader.getTableProperties(); String tableName = new String(properties.getColumnFamilyName(), UTF_8); if (tableToPrefixMap.containsKey(tableName)) { String prefix = tableToPrefixMap.get(tableName); @@ -100,7 +99,7 @@ public static boolean doesSstFileContainKeyRange(String filepath, try ( ManagedReadOptions readOptions = new ManagedReadOptions(); ManagedSstFileReaderIterator iterator = ManagedSstFileReaderIterator.managed( - sstFileReader.get().newIterator(readOptions))) { + sstFileReader.newIterator(readOptions))) { iterator.get().seek(prefix.getBytes(UTF_8)); String seekResultKey = new String(iterator.get().key(), UTF_8); return seekResultKey.startsWith(prefix); diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java index 7cb0e6f6253..0164e3a23bd 100644 --- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java +++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java @@ -56,9 +56,14 @@ import org.apache.hadoop.hdds.StringUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.hadoop.hdds.utils.db.managed.ManagedCheckpoint; +import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedFlushOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.ozone.compaction.log.CompactionFileInfo; import org.apache.ozone.compaction.log.CompactionLogEntry; @@ -70,18 +75,11 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; -import org.rocksdb.FlushOptions; import org.rocksdb.LiveFileMetaData; -import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.SstFileReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -138,7 +136,9 @@ public class TestRocksDBCheckpointDiffer { private ConfigurationSource config; private ExecutorService executorService = Executors.newCachedThreadPool(); private RocksDBCheckpointDiffer rocksDBCheckpointDiffer; - private RocksDB activeRocksDB; + private ManagedRocksDB activeRocksDB; + + private ManagedDBOptions dbOptions; private ColumnFamilyHandle keyTableCFHandle; private ColumnFamilyHandle directoryTableCFHandle; private ColumnFamilyHandle fileTableCFHandle; @@ -181,17 +181,16 @@ public void init() throws RocksDBException { ACTIVE_DB_DIR_NAME, config); - ColumnFamilyOptions cfOpts = new ColumnFamilyOptions() - .optimizeUniversalStyleCompaction(); + ManagedColumnFamilyOptions cfOpts = new ManagedColumnFamilyOptions(); + cfOpts.optimizeUniversalStyleCompaction(); List cfDescriptors = getCFDescriptorList(cfOpts); List cfHandles = new ArrayList<>(); - DBOptions dbOptions = new DBOptions() - .setCreateIfMissing(true) - .setCreateMissingColumnFamilies(true); + dbOptions = new ManagedDBOptions(); + dbOptions.setCreateIfMissing(true); + dbOptions.setCreateMissingColumnFamilies(true); rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions); - activeRocksDB = RocksDB.open(dbOptions, ACTIVE_DB_DIR_NAME, cfDescriptors, - cfHandles); + activeRocksDB = ManagedRocksDB.open(dbOptions, ACTIVE_DB_DIR_NAME, cfDescriptors, cfHandles); keyTableCFHandle = cfHandles.get(1); directoryTableCFHandle = cfHandles.get(2); fileTableCFHandle = cfHandles.get(3); @@ -600,12 +599,12 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ) /** * Helper function that creates an RDB checkpoint (= Ozone snapshot). */ - private void createCheckpoint(RocksDB rocksDB) throws RocksDBException { + private void createCheckpoint(ManagedRocksDB rocksDB) throws RocksDBException { LOG.trace("Current time: " + System.currentTimeMillis()); long t1 = System.currentTimeMillis(); - final long snapshotGeneration = rocksDB.getLatestSequenceNumber(); + final long snapshotGeneration = rocksDB.get().getLatestSequenceNumber(); final String cpPath = CP_PATH_PREFIX + snapshotGeneration; // Delete the checkpoint dir if it already exists for the test @@ -631,12 +630,12 @@ private void createCheckpoint(RocksDB rocksDB) throws RocksDBException { } // Flushes the WAL and Creates a RocksDB checkpoint - void createCheckPoint(String dbPathArg, String cpPathArg, RocksDB rocksDB) { + void createCheckPoint(String dbPathArg, String cpPathArg, ManagedRocksDB rocksDB) { LOG.debug("Creating RocksDB '{}' checkpoint at '{}'", dbPathArg, cpPathArg); - try { - rocksDB.flush(new FlushOptions()); - Checkpoint cp = Checkpoint.create(rocksDB); - cp.createCheckpoint(cpPathArg); + try (ManagedFlushOptions flushOptions = new ManagedFlushOptions()) { + rocksDB.get().flush(flushOptions); + ManagedCheckpoint cp = ManagedCheckpoint.create(rocksDB); + cp.get().createCheckpoint(cpPathArg); } catch (RocksDBException e) { throw new RuntimeException(e.getMessage()); } @@ -654,7 +653,7 @@ void printAllSnapshots() { * @return List of ColumnFamilyDescriptor */ static List getCFDescriptorList( - ColumnFamilyOptions cfOpts) { + ManagedColumnFamilyOptions cfOpts) { return asList( new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts), new ColumnFamilyDescriptor("keyTable".getBytes(UTF_8), cfOpts), @@ -671,7 +670,7 @@ private void writeKeysAndCheckpointing() throws RocksDBException { String valueStr = "Val-" + i + "-" + generatedString; byte[] key = keyStr.getBytes(UTF_8); // Put entry in keyTable - activeRocksDB.put(keyTableCFHandle, key, valueStr.getBytes(UTF_8)); + activeRocksDB.get().put(keyTableCFHandle, key, valueStr.getBytes(UTF_8)); if (i % SNAPSHOT_EVERY_SO_MANY_KEYS == 0) { createCheckpoint(activeRocksDB); } @@ -691,27 +690,32 @@ private boolean deleteDirectory(File directoryToBeDeleted) { return directoryToBeDeleted.delete(); } + public List getColumnFamilyDescriptors(String dbPath) throws RocksDBException { + try (ManagedOptions emptyOptions = new ManagedOptions()) { + List cfList = RocksDB.listColumnFamilies(emptyOptions, dbPath); + return cfList.stream().map(ColumnFamilyDescriptor::new).collect(Collectors.toList()); + } + } + // Read from a given RocksDB instance and optionally write all the // keys to a given file. private void readRocksDBInstance(String dbPathArg, - RocksDB rocksDB, + ManagedRocksDB rocksDB, FileWriter file, RocksDBCheckpointDiffer differ) { LOG.debug("Reading RocksDB: " + dbPathArg); boolean createdDB = false; - try (Options options = new Options() - .setParanoidChecks(true) - .setForceConsistencyChecks(false)) { - + try (ManagedDBOptions dbOptions = new ManagedDBOptions()) { + List cfDescriptors = getColumnFamilyDescriptors(dbPathArg); + List cfHandles = new ArrayList<>(); if (rocksDB == null) { - rocksDB = RocksDB.openReadOnly(options, dbPathArg); + rocksDB = ManagedRocksDB.openReadOnly(dbOptions, dbPathArg, cfDescriptors, cfHandles); createdDB = true; } - List liveFileMetaDataList = - rocksDB.getLiveFilesMetaData(); + List liveFileMetaDataList = rocksDB.get().getLiveFilesMetaData(); for (LiveFileMetaData m : liveFileMetaDataList) { LOG.debug("SST File: {}. ", m.fileName()); LOG.debug("\tLevel: {}", m.level()); @@ -726,17 +730,20 @@ private void readRocksDBInstance(String dbPathArg, } if (differ.debugEnabled(DEBUG_READ_ALL_DB_KEYS)) { - RocksIterator iter = rocksDB.newIterator(); - for (iter.seekToFirst(); iter.isValid(); iter.next()) { - LOG.debug("Iterator key:" + bytes2String(iter.key()) + ", iter value:" + bytes2String(iter.value())); - if (file != null) { - file.write("iterator key:" + bytes2String(iter.key()) + ", iter value:" + bytes2String(iter.value())); - file.write("\n"); + try (ManagedRocksIterator iter = new ManagedRocksIterator(rocksDB.get().newIterator())) { + for (iter.get().seekToFirst(); iter.get().isValid(); iter.get().next()) { + LOG.debug( + "Iterator key:" + bytes2String(iter.get().key()) + ", iter value:" + bytes2String(iter.get().value())); + if (file != null) { + file.write("iterator key:" + bytes2String(iter.get().key()) + ", iter value:" + + bytes2String(iter.get().value())); + file.write("\n"); + } } } } } catch (IOException | RocksDBException e) { - e.printStackTrace(); + LOG.error("Caught exception while reading from rocksDB.", e); } finally { if (createdDB) { rocksDB.close(); @@ -1313,7 +1320,7 @@ public void testPruneOlderSnapshotsWithCompactionHistory( private int countEntriesInCompactionLogTable() { try (ManagedRocksIterator iterator = new ManagedRocksIterator( - activeRocksDB.newIterator(compactionLogTableCFHandle))) { + activeRocksDB.get().newIterator(compactionLogTableCFHandle))) { iterator.get().seekToFirst(); int count = 0; while (iterator.get().isValid()) { @@ -1833,14 +1840,16 @@ private void createKeys(ColumnFamilyHandle cfh, String valuePrefix, int numberOfKeys) throws RocksDBException { - for (int i = 0; i < numberOfKeys; ++i) { - String generatedString = RandomStringUtils.randomAlphabetic(7); - String keyStr = keyPrefix + i + "-" + generatedString; - String valueStr = valuePrefix + i + "-" + generatedString; - byte[] key = keyStr.getBytes(UTF_8); - activeRocksDB.put(cfh, key, valueStr.getBytes(UTF_8)); - if (i % 10 == 0) { - activeRocksDB.flush(new FlushOptions(), cfh); + try (ManagedFlushOptions flushOptions = new ManagedFlushOptions()) { + for (int i = 0; i < numberOfKeys; ++i) { + String generatedString = RandomStringUtils.randomAlphabetic(7); + String keyStr = keyPrefix + i + "-" + generatedString; + String valueStr = valuePrefix + i + "-" + generatedString; + byte[] key = keyStr.getBytes(UTF_8); + activeRocksDB.get().put(cfh, key, valueStr.getBytes(UTF_8)); + if (i % 10 == 0) { + activeRocksDB.get().flush(flushOptions, cfh); + } } } } @@ -1877,7 +1886,7 @@ public void testDagOnlyContainsDesiredCfh() Stream pathStream = Files.list( Paths.get(rocksDBCheckpointDiffer.getSSTBackupDir()))) { pathStream.forEach(path -> { - try (SstFileReader fileReader = new SstFileReader(options)) { + try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) { fileReader.open(path.toAbsolutePath().toString()); String columnFamily = bytes2String(fileReader.getTableProperties().getColumnFamilyName()); assertThat(COLUMN_FAMILIES_TO_TRACK_IN_DAG).contains(columnFamily); diff --git a/pom.xml b/pom.xml index 140716abfe1..87916856c60 100644 --- a/pom.xml +++ b/pom.xml @@ -1440,6 +1440,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs org.rocksdb.OptionsUtil org.rocksdb.RocksDBException org.rocksdb.StatsLevel + org.rocksdb.TableProperties org.rocksdb.TransactionLogIterator.BatchResult org.rocksdb.TickerType org.rocksdb.LiveFileMetaData @@ -1452,7 +1453,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs org.rocksdb.RocksDB.* - org.apache.hadoop.hdds.utils.db.managed.* + + org.apache.hadoop.hdds.utils.db.managed.* + org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer +