Skip to content

Commit

Permalink
HDDS-10787. Updated rocksdb-checkpoint-differ to use managed RocksDB …
Browse files Browse the repository at this point in the history
…objects (apache#6625)

(cherry picked from commit c435a7f)
  • Loading branch information
hemantk-12 authored and xichen01 committed Jul 18, 2024
1 parent 8f67db5 commit 808672f
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics stati
rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(
compactionLogTableCF.getHandle());
// Set activeRocksDB in differ to access compaction log CF.
rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb().get());
rocksDBCheckpointDiffer.setActiveRocksDB(db.getManagedRocksDb());
// Load all previous compaction logs
rocksDBCheckpointDiffer.loadAllCompactionLogs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
*/
package org.apache.hadoop.hdds.utils.db.managed;

import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.Options;
import org.rocksdb.SstFileReader;

import static org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils.track;

/**
* Managed SstFileReader.
*/
public class ManagedSstFileReader extends ManagedObject<SstFileReader> {
public class ManagedSstFileReader extends SstFileReader {

private final UncheckedAutoCloseable leakTracker = track(this);

ManagedSstFileReader(SstFileReader original) {
super(original);
public ManagedSstFileReader(final Options options) {
super(options);
}

public static ManagedSstFileReader managed(SstFileReader reader) {
return new ManagedSstFileReader(reader);
@Override
public void close() {
try {
super.close();
} finally {
leakTracker.close();
}
}
}
37 changes: 0 additions & 37 deletions hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,43 +157,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<id>depcheck</id>
<phase></phase>
</execution>
<execution>
<id>banned-rocksdb-imports</id>
<phase>process-sources</phase>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<RestrictImports>
<includeTestCode>false</includeTestCode>
<reason>Use managed RocksObjects under org.apache.hadoop.hdds.utils.db.managed instead.</reason>
<!-- By default, ban all the classes in org.rocksdb -->
<bannedImport>org.rocksdb.**</bannedImport>
<allowedImports>
<allowedImport>org.rocksdb.AbstractEventListener</allowedImport>
<allowedImport>org.rocksdb.Checkpoint</allowedImport>
<allowedImport>org.rocksdb.ColumnFamilyDescriptor</allowedImport>
<allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
<allowedImport>org.rocksdb.ColumnFamilyOptions</allowedImport>
<allowedImport>org.rocksdb.CompactionJobInfo</allowedImport>
<allowedImport>org.rocksdb.CompressionType</allowedImport>
<allowedImport>org.rocksdb.DBOptions</allowedImport>
<allowedImport>org.rocksdb.FlushOptions</allowedImport>
<allowedImport>org.rocksdb.LiveFileMetaData</allowedImport>
<allowedImport>org.rocksdb.Options</allowedImport>
<allowedImport>org.rocksdb.RocksDB</allowedImport>
<allowedImport>org.rocksdb.RocksDBException</allowedImport>
<allowedImport>org.rocksdb.SstFileReader</allowedImport>
<allowedImport>org.rocksdb.TableProperties</allowedImport>
<allowedImport>org.rocksdb.ReadOptions</allowedImport>
<allowedImport>org.rocksdb.SstFileReaderIterator</allowedImport>
</allowedImports>
<exclusion>org.apache.hadoop.hdds.utils.db.managed.*</exclusion>
</RestrictImports>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSlice;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.apache.hadoop.util.ClosableIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -78,7 +77,7 @@ public long getEstimatedTotalKeys() throws RocksDBException {

try (ManagedOptions options = new ManagedOptions()) {
for (String sstFile : sstFiles) {
try (SstFileReader fileReader = new SstFileReader(options)) {
try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) {
fileReader.open(sstFile);
estimatedSize += fileReader.getTableProperties().getNumEntries();
}
Expand All @@ -96,7 +95,7 @@ public Stream<String> getKeyStream(String lowerBound,
final MultipleSstFileIterator<String> itr =
new MultipleSstFileIterator<String>(sstFiles) {
private ManagedOptions options;
private ReadOptions readOptions;
private ManagedReadOptions readOptions;

private ManagedSlice lowerBoundSLice;

Expand Down Expand Up @@ -125,8 +124,8 @@ protected ClosableIterator<String> getKeyIteratorForFile(String file)
return new ManagedSstFileIterator(file, options, readOptions) {
@Override
protected String getIteratorValue(
SstFileReaderIterator iterator) {
return new String(iterator.key(), UTF_8);
ManagedSstFileReaderIterator iterator) {
return new String(iterator.get().key(), UTF_8);
}
};
}
Expand Down Expand Up @@ -188,18 +187,17 @@ public void close() throws UncheckedIOException {
return getStreamFromIterator(itr);
}

private abstract static class ManagedSstFileIterator implements
ClosableIterator<String> {
private SstFileReader fileReader;
private SstFileReaderIterator fileReaderIterator;
private abstract static class ManagedSstFileIterator implements ClosableIterator<String> {
private final ManagedSstFileReader fileReader;
private final ManagedSstFileReaderIterator fileReaderIterator;

ManagedSstFileIterator(String path, ManagedOptions options,
ReadOptions readOptions)
ManagedReadOptions readOptions)
throws RocksDBException {
this.fileReader = new SstFileReader(options);
this.fileReader = new ManagedSstFileReader(options);
this.fileReader.open(path);
this.fileReaderIterator = fileReader.newIterator(readOptions);
fileReaderIterator.seekToFirst();
this.fileReaderIterator = ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions));
fileReaderIterator.get().seekToFirst();
}

@Override
Expand All @@ -210,15 +208,15 @@ public void close() {

@Override
public boolean hasNext() {
return fileReaderIterator.isValid();
return fileReaderIterator.get().isValid();
}

protected abstract String getIteratorValue(SstFileReaderIterator iterator);
protected abstract String getIteratorValue(ManagedSstFileReaderIterator iterator);

@Override
public String next() {
String value = getIteratorValue(fileReaderIterator);
fileReaderIterator.next();
fileReaderIterator.get().next();
return value;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
Expand All @@ -55,12 +56,9 @@
import org.rocksdb.AbstractEventListener;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.DBOptions;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.TableProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -177,7 +175,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
private AtomicBoolean suspended;

private ColumnFamilyHandle compactionLogTableCFHandle;
private RocksDB activeRocksDB;
private ManagedRocksDB activeRocksDB;

/**
* For snapshot diff calculation we only need to track following column
Expand Down Expand Up @@ -349,32 +347,11 @@ public static void addDebugLevel(Integer level) {
DEBUG_LEVEL.add(level);
}

/**
* Takes {@link org.rocksdb.Options}.
*/
public void setRocksDBForCompactionTracking(Options rocksOptions,
List<AbstractEventListener> list) {
list.add(newCompactionBeginListener());
list.add(newCompactionCompletedListener());
rocksOptions.setListeners(list);
}

public void setRocksDBForCompactionTracking(Options rocksOptions) {
setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
}

/**
* Takes {@link org.rocksdb.DBOptions}.
*/
public void setRocksDBForCompactionTracking(DBOptions rocksOptions,
List<AbstractEventListener> list) {
list.add(newCompactionBeginListener());
list.add(newCompactionCompletedListener());
rocksOptions.setListeners(list);
}

public void setRocksDBForCompactionTracking(DBOptions rocksOptions) {
setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
public void setRocksDBForCompactionTracking(ManagedDBOptions rocksOptions) {
List<AbstractEventListener> events = new ArrayList<>();
events.add(newCompactionBeginListener());
events.add(newCompactionCompletedListener());
rocksOptions.setListeners(events);
}

/**
Expand Down Expand Up @@ -403,7 +380,7 @@ public synchronized void setCompactionLogTableCFHandle(
* Set activeRocksDB to access CompactionLogTable.
* @param activeRocksDB RocksDB
*/
public synchronized void setActiveRocksDB(RocksDB activeRocksDB) {
public synchronized void setActiveRocksDB(ManagedRocksDB activeRocksDB) {
Preconditions.checkNotNull(activeRocksDB, "RocksDB should not be null.");
this.activeRocksDB = activeRocksDB;
}
Expand Down Expand Up @@ -436,8 +413,7 @@ private boolean isSnapshotInfoTableEmpty(RocksDB db) {
// Note the goal of compaction DAG is to track all compactions that happened
// _after_ a DB checkpoint is taken.

try (ManagedRocksIterator it = ManagedRocksIterator.managed(
db.newIterator(snapshotInfoTableCFHandle))) {
try (ManagedRocksIterator it = ManagedRocksIterator.managed(db.newIterator(snapshotInfoTableCFHandle))) {
it.get().seekToFirst();
return !it.get().isValid();
}
Expand Down Expand Up @@ -499,7 +475,6 @@ public void onCompactionBegin(RocksDB db,
};
}


private AbstractEventListener newCompactionCompletedListener() {
return new AbstractEventListener() {
@Override
Expand Down Expand Up @@ -577,7 +552,7 @@ void addToCompactionLogTable(CompactionLogEntry compactionLogEntry) {
byte[] key = keyString.getBytes(UTF_8);
byte[] value = compactionLogEntry.getProtobuf().toByteArray();
try {
activeRocksDB.put(compactionLogTableCFHandle, key, value);
activeRocksDB.get().put(compactionLogTableCFHandle, key, value);
} catch (RocksDBException exception) {
// TODO: Revisit exception handling before merging the PR.
throw new RuntimeException(exception);
Expand Down Expand Up @@ -631,11 +606,11 @@ private long getSSTFileSummary(String filename)
}

try (ManagedOptions option = new ManagedOptions();
ManagedSstFileReader reader = ManagedSstFileReader.managed(new SstFileReader(option))) {
ManagedSstFileReader reader = new ManagedSstFileReader(option)) {

reader.get().open(getAbsoluteSstFilePath(filename));
reader.open(getAbsoluteSstFilePath(filename));

TableProperties properties = reader.get().getTableProperties();
TableProperties properties = reader.getTableProperties();
if (LOG.isDebugEnabled()) {
LOG.debug("{} has {} keys", filename, properties.getNumEntries());
}
Expand Down Expand Up @@ -801,7 +776,7 @@ public void loadAllCompactionLogs() {
preconditionChecksForLoadAllCompactionLogs();
addEntriesFromLogFilesToDagAndCompactionLogTable();
try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator(
activeRocksDB.newIterator(compactionLogTableCFHandle))) {
activeRocksDB.get().newIterator(compactionLogTableCFHandle))) {
managedRocksIterator.get().seekToFirst();
while (managedRocksIterator.get().isValid()) {
byte[] value = managedRocksIterator.get().value();
Expand Down Expand Up @@ -1252,7 +1227,7 @@ private synchronized Pair<Set<String>, List<byte[]>> getOlderFileNodes() {
List<byte[]> keysToRemove = new ArrayList<>();

try (ManagedRocksIterator managedRocksIterator = new ManagedRocksIterator(
activeRocksDB.newIterator(compactionLogTableCFHandle))) {
activeRocksDB.get().newIterator(compactionLogTableCFHandle))) {
managedRocksIterator.get().seekToFirst();
while (managedRocksIterator.get().isValid()) {
CompactionLogEntry compactionLogEntry = CompactionLogEntry
Expand Down Expand Up @@ -1282,7 +1257,7 @@ private synchronized void removeKeyFromCompactionLogTable(
List<byte[]> keysToRemove) {
try {
for (byte[] key: keysToRemove) {
activeRocksDB.delete(compactionLogTableCFHandle, key);
activeRocksDB.get().delete(compactionLogTableCFHandle, key);
}
} catch (RocksDBException exception) {
// TODO Handle exception properly before merging the PR.
Expand Down Expand Up @@ -1575,11 +1550,11 @@ private CompactionFileInfo toFileInfo(String sstFile,
CompactionFileInfo.Builder fileInfoBuilder =
new CompactionFileInfo.Builder(fileName);

try (ManagedSstFileReader fileReader = ManagedSstFileReader.managed(new SstFileReader(options))) {
fileReader.get().open(sstFile);
String columnFamily = StringUtils.bytes2String(fileReader.get().getTableProperties().getColumnFamilyName());
try (ManagedSstFileReader fileReader = new ManagedSstFileReader(options)) {
fileReader.open(sstFile);
String columnFamily = StringUtils.bytes2String(fileReader.getTableProperties().getColumnFamilyName());
try (ManagedSstFileReaderIterator iterator =
ManagedSstFileReaderIterator.managed(fileReader.get().newIterator(readOptions))) {
ManagedSstFileReaderIterator.managed(fileReader.newIterator(readOptions))) {
iterator.get().seekToFirst();
String startKey = StringUtils.bytes2String(iterator.get().key());
iterator.get().seekToLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReader;
import org.apache.hadoop.hdds.utils.db.managed.ManagedSstFileReaderIterator;
import org.rocksdb.SstFileReader;
import org.rocksdb.TableProperties;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,17 +89,17 @@ public static boolean doesSstFileContainKeyRange(String filepath,

try (
ManagedOptions options = new ManagedOptions();
ManagedSstFileReader sstFileReader = ManagedSstFileReader.managed(new SstFileReader(options))) {
sstFileReader.get().open(filepath);
TableProperties properties = sstFileReader.get().getTableProperties();
ManagedSstFileReader sstFileReader = new ManagedSstFileReader(options)) {
sstFileReader.open(filepath);
TableProperties properties = sstFileReader.getTableProperties();
String tableName = new String(properties.getColumnFamilyName(), UTF_8);
if (tableToPrefixMap.containsKey(tableName)) {
String prefix = tableToPrefixMap.get(tableName);

try (
ManagedReadOptions readOptions = new ManagedReadOptions();
ManagedSstFileReaderIterator iterator = ManagedSstFileReaderIterator.managed(
sstFileReader.get().newIterator(readOptions))) {
sstFileReader.newIterator(readOptions))) {
iterator.get().seek(prefix.getBytes(UTF_8));
String seekResultKey = new String(iterator.get().key(), UTF_8);
return seekResultKey.startsWith(prefix);
Expand Down
Loading

0 comments on commit 808672f

Please sign in to comment.